45
votes

I am trying to explore Protocol Buffer (PB) in Linux platform and my coding language is C++. I found examples in the protocol buffer online docs but nothing specific to socket send and receive (Or I have missed it completely :) ). So I decided to add the message Length before the actual message and send it across socket. I would appreciate if anyone can suggest a better solution than what I am planning to do and also is there anything ready made in PB for creating such packets.

But I still end up with a problem at server side where I have to decode the packet. Say if the client sends a packet of 10 byte in which first 4 byte is the length of the packet; But it is impossible to know the length before decoding the packet. So even if i read the first 4 byte how do i deduce the the value with half read packet using Protocol Buffer.

2

2 Answers

42
votes

At last I could get it working . I am posting the code here so that one can review and comment on it as well as if some one wants to implement it in c++, this piece of code can help. Its a shabby code my intention was to get Protobuf working in length prefixed manner. I have taken the code of client server from some site which I don't remember and I have modified it to accommodate protobuf. Here the server first peeks into the socket and gets the length of the total packet and then actual socket read is done to read the entire packet. There can be zillion ways to do it but for quick solution I did it in this manner. But I need to find a better way to avoid 2 recv per packets, but in my condition all the messages are of different size, so this is the only way I guess.

Proto file

  message log_packet {
  required fixed64 log_time =1;
  required fixed32 log_micro_sec =2;
  required fixed32 sequence_no =3;
  required fixed32 shm_app_id =4;
  required string packet_id =5;
  required string log_level=6;
  required string log_msg=7;
  }

Protocol buffer Client Code

#include <unistd.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>


using namespace google::protobuf::io;

using namespace std;
int main(int argv, char** argc){

/* Coded output stram */

log_packet payload ;

payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");

cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);

        int host_port= 1101;
        char* host_name="127.0.0.1";

        struct sockaddr_in my_addr;

        char buffer[1024];
        int bytecount;
        int buffer_len=0;

        int hsock;
        int * p_int;
        int err;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n",errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n",errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                if((err = errno) != EINPROGRESS){
                        fprintf(stderr, "Error connecting socket %d\n", errno);
                        goto FINISH;
                }
        }




       for (int i =0;i<10000;i++){
          for (int j = 0 ;j<10;j++) {

                if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
                        fprintf(stderr, "Error sending data %d\n", errno);
                        goto FINISH;
                }
                printf("Sent bytes %d\n", bytecount);
                usleep(1);
         }
        }
        delete pkt;

FINISH:
        close(hsock);

}

Protocol buffer Server Code

#include <fcntl.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <netinet/in.h>
#include <resolv.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <pthread.h>
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>

using namespace std;
using namespace google::protobuf::io;



void* SocketHandler(void*);

int main(int argv, char** argc){

        int host_port= 1101;

        struct sockaddr_in my_addr;

        int hsock;
        int * p_int ;
        int err;

        socklen_t addr_size = 0;
        int* csock;
        sockaddr_in sadr;
        pthread_t thread_id=0;

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
                printf("Error initializing socket %d\n", errno);
                goto FINISH;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 )||
                (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
                printf("Error setting options %d\n", errno);
                free(p_int);
                goto FINISH;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = INADDR_ANY ;

        if( bind( hsock, (sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
                fprintf(stderr,"Error binding to socket, make sure nothing else is listening on this port %d\n",errno);
                goto FINISH;
        }
        if(listen( hsock, 10) == -1 ){
                fprintf(stderr, "Error listening %d\n",errno);
                goto FINISH;
        }

        //Now lets do the server stuff

        addr_size = sizeof(sockaddr_in);

        while(true){
                printf("waiting for a connection\n");
                csock = (int*)malloc(sizeof(int));
                if((*csock = accept( hsock, (sockaddr*)&sadr, &addr_size))!= -1){
                        printf("---------------------\nReceived connection from %s\n",inet_ntoa(sadr.sin_addr));
                        pthread_create(&thread_id,0,&SocketHandler, (void*)csock );
                        pthread_detach(thread_id);
                }
                else{
                        fprintf(stderr, "Error accepting %d\n", errno);
                }
        }

FINISH:
;//oops
}

google::protobuf::uint32 readHdr(char *buf)
{
  google::protobuf::uint32 size;
  google::protobuf::io::ArrayInputStream ais(buf,4);
  CodedInputStream coded_input(&ais);
  coded_input.ReadVarint32(&size);//Decode the HDR and get the size
  cout<<"size of payload is "<<size<<endl;
  return size;
}

void readBody(int csock,google::protobuf::uint32 siz)
{
  int bytecount;
  log_packet payload;
  char buffer [siz+4];//size of the payload and hdr
  //Read the entire buffer including the hdr
  if((bytecount = recv(csock, (void *)buffer, 4+siz, MSG_WAITALL))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }
  cout<<"Second read byte count is "<<bytecount<<endl;
  //Assign ArrayInputStream with enough memory 
  google::protobuf::io::ArrayInputStream ais(buffer,siz+4);
  CodedInputStream coded_input(&ais);
  //Read an unsigned integer with Varint encoding, truncating to 32 bits.
  coded_input.ReadVarint32(&siz);
  //After the message's length is read, PushLimit() is used to prevent the CodedInputStream 
  //from reading beyond that length.Limits are used when parsing length-delimited 
  //embedded messages
  google::protobuf::io::CodedInputStream::Limit msgLimit = coded_input.PushLimit(siz);
  //De-Serialize
  payload.ParseFromCodedStream(&coded_input);
  //Once the embedded message has been parsed, PopLimit() is called to undo the limit
  coded_input.PopLimit(msgLimit);
  //Print the message
  cout<<"Message is "<<payload.DebugString();

}

void* SocketHandler(void* lp){
    int *csock = (int*)lp;

        char buffer[4];
        int bytecount=0;
        string output,pl;
        log_packet logp;

        memset(buffer, '\0', 4);

        while (1) {
        //Peek into the socket and get the packet size
        if((bytecount = recv(*csock,
                         buffer,
                                 4, MSG_PEEK))== -1){
                fprintf(stderr, "Error receiving data %d\n", errno);
        }else if (bytecount == 0)
                break;
        cout<<"First read byte count is "<<bytecount<<endl;
        readBody(*csock,readHdr(buffer));
        }

FINISH:
        free(csock);
    return 0;
}
19
votes

Unfortunately, protobuf does not provide a way for "packaging" (delimiting) your messages:

If you want to write multiple messages to a single file or stream, it is up to you to keep track of where one message ends and the next begins. The Protocol Buffer wire format is not self-delimiting, so protocol buffer parsers cannot determine where a message ends on their own. The easiest way to solve this problem is to write the size of each message before you write the message itself.

(from their documentation)

So, they basically recommend the same solution you arrived at.