I'm developing an event-driven application using various Boost functions. I've been planning on basing my multi-threading architecture on an ASIO's io_service object whereby each atom of work will be dispatched by a thread group of 1 or more threads each having called run().
An earlier "proof of concept" version of this engine used a single io_service object to dispatch a number of different tasks, including deadline timers, network i/o and posted operations. As these early renditions can sometimes be, this earlier version didn't have more than a few events scheduled for dispatch at a time. Having convinced myself that I was on the right track, I refactored a portion of the engine to support finer granularity and increased extensibility. But this new version is crashing with what appears to be a bad pointer in the io_service object.
I'll try to develop a simplified and reproducible test case for the problem I'm experiencing. But before I do, I'd like confirmation of the assumption my architecture is based on…
A single io_service object can be shared among a multitude of networking objects -- tcp & udp resolves, sockets, timers and any other beast that takes an io_service object reference.
The reason I ask is that I've not been able to find this unambiguously stated in the documentation or in online discussions. Another hint that something is wrong with my io_service is that the crash I'm experiencing somewhere downstream from a call to a tcp::socket's async_connect() with a valid endpoint and handler. The last line of the implementation of async_connect() calls this->get_service(). The pointer the stream_socket_service get_service() is supposed to return ends up being 0x2, which hasn't been a great pointer value since the ENIAC.
My environment…
I've tried debugging this problem with Boost versions 48 through 52.
I'm developing on OSX and have tried various gcc 4.x compiler versions from 4.2 to 4.7.3.
The async operations I've done earlier in the session prior to this corruption issue include some timers, a udp resolve and a tcp resolve.
The socket I'm doing the
async_connect()on was alloc'd in the heap just prior to the call and was passed the io_service in it's constructor.I have an
io_service::workobject.I'm not using strands (yet).
Is this enough information for anybody to help, or do I need to submit a compilable piece of code? I'd also LOVE a primer on what io_service services are, as would other SO readers I'm sure.
Update #1: Here is the minimal characterization of the problem I'm experiencing that I've confirmed still crashes. I built it using Boost 1.52.0, gcc 4.6.3 on latest osx ML.
#include <stdlib.h>
#include <string>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/asio.hpp>
namespace foo {
namespace asio = boost::asio;
class ios_threads : private boost::noncopyable
{
public:
ios_threads(bool strt = false)
: work_(new asio::io_service::work(ios_))
{
if (strt)
start();
}
static asio::io_service &ios()
{
return ios_;
}
void start()
{
threads_.create_thread(boost::bind(&ios_threads::run, this));
}
void wait()
{
threads_.join_all();
}
private:
void run()
{
while (true) {
try {
ios_.run();
}
catch (std::exception &e) {
delete work_;
break;
}
}
printf("Shutting down.\n");
}
static asio::io_service ios_;
asio::io_service::work *work_;
boost::thread_group threads_;
};
asio::io_service ios_threads::ios_;
struct op;
typedef op * opPtr;
struct op
{
typedef boost::recursive_mutex mutex_type;
op(op *del)
: delegate_(del)
{
}
virtual ~op()
{
}
bool start_async()
{
boost::unique_lock< mutex_type > lock(mutex_);
return start_it();
}
protected:
virtual bool start_it()
{
return false;
}
virtual void did_it(const boost::system::error_code& error)
{
}
void completion_handler(const boost::system::error_code& error)
{
boost::unique_lock< mutex_type > lock(mutex_);
did_it(error);
}
opPtr delegate_;
mutable mutex_type mutex_;
};
struct interface_search : public op
{
typedef op super;
interface_search(op *del)
: super(del),
udp_resolver_(ios_threads::ios())
{
it_ = NULL;
}
bool start_it()
{
try {
std::string hostname = boost::asio::ip::host_name();
asio::ip::udp::resolver::query query(hostname, "", asio::ip::resolver_query_base::numeric_service | boost::asio::ip::resolver_query_base::passive);
udp_resolver_.async_resolve(query, boost::bind(&interface_search::udp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
}
catch (std::exception& e) {
printf("UDP resolve operation failed. Exception: %s", e.what());
}
return super::start_it();
}
protected:
void udp_handler(const boost::system::error_code& error, asio::ip::udp::resolver::iterator it)
{
it_ = ⁢
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
op *del = delegate_;
if (del)
del->start_async();
}
asio::ip::udp::resolver udp_resolver_;
asio::ip::udp::resolver::iterator *it_;
};
struct google_connect : public op
{
typedef op super;
google_connect()
: super(NULL),
socket_(ios_threads::ios())
{
}
void endpoint(asio::ip::tcp::endpoint &endpoint)
{
endpoint_ = endpoint;
}
bool start_it()
{
try {
// Crashes in the following call!
socket_.async_connect(endpoint_, boost::bind(&google_connect::connect_handler, this, asio::placeholders::error));
}
catch (std::exception& e) {
printf(e.what());
}
return super::start_it();
}
void connect_handler(const boost::system::error_code& error)
{
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
boost::asio::ip::address addr = socket_.local_endpoint().address();
printf(addr.to_string().c_str());
}
asio::ip::tcp::socket socket_;
asio::ip::tcp::endpoint endpoint_;
};
struct google_resolve : public op
{
typedef op super;
google_resolve()
: super(new google_connect()),
resolver_(ios_threads::ios())
{
it_ = NULL;
}
bool start_it()
{
try {
asio::ip::tcp::resolver::query query(asio::ip::tcp::v4(), "google.com", "http");
resolver_.async_resolve(query, boost::bind(&google_resolve::tcp_handler, this, asio::placeholders::error, asio::placeholders::iterator));
}
catch (std::exception& e) {
printf(e.what());
}
return super::start_it();
}
protected:
void tcp_handler(const boost::system::error_code& error, asio::ip::tcp::resolver::iterator it)
{
it_ = ⁢
completion_handler(error);
}
void did_it(const boost::system::error_code& error)
{
if (error == asio::error::operation_aborted)
return;
asio::ip::tcp::resolver::iterator last;
if (*it_ != last) {
google_connect *gc = static_cast< google_connect * >(delegate_);
if (gc) {
asio::ip::tcp::endpoint ep = **it_;
gc->endpoint(ep);
gc->start_async();
super::did_it(error);
}
}
}
asio::ip::tcp::resolver resolver_;
asio::ip::tcp::resolver::iterator *it_;
};
} // namespace foo
int main(int argc, const char * argv[])
{
try {
foo::ios_threads threads(false);
foo::opPtr ops_;
ops_ = new foo::interface_search(
new foo::google_resolve()
);
ops_->start_async();
threads.start();
threads.wait();
}
catch (std::exception& e) {
printf(e.what());
}
return 0;
}