diff --git a/apps/app_confbridge.c b/apps/app_confbridge.c index a4e5c67a8d..edb7e03659 100644 --- a/apps/app_confbridge.c +++ b/apps/app_confbridge.c @@ -1111,13 +1111,15 @@ static void destroy_conference_bridge(void *obj) if (conference->playback_queue) { struct hangup_data hangup; hangup_data_init(&hangup, conference); - ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup); - ast_mutex_lock(&hangup.lock); - while (!hangup.hungup) { - ast_cond_wait(&hangup.cond, &hangup.lock); + if (!ast_taskprocessor_push(conference->playback_queue, hangup_playback, &hangup)) { + ast_mutex_lock(&hangup.lock); + while (!hangup.hungup) { + ast_cond_wait(&hangup.cond, &hangup.lock); + } + ast_mutex_unlock(&hangup.lock); } - ast_mutex_unlock(&hangup.lock); + hangup_data_destroy(&hangup); } else { /* Playback queue is not yet allocated. Just hang up the channel straight */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 7c79036b37..f74989a3c5 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -213,7 +213,8 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); * \retval -1 failure * \since 1.6.1 */ -int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); +int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) + attribute_warn_unused_result; /*! \brief Local data parameter */ struct ast_taskprocessor_local { @@ -239,7 +240,8 @@ struct ast_taskprocessor_local { * \since 12.0.0 */ int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, - int (*task_exe)(struct ast_taskprocessor_local *local), void *datap); + int (*task_exe)(struct ast_taskprocessor_local *local), void *datap) + attribute_warn_unused_result; /*! * \brief Indicate the taskprocessor is suspended. diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 0f360c7a4a..77ab8a84a3 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -186,7 +186,8 @@ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int siz * \retval 0 success * \retval -1 failure */ -int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data); +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) + attribute_warn_unused_result; /*! * \brief Shut down a threadpool and destroy it diff --git a/main/stasis.c b/main/stasis.c index 51f01c0b0c..ed838733b9 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -561,7 +561,10 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) /* When all that's done, remove the ref the mailbox has on the sub */ if (sub->mailbox) { - ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub); + if (ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub)) { + /* Nothing we can do here, the conditional is just to keep + * the compiler happy that we're not ignoring the result. */ + } } /* Unsubscribing unrefs the subscription */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 91eb7d9930..33acb37220 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -235,7 +235,11 @@ static void default_listener_shutdown(struct ast_taskprocessor_listener *listene /* Hold a reference during shutdown */ ao2_t_ref(listener->tps, +1, "tps-shutdown"); - ast_taskprocessor_push(listener->tps, default_listener_die, pvt); + if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) { + /* This will cause the thread to exit early without completing tasks already + * in the queue. This is probably the least bad option in this situation. */ + default_listener_die(pvt); + } ast_assert(pvt->poll_thread != AST_PTHREADT_NULL); diff --git a/main/threadpool.c b/main/threadpool.c index e7abc8f8cf..7729930545 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -658,7 +658,9 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) } if (pool->listener && pool->listener->callbacks->emptied) { - ast_taskprocessor_push(pool->control_tps, queued_emptied, pool); + if (ast_taskprocessor_push(pool->control_tps, queued_emptied, pool)) { + /* Nothing to do here but we need the check to keep the compiler happy. */ + } } } diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 273e045d5b..642874625c 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -151,7 +151,10 @@ AST_TEST_DEFINE(default_taskprocessor) return AST_TEST_FAIL; } - ast_taskprocessor_push(tps, task, task_data); + if (ast_taskprocessor_push(tps, task, task_data)) { + ast_test_status_update(test, "Failed to queue task\n"); + return AST_TEST_FAIL; + } res = task_wait(task_data); if (res != 0) { @@ -240,7 +243,11 @@ AST_TEST_DEFINE(default_taskprocessor_load) for (i = 0; i < NUM_TASKS; ++i) { rand_data[i] = ast_random(); - ast_taskprocessor_push(tps, load_task, &rand_data[i]); + if (ast_taskprocessor_push(tps, load_task, &rand_data[i])) { + ast_test_status_update(test, "Failed to queue task\n"); + res = AST_TEST_FAIL; + goto test_end; + } } ast_mutex_lock(&load_task_results.lock); @@ -438,14 +445,22 @@ AST_TEST_DEFINE(taskprocessor_listener) goto test_exit; } - ast_taskprocessor_push(tps, listener_test_task, NULL); + if (ast_taskprocessor_push(tps, listener_test_task, NULL)) { + ast_test_status_update(test, "Failed to queue task\n"); + res = AST_TEST_FAIL; + goto test_exit; + } if (check_stats(test, pvt, 1, 0, 1) < 0) { res = AST_TEST_FAIL; goto test_exit; } - ast_taskprocessor_push(tps, listener_test_task, NULL); + if (ast_taskprocessor_push(tps, listener_test_task, NULL)) { + ast_test_status_update(test, "Failed to queue task\n"); + res = AST_TEST_FAIL; + goto test_exit; + } if (check_stats(test, pvt, 2, 0, 1) < 0) { res = AST_TEST_FAIL; @@ -710,7 +725,10 @@ AST_TEST_DEFINE(taskprocessor_push_local) local_data = 0; ast_taskprocessor_set_local(tps, &local_data); - ast_taskprocessor_push_local(tps, local_task_exe, task_data); + if (ast_taskprocessor_push_local(tps, local_task_exe, task_data)) { + ast_test_status_update(test, "Failed to queue task\n"); + return AST_TEST_FAIL; + } res = task_wait(task_data); if (res != 0) { diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index d8acf26f9f..3fb4430d98 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -127,6 +127,18 @@ static struct simple_task_data *simple_task_data_alloc(void) return std; } +static void simple_task_data_free(struct simple_task_data *std) +{ + if (!std) { + return; + } + + ast_mutex_destroy(&std->lock); + ast_cond_destroy(&std->cond); + + ast_free(std); +} + static int simple_task(void *data) { struct simple_task_data *std = data; @@ -319,7 +331,9 @@ AST_TEST_DEFINE(threadpool_push) goto end; } - ast_threadpool_push(pool, simple_task, std); + if (ast_threadpool_push(pool, simple_task, std)) { + goto end; + } wait_for_task_pushed(listener); @@ -328,7 +342,7 @@ AST_TEST_DEFINE(threadpool_push) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std); + simple_task_data_free(std); ast_free(tld); return res; } @@ -635,11 +649,13 @@ AST_TEST_DEFINE(threadpool_thread_timeout_thrash) } ast_mutex_unlock(&tld->lock); - ast_threadpool_push(pool, simple_task, std); - - res = wait_for_completion(test, std); + if (ast_threadpool_push(pool, simple_task, std)) { + res = AST_TEST_FAIL; + } else { + res = wait_for_completion(test, std); + } - ast_free(std); + simple_task_data_free(std); if (res == AST_TEST_FAIL) { goto end; @@ -707,7 +723,9 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) goto end; } - ast_threadpool_push(pool, simple_task, std); + if (ast_threadpool_push(pool, simple_task, std)) { + goto end; + } ast_threadpool_set_size(pool, 1); @@ -736,7 +754,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std); + simple_task_data_free(std); ast_free(tld); return res; @@ -796,7 +814,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) goto end; } - ast_threadpool_push(pool, simple_task, std); + if (ast_threadpool_push(pool, simple_task, std)) { + res = AST_TEST_FAIL; + goto end; + } res = wait_for_completion(test, std); if (res == AST_TEST_FAIL) { @@ -819,7 +840,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std); + simple_task_data_free(std); ast_free(tld); return res; } @@ -882,9 +903,18 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) goto end; } - ast_threadpool_push(pool, simple_task, std1); - ast_threadpool_push(pool, simple_task, std2); - ast_threadpool_push(pool, simple_task, std3); + res = AST_TEST_FAIL; + if (ast_threadpool_push(pool, simple_task, std1)) { + goto end; + } + + if (ast_threadpool_push(pool, simple_task, std2)) { + goto end; + } + + if (ast_threadpool_push(pool, simple_task, std3)) { + goto end; + } res = wait_for_completion(test, std1); if (res == AST_TEST_FAIL) { @@ -914,9 +944,9 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std1); - ast_free(std2); - ast_free(std3); + simple_task_data_free(std1); + simple_task_data_free(std2); + simple_task_data_free(std3); ast_free(tld); return res; } @@ -1011,7 +1041,9 @@ AST_TEST_DEFINE(threadpool_auto_increment) goto end; } - ast_threadpool_push(pool, simple_task, std1); + if (ast_threadpool_push(pool, simple_task, std1)) { + goto end; + } /* Pushing the task should result in the threadpool growing * by three threads. This will allow the task to actually execute @@ -1034,9 +1066,19 @@ AST_TEST_DEFINE(threadpool_auto_increment) /* Now push three tasks into the pool and ensure the pool does not * grow. */ - ast_threadpool_push(pool, simple_task, std2); - ast_threadpool_push(pool, simple_task, std3); - ast_threadpool_push(pool, simple_task, std4); + res = AST_TEST_FAIL; + + if (ast_threadpool_push(pool, simple_task, std2)) { + goto end; + } + + if (ast_threadpool_push(pool, simple_task, std3)) { + goto end; + } + + if (ast_threadpool_push(pool, simple_task, std4)) { + goto end; + } res = wait_for_completion(test, std2); if (res == AST_TEST_FAIL) { @@ -1064,10 +1106,10 @@ AST_TEST_DEFINE(threadpool_auto_increment) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std1); - ast_free(std2); - ast_free(std3); - ast_free(std4); + simple_task_data_free(std1); + simple_task_data_free(std2); + simple_task_data_free(std3); + simple_task_data_free(std4); ast_free(tld); return res; } @@ -1121,7 +1163,9 @@ AST_TEST_DEFINE(threadpool_max_size) goto end; } - ast_threadpool_push(pool, simple_task, std); + if (ast_threadpool_push(pool, simple_task, std)) { + goto end; + } res = wait_for_completion(test, std); if (res == AST_TEST_FAIL) { @@ -1137,7 +1181,7 @@ AST_TEST_DEFINE(threadpool_max_size) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std); + simple_task_data_free(std); ast_free(tld); return res; } @@ -1193,7 +1237,9 @@ AST_TEST_DEFINE(threadpool_reactivation) goto end; } - ast_threadpool_push(pool, simple_task, std1); + if (ast_threadpool_push(pool, simple_task, std1)) { + goto end; + } ast_threadpool_set_size(pool, 1); @@ -1218,7 +1264,10 @@ AST_TEST_DEFINE(threadpool_reactivation) } /* Now make sure the threadpool reactivates when we add a second task */ - ast_threadpool_push(pool, simple_task, std2); + if (ast_threadpool_push(pool, simple_task, std2)) { + res = AST_TEST_FAIL; + goto end; + } res = wait_for_completion(test, std2); if (res == AST_TEST_FAIL) { @@ -1240,8 +1289,8 @@ AST_TEST_DEFINE(threadpool_reactivation) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(std1); - ast_free(std2); + simple_task_data_free(std1); + simple_task_data_free(std2); ast_free(tld); return res; @@ -1269,6 +1318,19 @@ static struct complex_task_data *complex_task_data_alloc(void) return ctd; } +static void complex_task_data_free(struct complex_task_data *ctd) +{ + if (!ctd) { + return; + } + + ast_mutex_destroy(&ctd->lock); + ast_cond_destroy(&ctd->stall_cond); + ast_cond_destroy(&ctd->notify_cond); + + ast_free(ctd); +} + static int complex_task(void *data) { struct complex_task_data *ctd = data; @@ -1400,8 +1462,13 @@ AST_TEST_DEFINE(threadpool_task_distribution) goto end; } - ast_threadpool_push(pool, complex_task, ctd1); - ast_threadpool_push(pool, complex_task, ctd2); + if (ast_threadpool_push(pool, complex_task, ctd1)) { + goto end; + } + + if (ast_threadpool_push(pool, complex_task, ctd2)) { + goto end; + } ast_threadpool_set_size(pool, 2); @@ -1438,8 +1505,8 @@ AST_TEST_DEFINE(threadpool_task_distribution) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(ctd1); - ast_free(ctd2); + complex_task_data_free(ctd1); + complex_task_data_free(ctd2); ast_free(tld); return res; } @@ -1496,8 +1563,13 @@ AST_TEST_DEFINE(threadpool_more_destruction) goto end; } - ast_threadpool_push(pool, complex_task, ctd1); - ast_threadpool_push(pool, complex_task, ctd2); + if (ast_threadpool_push(pool, complex_task, ctd1)) { + goto end; + } + + if (ast_threadpool_push(pool, complex_task, ctd2)) { + goto end; + } ast_threadpool_set_size(pool, 4); @@ -1549,8 +1621,8 @@ AST_TEST_DEFINE(threadpool_more_destruction) end: ast_threadpool_shutdown(pool); ao2_cleanup(listener); - ast_free(ctd1); - ast_free(ctd2); + complex_task_data_free(ctd1); + complex_task_data_free(ctd2); ast_free(tld); return res; } @@ -1666,9 +1738,9 @@ end: poke_worker(data3); ast_taskprocessor_unreference(uut); ast_threadpool_shutdown(pool); - ast_free(data1); - ast_free(data2); - ast_free(data3); + complex_task_data_free(data1); + complex_task_data_free(data2); + complex_task_data_free(data3); return res; }