0
votes

I need some algorithm help with a multithreaded program I'm writing. It's basically the cp command in unix, but with a read thread and a write thread. I'm using semaphores for thread synchronization. I have structs for buffer and thread data defined as

struct bufType {
    char buf[BUFFER_SIZE];
    int numBytes;
};

struct threadData {
    int fd;
    bufType buf;
};

and a global array of bufType. Code for my main is

int main(int argc, const char * argv[])
{
    int in, out;
    pthread_t Producer, Consumer;
    threadData producerData, consumerData;

    if (argc != 3)
    {
        cout << "Error: incorrect number of params" << endl;
        exit(0);
    }
    if ((in = open(argv[1], O_RDONLY, 0666)) == -1)
    {
        cout << "Error: cannot open input file" << endl;
        exit(0);
    }
    if ((out = open(argv[2], O_WRONLY | O_CREAT, 0666)) == -1)
    {
        cout << "Cannot create output file" << endl;
        exit(0);
    }

    sem_init(&sem_empty, 0, NUM_BUFFERS);
    sem_init(&sem_full, 0, 0);

    pthread_create (&Producer, NULL, read_thread, (void *) &producerData);
    pthread_create (&Consumer, NULL, write_thread, (void *) &consumerData);

    pthread_join(Producer, NULL);
    pthread_join(Consumer, NULL);

    return 0;
}

and read and write threads:

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf.numBytes = slow_read(thread_data->fd, thread_data->buf.buf, BUFFER_SIZE)) != 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    sem_wait(&sem_full);
    slow_write(thread_data->fd, thread_data->buf.buf, thread_data->buf.numBytes);
    sem_post(&sem_empty);

    pthread_exit(0);
}

So my issue is in what to assign to my threadData variables in main, and my semaphore logic in the read and write threads. I appreciate any help you're able to give

2

2 Answers

0
votes

You could use a common buffer pool, either a circular array or a linked lists. Here is a link to a zip of a Windows example that is similar to what you're asking, using linked lists as part of a inter-thread messaging system to buffer data. Other than the creation of the mutexes, semaphores, and the write thread, the functions are small and simple. mtcopy.zip .

0
votes

Being a windows guy who does not use file descriptors I might be wrong with the in's and out's but I think this needs to be done in your main in order to setup the threadData structures.

producerData.fd = in;
consumerData.fd = out;

Then declare ONE SINGLE object of type bufType for both structures. Change for example the definition of threadData to

struct threadData {
    int fd;
    bufType* buf;
};

and in your Main, you write

bufType buffer;
producerData.buf = &buffer;
consumerData.buf = &buffer;

Then both threads will use a common buffer. Otherwise you would be writing to the producerData buffer, but the consumerData buffer will stay empty (and this is where your writer thread is looking for data)

Then you need to change your signalling logic. Right now your program cannot accept input that exceeds BUFFER_SIZE, because your write thread will only write once. There needs to be a loop around it. And then you need some mechanism that signals the writer thread that no more data will be sent. For example you could do this

void *read_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;

    while((thread_data->buf->numBytes = slow_read(thread_data->fd, thread_data->buf->buf, BUFFER_SIZE)) > 0)
    {
        sem_post(&sem_full);
        sem_wait(&sem_empty);
    }
    sem_post(&sem_full); // Note that thread_data->buf->numBytes <= 0 now

    pthread_exit(0);
}

void *write_thread(void *data)
{
    threadData *thread_data;
    thread_data = (threadData *) data;


    sem_wait(&sem_full);
    while (thread_data->buf->numBytes > 0)
    {
        slow_write(thread_data->fd, thread_data->buf->buf, thread_data->buf->numBytes);
        sem_post(&sem_empty);
        sem_wait(&sem_full);
    }
    pthread_exit(0);
}

Hope there are no more errors, did not test solution. But the concept should be what you were asking for.