[worker] Use generic inter thread commands util

This commit is contained in:
chme 2016-05-14 07:03:30 +02:00
parent 3823900394
commit 32944d7273

View File

@ -39,29 +39,15 @@
#include "db.h"
#include "logger.h"
#include "worker.h"
#include "commands.h"
struct worker_command;
typedef int (*cmd_func)(struct worker_command *cmd);
struct worker_command
struct worker_arg
{
pthread_mutex_t lck;
pthread_cond_t cond;
cmd_func func;
int nonblock;
struct {
void (*cb)(void *);
void *cb_arg;
int delay;
struct event *timer;
} arg;
int ret;
void (*cb)(void *);
void *cb_arg;
int delay;
struct event *timer;
};
@ -73,9 +59,9 @@ static pthread_t tid_worker;
struct event_base *evbase_worker;
static int g_initialized;
static int g_exit_pipe[2];
static int g_cmd_pipe[2];
static struct event *g_exitev;
static struct event *g_cmdev;
static struct commands_base *cmdbase;
/* ---------------------------- CALLBACK EXECUTION ------------------------- */
/* Thread: worker */
@ -83,71 +69,39 @@ static struct event *g_cmdev;
static void
execute_cb(int fd, short what, void *arg)
{
struct worker_command *cmd = arg;
struct worker_arg *cmdarg = arg;
cmd->arg.cb(cmd->arg.cb_arg);
cmdarg->cb(cmdarg->cb_arg);
event_free(cmd->arg.timer);
free(cmd->arg.cb_arg);
free(cmd);
event_free(cmdarg->timer);
free(cmdarg->cb_arg);
free(cmdarg);
}
static int
execute(struct worker_command *cmd)
static enum command_state
execute(void *arg, int *retval)
{
struct timeval tv = { cmd->arg.delay, 0 };
struct worker_arg *cmdarg = arg;
struct timeval tv = { cmdarg->delay, 0 };
if (cmd->arg.delay)
if (cmdarg->delay)
{
cmd->arg.timer = evtimer_new(evbase_worker, execute_cb, cmd);
evtimer_add(cmd->arg.timer, &tv);
cmdarg->timer = evtimer_new(evbase_worker, execute_cb, cmdarg);
evtimer_add(cmdarg->timer, &tv);
return 1; // Not done yet, ask caller not to free cmd
*retval = 0;
return COMMAND_PENDING; // Not done yet, ask caller not to free cmd
}
cmd->arg.cb(cmd->arg.cb_arg);
free(cmd->arg.cb_arg);
cmdarg->cb(cmdarg->cb_arg);
free(cmdarg->cb_arg);
return 0;
*retval = 0;
return COMMAND_END;
}
/* ---------------------------- COMMAND EXECUTION -------------------------- */
static int
send_command(struct worker_command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_MAIN, "BUG: cmd->func is NULL!\n");
return -1;
}
ret = write(g_cmd_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MAIN, "Could not send command: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int
nonblock_command(struct worker_command *cmd)
{
int ret;
ret = send_command(cmd);
if (ret < 0)
return -1;
return 0;
}
/* Thread: main */
static void
thread_exit(void)
@ -209,40 +163,6 @@ exit_cb(int fd, short what, void *arg)
event_add(g_exitev, NULL);
}
static void
command_cb(int fd, short what, void *arg)
{
struct worker_command *cmd;
int ret;
ret = read(g_cmd_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MAIN, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-");
goto readd;
}
if (cmd->nonblock)
{
ret = cmd->func(cmd);
if (ret == 0)
free(cmd);
goto readd;
}
pthread_mutex_lock(&cmd->lck);
ret = cmd->func(cmd);
cmd->ret = ret;
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
readd:
event_add(g_cmdev, NULL);
}
/* ---------------------------- Our worker API --------------------------- */
@ -250,19 +170,19 @@ command_cb(int fd, short what, void *arg)
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
{
struct worker_command *cmd;
struct worker_arg *cmdarg;
void *argcpy;
DPRINTF(E_DBG, L_MAIN, "Got worker execute request\n");
cmd = (struct worker_command *)malloc(sizeof(struct worker_command));
if (!cmd)
cmdarg = (struct worker_arg *)malloc(sizeof(struct worker_arg));
if (!cmdarg)
{
DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_command\n");
DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_arg\n");
return;
}
memset(cmd, 0, sizeof(struct worker_command));
memset(cmdarg, 0, sizeof(struct worker_arg));
argcpy = malloc(arg_size);
if (!argcpy)
@ -273,15 +193,11 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
memcpy(argcpy, cb_arg, arg_size);
cmd->nonblock = 1;
cmd->func = execute;
cmd->arg.cb = cb;
cmd->arg.cb_arg = argcpy;
cmd->arg.delay = delay;
cmdarg->cb = cb;
cmdarg->cb_arg = argcpy;
cmdarg->delay = delay;
nonblock_command(cmd);
return;
commands_exec_async(cmdbase, execute, cmdarg);
}
int
@ -300,17 +216,6 @@ worker_init(void)
goto exit_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(g_cmd_pipe, O_CLOEXEC);
#else
ret = pipe(g_cmd_pipe);
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not create command pipe: %s\n", strerror(errno));
goto cmd_fail;
}
evbase_worker = event_base_new();
if (!evbase_worker)
{
@ -325,15 +230,9 @@ worker_init(void)
goto evnew_fail;
}
g_cmdev = event_new(evbase_worker, g_cmd_pipe[0], EV_READ, command_cb, NULL);
if (!g_cmdev)
{
DPRINTF(E_LOG, L_MAIN, "Could not create cmd event\n");
goto evnew_fail;
}
cmdbase = commands_base_new(evbase_worker);
event_add(g_exitev, NULL);
event_add(g_cmdev, NULL);
ret = pthread_create(&tid_worker, NULL, worker, NULL);
if (ret < 0)
@ -352,15 +251,12 @@ worker_init(void)
return 0;
thread_fail:
commands_base_free(cmdbase);
evnew_fail:
event_base_free(evbase_worker);
evbase_worker = NULL;
evbase_fail:
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
cmd_fail:
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
@ -383,11 +279,10 @@ worker_deinit(void)
}
// Free event base (should free events too)
commands_base_free(cmdbase);
event_base_free(evbase_worker);
// Close pipes
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
}