0

I implemented my own circular buffer. I want to use two threads to test it. One thread continuously writing to the buffer while the other continuously reading it. When certain amount of data are read, print out the benchmark.

The purpose of this circular buffer is write and read never access the same memory so there won't incur a race.

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <time.h> #include <unistd.h> #define MAX_NUM_ITEM 10000 #define HASH_SIZE 32 long write_count = 0; long read_count = 0; struct cBuf { int first; int last; int max_items; int item_size; int valid_items; unsigned char *buffer; }; void init_cBuf(struct cBuf *buf, int max_items, int item_size) { buf -> first = 0; buf -> last = 0; buf -> max_items = max_items; buf -> item_size = item_size; buf -> valid_items = 0; buf -> buffer = calloc(max_items, item_size); return; } int isEmpty(struct cBuf *buf) { if (buf -> valid_items == 0) { return 1; } else { return 0; } } int push(struct cBuf *buf, unsigned char *data) { if (buf -> valid_items >= buf -> max_items) { // buffer full return -1; } else { // push data into the buffer memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size); // update cBuf info buf -> valid_items++; buf -> last = (buf -> last + 1) % (buf -> max_items); return 0; } } int pop(struct cBuf *buf, unsigned char *new_buf) { if (isEmpty(buf)) { // buffer empty return -1; } else { // read data memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size); // update cBuf info buf -> first = (buf -> first + 1) % (buf -> max_items); buf -> valid_items--; return 0; } } void *write_hash(void *ptr) { struct cBuf *buf = (struct cBuf *)(ptr); while (1) { unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte. if (push(buf, hash) == 0) { write_count++; //printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items); } free(hash); if (write_count == MAX_NUM_ITEM) { break; } } printf (" thread id = %lu\n", (long unsigned) (pthread_self ())); printf (" total write = %lu\n\n", write_count); return NULL; } void *read_hash(void *ptr) { struct cBuf *buf = (struct cBuf *)(ptr); unsigned char *new_buf = malloc(HASH_SIZE); while (1) { if (pop(buf, new_buf) == 0) { read_count++; //printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items); } if (read_count == MAX_NUM_ITEM) { break; } } free(new_buf); printf (" thread id = %lu\n", (long unsigned) (pthread_self ())); printf (" total read = %lu\n\n", read_count); } int main(int argc, char const *argv[]) { struct cBuf buf; init_cBuf(&buf, 200, HASH_SIZE); pthread_t write_thd, read_thd; double diff = 0.0, t1 = 0.0, t2 = 0.0; t1 = clock (); pthread_create(&read_thd, NULL, read_hash, &buf); pthread_create(&write_thd, NULL, write_hash, &buf); pthread_join(write_thd, NULL); pthread_join(read_thd, NULL); t2 = clock (); diff = (double)((t2 - t1) / CLOCKS_PER_SEC); printf ("----------------\nTotal time: %lf second\n", diff); printf ("Total write: %lu\n", write_count); printf ("write per-second: %lf\n\n", write_count / diff); return 0; } 

It is very weird that the program never terminates if I left the "printf" in function "write_hash" and "read_hash" commented. If I uncommented these two "printf"s, the program will print out all push and pop info till the end and eventually the program exit successfully.

Please help me to find if it's because something wrong with my circular buffer implementation, or somewhere else.

5
  • C11 draft standard n1570; 5.1.2.4 Multi-threaded executions and data races 4 Two expression evaluations conflict if one of them modifies a memory location and the other one reads or modifies the same memory location. 25 The execution of a program contains a data race if it contains two conflicting actions in different threads, at least one of which is not atomic, and neither happens before the other. Any such data race results in undefined behavior. Commented Jun 29, 2016 at 22:51
  • @EOF The purpose for my circular buffer implementation is to avoid such race because write and read to the buffer always access different memory. Commented Jun 29, 2016 at 22:54
  • 1
    Both push() and pop() modify buf -> valid_items, and you pass the same buf to both of them. Race condition, undefined behavior. Commented Jun 29, 2016 at 22:55
  • @EOF I think this is the answer! Commented Jun 29, 2016 at 22:57
  • Well, it's an answer. I'm quite sure you could find half a dozen race conditions in your code if you spent a bit of time looking for them. Unfortunately I don't feel like doing that, so a definitive answer won't be coming from me. Commented Jun 29, 2016 at 22:58

1 Answer 1

1

Luke, like EOF said, you're going to have race conditions in your code (probably several). Here are a few thoughts I had when I saw your code:

When threads share memory, you need to have protections on that memory so that only one thread can access it at a time. You can do this with a mutex (see pthread_mutex_lock and pthread_mutex_unlock).

Also, I noticed that your if statements in your read and write threads check the result of push and pop to see if a character was actually pushed or popped. While that might work, you should probably use semaphores to synchronize your threads instead. Here's what I mean by that: When your write thread writes to the buffer, you should have something like sem_post(&buff_count_sem);. Then, before your read thread reads a character, you should have sem_wait(&buff_count_sem); so that you know there is at least one character in the buffer.

The same principle applies to the read thread. I suggest the read thread having sem_post(&space_left_sem) just after popping a byte from the buffer and the write thread having sem_wait(&space_left_sem) just before pushing to the buffer to ensure that there is room in the buffer for the byte you are trying to write.

Here are my suggested changes to your code (not tested):

#include <stdio.h> #include <stdlib.h> #include <string.h> #include <pthread.h> #include <time.h> #include <unistd.h> #define MAX_NUM_ITEM 10000 #define HASH_SIZE 32 long write_count = 0; long read_count = 0; pthread_mutex_t buff_mutex; sem_t buff_count_sem; sem_t space_left_sem; struct cBuf { int first; int last; int max_items; int item_size; int valid_items; unsigned char *buffer; }; void init_cBuf(struct cBuf *buf, int max_items, int item_size) { buf -> first = 0; buf -> last = 0; buf -> max_items = max_items; buf -> item_size = item_size; buf -> valid_items = 0; buf -> buffer = calloc(max_items, item_size); return; } int isEmpty(struct cBuf *buf) { if (buf -> valid_items == 0) { return 1; } else { return 0; } } int push(struct cBuf *buf, unsigned char *data) { pthread_mutex_lock(&buff_mutex); if (buf -> valid_items >= buf -> max_items) { // buffer full pthread_mutex_unlock(&buff_mutex); return -1; } else { // push data into the buffer memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size); // update cBuf info buf -> valid_items++; buf -> last = (buf -> last + 1) % (buf -> max_items); pthread_mutex_unlock(&buff_mutex); return 0; } } int pop(struct cBuf *buf, unsigned char *new_buf) { phthread_mutex_lock(&buff_mutex); if (isEmpty(buf)) { // buffer empty pthread_mutex_unlock(&buff_mutex); return -1; } else { // read data memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size); // update cBuf info buf -> first = (buf -> first + 1) % (buf -> max_items); buf -> valid_items--; pthread_mutex_unlock(&buff_mutex); return 0; } } void *write_hash(void *ptr) { struct cBuf *buf = (struct cBuf *)(ptr); while (1) { unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte. sem_wait(&space_left_sem); push(buf, hash); write_count++; sem_post(&buff_count_sem); free(hash); if (write_count == MAX_NUM_ITEM) { break; } } printf (" thread id = %lu\n", (long unsigned) (pthread_self ())); printf (" total write = %lu\n\n", write_count); return NULL; } void *read_hash(void *ptr) { struct cBuf *buf = (struct cBuf *)(ptr); unsigned char *new_buf = malloc(HASH_SIZE); while (1) { sem_wait(&buff_count_sem); pop(buf, new_buf); read_count++; sem_post(&space_left_sem); if (read_count == MAX_NUM_ITEM) { break; } } free(new_buf); printf (" thread id = %lu\n", (long unsigned) (pthread_self ())); printf (" total read = %lu\n\n", read_count); } int main(int argc, char const *argv[]) { struct cBuf buf; init_cBuf(&buf, 200, HASH_SIZE); pthread_t write_thd, read_thd; pthread_mutex_init(&buff_mutex); sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer sem_init(&space_left_sem, 0, buf.max_items); double diff = 0.0, t1 = 0.0, t2 = 0.0; t1 = clock (); pthread_create(&read_thd, NULL, read_hash, &buf); pthread_create(&write_thd, NULL, write_hash, &buf); pthread_join(write_thd, NULL); pthread_join(read_thd, NULL); t2 = clock (); diff = (double)((t2 - t1) / CLOCKS_PER_SEC); printf ("----------------\nTotal time: %lf second\n", diff); printf ("Total write: %lu\n", write_count); printf ("write per-second: %lf\n\n", write_count / diff); return 0; } 

I suggest reading up on how to properly use threads.

UPDATE: 1. Fixed a typo. 2. You'll also want to have mutex_lock and mutex_unlock in isEmpty and any other functions that you write for your circular buffers where multiple threads might access the same buffer.

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.