1
votes

I'm trying to send the large amount of data to the server which should accept the data and parse it. So as I know, when you send() the data in blocking mode in one call, it splits data into chunks and then sends the chunks to the target. But I need to mark each chunk with a small identifier in the beginning of the data (let's say I'm placing a header in each chunk), so I decided to use non- blocking send. I thought, when I do non-blocking send, it sends the max the buffer allows and then returns, leaving the chunking work for me, but it seems that's not happening.

My code is:

    struct sockaddr_in target;

    SOCKET connection = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);  
    target.sin_family = AF_INET;
    target.sin_addr.s_addr = inet_addr(host);
    target.sin_port = htons(port);

     ULONG NonBlock;
     NonBlock = 1;
     if (ioctlsocket(connection, FIONBIO, &NonBlock) == SOCKET_ERROR)
     {

         return WSAGetLastError();

     }

      fd_set write_fds;

        FD_ZERO(&write_fds);            
        FD_SET(connection, &write_fds);
        struct timeval tv;
        tv.tv_sec=1;
        tv.tv_usec=0;
    int result = connect(connection,(SOCKADDR*)&target,sizeof(target));
    if(result==SOCKET_ERROR)
    {
        while(true)
        {
            result= select(connection+1,NULL,&write_fds,NULL,&tv);
            printf("connect: result=%d\r\n",result);
            if(result== -1)
            {
                return WSAGetLastError();
            }
            else break;
        }

    }

    //later on

fd_set write_set;
int bytes_sent= 0;
int total_sent = 0;
int length = 0;
char *tmp = malloc(sizeof(header)+data_size); //data_size is the size of the large buffer
memcpy(tmp,packet,sizeof(header));
memcpy(tmp+sizeof(header),data,data_size);


int result;

FD_ZERO(&write_set);
FD_SET(connection,&write_set);

struct timeval time_out;

time_out.tv_sec=0;
time_out.tv_usec=1500;

while(total_sent < data_size)
{
    length= (data_size+sizeof(my_header))-total_sent;

    result = select(connection+1,NULL,&write_set,NULL,&time_out);

    if(result== SOCKET_ERROR) return -1;
    if(result!=0 && FD_ISSET(connection, &write_set))
    {
        bytes_sent = send(connection,tmp,length,0);
    }

    if(bytes_sent == SOCKET_ERROR)
    {
        return SOCKET_ERROR;
    }
    if(bytes_sent > 0)
    {
        //here i need to append a header to the new chunk
    }
    else break;


}

So basically my question is: why the send on non-blocking socket, still blocks and doesn't return after sending the first chunk, and acts just like regular blocking send? What i want to achieve is send() sending one chunk of data of the length that the system allows, so i put the length of the whole data, assuming that non-blocking send will return after sending the first chunk, because the buffer is to big, to be sent as one block.

UPDATE some runnable code:

#include <windows.h>
#include <winsock.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <types.h>

typedef struct hdr{
 uint8_t super_id;
}my_header,*pmy_header;

SOCKET connection;
int start_winsock()
{
    WSADATA check;

    int result = WSAStartup(MAKEWORD(2,2),&check);  
    return result;
}
int create_connection(char* host,int port)
{
     struct sockaddr_in target;

    connection = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 
    target.sin_family = AF_INET;
    target.sin_addr.s_addr = inet_addr(host);
    target.sin_port = htons(port);
    int result =    UnblockSocket();
    if(result!=0) return WSAGetLastError();

        fd_set write_fds;
        FD_ZERO(&write_fds);            
        FD_SET(connection, &write_fds);
         struct timeval tv;

        tv.tv_sec=1;
        tv.tv_usec=0;

        result = connect(connection,(SOCKADDR*)&target,sizeof(target));
        if(result==SOCKET_ERROR)
        {
          while(true)
          {
            result= select(connection+1,NULL,&write_fds,NULL,&tv);
            if(result== -1)
            {
                 return WSAGetLastError();
            }
             else break;
        }

    }

    return 0;

}


 int UnblockSocket()
 {
   ULONG NonBlock;
   NonBlock = 1;
   if (ioctlsocket(connection, FIONBIO, &NonBlock) == SOCKET_ERROR)
   {

      return WSAGetLastError();

   }
    return 0;
 }



 int SendMyData(pmy_header header,char * data,int data_size)
 {

    fd_set write_set;
    int bytes_sent= 0;
    int total_sent = 0;
    int length = 0;
    char *tmp = malloc(sizeof(my_header)+data_size);
    memcpy(tmp,packet,sizeof(my_header));
    memcpy(tmp+sizeof(my_header),data,data_size);

    int result;

    FD_ZERO(&write_set);
    FD_SET(connection,&write_set);

    struct timeval time_out;

    time_out.tv_sec=0;
    time_out.tv_usec=1500;

    header->super_id=0xdead;
    while(total_sent < data_size)
    {
        length= (data_size+sizeof(my_header))-total_sent;

        if(result== SOCKET_ERROR) return -1;
        if(result!=0 && FD_ISSET(connection, &write_set))
        {
            bytes_sent = send(connection,tmp,length,0);
        }

            printf("bytes sent per iteration=%d\n",bytes_sent);
        if(bytes_sent == SOCKET_ERROR)
        {
            return SOCKET_ERROR;
        }
        if(bytes_sent > 0)
        {
         total_sent+= bytes_sent-sizeof(my_header);

         tmp = realloc(tmp,sizeof(my_header)+(data_size-total_sent));


         memcpy(tmp,header,sizeof(my_header));
         memcpy(tmp+sizeof(my_header),data,data_size-total_sent);
        }
        else break;


    }
    free(tmp);
    return total_sent;
}

int main(int argc, char *argv[])
{
     start_winsock();
     int result = create_connection("2.2.2.2",88);
     if(result!=0) { printf("Cannot connect\n"); return 0; }

     pmy_header *header = malloc(sizeof(my_header));

     int buf_size = 500000;
     char buffer_test[buf_size];
     ZeroMemory(buffer_test,buf_size);
     int count=0;
     for(count;count<buf_size;count++)
     {
       strcat(buffer_test,"4");
     }
     result = SendMyData(header,buffer_test,buf_size);

} 
1
I strongly recommend using boost::asio instead of doing any major deployments over raw socket. Will save you from almost all the trouble - Tymoteusz Paul
Because you are blocking in select()? - jxh
@Puciek: It is probably just because I've done more with raw sockets but I actually find doing it that way much easier to read than boost::asio. When boost fails to work it does so in mysterrrrrious ways. - Zan Lynx
@ZanLynx while I too come from the "raw socket era", where things like asio were immature, it certainly isn't the case any more. At least not in case of ASIO, as I cannot speak for the whole monster that boost is. It took quite a bit of getting used to, but once you do - it just makes the code shorter, cleaner and less prone to errors. - Tymoteusz Paul
No you see, i thought that send won't loop continiously trying to send the whole buffer inside like in blocking mode, i thought it will send as much as possible (probably arount 4-5 kb) and return after, but it still blocks until the whole "length" is sent. - Vanya

1 Answers

2
votes

send() is not guaranteed to send everything you ask it to send. It may send less. You MUST take the return value into account. If it is less than the amount you requested, you have to call send() again to re-send the remaining bytes, before then sending new bytes. And in the case of non-blocking, you have to take WSAEWOULDBLOCK into account as well.

And you don't put on a header on each chunk that send() sends. You put a header on each chunk you tell send() to send. You do your own chunking, don't worry about the chunking that TCP does internally. That is a network implementation, it does not affect your protocol. The receiver should be paying attention to your chunk headers, calling recv() as many times as needed to receive your full header and data to account for TCPs chunking.

Try something more like this instead:

SOCKET connection = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);  
if (connection == INVALID_SOCKET)
{
     return WSAGetLastError();
}

ULONG NonBlock = 1;
in result = ioctlsocket(connection, FIONBIO, &NonBlock);
if (result == SOCKET_ERROR)
{
    result = WSAGetLastError();
    closesocket(connection);
    return result;
}

struct sockaddr_in target;
memset(&target, 0, sizeof(target));
target.sin_family = AF_INET;
target.sin_addr.s_addr = inet_addr(host);
target.sin_port = htons(port);

result = connect(connection, (SOCKADDR*)&target, sizeof(target));
if (result == SOCKET_ERROR)
{
    result = WSAGetLastError();
    if (result != WSAEWOULDBLOCK)
    {
        closesocket(connection);
        return result;
    }

    fd_set write_fds;
    FD_ZERO(&write_fds);            
    FD_SET(connection, &write_fds);

    struct timeval tv;
    tv.tv_sec = 5;
    tv.tv_usec = 0;

    result = select(0, NULL, &write_fds, NULL, &tv);
    if (result == SOCKET_ERROR)
    {
        result = WSAGetLastError();
        closesocket(connection);
        return result;
    }

    if (result == 0)
    {
        closesocket(connection);
        return WSAETIMEDOUT;
    }
}

char *tmp_data = data;
int data_remaining = data_size;

while (data_remaining > 0)
{
    int pkt_data_size = min(data_remaining, 1024); // replace 1024 with whatever maximum chunk size you want...

    int pkt_size = sizeof(header) + pkt_data_size;
    char *pkt = malloc(pkt_size);
    if (!pkt) return -1;

    // fill header as needed...
    memcpy(pkt+sizeof(header), tmp_data, pkt_data_size);

    tmp_data += pkt_data_size;
    data_remaining -= pkt_data_size;

    char *tmp_pkt = pkt;
    while (pkt_size > 0)
    {
        result = send(connection, tmp_pkt, pkt_size, 0);
        if (result == SOCKET_ERROR)
        {
            result = WSAGetLastError();
            if (result != WSAEWOULDBLOCK)
            {
                free(pkt);
                return -1;
            }

            fd_set write_set;
            FD_ZERO(&write_set);
            FD_SET(connection, &write_set);

            struct timeval time_out;
            time_out.tv_sec = 5;
            time_out.tv_usec = 0;

            result = select(0, NULL, &write_set, NULL, &time_out);
            if (result != 1)
            {
                free(pkt);
                return -1;
            }

            continue;
        }

        tmp_pkt += result;
        pkt_size -= result;
    }

    free(pkt);
}