Skip to content

A fast lock free generic data queue written in C

Notifications You must be signed in to change notification settings

shaneyuee/shmqueue

Repository files navigation

[中文描述]

一个基于CAS实现的无锁共享内存队列,支持任意大块数据的超高性能存储,在Linux下可以使用fifo进行事件通知,消费方可以同时轮询网络socket FD和队列生成的FIFO FD,满足实时性的消费需求。目前已经支持linux、mac、windows操作系统,开发语言支持golang和python调用。

特点

  • 1,高性能,实测800w/s以上;
  • 2,实时通知,可用epoll侦听;
  • 3,高并发,支持同时多写多读。
  • 4,支持多语言、多平台,目前可以支持Go和Python语言,也可以在Windows运行。

主要功能

  • 1,高性能访问,读写可以达到解决访问内存的性能;
  • 2,高并发访问,无锁解决读写、读读、写写冲突;
  • 3,带fifo fd通知,可以select或epoll通知句柄;
  • 4,基于共享内存创建;
  • 5,大小变化时自动销毁并重新创建共享内存;
  • 6,使用分块存储策略,便于高性能访问;
  • 7,发现错误块时跳过错误的块,快速自动恢复;

[English Description]

This is a very fast lock-free generic data queue implemented based on share memory (Linux shmget/shmat API, Windows CreateFileMapping/OpenFileMapping API), it is lock-free so that multiple writers and readers can access the same queue synchronously without need of locking.

Now we have supported Windows platform and golang (with cgo wrapper) / python, we will cover more platforms and languages as needed.

Usage example

//
// Below is a test program, please compile with:
// gcc -o sqtest test.c shm_queue.c
//

static char m[1024*1024];

void test_put(struct shm_queue *queue, int proc_count, int count, char *msg)
{
	int i;
	int pid = 0;

	for(i=1; i<=proc_count; i++)
	{
		if(fork()==0)
		{
			pid=i;
			break;
		}
	}

	for(i=0; pid && i<count; i++)
	{
		sprintf(m, "[%d:%d] %s", pid, i, msg);
		if(sq_put(queue, m, strlen(m))<0)
		{
			printf("put msg[%d] failed: %s\n", i, sq_errorstr(queue));
			return;
		}
	}
	if(pid) exit(0);
	while(wait(NULL)>0);
	printf("put successfully\n");
}

void test_get(struct shm_queue *queue, int proc_count, int count)
{
	int i;
	int pid = 0;

	for(i=1; i<=proc_count; i++)
	{
		if(fork()==0)
		{
			pid=i;
			break;
		}
	}

	if(pid)
	{
		// Note: each process has a uniq event fd, which is bond to process id
		// If your server forks several processes, this function should be called after fork()
		int event_fd = sq_get_eventfd(queue);
		printf("child %d event_fd=%d\n", pid, event_fd);

		for(i=0; i<count; i++)
		{
			fd_set fdset;
			FD_ZERO(&fdset);
			FD_SET(event_fd, &fdset);
			struct timeval to;
			to.tv_sec = 100;
			to.tv_usec = 0;

			sq_sigon(queue); // now we are entering into sleeping sys call
			int ret = select(event_fd+1, &fdset, NULL, NULL, &to);
			sq_sigoff(queue); // no longer needs signal
			if(ret<0)
			{
				printf("select failed: %s\n", strerror(errno));
				continue;
			}

			if(FD_ISSET(event_fd, &fdset))
				sq_consume_event(queue);

			struct timeval tv;
			int l = sq_get(queue, m, sizeof(m), &tv);
			if(l<0)
			{
				printf("sq_get failed: %s\n", sq_errorstr(queue));
				break;
			}
			if(l==0)// no data
			{
				i --;
				continue;
			}
			// if we are able to retrieve data from queue, always
			// try it without sleeping
			m[l] = 0;
			printf("pid[%d] msg[%d] len[%d]: %s\n", pid, i, l, m);
		}
		exit(0);
	}
	while(wait(NULL)>0);
}

void press_test(struct shm_queue *queue, uint32_t record_count, uint32_t record_size)
{
	struct timeval tv;
	int put_count=0,get_count=0;
	for(; put_count<record_count; )
	{
		while(put_count<record_count && sq_put(queue, m, record_size)==0) put_count ++;
		while(sq_get(queue, m, sizeof(m), &tv)>0) get_count ++;
	}
	printf("put %u, get %u finished\n", put_count, get_count);
}

int main(int argc, char *argv[])
{
	struct shm_queue *queue;
	long key;

	if(argc<3)
	{
badarg:
		printf("Usage: \n");
		printf("     %s open <key> \n", argv[0]);
		printf("     %s openid <shm_id>\n", argv[0]);
		printf("     %s create <key> <element_size> <element_count>\n", argv[0]);
		printf("     %s press <key> <record_count> <record_size>\n", argv[0]);
		printf("key can be IPC_PRIVATE or 0\n");
		return -1;
	}

	const char *oper = "Open";
	if(strcasecmp(argv[2], "IPC_PRIVATE")==0)
		key = IPC_PRIVATE;
	else if(strncasecmp(argv[2], "0x", 2)==0)
		key = strtoul(argv[2]+2, NULL, 16);
	else
		key = strtoul(argv[2], NULL, 10); 

	if(strcmp(argv[1], "open")==0 || strcmp(argv[1], "press")==0)
	{
		queue = sq_open(key);
	}
	else if(strcmp(argv[1], "openid")==0)
	{
		queue = sq_open_by_shmid(key);
	}
	else if(strcmp(argv[1], "create")==0 && argc==5)
	{
		oper = "Create";
		queue = sq_create(key, strtoul(argv[3], NULL, 10), strtoul(argv[4], NULL, 10), 1, 2);
	}
	else
	{
		goto badarg;
	}

	if(queue==NULL)
	{
		printf("Failed to open shm queue: %s\n", sq_errorstr(NULL));
		return -1;
	}
	printf("%s shm (0x%lX) successfully, shm ID is %d.\n", oper, key, sq_get_shmid(queue));

	if(strcmp(argv[1], "press")==0)
	{
		if(argc!=5) goto badarg;
		press_test(queue, strtoul(argv[3], NULL, 10), strtoul(argv[4], NULL, 10));
		return 0;
	}

	while(1)
	{
		static char cmd[1024*1024];
		printf("Available commands: \n");
		printf("  put <concurrent_proc_count> <msg_count> <msg>\n");
		printf("  get <concurrent_proc_count> <msg_count>\n");
		printf("  quit\n");
		printf("cmd>"); fflush(stdout);
		if(gets(cmd)==NULL)
			return 0;
		if(strncmp(cmd, "put ", 4)==0)
		{
			char *pstr = cmd + 4;
			while(isspace(*pstr)) pstr ++;
			int proc_count = atoi(pstr);
			if(proc_count<1) proc_count = 1;
			while(isdigit(*pstr)) pstr ++;
			while(isspace(*pstr)) pstr ++;
			int count = atoi(pstr);
			if(count<1) count = 1;
			while(isdigit(*pstr)) pstr ++;
			while(isspace(*pstr)) pstr ++;
			test_put(queue, proc_count, count, pstr);
		}
		else if(strncmp(cmd, "get ", 4)==0)
		{
			char *pstr = cmd + 4;
			while(isspace(*pstr)) pstr ++;
			int proc_count = atoi(pstr);
			if(proc_count<1) proc_count = 1;
			while(isdigit(*pstr)) pstr ++;
			while(isspace(*pstr)) pstr ++;
			int count = atoi(pstr);
			if(count<1) count = 1;
			test_get(queue, proc_count, count);
		}
		else if(strncmp(cmd, "quit", 4)==0 || strncmp(cmd, "exit", 4)==0)
		{
			return 0;
		}
	}
	return 0;
}

About

A fast lock free generic data queue written in C

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published