@ -48,6 +48,21 @@ static pj_bool_t thread_quit_flag;
static pj_status_t last_error ;
static unsigned last_error_counter ;
/* Limit the number of send/receive to this value. The recommended value is
* zero , to limit the test by some duration . IMO setting this value to non - zero
* will cause the test to terminate prematurely :
* a ) setting it to 10000 ( the old behavior ) causes the test to complete in 1 ms .
* b ) any thread that first reaches this value will cause other threads to
* quit , even if that other threads have not processed anything .
*/
//#define LIMIT_TRANSFER 10000
# define LIMIT_TRANSFER 0
/* Silenced error(s):
* - 120011 : EAGAIN ( Resource temporarily unavailable )
*/
# define IS_ERROR_SILENCED(e) (e==PJ_STATUS_FROM_OS(EAGAIN))
/* Descriptor for each producer/consumer pair. */
typedef struct test_item
{
@ -89,12 +104,14 @@ static void on_read_complete(pj_ioqueue_key_t *key,
rc = ( pj_status_t ) - bytes_read ;
if ( rc ! = last_error ) {
//last_error = rc;
if ( ! IS_ERROR_SILENCED ( rc ) ) {
pj_strerror ( rc , errmsg , sizeof ( errmsg ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " ...error: read error, bytes_read=%d (%s) " ,
bytes_read , errmsg ) ) ;
PJ_LOG ( 3 , ( THIS_FILE ,
" .....additional info: total read=%u, total sent=%u " ,
item - > bytes_recv , item - > bytes_sent ) ) ;
}
} else {
last_error_counter + + ;
}
@ -109,7 +126,7 @@ static void on_read_complete(pj_ioqueue_key_t *key,
/* To assure that the test quits, even if main thread
* doesn ' t have time to run .
*/
if ( item- > bytes_recv > item - > buffer_size * 10000 )
if ( LIMIT_TRANSFER & & item- > bytes_recv > item - > buffer_size * LIMIT_TRANSFER )
thread_quit_flag = 1 ;
bytes_read = item - > buffer_size ;
@ -136,6 +153,8 @@ static void on_read_complete(pj_ioqueue_key_t *key,
item - > outgoing_buffer , & sent , 0 ) ;
if ( rc ! = PJ_SUCCESS & & rc ! = PJ_EPENDING ) {
app_perror ( " ...error: write error " , rc ) ;
} else if ( rc = = PJ_SUCCESS ) {
item - > bytes_sent + = sent ;
}
item - > has_pending_send = ( rc = = PJ_EPENDING ) ;
@ -159,7 +178,6 @@ static void on_write_complete(pj_ioqueue_key_t *key,
return ;
item - > has_pending_send = 0 ;
item - > bytes_sent + = bytes_sent ;
if ( bytes_sent < = 0 ) {
PJ_LOG ( 3 , ( THIS_FILE , " ...error: sending stopped. bytes_sent=%d " ,
@ -168,11 +186,14 @@ static void on_write_complete(pj_ioqueue_key_t *key,
else {
pj_status_t rc ;
item - > bytes_sent + = bytes_sent ;
bytes_sent = item - > buffer_size ;
rc = pj_ioqueue_send ( item - > client_key , op_key ,
item - > outgoing_buffer , & bytes_sent , 0 ) ;
if ( rc ! = PJ_SUCCESS & & rc ! = PJ_EPENDING ) {
app_perror ( " ...error: write error " , rc ) ;
} else if ( rc = = PJ_SUCCESS ) {
item - > bytes_sent + = bytes_sent ;
}
item - > has_pending_send = ( rc = = PJ_EPENDING ) ;
@ -183,7 +204,9 @@ struct thread_arg
{
int id ;
pj_ioqueue_t * ioqueue ;
unsigned counter ;
unsigned loop_cnt ,
err_cnt ,
event_cnt ;
} ;
/* The worker thread. */
@ -195,7 +218,7 @@ static int worker_thread(void *p)
while ( ! thread_quit_flag ) {
+ + arg - > cou nter ;
+ + arg - > loop_ cnt;
rc = pj_ioqueue_poll ( arg - > ioqueue , & timeout ) ;
//TRACE_((THIS_FILE, " thread: poll returned rc=%d", rc));
if ( rc < 0 ) {
@ -204,8 +227,11 @@ static int worker_thread(void *p)
PJ_LOG ( 3 , ( THIS_FILE ,
" ...error in pj_ioqueue_poll() in thread %d "
" after %d loop: %s [pj_status_t=%d] " ,
arg - > id , arg - > cou nter , errmsg , - rc ) ) ;
arg - > id , arg - > loop_ cnt, errmsg , - rc ) ) ;
//return -1;
+ + arg - > err_cnt ;
} else if ( rc > 0 ) {
+ + arg - > event_cnt ;
}
}
return 0 ;
@ -226,16 +252,18 @@ static int perform_test(pj_bool_t allow_concur,
int sock_type , const char * type_name ,
unsigned thread_cnt , unsigned sockpair_cnt ,
pj_size_t buffer_size ,
pj_bool_t display_report ,
pj_size_t * p_bandwidth )
{
enum { MSEC_DURATION = 5000 } ;
pj_pool_t * pool ;
test_item * items ;
pj_thread_t * * thread ;
struct thread_arg * args ;
pj_ioqueue_t * ioqueue ;
pj_status_t rc ;
pj_ioqueue_callback ioqueue_callback ;
pj_ uint32 _t total_elapsed_usec , total_received ;
pj_ size _t total_elapsed_usec , total_received ;
pj_highprec_t bandwidth ;
pj_timestamp start , stop ;
unsigned i ;
@ -251,7 +279,7 @@ static int perform_test(pj_bool_t allow_concur,
if ( ! pool )
return - 10 ;
items = ( test_item * ) pj_pool_ alloc( pool , sockpair_cnt * sizeof ( test_item ) ) ;
items = ( test_item * ) pj_pool_ c alloc( pool , sockpair_cnt , sizeof ( test_item ) ) ;
thread = ( pj_thread_t * * )
pj_pool_alloc ( pool , thread_cnt * sizeof ( pj_thread_t * ) ) ;
@ -335,19 +363,20 @@ static int perform_test(pj_bool_t allow_concur,
if ( rc ! = PJ_SUCCESS & & rc ! = PJ_EPENDING ) {
app_perror ( " ...error: pj_ioqueue_write " , rc ) ;
return - 76 ;
} else if ( rc = = PJ_SUCCESS ) {
items [ i ] . bytes_sent + = bytes ;
}
items [ i ] . has_pending_send = ( rc = = PJ_EPENDING ) ;
}
/* Create the threads. */
args = ( struct thread_arg * ) pj_pool_calloc ( pool , thread_cnt ,
sizeof ( struct thread_arg ) ) ;
for ( i = 0 ; i < thread_cnt ; + + i ) {
struct thread_arg * arg ;
arg = ( struct thread_arg * ) pj_pool_zalloc ( pool , sizeof ( * arg ) ) ;
struct thread_arg * arg = & args [ i ] ;
arg - > id = i ;
arg - > ioqueue = ioqueue ;
arg - > counter = 0 ;
rc = pj_thread_create ( pool , NULL ,
& worker_thread ,
@ -390,7 +419,7 @@ static int perform_test(pj_bool_t allow_concur,
break ;
}
if ( pj_elapsed_usec ( & start , & stop ) < MSEC_DURATION * 1000 ) {
if ( pj_elapsed_usec ( & start , & stop ) > MSEC_DURATION * 1000 ) {
TRACE_ ( ( THIS_FILE , " time limit reached.. " ) ) ;
break ;
}
@ -406,6 +435,9 @@ static int perform_test(pj_bool_t allow_concur,
pj_thread_join ( thread [ i ] ) ;
}
/* Calculate actual time in usec. */
total_elapsed_usec = pj_elapsed_usec ( & start , & stop ) ;
/* Close all sockets. */
TRACE_ ( ( THIS_FILE , " closing all sockets.. " ) ) ;
for ( i = 0 ; i < sockpair_cnt ; + + i ) {
@ -422,13 +454,10 @@ static int perform_test(pj_bool_t allow_concur,
TRACE_ ( ( THIS_FILE , " destroying ioqueue.. " ) ) ;
pj_ioqueue_destroy ( ioqueue ) ;
/* Calculate actual time in usec. */
total_elapsed_usec = pj_elapsed_usec ( & start , & stop ) ;
/* Calculate total bytes received. */
total_received = 0 ;
for ( i = 0 ; i < sockpair_cnt ; + + i ) {
total_received = ( pj_uint32_t ) items [ i ] . bytes_recv ;
total_received + = items [ i ] . bytes_recv ;
}
/* bandwidth = total_received*1000/total_elapsed_usec */
@ -438,9 +467,37 @@ static int perform_test(pj_bool_t allow_concur,
* p_bandwidth = ( pj_uint32_t ) bandwidth ;
if ( display_report ) {
PJ_LOG ( 3 , ( THIS_FILE , " %s %d threads, %d pairs " , type_name ,
thread_cnt , sockpair_cnt ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Elapsed : %u msec " , total_elapsed_usec / 1000 ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Bandwidth: %d KB/s " , * p_bandwidth ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Threads statistics: " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " ============================= " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Thread Loops Events Errors " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " ============================= " ) ) ;
for ( unsigned i = 0 ; i < thread_cnt ; + + i ) {
struct thread_arg * arg = & args [ i ] ;
PJ_LOG ( 3 , ( THIS_FILE , " %6d %6d %6d %6d " ,
arg - > id , arg - > loop_cnt , arg - > event_cnt , arg - > err_cnt ) ) ;
}
PJ_LOG ( 3 , ( THIS_FILE , " ============================= " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Socket-pair statistics: " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " =================================== " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Pair Sent Recv Pct total " ) ) ;
PJ_LOG ( 3 , ( THIS_FILE , " =================================== " ) ) ;
for ( unsigned i = 0 ; i < sockpair_cnt ; + + i ) {
test_item * item = & items [ i ] ;
PJ_LOG ( 3 , ( THIS_FILE , " %4d %5.1f MB %5.1f MB %5.1f%% " ,
i , item - > bytes_sent / 1000000.0 ,
item - > bytes_recv / 1000000.0 ,
item - > bytes_recv * 100.0 / total_received ) ) ;
}
} else {
PJ_LOG ( 3 , ( THIS_FILE , " %.4s %2d %2d %8d KB/s " ,
type_name , thread_cnt , sockpair_cnt ,
* p_bandwidth ) ) ;
}
/* Done. */
pj_pool_release ( pool ) ;
@ -461,49 +518,16 @@ static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
} test_param [ ] =
{
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 2 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 4 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 8 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 2 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 2 , 2 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 2 , 4 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 2 , 8 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 4 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 4 , 2 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 4 , 4 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 4 , 8 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 4 , 16 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 8 , 8 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 16 , 16 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 2 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 4 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 8 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 2 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 2 , 2 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 2 , 4 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 2 , 8 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 4 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 4 , 2 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 4 , 4 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 4 , 8 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 4 , 16 } ,
/*
{ pj_SOCK_DGRAM ( ) , " udp " , 32 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 32 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 32 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 32 , 1 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 32 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 32 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 32 } ,
{ pj_SOCK_DGRAM ( ) , " udp " , 1 , 32 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 32 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 32 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 32 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 32 , 1 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 32 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 32 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 32 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 1 , 32 } ,
*/
{ pj_SOCK_STREAM ( ) , " tcp " , 8 , 8 } ,
{ pj_SOCK_STREAM ( ) , " tcp " , 16 , 16 } ,
} ;
pj_size_t best_bandwidth ;
int best_index = 0 ;
@ -524,6 +548,7 @@ static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
test_param [ i ] . thread_cnt ,
test_param [ i ] . sockpair_cnt ,
BUF_SIZE ,
PJ_FALSE ,
& bandwidth ) ;
if ( rc ! = 0 )
return rc ;
@ -553,8 +578,29 @@ static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
*/
int ioqueue_perf_test ( void )
{
pj_size_t bandwidth ;
int rc ;
PJ_LOG ( 3 , ( THIS_FILE , " Detailed perf (concurrency=1): " ) ) ;
rc = perform_test ( PJ_TRUE ,
pj_SOCK_DGRAM ( ) ,
" udp " ,
8 ,
8 ,
512 ,
PJ_TRUE ,
& bandwidth ) ;
PJ_LOG ( 3 , ( THIS_FILE , " Detailed perf (concurrency=0): " ) ) ;
rc = perform_test ( PJ_FALSE ,
pj_SOCK_DGRAM ( ) ,
" udp " ,
8 ,
8 ,
512 ,
PJ_TRUE ,
& bandwidth ) ;
rc = ioqueue_perf_test_imp ( PJ_TRUE ) ;
if ( rc ! = 0 )
return rc ;