diff --git a/src/worker.c b/src/worker.c index da6ba097..dbd4a61b 100644 --- a/src/worker.c +++ b/src/worker.c @@ -39,29 +39,15 @@ #include "db.h" #include "logger.h" #include "worker.h" +#include "commands.h" -struct worker_command; - -typedef int (*cmd_func)(struct worker_command *cmd); - -struct worker_command +struct worker_arg { - pthread_mutex_t lck; - pthread_cond_t cond; - - cmd_func func; - - int nonblock; - - struct { - void (*cb)(void *); - void *cb_arg; - int delay; - struct event *timer; - } arg; - - int ret; + void (*cb)(void *); + void *cb_arg; + int delay; + struct event *timer; }; @@ -73,9 +59,9 @@ static pthread_t tid_worker; struct event_base *evbase_worker; static int g_initialized; static int g_exit_pipe[2]; -static int g_cmd_pipe[2]; static struct event *g_exitev; -static struct event *g_cmdev; +static struct commands_base *cmdbase; + /* ---------------------------- CALLBACK EXECUTION ------------------------- */ /* Thread: worker */ @@ -83,71 +69,39 @@ static struct event *g_cmdev; static void execute_cb(int fd, short what, void *arg) { - struct worker_command *cmd = arg; + struct worker_arg *cmdarg = arg; - cmd->arg.cb(cmd->arg.cb_arg); + cmdarg->cb(cmdarg->cb_arg); - event_free(cmd->arg.timer); - free(cmd->arg.cb_arg); - free(cmd); + event_free(cmdarg->timer); + free(cmdarg->cb_arg); + free(cmdarg); } -static int -execute(struct worker_command *cmd) +static enum command_state +execute(void *arg, int *retval) { - struct timeval tv = { cmd->arg.delay, 0 }; + struct worker_arg *cmdarg = arg; + struct timeval tv = { cmdarg->delay, 0 }; - if (cmd->arg.delay) + if (cmdarg->delay) { - cmd->arg.timer = evtimer_new(evbase_worker, execute_cb, cmd); - evtimer_add(cmd->arg.timer, &tv); + cmdarg->timer = evtimer_new(evbase_worker, execute_cb, cmdarg); + evtimer_add(cmdarg->timer, &tv); - return 1; // Not done yet, ask caller not to free cmd + *retval = 0; + return COMMAND_PENDING; // Not done yet, ask caller not to free cmd } - cmd->arg.cb(cmd->arg.cb_arg); - free(cmd->arg.cb_arg); + cmdarg->cb(cmdarg->cb_arg); + free(cmdarg->cb_arg); - return 0; + *retval = 0; + return COMMAND_END; } -/* ---------------------------- COMMAND EXECUTION -------------------------- */ - -static int -send_command(struct worker_command *cmd) -{ - int ret; - - if (!cmd->func) - { - DPRINTF(E_LOG, L_MAIN, "BUG: cmd->func is NULL!\n"); - return -1; - } - - ret = write(g_cmd_pipe[1], &cmd, sizeof(cmd)); - if (ret != sizeof(cmd)) - { - DPRINTF(E_LOG, L_MAIN, "Could not send command: %s\n", strerror(errno)); - return -1; - } - - return 0; -} - -static int -nonblock_command(struct worker_command *cmd) -{ - int ret; - - ret = send_command(cmd); - if (ret < 0) - return -1; - - return 0; -} - /* Thread: main */ static void thread_exit(void) @@ -209,40 +163,6 @@ exit_cb(int fd, short what, void *arg) event_add(g_exitev, NULL); } -static void -command_cb(int fd, short what, void *arg) -{ - struct worker_command *cmd; - int ret; - - ret = read(g_cmd_pipe[0], &cmd, sizeof(cmd)); - if (ret != sizeof(cmd)) - { - DPRINTF(E_LOG, L_MAIN, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-"); - goto readd; - } - - if (cmd->nonblock) - { - ret = cmd->func(cmd); - - if (ret == 0) - free(cmd); - goto readd; - } - - pthread_mutex_lock(&cmd->lck); - - ret = cmd->func(cmd); - cmd->ret = ret; - - pthread_cond_signal(&cmd->cond); - pthread_mutex_unlock(&cmd->lck); - - readd: - event_add(g_cmdev, NULL); -} - /* ---------------------------- Our worker API --------------------------- */ @@ -250,19 +170,19 @@ command_cb(int fd, short what, void *arg) void worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay) { - struct worker_command *cmd; + struct worker_arg *cmdarg; void *argcpy; DPRINTF(E_DBG, L_MAIN, "Got worker execute request\n"); - cmd = (struct worker_command *)malloc(sizeof(struct worker_command)); - if (!cmd) + cmdarg = (struct worker_arg *)malloc(sizeof(struct worker_arg)); + if (!cmdarg) { - DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_command\n"); + DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_arg\n"); return; } - memset(cmd, 0, sizeof(struct worker_command)); + memset(cmdarg, 0, sizeof(struct worker_arg)); argcpy = malloc(arg_size); if (!argcpy) @@ -273,15 +193,11 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay) memcpy(argcpy, cb_arg, arg_size); - cmd->nonblock = 1; - cmd->func = execute; - cmd->arg.cb = cb; - cmd->arg.cb_arg = argcpy; - cmd->arg.delay = delay; + cmdarg->cb = cb; + cmdarg->cb_arg = argcpy; + cmdarg->delay = delay; - nonblock_command(cmd); - - return; + commands_exec_async(cmdbase, execute, cmdarg); } int @@ -300,17 +216,6 @@ worker_init(void) goto exit_fail; } -#ifdef HAVE_PIPE2 - ret = pipe2(g_cmd_pipe, O_CLOEXEC); -#else - ret = pipe(g_cmd_pipe); -#endif - if (ret < 0) - { - DPRINTF(E_LOG, L_MAIN, "Could not create command pipe: %s\n", strerror(errno)); - goto cmd_fail; - } - evbase_worker = event_base_new(); if (!evbase_worker) { @@ -325,15 +230,9 @@ worker_init(void) goto evnew_fail; } - g_cmdev = event_new(evbase_worker, g_cmd_pipe[0], EV_READ, command_cb, NULL); - if (!g_cmdev) - { - DPRINTF(E_LOG, L_MAIN, "Could not create cmd event\n"); - goto evnew_fail; - } + cmdbase = commands_base_new(evbase_worker); event_add(g_exitev, NULL); - event_add(g_cmdev, NULL); ret = pthread_create(&tid_worker, NULL, worker, NULL); if (ret < 0) @@ -352,15 +251,12 @@ worker_init(void) return 0; thread_fail: + commands_base_free(cmdbase); evnew_fail: event_base_free(evbase_worker); evbase_worker = NULL; evbase_fail: - close(g_cmd_pipe[0]); - close(g_cmd_pipe[1]); - - cmd_fail: close(g_exit_pipe[0]); close(g_exit_pipe[1]); @@ -383,11 +279,10 @@ worker_deinit(void) } // Free event base (should free events too) + commands_base_free(cmdbase); event_base_free(evbase_worker); // Close pipes - close(g_cmd_pipe[0]); - close(g_cmd_pipe[1]); close(g_exit_pipe[0]); close(g_exit_pipe[1]); }