0
votes

im trying to make a simple example that accept clients using tcp socket and handle them with select function of winsock.

The problem is that when ever i run the select function it keeps return me the value -1 (error) and WSAGetLastError return the value of 10022.

I cant find out what im doing wrong because the fd_set is setting with the sockets properly and the maximum_sd set to the right value.

#include "stdafx.h"
#include <stdio.h>
#include <thread>
#include <queue>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>

using namespace std;

void SlaveThread(queue<char*>* tasks);
void MasterThread(queue<char*>* tasks);

fd_set readfds;
int max_sd = 0;
deque<SOCKET> socketsQueue;
int nSocketsAmount = 0;

int _tmain(int argc, _TCHAR* argv[])
{

    queue<char*>* tasksQueue = new queue<char*>();
    FD_ZERO(&readfds);
    thread SecondThread(MasterThread,tasksQueue);
    thread FirstThread(SlaveThread,tasksQueue);

    int nReady;
    struct timeval timeout={0, 0};
    timeout.tv_sec=10;
    timeout.tv_usec=0;
    while (true)
    {
        int i;

        nReady = select(max_sd + 1, &readfds, NULL, NULL, &timeout);

         for (i=0; i < nSocketsAmount && nReady > 0; i++)
         {
             SOCKET temp = socketsQueue[i];
             if (FD_ISSET(temp, &readfds)) {

                char buffer[200];
                memset(buffer, 0, 200);
                recv(temp, buffer, 200, 0);
                tasksQueue->push(buffer);
                nReady--;
            }
         }
    }

    FirstThread.join();
    SecondThread.join();
    return 0;
};

void SlaveThread(queue<char*>* tasks)
{
    while (true)
    {
        if (!tasks->empty())
        {
            cout << tasks->front() << " Queue size : " << tasks->size() << endl;
            tasks->pop();
        }

        Sleep(1000);
    }
};

void MasterThread(queue<char*>* tasks)
{
    WSAData WinSockData;
    WORD Version = MAKEWORD(2, 1);

    WSAStartup(Version, &WinSockData);

    /* Create socket structure */
    SOCKADDR_IN Server;
    Server.sin_addr.s_addr = inet_addr("10.0.0.7");
    Server.sin_family = AF_INET;
    Server.sin_port = htons(27015);

    SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, NULL);
    SOCKET Connect;
    ::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server));
    int errno0 = WSAGetLastError();
    listen(ListenSock, 1);
    int errno1 = WSAGetLastError();
    cout << "Listening on port 27015" << endl;
    char buffer[200];
    int size = sizeof(Server);

    while (true)
    {
        if (Connect = accept(ListenSock, (SOCKADDR*)&Server, &size))
        {
            cout << "Connection established..." << endl;
            FD_SET(Connect, &readfds);

            socketsQueue.push_front(Connect);
            nSocketsAmount++;

            if (Connect > max_sd)
            {
                max_sd = Connect;
            }
        }
    }

    WSACleanup();
};

master thread adding the sockets to the fd_set and sockets queue. and the main use select to get the socket need to read from.

Any suggestions why is that happen?

Thank you.

2
Try moving the WSAStartup stuff to main, before calling select.Ben Voigt
Have you considered looking up what WSA error 10022 means?user207421
Yes i did, it's say invalid argument but i don't what wrong with the arguments i sent to selectasaf
You should have included that information in your question.user207421

2 Answers

2
votes

Who (what documentation page) gave you permission to change readfds while select is using it?

When you pass a data structure to an API function, that function owns it until it returns (or longer, in the case of buffers for overlapped I/O). You can't go overwriting it from another thread.

You need to combine your main select loop and your "master" thread, luckily select is perfectly capable of waiting for incoming connections on a listening socket.

The actual error you are getting is from violating this requirement found in the documentation:

Any two of the parameters, readfds, writefds, or exceptfds, can be given as null. At least one must be non-null, and any non-null descriptor set must contain at least one handle to a socket.

After applying the redesign I describe above, the set will always contain the listening socket, fixing this problem as well.

0
votes

10022 is WSAEINVAL. You are passing an invalid argument to select(). One thing you are not taking into account is that select() modifies the provided fd_set structs on exit, so you have to reset them every time you call select(). You are also not taking into account that you are modifying the fd_set while select() may still be using it. And you are not protecting your task queues from concurrent thread access, either.

Try something more like this instead:

#include "stdafx.h"
#include <stdio.h>
#include <thread>
#include <queue>
#include <mutex>
#include <string.h>
#include <WinSock2.h>
#include <iostream>
#include <Windows.h>

using namespace std;

struct taskQueue
{
    queue<char*> items;
    mutex itemsLock;
};

void SlaveThread(taskQueue *tasks);
void MasterThread(taskQueue *tasks);

deque<SOCKET> socketsQueue;
mutex socketsQueueLock;

int _tmain(int argc, _TCHAR* argv[])
{
    WSAData WinSockData;
    WORD Version = MAKEWORD(2, 1);

    WSAStartup(Version, &WinSockData);

    taskQueue tasks;
    thread SecondThread(MasterThread, &tasks);
    thread FirstThread(SlaveThread, &tasks);

    char *buffer = NULL;

    while (true)
    {
        fd_set readfds;
        FD_ZERO(&readfds);

        struct timeval timeout = {0};
        timeout.tv_sec = 10;
        timeout.tv_usec = 0;

        int nReady = 0;

        {
            lock_guard<mutex> lock(socketsQueueLock);
            for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
            {
                FD_SET(*iter, &readfds);
                ++nReady;
            }
        }

        if (nReady == 0)
        {
            Sleep(100);
            continue;
        }

        nReady = select(0, &readfds, NULL, NULL, &timeout);
        if (nReady > 0)
        {
            lock_guard<mutex> lock(socketsQueueLock);
            for (deque<SOCKET>::iterator iter = socketsQueue.begin(); iter != socketsQueue.end(); ++iter)
            {
                SOCKET temp = *iter;
                if (FD_ISSET(temp, &readfds))
                {
                    if (!buffer)
                        buffer = new char[201];
                    memset(buffer, 0, 200);

                    nReady = recv(temp, buffer, 200, 0);
                    if (nReady > 0)
                    {
                        buffer[nReady] = 0;
                        lock_guard<mutex> lock2(tasks.itemsLock);
                        tasks.items.push(buffer);
                        buffer = NULL;
                    }
                }
            }
        }
    }

    FirstThread.join();
    SecondThread.join();

    delete[] buffer;

    WSACleanup();
    return 0;
};

void SlaveThread(taskQueue *tasks)
{
    while (true)
    {
        {
            lock_guard<mutex> lock(tasks->itemsLock);

            if (!tasks->items.empty())
            {
                char *buffer = tasks->items.front();
                cout << buffer << " Queue size : " << tasks->items.size() << endl;
                tasks->items.pop();
                delete[] buffer;
            }
        }

        Sleep(1000);
    }
};

void MasterThread(taskQueue *tasks)
{
    /* Create socket structure */
    SOCKADDR_IN Server = {0};
    Server.sin_addr.s_addr = inet_addr("10.0.0.7");
    Server.sin_family = AF_INET;
    Server.sin_port = htons(27015);

    SOCKET ListenSock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (ListenSock == INVALID_SOCKET)
    {
        cout << "Cannot create listening socket. Error: " << WSAGetLastError() << endl;
        return;
    }

    if (::bind(ListenSock, (SOCKADDR*)&Server, sizeof(Server)) == SOCKET_ERROR)
    {
        cout << "Cannot bind listening socket. Error: " << WSAGetLastError() << endl;
        closesocket(ListenSock);
        return;
    }

    if (listen(ListenSock, 1) == SOCKET_ERROR)
    {
        cout << "Cannot listen on port 27015. Error: " << WSAGetLastError() << endl;
        closesocket(ListenSock);
        return;
    }

    cout << "Listening on port 27015" << endl;

    char buffer[200];
    int size;
    SOCKET Connect;

    while (true)
    {
        size = sizeof(Server);
        Connect = accept(ListenSock, (SOCKADDR*)&Server, &size);
        if (Connect != INVALID_SOCKET)
        {
            cout << "Connection established..." << endl;

            lock_guard<mutex> lock(socketsQueueLock);
            socketsQueue.push_front(Connect);
        }
    }
};