2
votes

I'm developing an event-driven application using various Boost functions. I've been planning on basing my multi-threading architecture on an ASIO's io_service object whereby each atom of work will be dispatched by a thread group of 1 or more threads each having called run().

An earlier "proof of concept" version of this engine used a single io_service object to dispatch a number of different tasks, including deadline timers, network i/o and posted operations. As these early renditions can sometimes be, this earlier version didn't have more than a few events scheduled for dispatch at a time. Having convinced myself that I was on the right track, I refactored a portion of the engine to support finer granularity and increased extensibility. But this new version is crashing with what appears to be a bad pointer in the io_service object.

I'll try to develop a simplified and reproducible test case for the problem I'm experiencing. But before I do, I'd like confirmation of the assumption my architecture is based on…

A single io_service object can be shared among a multitude of networking objects -- tcp & udp resolves, sockets, timers and any other beast that takes an io_service object reference.

The reason I ask is that I've not been able to find this unambiguously stated in the documentation or in online discussions. Another hint that something is wrong with my io_service is that the crash I'm experiencing somewhere downstream from a call to a tcp::socket's async_connect() with a valid endpoint and handler. The last line of the implementation of async_connect() calls this->get_service(). The pointer the stream_socket_service get_service() is supposed to return ends up being 0x2, which hasn't been a great pointer value since the ENIAC.

My environment…

  • I've tried debugging this problem with Boost versions 48 through 52.

  • I'm developing on OSX and have tried various gcc 4.x compiler versions from 4.2 to 4.7.3.

  • The async operations I've done earlier in the session prior to this corruption issue include some timers, a udp resolve and a tcp resolve.

  • The socket I'm doing the async_connect() on was alloc'd in the heap just prior to the call and was passed the io_service in it's constructor.

  • I have an io_service::work object.

  • I'm not using strands (yet).

Is this enough information for anybody to help, or do I need to submit a compilable piece of code? I'd also LOVE a primer on what io_service services are, as would other SO readers I'm sure.

Update #1: Here is the minimal characterization of the problem I'm experiencing that I've confirmed still crashes. I built it using Boost 1.52.0, gcc 4.6.3 on latest osx ML.

#include <stdlib.h>
#include <string>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/asio.hpp>

namespace foo {

namespace asio = boost::asio;

class ios_threads : private boost::noncopyable
{
public:
    ios_threads(bool strt = false)
    : work_(new asio::io_service::work(ios_))
    {
        if (strt)
            start();
    }

    static asio::io_service &ios()
    {
        return ios_;
    }

    void start()
    {
        threads_.create_thread(boost::bind(&ios_threads::run, this));
    }

    void wait()
    {
        threads_.join_all();
    }

private:
    void run()
    {
        while (true) {
            try {
                ios_.run();
            }
            catch (std::exception &e) {
                delete work_;
                break;
            }
        }

        printf("Shutting down.\n");
    }

    static asio::io_service ios_;

    asio::io_service::work *work_;
    boost::thread_group threads_;

};

asio::io_service ios_threads::ios_;

struct op;

typedef op * opPtr;

struct op
{
    typedef boost::recursive_mutex mutex_type;

    op(op *del)
    : delegate_(del)
    {
    }

    virtual ~op()
    {
    }

    bool start_async()
    {
        boost::unique_lock< mutex_type > lock(mutex_);

        return start_it();
    }

protected:
    virtual bool start_it()
    {
        return false;
    }

    virtual void did_it(const boost::system::error_code& error)
    {
    }

    void completion_handler(const boost::system::error_code& error)
    {
        boost::unique_lock< mutex_type > lock(mutex_);

        did_it(error);
    }

    opPtr delegate_;
    mutable mutex_type mutex_;
};

struct interface_search : public op
{
    typedef op super;

    interface_search(op *del)
    : super(del),
      udp_resolver_(ios_threads::ios())
    {
        it_ = NULL;
    }

    bool start_it()
    {
        try {
            std::string hostname = boost::asio::ip::host_name();
            asio::ip::udp::resolver::query query(hostname, "", asio::ip::resolver_query_base::numeric_service | boost::asio::ip::resolver_query_base::passive);
            udp_resolver_.async_resolve(query, boost::bind(&interface_search::udp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
        }
        catch (std::exception& e) {
            printf("UDP resolve operation failed. Exception: %s", e.what());
         }

        return super::start_it();
    }

protected:
    void udp_handler(const boost::system::error_code& error, asio::ip::udp::resolver::iterator it)
    {
        it_ = &it;
        completion_handler(error);
    }

    void did_it(const boost::system::error_code& error)
    {
        if (error == asio::error::operation_aborted)
            return;

        op *del = delegate_;

        if (del)
            del->start_async();
    }

    asio::ip::udp::resolver udp_resolver_;
    asio::ip::udp::resolver::iterator *it_;
};


struct google_connect : public op
{
    typedef op super;

    google_connect()
    : super(NULL),
      socket_(ios_threads::ios())
    {
    }

    void endpoint(asio::ip::tcp::endpoint &endpoint)
    {
        endpoint_ = endpoint;
    }

    bool start_it()
    {
        try {
            // Crashes in the following call!
            socket_.async_connect(endpoint_, boost::bind(&google_connect::connect_handler, this, asio::placeholders::error));
        }
        catch (std::exception& e) {
            printf(e.what());
         }

        return super::start_it();
}

    void connect_handler(const boost::system::error_code& error)
    {
        completion_handler(error);
    }

    void did_it(const boost::system::error_code& error)
    {
        if (error == asio::error::operation_aborted)
            return;

        boost::asio::ip::address addr = socket_.local_endpoint().address();

        printf(addr.to_string().c_str());
    }

    asio::ip::tcp::socket socket_;
    asio::ip::tcp::endpoint endpoint_;
};

struct google_resolve : public op
{
    typedef op super;

    google_resolve()
    : super(new google_connect()),
      resolver_(ios_threads::ios())
    {
        it_ = NULL;
    }

    bool start_it()
    {
        try {
            asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), "google.com", "http");
            resolver_.async_resolve(query, boost::bind(&google_resolve::tcp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
        }
        catch (std::exception& e) {
            printf(e.what());
         }

        return super::start_it();
}

protected:
    void tcp_handler(const boost::system::error_code& error, asio::ip::tcp::resolver::iterator it)
    {
        it_ = &it;
        completion_handler(error);
    }

    void did_it(const boost::system::error_code& error)
    {
        if (error == asio::error::operation_aborted)
            return;

        asio::ip::tcp::resolver::iterator last;

        if (*it_ != last) {
            google_connect *gc = static_cast< google_connect * >(delegate_);

            if (gc) {
                asio::ip::tcp::endpoint ep = **it_;
                gc->endpoint(ep);
                gc->start_async();

                super::did_it(error);
            }
         }
    }

    asio::ip::tcp::resolver resolver_;
    asio::ip::tcp::resolver::iterator *it_;
};

}    // namespace foo

int main(int argc, const char * argv[])
{
    try {
        foo::ios_threads threads(false);
        foo::opPtr ops_;

        ops_ = new foo::interface_search(
            new foo::google_resolve()
        );

        ops_->start_async();

        threads.start();
        threads.wait();
    }
    catch (std::exception& e) {
        printf(e.what());
    }

    return 0;
}
1
The code can definetly better explain what is going to happen - PSIAlt
Your assumption is correct. I guess, you attempt to access an object which is already destroyed. Use shared_from_this idiom. - Igor R.
if you are unable to post code demonstrating the problem, use a tool like valgrind to find errors in your code. - Sam Miller
Igor, I've convinced myself that the io_service and any other objects are not being free'd out from under me, as I think the sample code I've added shows. - neobobkrause
It's my understanding that valgrind isn't reliable under osx 10.8. - neobobkrause

1 Answers

2
votes

For the benefit of others, I'll answer this question myself.

The cause of the problem I've described was that after transitioning from gcc 4.2 to gcc 4.6.3 and enabling c++0x language support, I needed to link with the libstdc++ library that I built when I built the compiler. The runtime error no longer occurs now that I'm linking with the proper runtime library.