0
votes

I'm currently working a small servlet to send simulation data over TCP, using boost::asio for the networking part. I've managed to get comms between two processes on my machine (the simple client is written in Python). The problem lies in that the same data keeps being sent over the socket, instead of being updated.

I'm using two threads: one which runs a simulation, creating data, and updates the server's connection object with the current data. The second runs the server, and every so often writes the current data to the socket. I've created a minimal example to share with you here (it compiles with MSVC++ 12.0, and has the problem I'm talking about if you care to replicate).

tcp_server * server;
bool connected = false;

void runServer() {
    try
    {
        boost::asio::io_service io_service;
        server = new tcp_server(io_service);

        connected = true;
        io_service.run();
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void runSim() {
    for (int i = 0; i < 1000; i++) {
        if (connected)
            server->setData("Current Message: " + std::to_string(i));

        boost::this_thread::sleep(boost::posix_time::seconds(1));
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    boost::thread serverThread(runServer);
    boost::thread simThread(runSim);

    simThread.join();
    serverThread.join();

    return 0;
}

Here are the two classes, TCP_Connection and TCP_Server. These pretty closely replicate those found in the boost::asio tutorials on the boost website right now.

class tcp_connection
    : public boost::enable_shared_from_this<tcp_connection>
{
public:
    typedef boost::shared_ptr<tcp_connection> pointer;

    static pointer create(boost::asio::io_service& io_service)
    {
        return pointer(new tcp_connection(io_service));
    }

    tcp::socket& socket()
    {
        return socket_;
    }

    void start()
    {
        message_ = make_daytime_string();

        boost::asio::async_write(socket_, boost::asio::buffer(message_),
            boost::bind(&tcp_connection::handle_write, shared_from_this()));
    }

    void setData(std::string msg) {
        boost::unique_lock<boost::shared_mutex> msgLock(msgMutex, boost::try_to_lock);
        if (msgLock.owns_lock()) {
            message_ = msg;
        }
    }

private:
    tcp_connection(boost::asio::io_service& io_service)
        : socket_(io_service)
    {
        timer_ = new boost::asio::deadline_timer(io_service,boost::posix_time::milliseconds(250));
    }

    void handle_write()
    {
        boost::shared_lock<boost::shared_mutex> msgLock(msgMutex);
        std::cout << "Writing to socket: " << message_ << std::endl;
        boost::asio::write(socket_, boost::asio::buffer(message_));     

        timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500));
        timer_->async_wait(boost::bind(&tcp_connection::handle_write, shared_from_this())); 
    }

    tcp::socket socket_;
    std::string message_;
    int counter_;
    boost::asio::deadline_timer * timer_;
    boost::shared_mutex msgMutex;
};

class tcp_server
{
public:
    tcp_server(boost::asio::io_service& io_service)
        : acceptor_(io_service, tcp::endpoint(tcp::v4(), 13))
    {
        start_accept();
    }

    void setData(std::string msg) {
        if (current_connection != NULL) {
            current_connection->setData(msg);
        }
    }

private:
    void start_accept()
    {
        tcp_connection::pointer new_connection =
            tcp_connection::create(acceptor_.get_io_service());

        acceptor_.async_accept(new_connection->socket(),
            boost::bind(&tcp_server::handle_accept, this, new_connection,
            boost::asio::placeholders::error));

        current_connection = new_connection;
    }

    void handle_accept(tcp_connection::pointer new_connection,
        const boost::system::error_code& error)
    {
        if (!error)
        {
            new_connection->start();
            std::cout << "New Connection on 127.0.0.1" << std::endl;
        }

        start_accept();
    }

    tcp::acceptor acceptor_;
    tcp_connection::pointer current_connection;
};

Through judicious use of std::cout, I've managed to determine that the server thread is getting the data from the simulation thread, and the connection object is being passed it as well (because the setData() method is being called when it is supposed to). For whatever reason, it looks like the connection's member 'message_' is not being updated. I know also that the connection is not being reset or recreated from the "New Connection" update to the console.

1
You appear to be mixing a synchronous boost::asio::write() with asynchronous boost::asio::async_write() operations. Similarly, why is the completion handler for async_wait() writing another stream of data on the socket?Sam Miller
Well, I do that because it's the only way I could figure out how to have a set period between transmissions (the async_wait is on a deadline timer with the specified period). My idea is for the write to happen repetitively, with the data being updated by the other thread, which is why the completion handler starts another write and wait.gankoji
I should add that I have only added code to use the blocking boost::asio::write(). The async_write() is from the tutorial I used as the base of this app, and could easily be changed to a blocking write if there's a need or a reason to use only one or the other.gankoji
I suggest you rethink your design. It can be achieved using solely asynchronous operations. Get rid of the synchronous write and the mutex, use a strand instead. Use separate completion handlers for async_wait compared to async_write. You will need to maintain an outgoing message queue to avoid interleaved writes. You should also check the error parameters given to your asynchronous completion handlers.Sam Miller
Yes, you need to ensure at most one write operation is outstanding on a given socket. A queue is how that happens.Sam Miller

1 Answers

1
votes

Okay, Sam Miller gets the credit for the answer here, but he posted it as a comment so I'm answering to close the question now that I have it figured out. Ultimately, the bug was most likely an issue with interleaved write calls and accessing the object's data. I've re-written my example code to include only one class (instead of the two above), using the guidelines Sam provided in his other answer already linked to. I've also made all the write operations asynchronous. Here's the code now:

#include <iostream>
#include <string>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <deque>

using boost::asio::ip::tcp;
using namespace std;

class tcp_server {
public:
    tcp_server(boost::asio::io_service& io_service)
        : _acceptor(io_service, tcp::endpoint(tcp::v4(), 5005)), _socket(io_service)
    {
        messages = std::deque<std::string> (1,"Hello from Jake's shitty server");

        timer_ = new boost::asio::deadline_timer(io_service, boost::posix_time::milliseconds(250));

        start_accept();
    }

    void write(std::string message) {
        boost::unique_lock<boost::shared_mutex> queueLock(queueMutex);
        messages.push_back(message);
        if (messages.size() <= 1)
            handle_write();
    }
private:
    void start_accept() {
        _acceptor.async_accept(_socket,
            boost::bind(&tcp_server::handle_accept, this,
            boost::asio::placeholders::error));
    }

    void handle_accept(boost::system::error_code e) {
        if (!messages.empty()) {

            _message = messages.front();
            messages.pop_front();

            boost::asio::async_write(_socket, boost::asio::buffer(_message),
                boost::bind(&tcp_server::handle_write, this));

        }
    }

    void handle_write() {

        if (!messages.empty()) {
            timer_->expires_at(timer_->expires_at() + boost::posix_time::milliseconds(1500));
            timer_->async_wait(boost::bind(&tcp_server::handle_accept, this, boost::asio::placeholders::error));
        }

        return;
    }

    std::string _message;
    std::deque<std::string> messages;

    tcp::acceptor _acceptor;
    tcp::socket _socket;
    boost::asio::deadline_timer * timer_;
    boost::shared_mutex queueMutex;


};


tcp_server * server;

void addMessages() {
    for (int i = 0; i < 10; i++) {
        server->write("New Message. Count: " + std::to_string(i) + ".\n");
    }
}


int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;
    server = new tcp_server(io_service);

    server->write("Hey there sexy");
    boost::thread messenger(addMessages);

    io_service.run();

    return 0;
}

TL;DR use a message queue, and don't mix async/synchronous writes.

ALSO, a funny issue I ran into while working this, was that I was filling the boost::asio::buffer with a temporary string popped from the message queue. This kept failing a debug assertion with VS 2013 saying a string iterator wasn't dereferencable. Once I added the _message attribute to the class, and used it to build the buffer, everything worked great. Found that tip here: Expression: string iterator not dereferencable while using boost regex. Thanks for your help Sam!