Skip to content

Commit

Permalink
began impl of thread safe queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Young committed May 8, 2015
1 parent 03b6f19 commit 312d4d0
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 49 deletions.
2 changes: 1 addition & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
CC = gcc
STD = -std=c99
CFLAGS = $(STD) -I ../src
CFLAGS = $(STD) -pthread -I ../src

simple.out: simple.c ../src/queue.h
$(CC) $(CFLAGS) simple.c -o simple.out
3 changes: 2 additions & 1 deletion examples/simple.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ int main(void) {
struct msg *msgs; // message queue
struct msg m1, *m2;

msgs = NULL;
msgs = malloc(sizeof (struct msg));
QUEUE_INIT(msgs);

m1.content = "abc";
QUEUE_PUSH(msgs, &m1);
Expand Down
114 changes: 71 additions & 43 deletions src/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,99 @@
#define QUEUE_H_

#include <stdlib.h> /* malloc, free */
#include <pthread.h>

typedef struct {
pthread_mutex_t mutex; // here or in queue_handle so locks/waits can be done
pthread_cond_t cond; // with as little stuff as pos being read
void *front;
void *back;
void *backqh;
unsigned int size;
} queue_t;
} queue_core;

typedef struct queue_handle {
queue_t *q;
queue_core *qc;
void *next;
} queue_handle;

#define QUEUE_PUSH(queue, element) \
/*
pthread_mutex_lock(&queue_mutex);
while (QUEUE_SIZE(queue) == 0) {
pthread_cond_wait(&queue_cond, &queue_mutex);
}
QUEUE_POP(queue, client);
pthread_mutex_unlock(&queue_mutex);
pthread_mutex_lock(&queue_mutex);
QUEUE_PUSH(queue, client);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_signal(&queue_cond); // broadcast to all consumers??
*/

#define QUEUE_INIT(q) \
do { \
(q)->qh.qc = malloc(sizeof (queue_core)); \
queue_core *qc = (q)->qh.qc; \
pthread_mutex_init(&qc->mutex, NULL); \
pthread_cond_init(&qc->cond, NULL); \
qc->front = qc->back = NULL; \
qc->backqh = NULL; \
qc->size = 0; \
(q)->qh.next = NULL; \
} while (0)

#define QUEUE_PUSH(q, e) \
do { \
if (!(queue)) { \
(element)->qh.q = malloc(sizeof (queue_t)); \
queue_t *q = (element)->qh.q; \
\
q->front = q->back = (element); \
q->backqh = &((element)->qh); \
q->size = 0; \
(element)->qh.next = NULL; \
(queue) = (element); \
} else { \
queue_t *q = (queue)->qh.q; \
queue_handle *backqh = q->backqh; \
\
(element)->qh.q = q; \
(element)->qh.next = NULL; \
\
/* set current back's next to the new element */ \
backqh->next = (element); \
\
/* set back to the new element */ \
q->back = (element); \
\
/* set the new backs queue handle to backqh */ \
backqh = &((element)->qh); \
\
/* set the queue's backqh to the new elements queue handle */ \
q->backqh = backqh; \
if (q && (q)->qh.qc) { \
queue_core *qc = (q)->qh.qc; \
queue_handle *backqh; \
pthread_mutex_lock(&qc->mutex); \
(e)->qh.qc = qc; \
(e)->qh.next = NULL; \
backqh = qc->backqh; \
if (!qc->front) { /* empty queue */ \
qc->front = qc->back = (e); \
} else { /* non-empty queue */ \
backqh->next = (e); \
qc->back = (e); \
} \
backqh = &(e)->qh; \
qc->size++; \
pthread_mutex_unlock(&qc->mutex); \
pthread_cond_signal(&qc->cond); /* broadcast to all? */ \
} \
(queue)->qh.q->size++; \
} while (0)

#define QUEUE_POP(queue, element) \
#define QUEUE_POP(q, e) \
do { \
(element) = NULL; \
if (queue) { \
if ((queue)->qh.q->front != NULL) { \
(element) = (queue)->qh.q->front; \
(queue)->qh.q->front = (element)->qh.next; \
(queue)->qh.q->size--; \
(e) = NULL; \
if (q && (q)->qh.qc) { \
queue_core *qc = (q)->qh.qc; \
pthread_mutex_lock(&qc->mutex); \
while (QUEUE_SIZE(q) == 0) { \
pthread_cond_wait(&qc->cond, &qc->mutex); \
} \
if ((q)->qh.qc->front != NULL) { \
(e) = (q)->qh.qc->front; \
(q)->qh.qc->front = (e)->qh.next; \
(q)->qh.qc->size--; \
} \
pthread_mutex_unlock(&qc->mutex); \
} \
} while (0)

#define QUEUE_SIZE(queue) \
((queue) ? queue->qh.q->size : 0U)
#define QUEUE_SIZE(q) \
((q) ? (q)->qh.qc->size : 0U)

#define QUEUE_FREE(queue) \
#define QUEUE_FREE(q) \
do { \
if (queue && queue->qh.q) { \
free(queue->qh.q); \
if ((q) && (q)->qh.qc) { \
queue_core *qc = (q)->qh.qc; \
pthread_cond_destroy(&qc->cond); \
pthread_mutex_destroy(&qc->mutex); \
free(qc); \
} \
} while (0)

Expand Down
8 changes: 4 additions & 4 deletions tests/Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
CC = gcc
STD = -std=c99
CFLAGS = $(STD) -I ../src
CFLAGS = $(STD) -pthread -I ../src
LIBS = -lcheck

check: check_queue.c ../src/queue.h
$(CC) $(CFLAGS) check_queue.c -o check_queue $(LIBS)
./check_queue
rm check_queue
$(CC) $(CFLAGS) check_queue.c -o check_queue.out $(LIBS)
./check_queue.out
rm check_queue.out

.PHONY: check

0 comments on commit 312d4d0

Please sign in to comment.