[worker] Switch worker to use evthr threadpool

This commit is contained in:
ejurgensen 2023-01-10 16:22:06 +01:00
parent 68d66c3229
commit 3377faffb8

@ -29,16 +29,455 @@
#include <time.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <sys/queue.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <pthread.h>
#include <event2/event.h>
#include <event2/thread.h>
#include "db.h"
#include "logger.h"
#include "worker.h"
#include "commands.h"
#include "misc.h"
#define THREADPOOL_NTHREADS 4
struct evthr_pool;
static struct evthr_pool *worker_threadpool;
/* ----------------- Thread handling borrowed from libevhtp ----------------- */
#ifndef TAILQ_FOREACH_SAFE
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = TAILQ_FIRST((head)); \
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
(var) = (tvar))
#endif
#define _evthr_read(thr, cmd, sock) \
(recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0
#define EVTHR_SHARED_PIPE 1
enum evthr_res {
EVTHR_RES_OK = 0,
EVTHR_RES_BACKLOG,
EVTHR_RES_RETRY,
EVTHR_RES_NOCB,
EVTHR_RES_FATAL
};
struct evthr;
typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared);
typedef void (*evthr_init_cb)(struct evthr *thr, void *shared);
typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared);
struct evthr_cmd {
uint8_t stop;
void *args;
evthr_cb cb;
} __attribute__((packed));
struct evthr_pool {
#ifdef EVTHR_SHARED_PIPE
int rdr;
int wdr;
#endif
int nthreads;
TAILQ_HEAD(evthr_pool_slist, evthr) threads;
};
struct evthr {
int rdr;
int wdr;
char err;
struct event *event;
struct event_base *evbase;
pthread_mutex_t lock;
pthread_t *thr;
evthr_init_cb init_cb;
evthr_exit_cb exit_cb;
void *arg;
void *aux;
#ifdef EVTHR_SHARED_PIPE
int pool_rdr;
struct event *shared_pool_ev;
#endif
TAILQ_ENTRY(evthr) next;
};
static void
_evthr_read_cmd(evutil_socket_t sock, short which, void *args)
{
struct evthr *thread;
struct evthr_cmd cmd;
int stopped;
if (!(thread = (struct evthr *)args)) {
return;
}
stopped = 0;
if (_evthr_read(thread, &cmd, sock) == 1) {
stopped = cmd.stop;
if (cmd.cb != NULL) {
(cmd.cb)(thread, cmd.args, thread->arg);
}
}
if (stopped == 1) {
event_base_loopbreak(thread->evbase);
}
return;
}
static void *
_evthr_loop(void *args)
{
struct evthr *thread;
if (!(thread = (struct evthr *)args)) {
return NULL;
}
if (thread == NULL || thread->thr == NULL) {
pthread_exit(NULL);
}
thread->evbase = event_base_new();
thread->event = event_new(thread->evbase, thread->rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->event, NULL);
#ifdef EVTHR_SHARED_PIPE
if (thread->pool_rdr > 0) {
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->shared_pool_ev, NULL);
}
#endif
pthread_mutex_lock(&thread->lock);
if (thread->init_cb != NULL) {
(thread->init_cb)(thread, thread->arg);
}
pthread_mutex_unlock(&thread->lock);
CHECK_ERR(L_MAIN, thread->err);
event_base_loop(thread->evbase, 0);
pthread_mutex_lock(&thread->lock);
if (thread->exit_cb != NULL) {
(thread->exit_cb)(thread, thread->arg);
}
pthread_mutex_unlock(&thread->lock);
pthread_exit(NULL);
}
static enum evthr_res
evthr_defer(struct evthr *thread, evthr_cb cb, void *arg)
{
struct evthr_cmd cmd = {
.cb = cb,
.args = arg,
.stop = 0
};
if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
return EVTHR_RES_RETRY;
}
return EVTHR_RES_OK;
}
static enum evthr_res
evthr_stop(struct evthr *thread)
{
struct evthr_cmd cmd = {
.cb = NULL,
.args = NULL,
.stop = 1
};
if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) {
return EVTHR_RES_RETRY;
}
pthread_join(*thread->thr, NULL);
return EVTHR_RES_OK;
}
static struct event_base *
evthr_get_base(struct evthr * thr)
{
return thr ? thr->evbase : NULL;
}
static void
evthr_free(struct evthr *thread)
{
if (thread == NULL) {
return;
}
if (thread->rdr > 0) {
close(thread->rdr);
}
if (thread->wdr > 0) {
close(thread->wdr);
}
if (thread->thr) {
free(thread->thr);
}
if (thread->event) {
event_free(thread->event);
}
#ifdef EVTHR_SHARED_PIPE
if (thread->shared_pool_ev) {
event_free(thread->shared_pool_ev);
}
#endif
if (thread->evbase) {
event_base_free(thread->evbase);
}
free(thread);
}
static struct evthr *
evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
{
struct evthr *thread;
int fds[2];
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
return NULL;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
if (!(thread = calloc(1, sizeof(struct evthr)))) {
return NULL;
}
thread->thr = malloc(sizeof(pthread_t));
thread->arg = args;
thread->rdr = fds[0];
thread->wdr = fds[1];
thread->init_cb = init_cb;
thread->exit_cb = exit_cb;
if (pthread_mutex_init(&thread->lock, NULL)) {
evthr_free(thread);
return NULL;
}
return thread;
}
static int
evthr_start(struct evthr *thread)
{
if (thread == NULL || thread->thr == NULL) {
return -1;
}
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
return -1;
}
return 0;
}
static void
evthr_pool_free(struct evthr_pool *pool)
{
struct evthr *thread;
struct evthr *save;
if (pool == NULL) {
return;
}
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
TAILQ_REMOVE(&pool->threads, thread, next);
evthr_free(thread);
}
free(pool);
}
static enum evthr_res
evthr_pool_stop(struct evthr_pool *pool)
{
struct evthr *thr;
struct evthr *save;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
evthr_stop(thr);
}
return EVTHR_RES_OK;
}
static inline int
get_backlog_(struct evthr *thread)
{
int backlog = 0;
ioctl(thread->rdr, FIONREAD, &backlog);
return (int)(backlog / sizeof(struct evthr_cmd));
}
static enum evthr_res
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg)
{
#ifdef EVTHR_SHARED_PIPE
struct evthr_cmd cmd = {
.cb = cb,
.args = arg,
.stop = 0
};
if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) {
return EVTHR_RES_RETRY;
}
return EVTHR_RES_OK;
#endif
struct evthr *thread = NULL;
struct evthr *min_thread = NULL;
int min_backlog = 0;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
if (cb == NULL) {
return EVTHR_RES_NOCB;
}
TAILQ_FOREACH(thread, &pool->threads, next) {
int backlog = get_backlog_(thread);
if (backlog == 0) {
min_thread = thread;
break;
}
if (min_thread == NULL || backlog < min_backlog) {
min_thread = thread;
min_backlog = backlog;
}
}
return evthr_defer(min_thread, cb, arg);
}
static struct evthr_pool *
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
{
struct evthr_pool *pool;
int i;
#ifdef EVTHR_SHARED_PIPE
int fds[2];
#endif
if (nthreads == 0) {
return NULL;
}
if (!(pool = calloc(1, sizeof(struct evthr_pool)))) {
return NULL;
}
pool->nthreads = nthreads;
TAILQ_INIT(&pool->threads);
#ifdef EVTHR_SHARED_PIPE
if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
return NULL;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
pool->rdr = fds[0];
pool->wdr = fds[1];
#endif
for (i = 0; i < nthreads; i++) {
struct evthr * thread;
if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
evthr_pool_free(pool);
return NULL;
}
#ifdef EVTHR_SHARED_PIPE
thread->pool_rdr = fds[0];
#endif
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
}
return pool;
}
static int
evthr_pool_start(struct evthr_pool *pool)
{
struct evthr *evthr = NULL;
if (pool == NULL) {
return -1;
}
TAILQ_FOREACH(evthr, &pool->threads, next) {
if (evthr_start(evthr) < 0) {
return -1;
}
usleep(5000);
}
return 0;
}
/* ----------------------------- CALLBACK EXECUTION ------------------------- */
/* Worker threads */
struct worker_arg
{
@ -49,19 +488,6 @@ struct worker_arg
};
/* --- Globals --- */
// worker thread
static pthread_t tid_worker;
// Event base, pipes and events
struct event_base *evbase_worker;
static int g_initialized;
static struct commands_base *cmdbase;
/* ---------------------------- CALLBACK EXECUTION ------------------------- */
/* Thread: worker */
static void
execute_cb(int fd, short what, void *arg)
{
@ -74,64 +500,51 @@ execute_cb(int fd, short what, void *arg)
free(cmdarg);
}
static enum command_state
execute(void *arg, int *retval)
static void
execute(struct evthr *thr, void *arg, void *shared)
{
struct worker_arg *cmdarg = arg;
struct timeval tv = { cmdarg->delay, 0 };
struct event_base *evbase;
if (cmdarg->delay)
{
cmdarg->timer = evtimer_new(evbase_worker, execute_cb, cmdarg);
evbase = evthr_get_base(thr);
cmdarg->timer = evtimer_new(evbase, execute_cb, cmdarg);
evtimer_add(cmdarg->timer, &tv);
*retval = 0;
return COMMAND_PENDING; // Not done yet, ask caller not to free cmd
return;
}
cmdarg->cb(cmdarg->cb_arg);
free(cmdarg->cb_arg);
*retval = 0;
return COMMAND_END;
free(cmdarg);
}
/* --------------------------------- MAIN --------------------------------- */
/* Thread: worker */
static void *
worker(void *arg)
static void
init_cb(struct evthr *thr, void *shared)
{
int ret;
ret = db_perthread_init();
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Error: DB init failed (worker thread)\n");
pthread_exit(NULL);
DPRINTF(E_FATAL, L_MAIN, "Error: DB init failed (worker thread)\n");
thr->err = EIO;
return;
}
g_initialized = 1;
thread_setname(pthread_self(), "worker");
}
event_base_dispatch(evbase_worker);
if (g_initialized)
static void
exit_cb(struct evthr *thr, void *shared)
{
DPRINTF(E_LOG, L_MAIN, "Worker event loop terminated ahead of time!\n");
g_initialized = 0;
}
db_perthread_deinit();
pthread_exit(NULL);
}
/* ---------------------------- Our worker API --------------------------- */
/* Thread: player */
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
{
@ -164,7 +577,7 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
cmdarg->cb_arg = argcpy;
cmdarg->delay = delay;
commands_exec_async(cmdbase, execute, cmdarg);
evthr_pool_defer(worker_threadpool, execute, cmdarg);
}
int
@ -172,51 +585,30 @@ worker_init(void)
{
int ret;
evbase_worker = event_base_new();
if (!evbase_worker)
worker_threadpool = evthr_pool_wexit_new(THREADPOOL_NTHREADS, init_cb, exit_cb, NULL);
if (!worker_threadpool)
{
DPRINTF(E_LOG, L_MAIN, "Could not create an event base\n");
goto evbase_fail;
DPRINTF(E_LOG, L_MAIN, "Could not create worker thread pool\n");
goto error;
}
cmdbase = commands_base_new(evbase_worker, NULL);
ret = pthread_create(&tid_worker, NULL, worker, NULL);
ret = evthr_pool_start(worker_threadpool);
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not spawn worker thread: %s\n", strerror(errno));
goto thread_fail;
DPRINTF(E_LOG, L_MAIN, "Could not spawn worker threads\n");
goto error;
}
thread_setname(tid_worker, "worker");
return 0;
thread_fail:
commands_base_free(cmdbase);
event_base_free(evbase_worker);
evbase_worker = NULL;
evbase_fail:
error:
worker_deinit();
return -1;
}
void
worker_deinit(void)
{
int ret;
g_initialized = 0;
commands_base_destroy(cmdbase);
ret = pthread_join(tid_worker, NULL);
if (ret != 0)
{
DPRINTF(E_FATAL, L_MAIN, "Could not join worker thread: %s\n", strerror(errno));
return;
}
// Free event base (should free events too)
event_base_free(evbase_worker);
evthr_pool_stop(worker_threadpool);
evthr_pool_free(worker_threadpool);
}