0
votes

I am using winsock and c++ 11 threads for the tcp server. For every client, I create a new ReceiveThread object, which has a std::thread object. Tcp client is in Java. I would like to create a simple broadcasting feature. (if someone sends a message, then the server forwards it to everybody). I use a wrapper class for the client sockets, which is including a mutex. (synchronized unordered_map). Every message is structured. The first byte is the length of the message, the second byte indicates the type, then the actual data. (the length of the data is known, the first byte was that)

[EDIT] My existing code works fine with one client. When the second client connects, he can also send messages and both of the clients get it. But if I send a message with the first client, the server receives it on second thread (the messages arrives correctly btw.), which belongs to the second client. After this, the server doesn't receive anything from the first client. (I removed the 'send forward to everybody' part, because the problem appears at receiving part, and I also edited the void ReceiveThread::receive(), now I call only once, and I will process it later)

Server.cpp

#include "Server.h"
#include <thread>
#include <string>
#include <winsock2.h>
#include <iostream>
#include "ReceiveThread.h"


using namespace std;

Server::Server(string ip, int port):ip(ip),port(port){
    init();
}

int Server::init(){

    //init the winsock library
    WSADATA wsaData;
    int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iResult != NO_ERROR){
        cout << "Error WSAStartup!";
        return -1;
    }

    // Create a SOCKET for listening for incoming connection requests.
    listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (listenSocket == INVALID_SOCKET) {
        cout << "Error at socket(): " << WSAGetLastError();
        WSACleanup();
        return -1;
    }

    //----------------------
    // The sockaddr_in structure specifies the address family,
    // IP address, and port for the socket that is being bound.
    sockaddr_in service;
    service.sin_family = AF_INET;
    service.sin_addr.s_addr = inet_addr(ip.c_str());
    service.sin_port = htons(port);

    if ( ::bind(listenSocket, (SOCKADDR*)&service, sizeof(service)) == SOCKET_ERROR) {
        cout << "Bind error.";
        closesocket(listenSocket);
        WSACleanup();
        return -1;
    }

    //----------------------
    // Listen for incoming connection requests.
    // on the created socket
    if (listen(listenSocket, 1) == SOCKET_ERROR) {
        cout << "Error listening on socket.\n";
        closesocket(listenSocket);
        WSACleanup();
        return -1;
    }

    // Accept connections
    acceptConnections();

}
void Server::acceptConnections(){
    // Create a SOCKET for accepting incoming request.
    SOCKET acceptSocket;
    cout << "Waiting for clients.";
    int counter = 0;
    while (1){
        acceptSocket = accept(listenSocket, NULL, NULL);
        if (acceptSocket == INVALID_SOCKET){
            cout << "Accept error";
            closesocket(listenSocket);
            WSACleanup();
            return;
        }
        else{
            clientSockets.add(acceptSocket);
            cout << "Client connected.";
            // create a new receive thread object for every client
            counter++;
            ReceiveThread receiveThread(clientSockets, acceptSocket,counter);
        }
    }
}

ReceiveThread.cpp

#include "ReceiveThread.h"
#include <winsock2.h>
#include "Message.h"
#include <iostream>
#include <string>

using namespace std;

ReceiveThread::ReceiveThread(ClientSockList &clients, SOCKET &socket,int counter) :clients(clients), socket(socket),counter(counter){
    //cout << clients.getList().size();
    receiveThread = new thread(&ReceiveThread::receive, this);
}

void ReceiveThread::terminateThread(){
    terminated = true;
}
void ReceiveThread::receive(){
    int res;
    while (!terminated){

        char recvbuf[BUF_SIZE]; // BU_SIZE = 1024
        int recv_len = 0;
        res = recv(socket, recvbuf + recv_len, BUF_SIZE - recv_len, 0);
        if (!checkSocket(res)) break;
        cout << "[" << counter << "] ";
        for (int i = 0; i < res; ++i){
            cout << recvbuf[i];
        }
        cout << endl;
    }
    //delete receiveThread;
}

bool ReceiveThread::checkSocket(int res){
    if (res == SOCKET_ERROR || res == 0){
        terminated = true;
        cout << endl << "Terminated" << endl;
        clients.remove(socket);
        closesocket(socket);
        return false;
    }
    else{
        return true;
    }
}

This is how I send messages from the client:

public void sendMessageForBroadcast(String message) throws IOException {
    //String m = buildMessage(message,Message.TYPE_BROADCAST);
    StringBuffer buff = new StringBuffer();
    buff.append(Character.toChars(message.length()));
    buff.append(Character.toChars(1));
    buff.append(message);

    //System.out.println("Sending message: " + m + "["+m.length()+"]");
    outputStream.write(buff.toString().getBytes("UTF8"));
    outputStream.flush();
}

[EDIT] Scenario:

  1. connect with client1
  2. send message with client1
  3. receive message on server (thread 1)
  4. connect with client2
  5. send message with client2
  6. receive message on server (thread 2)
  7. send message with client1
  8. receive message on server (thread 2)
  9. from now the server doesn't receive anything from client1
1
That's quite a lot of code to read, please try to narrow it down using the debugger. When the code seems to lock up, break it using the debugger to see where the lock up is, if it's in a blocking function or an infinite loop. - Some programmer dude
Note: I see nothing to indicate Message is rule-of-three safe. And you do realize your ReceiveThread receiveThread(clientSockets, acceptSocket,counter); logic invokes undefined behavior by the member function referencing an object (this) that has long been destroyed once it falls out of scope (which is immediately after construction). This is an sizable amount of code to have chunked without intermediate testing and validation. I would strongly suggest you take a half dozen steps back and reconsider what you're really trying to do. - WhozCraig
I have just edited the content. I removed the Message classes, and edited the ReceiveThread::receive() method. The minimal code is left only, and the problem is still there. By the way, how can I avoid that undefined behavior? Or how can I start the threads in better way? The whole design is bad? Could you give some advice? - cylon

1 Answers

1
votes

You have a serious case of undefined behavior.

It all starts with this line:

ReceiveThread receiveThread(clientSockets, acceptSocket,counter);

That creates a ReceiveThread object (whose constructor creates your thread referencing this). The problem is that as soon as the declaration is made, the block of code the variable is declared in ends, which makes the variable go out of scope and destructs the object.

Once the object is destroyed, any code dereferencing the former objects this pointer (which is all code that uses the objects non-static member variables or functions) will dereference the pointer of a destructed object, leading to said undefined behavior.

I suggest you keep a collection of pointers to the thread object, both to keep the object in scope until it needs to be destructed, and to keep a reference to the object if you later need to use it from other threads.


There's also another possible source of undefined behavior, because I don't see you initialize the member variable terminated which means its value will be indeterminate when you reference it in the thread.