0
votes

I implemented a multi-thread asynchronous service based on https://github.com/grpc/grpc/blob/master/examples/cpp/helloworld/greeter_async_client2.cc . I have 64 threads doing some execution and then contact the remote server in asynchronous manner. However, when I ran my code, usually it will stuck on pthread_join of some threads, and sometimes both of my node can successfully do pthread_join on all my worker threads and some times only one node can do that. I later ran info thread at the point of stuck and there is the result I get from it.

* 1    Thread 0x7ffff7fe2100 (LWP 10567) "rundb" 0x00007ffff64f4d2d in __GI___pthread_timedjoin_ex (threadid=140732532782848, thread_return=0x0, abstime=0x0, 
    block=<optimized out>) at pthread_join_common.c:89
  2    Thread 0x7ffff55ff700 (LWP 10568) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffff4134118)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  3    Thread 0x7ffff4dfe700 (LWP 10569) "rundb" 0x00007ffff621cbb7 in epoll_wait (epfd=25, events=0x7ffff3c446b0, maxevents=100, timeout=-1)
    at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
  4    Thread 0x7ffff3bff700 (LWP 10570) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff4108064)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  5    Thread 0x7ffff33fe700 (LWP 10571) "resolver-execut" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff410b860)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  6    Thread 0x7ffff17ff700 (LWP 10572) "grpc_global_tim" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0xd5a668 <g_cv_wait+40>)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  7    Thread 0x7ffff07fd700 (LWP 10574) "grpc_health_che" 0x00007ffff64f99f3 in---Type <return> to continue, or q <return> to quit---ret
 futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff07fb1e8)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  8    Thread 0x7ffff0ffe700 (LWP 10573) "grpc_health_che" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff0ffc1e8)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  9    Thread 0x7fffef7fb700 (LWP 10575) "grpcpp_sync_ser" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>, 
    reltime=0x7fffef7f8f60, expected=0, futex_word=0x7fffef7f90d8)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:142
  27   Thread 0x7fffcdbff700 (LWP 10594) "grpc_global_tim" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>, 
    reltime=0x7fffcdbfd2d0, expected=0, futex_word=0xd5a668 <g_cv_wait+40>)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:142
  41   Thread 0x7ffed89ff700 (LWP 10608) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffed89fcfb0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  81   Thread 0x7ffec49d7700 (LWP 10648) "rundb" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x7ffec49d4fb0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  83   Thread 0x7ffec39d5700 (LWP 10650) "rundb" 0x00007ffff621cbb7 in epoll_wait (epfd=3, events=0x7ffff404c2b0, maxevents=100, timeout=-1)
---Type <return> to continue, or q <return> to quit---ret
    at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
  91   Thread 0x7ffebf9cd700 (LWP 10658) "rundb" 0x0000000000429e7f in LogManager::run (this=0x7ffff5a3a1e0) at storage/log.cpp:79
  152  Thread 0x7ffeb81fd700 (LWP 10719) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff410810c)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  545  Thread 0x7ffeb9fff700 (LWP 11112) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff41081b0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  682  Thread 0x7fffefffc700 (LWP 11249) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff4108258)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  1397 Thread 0x7fffccbff700 (LWP 11964) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff4108300)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  3470 Thread 0x7ffeb89fe700 (LWP 14041) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff41083a8)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
---Type <return> to continue, or q <return> to quit---re
  4766 Thread 0x7ffeb91ff700 (LWP 15337) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff4108450)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  7122 Thread 0x7ffe9f8ff700 (LWP 17693) "default-executo" 0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, expected=0, 
    futex_word=0x7ffff41084f8)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
  7132 Thread 0x7ffeacdff700 (LWP 17703) "grpcpp_sync_ser" 0x00007ffff64f9ed9 in futex_reltimed_wait_cancelable (private=<optimized out>, 
    reltime=0x7ffeacdfce90, expected=0, futex_word=0x7ffeacdfd050)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:142
  7135 Thread 0x7ffe651ff700 (LWP 17706) "grpcpp_sync_ser" 0x00007ffff621cbb7 in epoll_wait (epfd=14, events=0x7ffedb88a0b0, maxevents=100, timeout=9992)
    at ../sysdeps/unix/sysv/linux/epoll_wait.c:30

I compared it with the results of successfully joined program and I found the major issues are 41, 81 and 83. Then I did thread 41, thread 81 and thread 83 and I get the followings.

thread 41

#0  0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, 
    expected=0, futex_word=0x7ffed89fcfb0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x7ffff404c278, 
    cond=0x7ffed89fcf88) at pthread_cond_wait.c:502
#2  __pthread_cond_wait (cond=0x7ffed89fcf88, mutex=0x7ffff404c278)
    at pthread_cond_wait.c:655
#3  0x000000000072f05a in gpr_cv_wait ()
#4  0x00000000006a12b0 in begin_worker ()
#5  0x00000000006a1757 in pollset_work ()
#6  0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#7  0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#8  0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
    ()
#9  0x000000000054ff18 in grpc_completion_queue_next ()
#10 0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#11 0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
    this=0x7ffebe0370c0, tag=0x7ffed89fd2d8, ok=0x7ffed89fd2d3)
    at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#12 0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
---Type <return> to continue, or q <return> to quit---ret
    this=0x7ffff5a08240, cq=0x7ffebe0370c0, txn=0x7ffebe00a380, node_id=1, 
    response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#13 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebe00a380, 
    rc=ABORT, cq=0x7ffebe0370c0) at system/txn.cpp:462
#14 0x000000000043a38c in TxnManager::start (this=0x7ffebe00a380)
    at system/txn.cpp:166
#15 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133440)
    at system/worker_thread.cpp:92
#16 0x0000000000441ff9 in start_thread (thread=0x7fffdb133440)
    at system/main.cpp:204
#17 0x00007ffff64f36db in start_thread (arg=0x7ffed89ff700)
    at pthread_create.c:463
#18 0x00007ffff621c88f in clone ()
    at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

thread 81

#0  0x00007ffff64f99f3 in futex_wait_cancelable (private=<optimized out>, 
    expected=0, futex_word=0x7ffec49d4fb0)
    at ../sysdeps/unix/sysv/linux/futex-internal.h:88
#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x7ffff404c278, 
    cond=0x7ffec49d4f88) at pthread_cond_wait.c:502
#2  __pthread_cond_wait (cond=0x7ffec49d4f88, mutex=0x7ffff404c278)
    at pthread_cond_wait.c:655
#3  0x000000000072f05a in gpr_cv_wait ()
#4  0x00000000006a12b0 in begin_worker ()
#5  0x00000000006a1757 in pollset_work ()
#6  0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#7  0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#8  0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
    ()
#9  0x000000000054ff18 in grpc_completion_queue_next ()
#10 0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#11 0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
    this=0x7ffebb632000, tag=0x7ffec49d52d8, ok=0x7ffec49d52d3)
    at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#12 0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
---Type <return> to continue, or q <return> to quit---ret
    this=0x7ffff5a08240, cq=0x7ffebb632000, txn=0x7ffebb608000, node_id=1, 
    response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#13 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebb608000, 
    rc=ABORT, cq=0x7ffebb632000) at system/txn.cpp:462
#14 0x000000000043a38c in TxnManager::start (this=0x7ffebb608000)
    at system/txn.cpp:166
#15 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133e40)
    at system/worker_thread.cpp:92
#16 0x0000000000441ff9 in start_thread (thread=0x7fffdb133e40)
    at system/main.cpp:204
#17 0x00007ffff64f36db in start_thread (arg=0x7ffec49d7700)
    at pthread_create.c:463
#18 0x00007ffff621c88f in clone ()
    at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

thread 83

#0  0x00007ffff621cbb7 in epoll_wait (epfd=3, events=0x7ffff404c2b0, 
    maxevents=100, timeout=-1) at ../sysdeps/unix/sysv/linux/epoll_wait.c:30
#1  0x00000000006a0c7e in pollable_epoll(pollable*, long) ()
#2  0x00000000006a17c5 in pollset_work ()
#3  0x000000000067dd5a in pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#4  0x00000000005219ea in grpc_pollset_work(grpc_pollset*, grpc_pollset_worker**, long) ()
#5  0x000000000054fa23 in cq_next(grpc_completion_queue*, gpr_timespec, void*)
    ()
#6  0x000000000054ff18 in grpc_completion_queue_next ()
#7  0x0000000000470676 in grpc_impl::CompletionQueue::AsyncNextInternal(void**, bool*, gpr_timespec) ()
#8  0x00000000004434f3 in grpc_impl::CompletionQueue::Next (
    this=0x7ffebbe000c0, tag=0x7ffec39d32d8, ok=0x7ffec39d32d3)
    at /local/include/grpcpp/impl/codegen/completion_queue_impl.h:179
#9  0x0000000000443152 in Sundial_Async_Client::contactRemoteDone (
    this=0x7ffff5a08240, cq=0x7ffebbe000c0, txn=0x7ffebbe037e0, node_id=1, 
    response=0x0, count=1) at grpc/grpc_async_client.cpp:70
#10 0x000000000043b2dc in TxnManager::process_2pc_phase2 (this=0x7ffebbe037e0, 
    rc=ABORT, cq=0x7ffebbe000c0) at system/txn.cpp:462
#11 0x000000000043a38c in TxnManager::start (this=0x7ffebbe037e0)
    at system/txn.cpp:166
---Type <return> to continue, or q <return> to quit---ret
#12 0x000000000043f8c2 in WorkerThread::run (this=0x7fffdb133ec0)
    at system/worker_thread.cpp:92
#13 0x0000000000441ff9 in start_thread (thread=0x7fffdb133ec0)
    at system/main.cpp:204
#14 0x00007ffff64f36db in start_thread (arg=0x7ffec39d5700)
    at pthread_create.c:463
#15 0x00007ffff621c88f in clone ()
    at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

It seems like occassionally the asynchronous client will get stuck at cq->Next(&got_tag, &ok), and I ran this execution over 10,000 times so now I suspect the RPC was lost during the contact. Therefore the server never processes it and the rpc was never returned back. I'm thinking about if I need to find a way to have guaranteed delivery. The follows are my send request function and check response function. I'm wondering if I can get any helps of what to do at this stuck poin.

Status Sundial_Async_Client:: contactRemote(CompletionQueue* cq, TxnManager * txn,uint64_t node_id,SundialRequest& request, SundialResponse** response){

        // Call object to store rpc data
        AsyncClientCall* call = new AsyncClientCall;

        // stub_->PrepareAsyncSayHello() creates an RPC object, returning
        // an instance to store in "call" but does not actually start the RPC
        // Because we are using the asynchronous API, we need to hold on to
        // the "call" instance in order to get updates on the ongoing RPC.
        call->response_reader =
            stub_->PrepareAsynccontactRemote(&call->context, request, cq);

        // StartCall initiates the RPC call
        call->response_reader->StartCall();
        call->reply=*response;
        // Request that, upon completion of the RPC, "reply" be updated with the
        // server's response; "status" with the indication of whether the operation
        // was successful. Tag the request with the memory address of the call object.
        call->response_reader->Finish(call->reply, &call->status, (void*)call);
  
    return Status::OK;
}

Status Sundial_Async_Client::contactRemoteDone(CompletionQueue* cq, TxnManager * txn,uint64_t node_id, SundialResponse* response, int count){
    void* got_tag;
        bool ok = false;
        int local_count=0;
        // Block until the next result is available in the completion queue "cq".
        while (cq->Next(&got_tag, &ok)) {
            local_count++;
            // The tag in this example is the memory location of the call object
            AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);

            // Verify that the request was completed successfully. Note that "ok"
            // corresponds solely to the request for updates introduced by Finish().
            GPR_ASSERT(ok);


            // Once we're complete, deallocate the call object.
             //doing the cleaning
    glob_stats->_stats[GET_THD_ID]->_resp_msg_count[ call->reply->response_type() ] ++;
    glob_stats->_stats[GET_THD_ID]->_resp_msg_size[ call->reply->response_type() ] += call->reply->SpaceUsedLong();
            delete call;
            if(local_count==count){
            break;
        }
        }
        
     
    //txn->rpc_semaphore->decr();
      return Status::OK;
   
}
1

1 Answers

0
votes

I think this is more likely a logical issue. Are you sure you send out the request every time before you check the response?