|
|
|
@ -41,9 +41,7 @@
|
|
|
|
|
#include "asterisk/stasis_bridges.h"
|
|
|
|
|
#include "asterisk/stasis_endpoints.h"
|
|
|
|
|
#include "asterisk/config_options.h"
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
#include "asterisk/cli.h"
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
/*** DOCUMENTATION
|
|
|
|
|
<managerEvent language="en_US" name="UserEvent">
|
|
|
|
@ -307,6 +305,16 @@ static struct ast_threadpool *pool;
|
|
|
|
|
|
|
|
|
|
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
|
|
|
|
|
|
|
|
|
|
#if defined(LOW_MEMORY)
|
|
|
|
|
|
|
|
|
|
#define TOPIC_ALL_BUCKETS 257
|
|
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
|
|
#define TOPIC_ALL_BUCKETS 997
|
|
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
|
|
|
|
|
/*! The number of buckets to use for topic statistics */
|
|
|
|
@ -372,9 +380,37 @@ struct stasis_topic {
|
|
|
|
|
int subscriber_id;
|
|
|
|
|
|
|
|
|
|
/*! Name of the topic */
|
|
|
|
|
char name[0];
|
|
|
|
|
char *name;
|
|
|
|
|
|
|
|
|
|
/*! Detail of the topic */
|
|
|
|
|
char *detail;
|
|
|
|
|
|
|
|
|
|
/*! Creation time */
|
|
|
|
|
struct timeval *creationtime;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct ao2_container *topic_all;
|
|
|
|
|
|
|
|
|
|
struct topic_proxy {
|
|
|
|
|
AO2_WEAKPROXY();
|
|
|
|
|
|
|
|
|
|
char *name;
|
|
|
|
|
char *detail;
|
|
|
|
|
|
|
|
|
|
struct timeval creationtime;
|
|
|
|
|
|
|
|
|
|
char buf[0];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
AO2_STRING_FIELD_HASH_FN(topic_proxy, name);
|
|
|
|
|
AO2_STRING_FIELD_CMP_FN(topic_proxy, name);
|
|
|
|
|
AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name);
|
|
|
|
|
|
|
|
|
|
static void proxy_dtor(void *weakproxy, void *data)
|
|
|
|
|
{
|
|
|
|
|
ao2_unlink(topic_all, weakproxy);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* Forward declarations for the tightly-coupled subscription object */
|
|
|
|
|
static int topic_add_subscription(struct stasis_topic *topic,
|
|
|
|
|
struct stasis_subscription *sub);
|
|
|
|
@ -394,6 +430,9 @@ static void topic_dtor(void *obj)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic *topic = obj;
|
|
|
|
|
|
|
|
|
|
ast_debug(2, "Destroying topic. name: %s, detail: %s\n",
|
|
|
|
|
topic->name, topic->detail);
|
|
|
|
|
|
|
|
|
|
/* Subscribers hold a reference to topics, so they should all be
|
|
|
|
|
* unsubscribed before we get here. */
|
|
|
|
|
ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0);
|
|
|
|
@ -442,40 +481,145 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(struct sta
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
struct stasis_topic *stasis_topic_create(const char *name)
|
|
|
|
|
static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail)
|
|
|
|
|
{
|
|
|
|
|
struct topic_proxy *proxy;
|
|
|
|
|
struct stasis_topic* topic_tmp;
|
|
|
|
|
|
|
|
|
|
if (!topic || !name || !strlen(name) || !detail) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ao2_wrlock(topic_all);
|
|
|
|
|
|
|
|
|
|
topic_tmp = stasis_topic_get(name);
|
|
|
|
|
if (topic_tmp) {
|
|
|
|
|
ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name);
|
|
|
|
|
ao2_ref(topic_tmp, -1);
|
|
|
|
|
ao2_unlock(topic_all);
|
|
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
proxy = ao2_t_weakproxy_alloc(
|
|
|
|
|
sizeof(*proxy) + strlen(name) + 1 + strlen(detail) + 1, NULL, topic->name);
|
|
|
|
|
if (!proxy) {
|
|
|
|
|
ao2_unlock(topic_all);
|
|
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* set the proxy info */
|
|
|
|
|
proxy->name = proxy->buf;
|
|
|
|
|
proxy->detail = proxy->name + strlen(name) + 1;
|
|
|
|
|
|
|
|
|
|
strcpy(proxy->name, name); /* SAFE */
|
|
|
|
|
strcpy(proxy->detail, detail); /* SAFE */
|
|
|
|
|
proxy->creationtime = ast_tvnow();
|
|
|
|
|
|
|
|
|
|
/* We have exclusive access to proxy, no need for locking here. */
|
|
|
|
|
if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) {
|
|
|
|
|
ao2_cleanup(proxy);
|
|
|
|
|
ao2_unlock(topic_all);
|
|
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ao2_weakproxy_subscribe(proxy, proxy_dtor, NULL, OBJ_NOLOCK)) {
|
|
|
|
|
ao2_cleanup(proxy);
|
|
|
|
|
ao2_unlock(topic_all);
|
|
|
|
|
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* setting the topic point to the proxy */
|
|
|
|
|
topic->name = proxy->name;
|
|
|
|
|
topic->detail = proxy->detail;
|
|
|
|
|
topic->creationtime = &(proxy->creationtime);
|
|
|
|
|
|
|
|
|
|
ao2_link_flags(topic_all, proxy, OBJ_NOLOCK);
|
|
|
|
|
ao2_ref(proxy, -1);
|
|
|
|
|
|
|
|
|
|
ao2_unlock(topic_all);
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct stasis_topic *stasis_topic_create_with_detail(
|
|
|
|
|
const char *name, const char* detail
|
|
|
|
|
)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic *topic;
|
|
|
|
|
int res = 0;
|
|
|
|
|
|
|
|
|
|
topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name);
|
|
|
|
|
if (!name|| !strlen(name) || !detail) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail);
|
|
|
|
|
|
|
|
|
|
topic = stasis_topic_get(name);
|
|
|
|
|
if (topic) {
|
|
|
|
|
ast_debug(2, "Topic is already exist. name: %s, detail: %s\n",
|
|
|
|
|
name, detail);
|
|
|
|
|
return topic;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name);
|
|
|
|
|
if (!topic) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
strcpy(topic->name, name); /* SAFE */
|
|
|
|
|
res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
|
|
|
|
|
res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
|
|
|
|
|
ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
|
|
|
|
|
if (res) {
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* link to the proxy */
|
|
|
|
|
if (link_topic_proxy(topic, name, detail)) {
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
topic->statistics = stasis_topic_statistics_create(topic);
|
|
|
|
|
if (!topic->name || !topic->statistics || res)
|
|
|
|
|
#else
|
|
|
|
|
if (!topic->name || res)
|
|
|
|
|
#endif
|
|
|
|
|
{
|
|
|
|
|
if (!topic->statistics) {
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
ast_debug(1, "Topic '%s': %p created\n", topic->name, topic);
|
|
|
|
|
|
|
|
|
|
return topic;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct stasis_topic *stasis_topic_create(const char *name)
|
|
|
|
|
{
|
|
|
|
|
return stasis_topic_create_with_detail(name, "");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct stasis_topic *stasis_topic_get(const char *name)
|
|
|
|
|
{
|
|
|
|
|
return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, "");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const char *stasis_topic_name(const struct stasis_topic *topic)
|
|
|
|
|
{
|
|
|
|
|
if (!topic) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
return topic->name;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const char *stasis_topic_detail(const struct stasis_topic *topic)
|
|
|
|
|
{
|
|
|
|
|
if (!topic) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
return topic->detail;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t stasis_topic_subscribers(const struct stasis_topic *topic)
|
|
|
|
|
{
|
|
|
|
|
return AST_VECTOR_SIZE(&topic->subscribers);
|
|
|
|
@ -2134,6 +2278,142 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type,
|
|
|
|
|
|
|
|
|
|
/*! @} */
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis show topics'
|
|
|
|
|
*/
|
|
|
|
|
static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct ao2_iterator iter;
|
|
|
|
|
struct topic_proxy *topic;
|
|
|
|
|
struct ao2_container *tmp_container;
|
|
|
|
|
int count = 0;
|
|
|
|
|
#define FMT_HEADERS "%-64s %-64s\n"
|
|
|
|
|
#define FMT_FIELDS "%-64s %-64s\n"
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis show topics";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis show topics\n"
|
|
|
|
|
" Shows a list of topics\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != e->args) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail");
|
|
|
|
|
|
|
|
|
|
tmp_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
|
|
|
|
|
topic_proxy_sort_fn, NULL);
|
|
|
|
|
|
|
|
|
|
if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) {
|
|
|
|
|
ao2_cleanup(tmp_container);
|
|
|
|
|
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* getting all topic in order */
|
|
|
|
|
iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK);
|
|
|
|
|
while ((topic = ao2_iterator_next(&iter))) {
|
|
|
|
|
ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail);
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
++count;
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&iter);
|
|
|
|
|
ao2_cleanup(tmp_container);
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "\n%d Total topics\n\n", count);
|
|
|
|
|
|
|
|
|
|
#undef FMT_HEADERS
|
|
|
|
|
#undef FMT_FIELDS
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI tab completion for topic names
|
|
|
|
|
*/
|
|
|
|
|
static char *topic_complete_name(const char *word)
|
|
|
|
|
{
|
|
|
|
|
struct topic_proxy *topic;
|
|
|
|
|
struct ao2_iterator it;
|
|
|
|
|
int wordlen = strlen(word);
|
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
|
|
it = ao2_iterator_init(topic_all, 0);
|
|
|
|
|
while ((topic = ao2_iterator_next(&it))) {
|
|
|
|
|
if (!strncasecmp(word, topic->name, wordlen)) {
|
|
|
|
|
ret = ast_cli_completion_add(ast_strdup(topic->name));
|
|
|
|
|
if (ret) {
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
}
|
|
|
|
|
ao2_iterator_destroy(&it);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*!
|
|
|
|
|
* \internal
|
|
|
|
|
* \brief CLI command implementation for 'stasis show topic'
|
|
|
|
|
*/
|
|
|
|
|
static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
|
|
|
|
{
|
|
|
|
|
struct stasis_topic *topic;
|
|
|
|
|
char print_time[32];
|
|
|
|
|
|
|
|
|
|
switch (cmd) {
|
|
|
|
|
case CLI_INIT:
|
|
|
|
|
e->command = "stasis show topic";
|
|
|
|
|
e->usage =
|
|
|
|
|
"Usage: stasis show topic <name>\n"
|
|
|
|
|
" Show stasis topic detail info.\n";
|
|
|
|
|
return NULL;
|
|
|
|
|
case CLI_GENERATE:
|
|
|
|
|
if (a->pos == 3) {
|
|
|
|
|
return topic_complete_name(a->word);
|
|
|
|
|
} else {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (a->argc != 4) {
|
|
|
|
|
return CLI_SHOWUSAGE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic = stasis_topic_get(a->argv[3]);
|
|
|
|
|
if (!topic) {
|
|
|
|
|
ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]);
|
|
|
|
|
return CLI_FAILURE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ast_cli(a->fd, "Name: %s\n", topic->name);
|
|
|
|
|
ast_cli(a->fd, "Detail: %s\n", topic->detail);
|
|
|
|
|
ast_cli(a->fd, "Subscribers count: %lu\n", AST_VECTOR_SIZE(&topic->subscribers));
|
|
|
|
|
ast_cli(a->fd, "Forwarding topic count: %lu\n", AST_VECTOR_SIZE(&topic->upstream_topics));
|
|
|
|
|
ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time));
|
|
|
|
|
ast_cli(a->fd, "Duration time: %s\n", print_time);
|
|
|
|
|
|
|
|
|
|
ao2_ref(topic, -1);
|
|
|
|
|
|
|
|
|
|
return CLI_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static struct ast_cli_entry cli_stasis[] = {
|
|
|
|
|
AST_CLI_DEFINE(stasis_show_topics, "Show all topics"),
|
|
|
|
|
AST_CLI_DEFINE(stasis_show_topic, "Show topic"),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
|
|
|
|
|
AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid);
|
|
|
|
@ -2646,6 +2926,9 @@ static void stasis_cleanup(void)
|
|
|
|
|
ao2_cleanup(subscription_statistics);
|
|
|
|
|
ao2_cleanup(topic_statistics);
|
|
|
|
|
#endif
|
|
|
|
|
ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis));
|
|
|
|
|
ao2_cleanup(topic_all);
|
|
|
|
|
topic_all = NULL;
|
|
|
|
|
ast_threadpool_shutdown(pool);
|
|
|
|
|
pool = NULL;
|
|
|
|
|
STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
|
|
|
|
@ -2740,6 +3023,16 @@ int stasis_init(void)
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS,
|
|
|
|
|
topic_proxy_hash_fn, 0, topic_proxy_cmp_fn);
|
|
|
|
|
if (!topic_all) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef AST_DEVMODE
|
|
|
|
|
/* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying
|
|
|
|
|
* topic or subscripton.
|
|
|
|
|