1
votes

we are trying to create a project that does simple modifications to a file over tcp sockets. We've used Asio to create classes that send and receive data to a port:

//ReceiveData.hpp
#pragma once

#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include <asio.hpp>
#include <iostream>
#include "SendReceiveConsts.hpp" //contains global io service object

using asio::ip::tcp;

typedef unsigned short ushort;

class ReceiveData {
private:
    asio::io_service service;
    tcp::acceptor acceptor;
    tcp::socket socket;
public:
    ReceiveData(ushort port = 8008) : acceptor(SendReceive::global_io_service, tcp::endpoint(tcp::v4(), port)),
                           socket(SendReceive::global_io_service) { }

    // Can return any amount on the socket stream
    template<size_t N>
    inline std::string receive() {
        std::string message;
        try {
            if (!this->socket.is_open()) {
                this->acceptor.accept(socket);
            }

            SendReceive::global_io_service.run();

            std::array<char, N> buf;
            asio::error_code error;
            size_t len = this->socket.read_some(asio::buffer(buf), error);

            if(error)
                throw asio::system_error(error);

            std::copy(buf.begin(), buf.end(), std::back_inserter(message));

        } catch(asio::error_code& e) {
            std::cout << e.message() << std::endl;
            return "-1";
        } catch (std::exception& e) {
            std::cout << e.what() << std::endl;
            return "-1";
        }
        return message;
    }

    inline void stop() {
        asio::error_code error;
        socket.shutdown(tcp::socket::shutdown_type::shutdown_send, error);
    }
};

.

//SendData.hpp
#pragma once

#ifdef __GNUC__
#define DEPRECATED(func) func __attribute__ ((deprecated))
#elif defined(_MSC_VER)
#define DEPRECATED(func) __declspec(deprecated) func
#else
#pragma message("WARNING: You need to implement DEPRECATED for this compiler")
#define DEPRECATED(func) func
#endif

#include <asio.hpp>
#include <chrono>
#include <vector>
#include <string>
#include <tuple>
#include <iostream>

#include "SendReceiveConsts.hpp"

using asio::ip::tcp;
typedef unsigned short ushort;

class SendData {
private:
    tcp::resolver resolver;
    tcp::resolver::query query;
    tcp::socket socket;
    tcp::resolver::iterator endpoint_iterator;

    std::string IP;
    ushort port;

    inline void send_string(std::string dataToSend, const char &separator = '\0') {
        if(!this->socket.is_open()) {
            asio::connect(this->socket, this->endpoint_iterator);
        }

        SendReceive::global_io_service.run();

        std::string MISTCompliant = dataToSend;
        MISTCompliant.push_back(separator);
        printf("Sent %lu to %s\n", asio::write(socket, asio::buffer(MISTCompliant.c_str(), MISTCompliant.length())), IP.c_str());
    };

public:
    SendData(std::string IP, ushort port)
              : resolver(SendReceive::global_io_service),
                query(IP, std::to_string(port)),
                socket(SendReceive::global_io_service) {
       this->IP = IP;
       this->port = port;
       this->endpoint_iterator = resolver.resolve(this->query);
   }

   ~SendData() { stop(); }

    DEPRECATED(void simple_send(std::string data));

    inline void send(std::string data, const char &separator = '\0') {
        send_string(data, separator);
    }

    inline void stop() {
        asio::error_code error;
        this->socket.shutdown(tcp::socket::shutdown_type::shutdown_receive, error);
        if(error) {
            printf("An error occurred when shutting down SendData socket: %s (File: %s, Line %i)\n", error.message().c_str(), __FILE__, __LINE__);
        }
        this->socket.close();
        printf("Socket closed.\n");
    }

    inline std::tuple<std::string, ushort> get_raw_info() {
        return std::tuple<std::string, ushort>(this->IP, this->port);
    }
};

Using these classes, we are able to send data from the master two slave machines, which are able to receive the data reliably. However, the master is never able to receive data back from the slaves. Here is the code running on the master:

//HashFile.cpp
#include <string>
#include <fstream>
#include <time.h>
#include <thread>
#include <chrono>

#include <networking/SendData.hpp>
#include <networking/ReceiveData.hpp>
#include <MIST.hpp>
#include <Machine.hpp>

#define FILE_SIZE 60

std::string random_salt(std::string s) {
    std::string copy = "";
    std::string chars = "abcdefghijklmnopqrstufwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12345678910!@#$%^&*()_+-=";
    for(std::string::iterator i = s.begin(); i != s.end(); i++) {
       copy.push_back(*i);
       srand(time(0));
        if((rand() % 100) < 10) {
            srand(time(0));
            copy.push_back(chars.at(rand() % chars.length()));
        }
    }
    return copy;
}

std::string add_salt(std::string s) {
    std::string copy = "";
    for(std::string::iterator i = s.begin(); i != s.end(); i++) {
        copy.push_back(*i);
        if(*i == '6') {
            copy.push_back('w');
            copy.push_back('h');
            copy.push_back('a');
            copy.push_back('t');
        } else if(*i == 'c' && *i == '9' && *i == 'D') { //c9D
            //HG6v
            copy.pop_back();
            copy.push_back(*i);
            copy.push_back('G');
            copy.push_back('6');
            copy.push_back('v');
        } else if(tolower(*i) == 'm' && tolower(*(i + 1)) == 'i') {
            copy.push_back('s');
            copy.push_back('t');
        }
    }
    return copy;
}

int main() {
    std::vector<MIST::Machine> machines_used = { MIST::Machine("local"), MIST::Machine("Helper 1", "25.88.30.47", false), MIST::Machine("Helper 2", "25.88.123.114", false) }; //Hamachi IP addresses
    auto mist = MIST::MIST(true, machines_used);

    std::ifstream hash;
    std::string data1 = "";
    std::string data2 = "";
    std::string mydata = "";
    printf("Dangerously large file being imported into code...\n");
    hash.open("testfile_smol", std::fstream::binary);
    if(hash.is_open()) {
        try {
            char chunk;
            int counter = 0;
            while(hash.get(chunk)) {
               if(counter < FILE_SIZE / 3) {
                    data1 += chunk;
                    counter++;
                } else if(counter < FILE_SIZE * (2.0f / 3.0f)) {
                    data2 += chunk;
                    counter++;
                } else {
                    mydata += chunk;
                    counter++;
                }
            }
        } catch(std::exception& e) {
            std::cerr << "Error encountered: " << e.what() << std::endl;
        }
    }
    hash.close();

    printf("data1: %s data2: %s mydata: %s", data1.substr(0, 10).c_str(), data2.substr(0, 10).c_str(), mydata.substr(0, 10).c_str());

    ProtobufMIST::Task task;
    task.set_task_name("hash");
    std::string serialized;
    task.SerializeToString(&serialized);

    const char c = 185;

    printf("Send all!\n");
    std::string s1 = "1" + data1 + c + serialized;
    std::string s2 = "2" + data2 + c + serialized;

    mist.send_task(s1, "Helper 1", 1025);
    printf("Updated first task!\n");

    mist.send_task(s2, "Helper 2", 1025); //Just a wrapper for SendData, as described in SendData.h
    printf("Updated first task!\n");

    std::string mydata_salted = add_salt(random_salt(mydata)); //TODO: Add pepper

    printf("Old mydata size: %zu\nNew mydata size: %zu\n", mydata.length(), mydata_salted.length());

    std::string one(""), two("");
    unsigned short port1 = 1026;
    unsigned short port2 = 1027;

    auto receive_slaves = [=](unsigned short& port, std::string& out) {
        bool got = false;
        printf("Looking for string on port %u\n", port);
        while(!got) {
            auto slave = new ReceiveData(port); //As defined in ReceiveData.hpp
            std::string x = slave->receive<1>();
            printf("Got chunk: %s\n", x.c_str());
            if(!(x.find((char)182) != std::string::npos || x == "-1")) {
                out += x;
            } else {
                got = true;
            }
            delete slave;
        }
        printf("Received full string!\n");
    };

    printf("Openning both receive channels...\n");
    printf("Waiting for strings...\n");
    //THIS IS WHERE IT BREAKS
    receive_slaves(port2, two); //Never gets response
    receive_slaves(port1, one); //Never gets response
    printf("Received all parts!\n");

    printf("Removing delimiters...\n");
    one.erase(std::remove(one.begin(), one.end(), (char)182), one.end());
    two.erase(std::remove(two.begin(), two.end(), (char)182), two.end());

    std::ofstream output;
    output.open("Hashed");
    output << one << two << mydata_salted;
    output.close();

    printf("Aloha!\n");

    return 0;
}

As you can see, the entire file is meant to read file of fixed size, send a third to two slaves, and "salt" the final third with random characters and other transformations. However, as mentioned above, the program gets stuck receiving the other finished parts. Here is what the slave code looks like

//HashFile.cpp (Slave)
#include <string>
#include <fstream>
#include <thread>
#include <chrono>
#include <time.h>
#include <MIST.pb.h>
#include <networking/SendData.hpp>
#include <networking/ReceiveData.hpp>
#include <MIST.hpp>
#include <Machine.hpp>
#include <stdlib.h>

std::string data;
std::string task;
std::string firstTwoChars;
ProtobufMIST::Task _task;
const char d = 182;
const char d_spc = 185;
int part;
std::string random_salt(std::string s) {
    std::string copy = "";
    std::string chars = "abcdefghijklmnopqrstufwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12345678910!@#$%^&*()_+-=";
    for (std::string::iterator i = s.begin(); i != s.end(); i++) {
        copy.push_back(*i);
        srand(time(0));
        if ((rand() % 100) < 10) {
            srand(time(0));
            copy.push_back(chars.at(rand() % chars.length()));
        }
    }
    return copy;
}
std::string add_salt(std::string s) {
    std::string copy = "";
    for (std::string::iterator i = s.begin(); i != s.end(); i++) {
        copy.push_back(*i);
        if (*i == '6') {
            copy.push_back('w');
            copy.push_back('h');
            copy.push_back('a');
            copy.push_back('t');
        }
        else if (*i == 'c' && *i == '9' && *i == 'D') { //c9D
            copy.pop_back();
            copy.push_back(*i);
            copy.push_back('G');
            copy.push_back('6');
            copy.push_back('v');
        }
        else if (tolower(*i) == 'm' && tolower(*(i + 1)) == 'i') {
            copy.push_back('s');
            copy.push_back('t');
        }
    }
    return copy;
}
//Previously defined salting functions
void hash()
{
    data = random_salt(data); //randomly salt
    data = add_salt(data); //add random chars
}
int main()
{
    MIST::Task taskThing("hash", *hash);
    ReceiveData * rObj = new ReceiveData(1025);
    SendData sObj("25.88.220.173", 1027);
    std::cout << "Receiving first char \n";
    firstTwoChars = rObj->receive<1>();
    //delete rObj;
    std::cout << firstTwoChars << std::endl;
    std::cout << "Received first char \n";
    int slavePart;
if (firstTwoChars == "1") {
        slavePart = 1;
    }
    else if (firstTwoChars == "2") {
        slavePart = 2;
    }
    else
        std::cout << "You messed up, what part is it? \n";
    std::cout << "Is part " << slavePart << std::endl;

    bool dataRecieved = false;
    while (!dataRecieved)
    {
    std::string chunk = rObj->receive<1>();
        if (chunk == "-1" || chunk.find((char)182) != std::string::npos) {
            std::cout << "Data recieved \n";
            dataRecieved = true;
        }
        else
        {
            data += chunk;
            std::cout << "Added chunk: " << chunk << std::endl;
        }
        chunk.clear();
    }
    std::cout << "All Data recieved! \n Parsing now \n";

    if (data.find(d_spc) != std::string::npos)
    {
        size_t data_before = data.find(d_spc); //find where data ends and task begins
        std::cout << "Data found at " << data_before << "bytes. \n";
        std::string task = data.substr(data_before); //copy task to new string
        std::cout << "Task copied: " << task << std::endl;
        data.erase(data_before);//erase everything that was the task from it's original string
    }
    else {
        std::cout << "Did not find d_spc \n";
        std::abort();
    }
    std::cout << "Data parsed \n Data: \n";
    std::cout << data << std::endl;
    std::cout << "Task: " << task << std::endl;

    if (_task.ParseFromString(task))
    {
        std::cout << "Task parsed properly \n";
    }
    else {
        std::cout << "I messed up parsing, trying again \n";
        if (_task.ParseFromString(task)) {
            task.pop_back();
            std::cout << "Worked the second time! \n";
        }
        else {
            std::cout << "Still messed up \n";
            std::abort();
        }
    }
    if(_task.task_name() == "hash")
        taskThing.run();
    std::cout << taskThing.getID();
    std::cout << "Sending... \n";
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    std::string t_str = std::to_string(slavePart) + data;

    //THIS IS WHERE IT BREAKS
    sObj.send(t_str, d); //error

    int x;
    std::cin >> x;
    std::cout << "Sent! \n";
}

Here is the error that appears when trying to reconnect to the master:

connect: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because the host has failed to respond.:connect: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because the host has failed to respond.

Which, from where we're standing, doesn't make sense, because there is a listening process running on master. We have a feeling it has to do with the way we wrap Asio in our send and receive classes, but we're not sure where.

For reference, this is a cross platform project. The master is running on Ubuntu 16.10 and the two slaves are running on Windows 10. We've disabled firewalls and are running the project over Hamachi. Also notice that each slave sends the finished part of the string on their own port.

1

1 Answers

0
votes

It looks like you create SendData on the slave program before there is a socket available on the other end, causing resolver (which is run when you create the SendData object) to be unable to resolve the ip address you use. Try creating SendData right before you send the data.

Also, nice wrapper for ASIO. It could use some work but it's a start and it seems a bit simpler to use than just straight ASIO.