I implemented a multi-thread asynchronous service based on https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_async_client2.cc . I have 64 threads doing some execution and then contact the remote server in asynchronous manner. However, when I ran my code, usually it will stuck on pthread_join of some threads, and sometimes both of my node can successfully do pthread_join on all my worker threads and some times only one node can do that. I later ran info thread at the point of stuck and there is the result I get from it.
* 1 Thread 0x7ffff7fe2100 (LWP 10567) "rundb" 0x00007ffff64f4d2d in __GI___pthread_timedjoin_ex (threadid=140732532782848, thread_return=0x0, abstime=0x0,
block=<optimized out>) at pthread_join_common.c:89
2 Thread 0x7ffff55ff700 (LWP 10568) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffff4134118)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
3 Thread 0x7ffff4dfe700 (LWP 10569) "rundb" 0x00007ffff621cbb7 in epoll_wait (epfd=25, events=0x7ffff3c446b0, maxevents=100, timeout=-1)
at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
4 Thread 0x7ffff3bff700 (LWP 10570) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff4108064)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
5 Thread 0x7ffff33fe700 (LWP 10571) "resolver-execut" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff410b860)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
6 Thread 0x7ffff17ff700 (LWP 10572) "grpc_global_tim" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0xd5a668 <g_cv_wait+40>)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
7 Thread 0x7ffff07fd700 (LWP 10574) "grpc_health_che" 0x00007ffff64f99f3 in---Type <return> to continue, or q <return> to quit---ret
futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff07fb1e8)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
8 Thread 0x7ffff0ffe700 (LWP 10573) "grpc_health_che" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff0ffc1e8)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
9 Thread 0x7fffef7fb700 (LWP 10575) "grpcpp_sync_ser" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>,
reltime=0x7fffef7f8f60, expected=0, futex_word=0x7fffef7f90d8)
at ../sysdeps/unix/sysv/linux/futex-internal.h:142
27 Thread 0x7fffcdbff700 (LWP 10594) "grpc_global_tim" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>,
reltime=0x7fffcdbfd2d0, expected=0, futex_word=0xd5a668 <g_cv_wait+40>)
at ../sysdeps/unix/sysv/linux/futex-internal.h:142
41 Thread 0x7ffed89ff700 (LWP 10608) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffed89fcfb0)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
81 Thread 0x7ffec49d7700 (LWP 10648) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffec49d4fb0)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
83 Thread 0x7ffec39d5700 (LWP 10650) "rundb" 0x00007ffff621cbb7 in epoll_wait (epfd=3, events=0x7ffff404c2b0, maxevents=100, timeout=-1)
---Type <return> to continue, or q <return> to quit---ret
at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
91 Thread 0x7ffebf9cd700 (LWP 10658) "rundb" 0x0000000000429e7f in LogManager::run (this=0x7ffff5a3a1e0) at storage/log.cpp:79
152 Thread 0x7ffeb81fd700 (LWP 10719) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff410810c)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
545 Thread 0x7ffeb9fff700 (LWP 11112) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff41081b0)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
682 Thread 0x7fffefffc700 (LWP 11249) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff4108258)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
1397 Thread 0x7fffccbff700 (LWP 11964) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff4108300)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
3470 Thread 0x7ffeb89fe700 (LWP 14041) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff41083a8)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
---Type <return> to continue, or q <return> to quit---re
4766 Thread 0x7ffeb91ff700 (LWP 15337) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff4108450)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
7122 Thread 0x7ffe9f8ff700 (LWP 17693) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0,
futex_word=0x7ffff41084f8)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
7132 Thread 0x7ffeacdff700 (LWP 17703) "grpcpp_sync_ser" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>,
reltime=0x7ffeacdfce90, expected=0, futex_word=0x7ffeacdfd050)
at ../sysdeps/unix/sysv/linux/futex-internal.h:142
7135 Thread 0x7ffe651ff700 (LWP 17706) "grpcpp_sync_ser" 0x00007ffff621cbb7 in epoll_wait (epfd=14, events=0x7ffedb88a0b0, maxevents=100, timeout=9992)
at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
I compared it with the results of successfully joined program and I found the major issues are 41, 81 and 83. Then I did thread 41
, thread 81
and thread 83
and I get the followings.
thread 41
#0 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>,
expected=0, futex_word=0x7ffed89fcfb0)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1 __pthread_cond_wait_common (abstime=0x0, mutex=0x7ffff404c278,
cond=0x7ffed89fcf88) at pthread_cond_wait.c:502
#2 __pthread_cond_wait (cond=0x7ffed89fcf88, mutex=0x7ffff404c278)
at pthread_cond_wait.c:655
#3 0x000000000072f05a in gpr_cv_wait ()
#4 0x00000000006a12b0 in begin_worker ()
#5 0x00000000006a1757 in pollset_work ()
#6 0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#7 0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#8 0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
()
#9 0x000000000054ff18 in grpc_completion_queue_next ()
#10 0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#11 0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
this=0x7ffebe0370c0, tag=0x7ffed89fd2d8, ok=0x7ffed89fd2d3)
at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#12 0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
---Type <return> to continue, or q <return> to quit---ret
this=0x7ffff5a08240, cq=0x7ffebe0370c0, txn=0x7ffebe00a380, node_id=1,
response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#13 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebe00a380,
rc=ABORT, cq=0x7ffebe0370c0) at system/txn.cpp:462
#14 0x000000000043a38c in TxnManager::start (this=0x7ffebe00a380)
at system/txn.cpp:166
#15 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133440)
at system/worker_thread.cpp:92
#16 0x0000000000441ff9 in start_thread (thread=0x7fffdb133440)
at system/main.cpp:204
#17 0x00007ffff64f36db in start_thread (arg=0x7ffed89ff700)
at pthread_create.c:463
#18 0x00007ffff621c88f in clone ()
at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
thread 81
#0 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>,
expected=0, futex_word=0x7ffec49d4fb0)
at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1 __pthread_cond_wait_common (abstime=0x0, mutex=0x7ffff404c278,
cond=0x7ffec49d4f88) at pthread_cond_wait.c:502
#2 __pthread_cond_wait (cond=0x7ffec49d4f88, mutex=0x7ffff404c278)
at pthread_cond_wait.c:655
#3 0x000000000072f05a in gpr_cv_wait ()
#4 0x00000000006a12b0 in begin_worker ()
#5 0x00000000006a1757 in pollset_work ()
#6 0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#7 0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#8 0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
()
#9 0x000000000054ff18 in grpc_completion_queue_next ()
#10 0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#11 0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
this=0x7ffebb632000, tag=0x7ffec49d52d8, ok=0x7ffec49d52d3)
at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#12 0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
---Type <return> to continue, or q <return> to quit---ret
this=0x7ffff5a08240, cq=0x7ffebb632000, txn=0x7ffebb608000, node_id=1,
response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#13 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebb608000,
rc=ABORT, cq=0x7ffebb632000) at system/txn.cpp:462
#14 0x000000000043a38c in TxnManager::start (this=0x7ffebb608000)
at system/txn.cpp:166
#15 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133e40)
at system/worker_thread.cpp:92
#16 0x0000000000441ff9 in start_thread (thread=0x7fffdb133e40)
at system/main.cpp:204
#17 0x00007ffff64f36db in start_thread (arg=0x7ffec49d7700)
at pthread_create.c:463
#18 0x00007ffff621c88f in clone ()
at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
thread 83
#0 0x00007ffff621cbb7 in epoll_wait (epfd=3, events=0x7ffff404c2b0,
maxevents=100, timeout=-1) at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
#1 0x00000000006a0c7e in pollable_epoll(pollable*, long) ()
#2 0x00000000006a17c5 in pollset_work ()
#3 0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#4 0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#5 0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
()
#6 0x000000000054ff18 in grpc_completion_queue_next ()
#7 0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#8 0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
this=0x7ffebbe000c0, tag=0x7ffec39d32d8, ok=0x7ffec39d32d3)
at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#9 0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
this=0x7ffff5a08240, cq=0x7ffebbe000c0, txn=0x7ffebbe037e0, node_id=1,
response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#10 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebbe037e0,
rc=ABORT, cq=0x7ffebbe000c0) at system/txn.cpp:462
#11 0x000000000043a38c in TxnManager::start (this=0x7ffebbe037e0)
at system/txn.cpp:166
---Type <return> to continue, or q <return> to quit---ret
#12 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133ec0)
at system/worker_thread.cpp:92
#13 0x0000000000441ff9 in start_thread (thread=0x7fffdb133ec0)
at system/main.cpp:204
#14 0x00007ffff64f36db in start_thread (arg=0x7ffec39d5700)
at pthread_create.c:463
#15 0x00007ffff621c88f in clone ()
at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
It seems like occassionally the asynchronous client will get stuck at cq->Next(&got_tag, &ok)
, and I ran this execution over 10,000 times so now I suspect the RPC was lost during the contact. Therefore the server never processes it and the rpc was never returned back. I'm thinking about if I need to find a way to have guaranteed delivery. The follows are my send request function and check response function. I'm wondering if I can get any helps of what to do at this stuck poin.
Status Sundial_Async_Client:: contactRemote(CompletionQueue* cq, TxnManager * txn,uint64_t node_id,SundialRequest& request, SundialResponse** response){
// Call object to store rpc data
AsyncClientCall* call = new AsyncClientCall;
// stub_->PrepareAsyncSayHello() creates an RPC object, returning
// an instance to store in "call" but does not actually start the RPC
// Because we are using the asynchronous API, we need to hold on to
// the "call" instance in order to get updates on the ongoing RPC.
call->response_reader =
stub_->PrepareAsynccontactRemote(&call->context, request, cq);
// StartCall initiates the RPC call
call->response_reader->StartCall();
call->reply=*response;
// Request that, upon completion of the RPC, "reply" be updated with the
// server's response; "status" with the indication of whether the operation
// was successful. Tag the request with the memory address of the call object.
call->response_reader->Finish(call->reply, &call->status, (void*)call);
return Status::OK;
}
Status Sundial_Async_Client::contactRemoteDone(CompletionQueue* cq, TxnManager * txn,uint64_t node_id, SundialResponse* response, int count){
void* got_tag;
bool ok = false;
int local_count=0;
// Block until the next result is available in the completion queue "cq".
while (cq->Next(&got_tag, &ok)) {
local_count++;
// The tag in this example is the memory location of the call object
AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
// Verify that the request was completed successfully. Note that "ok"
// corresponds solely to the request for updates introduced by Finish().
GPR_ASSERT(ok);
// Once we're complete, deallocate the call object.
//doing the cleaning
glob_stats->_stats[GET_THD_ID]->_resp_msg_count[ call->reply->response_type() ] ++;
glob_stats->_stats[GET_THD_ID]->_resp_msg_size[ call->reply->response_type() ] += call->reply->SpaceUsedLong();
delete call;
if(local_count==count){
break;
}
}
//txn->rpc_semaphore->decr();
return Status::OK;
}