1
votes

I have a multithread application which uses boost::asio and boost::coroutine via its integration in boost::asio. Every thread has its own io_service object. The only shared state between threads are connection pools which are locked with mutex when connection is get or returned from/to the connection pool. When there is not enough connections in the pool I push infinite asio::steady_tiemer in internal structure of the pool and asynchronously waiting on it and I yielding from the couroutine function. When other thread returns connection to the pool it checks whether there is waiting timers, it gets waiting timer from the internal structure, it gets its io_service object and posts a lambda which wakes up the timer to resume the suspended coroutine. I have random crashes in the application. I try to investigate the problem with valgrind. It founds some issues but I cannot understand them because they happen in boost::coroutine and boost::asio internals. Here are fragments from my code and from valgrind output. Can someone see and explain the problem?

Here is the calling code:

template <class ContextsType>
void executeRequests(ContextsType& avlRequestContexts)
{
    AvlRequestDataList allRequests;
    for(auto& requestContext : avlRequestContexts)
    {
        if(!requestContext.pullProvider || !requestContext.toAskGDS())
            continue;

        auto& requests = requestContext.pullProvider->getRequestsData();
        copy(requests.begin(), requests.end(), back_inserter(allRequests));
    }

    if(allRequests.size() == 0)
        return;

    boost::asio::io_service ioService;
    curl::AsioMultiplexer multiplexer(ioService);

    for(auto& request : allRequests)
    {
        using namespace boost::asio;

        spawn(ioService, [&multiplexer, &request](yield_context yield)
        {
            request->prepare(multiplexer, yield);
        });
    }

    while(true)
    {
        try
        {
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: Starting ASIO event loop.");
            ioService.run();
            VLOG_DEBUG(avlGeneralLogger, "executeRequests: ASIO event loop finished.");
            break;
        }
        catch(const std::exception& e)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Error while executing GDS request: " << e.what());
        }
        catch(...)
        {
            VLOG_ERROR(avlGeneralLogger, "executeRequests: Unknown error while executing GDS request.");
        }
    }
}

Here is the prepare function implementation which is called in spawned lambda:

void AvlRequestData::prepareImpl(curl::AsioMultiplexer& multiplexer,
                                 boost::asio::yield_context yield)
{
    auto& ioService = multiplexer.getIoService();
    _connection = _pool.getConnection(ioService, yield);
    _connection->prepareRequest(xmlRequest, xmlResponse, requestTimeoutMS);

    multiplexer.addEasyHandle(_connection->getHandle(),
                              [this](const curl::EasyHandleResult& result)
    {
        if(0 == result.responseCode)
            returnQuota();
        VLOG_DEBUG(lastSeatLogger, "Response " << id << ": " << xmlResponse);
        _pool.addConnection(std::move(_connection));
    });
}


void AvlRequestData::prepare(curl::AsioMultiplexer& multiplexer,
                             boost::asio::yield_context yield)
{
    try
    {
        prepareImpl(multiplexer, yield);
    }
    catch(const std::exception& e)
    {
        VLOG_ERROR(lastSeatLogger, "Error wile preparing request: " << e.what());
        returnQuota();
    }
    catch(...)
    {
        VLOG_ERROR(lastSeatLogger, "Unknown error while preparing request.");
        returnQuota();
    }
}

The returnQuota function is pure virtual method of the AvlRequestData class and its implementation for the TravelportRequestData class which is used in all my tests is the following:

void returnQuota() const override
{
    auto& avlQuotaManager = AvlQuotaManager::getInstance();
    avlQuotaManager.consumeQuotaTravelport(-1);
}

Here are push and pop methods of the connection pool.

auto AvlConnectionPool::getConnection(
        TimerPtr timer,
        asio::yield_context yield) -> ConnectionPtr
{
    lock_guard<mutex> lock(_mutex);

    while(_connections.empty())
    {
        _timers.emplace_back(timer);
        timer->expires_from_now(
            asio::steady_timer::clock_type::duration::max());

        _mutex.unlock();
        coroutineAsyncWait(*timer, yield);
        _mutex.lock();
    }

    ConnectionPtr connection = std::move(_connections.front());
    _connections.pop_front();

    VLOG_TRACE(defaultLogger, str(format("Getted connection from pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    ++_connectionsGiven;

    return connection;
}

void AvlConnectionPool::addConnection(ConnectionPtr connection,
                                      Side side /* = Back */)
{
    lock_guard<mutex> lock(_mutex);

    if(Front == side)
        _connections.emplace_front(std::move(connection));
    else
        _connections.emplace_back(std::move(connection));

    VLOG_TRACE(defaultLogger, str(format("Added connection to pool: %s. Connections count %d.")
                                  % _connectionPoolName % _connections.size()));

    if(_timers.empty())
        return;

    auto timer = _timers.back();
    _timers.pop_back();

    auto& ioService = timer->get_io_service();
    ioService.post([timer](){ timer->cancel(); });

    VLOG_TRACE(defaultLogger, str(format("Connection pool %s: Waiting thread resumed.")
                                  % _connectionPoolName));
}

This is implementation of coroutineAsyncWait.

inline void coroutineAsyncWait(boost::asio::steady_timer& timer,
                               boost::asio::yield_context yield)
{
    boost::system::error_code ec;
    timer.async_wait(yield[ec]);
    if(ec && ec != boost::asio::error::operation_aborted)
        throw std::runtime_error(ec.message());
}

And finally the first part of the valgrind output:

==8189== Thread 41:
==8189== Invalid read of size 8
==8189== at 0x995F84: void boost::coroutines::detail::trampoline_push_void, void, boost::asio::detail::coro_entry_point, void (anonymous namespace)::executeRequests > >(std::vector<(anonymous namespace)::AvlRequestContext, std::allocator<(anonymous namespace)::AvlRequestContext> >&)::{lambda(boost::asio::basic_yield_context >)#1}>&, boost::coroutines::basic_standard_stack_allocator > >(long) (trampoline_push.hpp:65)
==8189== Address 0x2e3b5528 is not stack'd, malloc'd or (recently) free'd

When I use valgrind with debugger attached it stops in the following function in trampoline_push.hpp in boost::coroutine library.

53│ template< typename Coro >
54│ void trampoline_push_void( intptr_t vp)
55│ {
56│     typedef typename Coro::param_type   param_type;
57│
58│     BOOST_ASSERT( vp);
59│
60│     param_type * param(
61│         reinterpret_cast< param_type * >( vp) );
62│     BOOST_ASSERT( 0 != param);
63│
64│     Coro * coro(
65├>        reinterpret_cast< Coro * >( param->coro) );
66│     BOOST_ASSERT( 0 != coro);
67│
68│     coro->run();
69│ }
1
Please post your returnQuota method body.PSIAlt
In a cursory glance, the catch-all suppression in AvlRequestData::prepare() is suspicious and violates a Boost.Coroutine requirement (see here). Does the problem persists if you catch const boost::coroutines::detail::forced_unwind& and rethrow it?Tanner Sansbury
@Tanner Sansbury = 10x for spotting this. I added rethrowing of the forced_unwind exception but the problem persists.bobeff
@PSIAlt - I added the returnQuota function in the question.bobeff
I posted another question with proof of concept to simulate the problem in isolation.bobeff

1 Answers

2
votes

Ultimately I found that when objects need to be deleted, boost::asio doesn't handle it gracefully without proper use of shared_ptr and weak_ptr. When crashes do occur, they are very difficult to debug, because its hard to look into what the io_service queue is doing at the time of failure.

After doing a full asynchronous client architecture recently and running into random crashing issues, I have a few tips to offer. Unfortunately, I cannot know whether these will solve your issues, but hopefully it provides a good start in the right direction.

Boost Asio Coroutine Usage Tips

  1. Use boost::asio::asio_handler_invoke instead of io_service.post():

    auto& ioService = timer->get_io_service();

    ioService.post(timer{ timer->cancel(); });

    Using post/dispatch within a coroutine is usually a bad idea. Always use the asio_handler_invoke when you are called from a coroutine. In this case, however, you can probably safely call timer->cancel() without posting it to the message loop anyways.

  2. Your timers do not appear to use shared_ptr objects. Regardless of what is going on in the rest of your application, there is no way to know for sure when these objects should be destroyed. I would highly recommend using shared_ptr objects for all of your timer objects. Also, any pointer to class methods should use shared_from_this() as well. Using a plain this can be quite dangerous if this is destructed (on the stack) or goes out of scope somewhere else in a shared_ptr. Whatever you do, do not use shared_from_this() in the constructor of an object!

    If you're getting a crash when a handler within the io_service is being executed, but part of the handler is no longer valid, this is a seriously difficult thing to debug. The handler object that is pumped into the io_service includes any pointers to timers, or pointers to objects that might be necessary to execute the handler.

    I highly recommend going overboard with shared_ptr objects wrapped around any asio classes. If the problem goes away, then its likely order of destruction issues.

  3. Is the failure address location on the heap somewhere or is it pointing to the stack? This will help you diagnose whether its an object going out of scope in a method at the wrong time, or if it is something else. For instance, this proved to me that all of my timers must become shared_ptr objects even within a single threaded application.