Skip to content

Commit

Permalink
command line option to switch between threads and processes
Browse files Browse the repository at this point in the history
This commit enables the choice between using threads or (forked)
processes when using parallel seeding. The default is to use a single
thread. Support for multiple processes is disabled on windows platforms
due to the lack of ipc message queues.
Setting the size of the ipc message queue (msg_qbytes) might be
unsupported on osx, needs checking.
  • Loading branch information
tbonfort committed Aug 20, 2012
1 parent fd78257 commit 31db17a
Showing 1 changed file with 121 additions and 80 deletions.
201 changes: 121 additions & 80 deletions util/mapcache_seed.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <time.h>
#ifndef _WIN32
#include <unistd.h>
#define nUSE_FORK
#define USE_FORK
#include <sys/time.h>
#endif

Expand All @@ -47,10 +47,11 @@
int msqid;
#include <sys/ipc.h>
#include <sys/msg.h>
#else
#include <errno.h>
#endif

#include <apr_queue.h>
apr_queue_t *work_queue;
#endif

#if defined(USE_OGR) && defined(USE_GEOS)
#define USE_CLIPPERS
Expand All @@ -71,7 +72,8 @@ apr_table_t *dimensions;
int minzoom=-1;
int maxzoom=-1;
mapcache_grid_link *grid_link;
int nthreads=1;
int nthreads=0;
int nprocesses=0;
int quiet = 0;
int verbose = 0;
int force = 0;
Expand Down Expand Up @@ -118,67 +120,72 @@ cmd mode = MAPCACHE_CMD_SEED; /* the mode the utility will be running in: either
int push_queue(struct seed_cmd cmd)
{
#ifdef USE_FORK
struct msg_cmd mcmd;
mcmd.mtype = 1;
mcmd.cmd = cmd;
if (msgsnd(msqid, &mcmd, sizeof(struct seed_cmd), 0) == -1) {
printf("failed to push tile %d %d %d\n",cmd.z,cmd.y,cmd.x);
return APR_EGENERAL;
if(nprocesses > 1) {
struct msg_cmd mcmd;
mcmd.mtype = 1;
mcmd.cmd = cmd;
if (msgsnd(msqid, &mcmd, sizeof(struct seed_cmd), 0) == -1) {
printf("failed to push tile %d %d %d\n",cmd.z,cmd.y,cmd.x);
return APR_EGENERAL;
}
return APR_SUCCESS;
}
return APR_SUCCESS;
#else
#endif
struct seed_cmd *pcmd = calloc(1,sizeof(struct seed_cmd));
*pcmd = cmd;
return apr_queue_push(work_queue,pcmd);
#endif
}

int pop_queue(struct seed_cmd *cmd)
{
#ifdef USE_FORK
struct msg_cmd mcmd;
if (msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, 0) == -1) {
printf("failed to pop tile\n");
return APR_EGENERAL;
}
*cmd = mcmd.cmd;
return APR_SUCCESS;
#else
int ret;
struct seed_cmd *pcmd;

#ifdef USE_FORK
if(nprocesses > 1) {
struct msg_cmd mcmd;
if (msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, 0) == -1) {
printf("failed to pop tile\n");
return APR_EGENERAL;
}
*cmd = mcmd.cmd;
return APR_SUCCESS;
}
#endif

ret = apr_queue_pop(work_queue, (void**)&pcmd);
if(ret == APR_SUCCESS) {
*cmd = *pcmd;
free(pcmd);
}
return ret;
#endif
}

int trypop_queue(struct seed_cmd *cmd)
{
#ifdef USE_FORK
int ret;
struct msg_cmd mcmd;
ret = msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, IPC_NOWAIT);
if(errno == ENOMSG) return APR_EAGAIN;
if(ret>0) {
*cmd = mcmd.cmd;
return APR_SUCCESS;
} else {
printf("failed to trypop tile\n");
return APR_EGENERAL;
}
#else
int ret;
struct seed_cmd *pcmd;

#ifdef USE_FORK
if(nprocesses>1) {
struct msg_cmd mcmd;
ret = msgrcv(msqid, &mcmd, sizeof(struct seed_cmd), 1, IPC_NOWAIT);
if(errno == ENOMSG) return APR_EAGAIN;
if(ret>0) {
*cmd = mcmd.cmd;
return APR_SUCCESS;
} else {
printf("failed to trypop tile\n");
return APR_EGENERAL;
}
}
#endif
ret = apr_queue_trypop(work_queue,(void**)&pcmd);
if(ret == APR_SUCCESS) {
*cmd = *pcmd;
free(pcmd);
}
return ret;
#endif
}

static const apr_getopt_option_t seed_options[] = {
Expand All @@ -189,7 +196,8 @@ static const apr_getopt_option_t seed_options[] = {
{ "zoom", 'z', TRUE, "min and max zoomlevels to seed, separated by a comma. eg 0,6" },
{ "metasize", 'M', TRUE, "override metatile size while seeding, eg 8,8" },
{ "extent", 'e', TRUE, "extent to seed, format: minx,miny,maxx,maxy" },
{ "nthreads", 'n', TRUE, "number of parallel threads to use" },
{ "nthreads", 'n', TRUE, "number of parallel threads to use (incompatible with -p/--nprocesses)" },
{ "nprocesses", 'p', TRUE, "number of parallel processes to use (incompatible with -n/--nthreads)" },
{ "mode", 'm', TRUE, "mode: seed (default), delete or transfer" },
{ "older", 'o', TRUE, "reseed tiles older than supplied date (format: year/month/day hour:minute, eg: 2011/01/31 20:45" },
{ "dimension", 'D', TRUE, "set the value of a dimension (format DIMENSIONNAME=VALUE). Can be used multiple times for multiple dimensions" },
Expand Down Expand Up @@ -277,7 +285,8 @@ void progresslog(int x, int y, int z)
{
char msg[1024];
if(quiet) return;

int nworkers = nthreads;
if(nprocesses >= 1) nworkers = nprocesses;

sprintf(msg,"seeding tile %d %d %d",x,y,z);
if(lastmsglen) {
Expand All @@ -293,11 +302,11 @@ void progresslog(int x, int y, int z)
fflush(NULL);
return;

if(queuedtilestot>nthreads) {
if(queuedtilestot>nworkers) {
struct mctimeval now_t;
float duration;
float totalduration;
seededtilestot = queuedtilestot - nthreads;
seededtilestot = queuedtilestot - nworkers;

mapcache_gettimeofday(&now_t,NULL);
duration = ((now_t.tv_sec-lastlogtime.tv_sec)*1000000+(now_t.tv_usec-lastlogtime.tv_usec))/1000000.0;
Expand Down Expand Up @@ -503,14 +512,16 @@ void cmd_recurse(mapcache_context *cmd_ctx, mapcache_tile *tile)
tile->z = curz;
}

void cmd_thread()
void cmd_worker()
{
int n;
mapcache_tile *tile;
int z = minzoom;
int x = grid_link->grid_limits[z][0];
int y = grid_link->grid_limits[z][1];
mapcache_context cmd_ctx = ctx;
int nworkers = nthreads;
if(nprocesses >= 1) nworkers = nprocesses;
apr_pool_create(&cmd_ctx.pool,ctx.pool);
tile = mapcache_tileset_tile_create(ctx.pool, tileset, grid_link);
tile->dimensions = dimensions;
Expand Down Expand Up @@ -578,7 +589,7 @@ void cmd_thread()
}
//instruct rendering threads to stop working

for(n=0; n<nthreads; n++) {
for(n=0; n<nworkers; n++) {
struct seed_cmd cmd;
cmd.command = MAPCACHE_CMD_STOP;
push_queue(cmd);
Expand All @@ -589,11 +600,8 @@ void cmd_thread()
}
}

#ifdef USE_FORK
int seed_thread(void *data)
#else
static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data)
#endif

void seed_worker()
{
mapcache_tile *tile;
mapcache_context seed_ctx = ctx;
Expand Down Expand Up @@ -637,12 +645,17 @@ static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data)
ctx.log(&ctx,MAPCACHE_INFO,seed_ctx.get_error_message(&seed_ctx));
}
}
}

#ifdef USE_FORK
int seed_process() {
seed_worker();
return 0;
#else
apr_thread_exit(thread,MAPCACHE_SUCCESS);
return NULL;
}
#endif
static void* APR_THREAD_FUNC seed_thread(apr_thread_t *thread, void *data) {
seed_worker();
return NULL;
}

void
Expand Down Expand Up @@ -703,10 +716,8 @@ int main(int argc, const char **argv)
/* initialize apr_getopt_t */
apr_getopt_t *opt;
const char *configfile=NULL;
#ifndef USE_FORK
apr_thread_t **threads;
apr_threadattr_t *thread_attrs;
#endif
const char *tileset_name=NULL;
const char *tileset_transfer_name=NULL;
const char *grid_name = NULL;
Expand Down Expand Up @@ -734,11 +745,6 @@ int main(int argc, const char **argv)
apr_pool_create(&ctx.pool,NULL);
mapcache_context_init(&ctx);
ctx.process_pool = ctx.pool;
#ifndef USE_FORK
apr_thread_mutex_create((apr_thread_mutex_t**)&ctx.threadlock,APR_THREAD_MUTEX_DEFAULT,ctx.pool);
#else
ctx.threadlock = NULL;
#endif
cfg = mapcache_configuration_create(ctx.pool);
ctx.config = cfg;
ctx.log= mapcache_context_seeding_log;
Expand Down Expand Up @@ -790,7 +796,18 @@ int main(int argc, const char **argv)
break;
case 'n':
nthreads = (int)strtol(optarg, NULL, 10);
if(nthreads <=0 )
return usage(argv[0], "failed to parse nthreads, expecting positive integer");
break;
case 'p':
#ifdef USE_FORK
nprocesses = (int)strtol(optarg, NULL, 10);
if(nprocesses <=0 )
return usage(argv[0], "failed to parse nprocesses, expecting positive integer");
break;
#else
return usage(argv[0], "multi process seeding not available on this platform");
#endif
case 'e':
if ( MAPCACHE_SUCCESS != mapcache_util_extract_double_list(&ctx, (char*)optarg, ",", &extent, &n) ||
n != 4 || extent[0] >= extent[2] || extent[1] >= extent[3] ) {
Expand Down Expand Up @@ -1079,40 +1096,65 @@ int main(int argc, const char **argv)

}

if( ! nthreads ) {
return usage(argv[0],"failed to parse nthreads, must be int");
} else {

if(nthreads == 0 && nprocesses == 0) {
nthreads = 1;
}
if(nthreads >= 1 && nprocesses >= 1) {
return usage(argv[0],"cannot set both nthreads and nprocesses");
}
if(nprocesses > 1) {
#ifdef USE_FORK
key_t key;
int i;
pid_t *pids = malloc(nthreads*sizeof(pid_t));
pid_t *pids = malloc(nprocesses*sizeof(pid_t));
struct msqid_ds queue_ds;
ctx.threadlock = NULL;
key = ftok(argv[0], 'B');
if ((msqid = msgget(key, 0644 | IPC_CREAT|S_IRUSR|S_IWUSR)) == -1) {
return usage(argv[0],"failed to create sysv ipc message queue");
}
if (-1 == msgctl(msqid, IPC_STAT, &queue_ds)) {
return usage(argv[0], "\nFailure in msgctl() 1");
return usage(argv[0], "\nFailure in msgctl() stat");
}
queue_ds.msg_qbytes = nprocesses*sizeof(struct seed_cmd);
if(-1 == msgctl(msqid, IPC_SET, &queue_ds)) {
switch(errno) {
case EACCES:
return usage(argv[0], "\nFailure in msgctl() set qbytes: EACCESS (should not happen here)");
case EFAULT:
return usage(argv[0], "\nFailure in msgctl() set qbytes: EFAULT queue not accessible");
case EIDRM:
return usage(argv[0], "\nFailure in msgctl() set qbytes: EIDRM message queue removed");
case EINVAL:
return usage(argv[0], "\nFailure in msgctl() set qbytes: EINVAL invalid value for msg_qbytes");
case EPERM:
return usage(argv[0], "\nFailure in msgctl() set qbytes: EPERM permission denied on msg_qbytes");
default:
return usage(argv[0], "\nFailure in msgctl() set qbytes: unknown");
}
}

for(i=0; i<nthreads; i++) {
for(i=0; i<nprocesses; i++) {
int pid = fork();
if(pid==0) {
seed_thread(NULL);
seed_process();
exit(0);
} else {
pids[i] = pid;
}
}
cmd_thread();
for(i=0; i<nthreads; i++) {
cmd_worker();
for(i=0; i<nprocesses; i++) {
int stat_loc;
waitpid(pids[i],&stat_loc,0);
}
msgctl(msqid,IPC_RMID,NULL);
#else
return usage(argv[0],"bug: multi process support not available");
#endif
} else {
//start the thread that will populate the queue.
apr_thread_mutex_create((apr_thread_mutex_t**)&ctx.threadlock,APR_THREAD_MUTEX_DEFAULT,ctx.pool);
//create the queue where tile requests will be put
apr_queue_create(&work_queue,nthreads,ctx.pool);

Expand All @@ -1122,22 +1164,21 @@ int main(int argc, const char **argv)
for(n=0; n<nthreads; n++) {
apr_thread_create(&threads[n], thread_attrs, seed_thread, NULL, ctx.pool);
}
cmd_thread();
cmd_worker();
for(n=0; n<nthreads; n++) {
apr_thread_join(&rv, threads[n]);
}
#endif
if(ctx.get_error(&ctx)) {
printf("%s",ctx.get_error_message(&ctx));
}
}
if(ctx.get_error(&ctx)) {
printf("%s",ctx.get_error_message(&ctx));
}

if(seededtilestot>0) {
struct mctimeval now_t;
float duration;
mapcache_gettimeofday(&now_t,NULL);
duration = ((now_t.tv_sec-starttime.tv_sec)*1000000+(now_t.tv_usec-starttime.tv_usec))/1000000.0;
printf("\nseeded %d metatiles at %g tiles/sec\n",seededtilestot, seededtilestot/duration);
}
if(seededtilestot>0) {
struct mctimeval now_t;
float duration;
mapcache_gettimeofday(&now_t,NULL);
duration = ((now_t.tv_sec-starttime.tv_sec)*1000000+(now_t.tv_usec-starttime.tv_usec))/1000000.0;
printf("\nseeded %d metatiles at %g tiles/sec\n",seededtilestot, seededtilestot/duration);
}
apr_terminate();
return 0;
Expand Down

0 comments on commit 31db17a

Please sign in to comment.