From 312d4d05a7ba0d7b42bd5c886837d26ee93208bd Mon Sep 17 00:00:00 2001 From: Alex Young Date: Fri, 8 May 2015 01:32:48 +0100 Subject: [PATCH] began impl of thread safe queue --- examples/Makefile | 2 +- examples/simple.c | 3 +- src/queue.h | 114 +++++++++++++++++++++++++++++----------------- tests/Makefile | 8 ++-- 4 files changed, 78 insertions(+), 49 deletions(-) diff --git a/examples/Makefile b/examples/Makefile index 91612bc..c2a88fb 100644 --- a/examples/Makefile +++ b/examples/Makefile @@ -1,6 +1,6 @@ CC = gcc STD = -std=c99 -CFLAGS = $(STD) -I ../src +CFLAGS = $(STD) -pthread -I ../src simple.out: simple.c ../src/queue.h $(CC) $(CFLAGS) simple.c -o simple.out diff --git a/examples/simple.c b/examples/simple.c index c0270ac..234a27b 100644 --- a/examples/simple.c +++ b/examples/simple.c @@ -11,7 +11,8 @@ int main(void) { struct msg *msgs; // message queue struct msg m1, *m2; - msgs = NULL; + msgs = malloc(sizeof (struct msg)); + QUEUE_INIT(msgs); m1.content = "abc"; QUEUE_PUSH(msgs, &m1); diff --git a/src/queue.h b/src/queue.h index 6621d5a..5d5f428 100644 --- a/src/queue.h +++ b/src/queue.h @@ -2,71 +2,99 @@ #define QUEUE_H_ #include /* malloc, free */ +#include typedef struct { + pthread_mutex_t mutex; // here or in queue_handle so locks/waits can be done + pthread_cond_t cond; // with as little stuff as pos being read void *front; void *back; void *backqh; unsigned int size; -} queue_t; +} queue_core; typedef struct queue_handle { - queue_t *q; + queue_core *qc; void *next; } queue_handle; -#define QUEUE_PUSH(queue, element) \ +/* +pthread_mutex_lock(&queue_mutex); +while (QUEUE_SIZE(queue) == 0) { + pthread_cond_wait(&queue_cond, &queue_mutex); +} +QUEUE_POP(queue, client); +pthread_mutex_unlock(&queue_mutex); + + +pthread_mutex_lock(&queue_mutex); +QUEUE_PUSH(queue, client); +pthread_mutex_unlock(&queue_mutex); +pthread_cond_signal(&queue_cond); // broadcast to all consumers?? +*/ + +#define QUEUE_INIT(q) \ + do { \ + (q)->qh.qc = malloc(sizeof (queue_core)); \ + queue_core *qc = (q)->qh.qc; \ + pthread_mutex_init(&qc->mutex, NULL); \ + pthread_cond_init(&qc->cond, NULL); \ + qc->front = qc->back = NULL; \ + qc->backqh = NULL; \ + qc->size = 0; \ + (q)->qh.next = NULL; \ + } while (0) + +#define QUEUE_PUSH(q, e) \ do { \ - if (!(queue)) { \ - (element)->qh.q = malloc(sizeof (queue_t)); \ - queue_t *q = (element)->qh.q; \ - \ - q->front = q->back = (element); \ - q->backqh = &((element)->qh); \ - q->size = 0; \ - (element)->qh.next = NULL; \ - (queue) = (element); \ - } else { \ - queue_t *q = (queue)->qh.q; \ - queue_handle *backqh = q->backqh; \ - \ - (element)->qh.q = q; \ - (element)->qh.next = NULL; \ - \ - /* set current back's next to the new element */ \ - backqh->next = (element); \ - \ - /* set back to the new element */ \ - q->back = (element); \ - \ - /* set the new backs queue handle to backqh */ \ - backqh = &((element)->qh); \ - \ - /* set the queue's backqh to the new elements queue handle */ \ - q->backqh = backqh; \ + if (q && (q)->qh.qc) { \ + queue_core *qc = (q)->qh.qc; \ + queue_handle *backqh; \ + pthread_mutex_lock(&qc->mutex); \ + (e)->qh.qc = qc; \ + (e)->qh.next = NULL; \ + backqh = qc->backqh; \ + if (!qc->front) { /* empty queue */ \ + qc->front = qc->back = (e); \ + } else { /* non-empty queue */ \ + backqh->next = (e); \ + qc->back = (e); \ + } \ + backqh = &(e)->qh; \ + qc->size++; \ + pthread_mutex_unlock(&qc->mutex); \ + pthread_cond_signal(&qc->cond); /* broadcast to all? */ \ } \ - (queue)->qh.q->size++; \ } while (0) -#define QUEUE_POP(queue, element) \ +#define QUEUE_POP(q, e) \ do { \ - (element) = NULL; \ - if (queue) { \ - if ((queue)->qh.q->front != NULL) { \ - (element) = (queue)->qh.q->front; \ - (queue)->qh.q->front = (element)->qh.next; \ - (queue)->qh.q->size--; \ + (e) = NULL; \ + if (q && (q)->qh.qc) { \ + queue_core *qc = (q)->qh.qc; \ + pthread_mutex_lock(&qc->mutex); \ + while (QUEUE_SIZE(q) == 0) { \ + pthread_cond_wait(&qc->cond, &qc->mutex); \ + } \ + if ((q)->qh.qc->front != NULL) { \ + (e) = (q)->qh.qc->front; \ + (q)->qh.qc->front = (e)->qh.next; \ + (q)->qh.qc->size--; \ } \ + pthread_mutex_unlock(&qc->mutex); \ } \ } while (0) -#define QUEUE_SIZE(queue) \ - ((queue) ? queue->qh.q->size : 0U) +#define QUEUE_SIZE(q) \ + ((q) ? (q)->qh.qc->size : 0U) -#define QUEUE_FREE(queue) \ +#define QUEUE_FREE(q) \ do { \ - if (queue && queue->qh.q) { \ - free(queue->qh.q); \ + if ((q) && (q)->qh.qc) { \ + queue_core *qc = (q)->qh.qc; \ + pthread_cond_destroy(&qc->cond); \ + pthread_mutex_destroy(&qc->mutex); \ + free(qc); \ } \ } while (0) diff --git a/tests/Makefile b/tests/Makefile index abe170e..80d5304 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -1,11 +1,11 @@ CC = gcc STD = -std=c99 -CFLAGS = $(STD) -I ../src +CFLAGS = $(STD) -pthread -I ../src LIBS = -lcheck check: check_queue.c ../src/queue.h - $(CC) $(CFLAGS) check_queue.c -o check_queue $(LIBS) - ./check_queue - rm check_queue + $(CC) $(CFLAGS) check_queue.c -o check_queue.out $(LIBS) + ./check_queue.out + rm check_queue.out .PHONY: check