diff --git a/src/Makefile.am b/src/Makefile.am index 454dd7f2..a64b5299 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -122,6 +122,7 @@ owntone_SOURCES = main.c \ outputs/dummy.c outputs/fifo.c outputs/rcp.c \ $(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \ evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \ + evthr.c evthr.h \ $(SPOTIFY_SRC) \ $(LASTFM_SRC) \ $(MPD_SRC) \ diff --git a/src/evthr.c b/src/evthr.c new file mode 100644 index 00000000..0c236cdc --- /dev/null +++ b/src/evthr.c @@ -0,0 +1,467 @@ +/* +------------------- Thread handling borrowed from libevhtp --------------------- + +BSD 3-Clause License + +Copyright (c) 2010-2018, Mark Ellzey, Nathan French, Marcus Sundberg +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include "evthr.h" + +#ifndef TAILQ_FOREACH_SAFE +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST((head)); \ + (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ + (var) = (tvar)) +#endif + +#define _evthr_read(thr, cmd, sock) \ + (recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0 + +#define EVTHR_SHARED_PIPE 1 + +struct evthr_cmd { + uint8_t stop; + void *args; + evthr_cb cb; +} __attribute__((packed)); + +struct evthr_pool { +#ifdef EVTHR_SHARED_PIPE + int rdr; + int wdr; +#endif + int nthreads; + TAILQ_HEAD(evthr_pool_slist, evthr) threads; +}; + +struct evthr { + int rdr; + int wdr; + char err; + struct event *event; + struct event_base *evbase; + pthread_mutex_t lock; + pthread_t *thr; + evthr_init_cb init_cb; + evthr_exit_cb exit_cb; + void *arg; + void *aux; +#ifdef EVTHR_SHARED_PIPE + int pool_rdr; + struct event *shared_pool_ev; +#endif + TAILQ_ENTRY(evthr) next; +}; + + +static void +_evthr_read_cmd(evutil_socket_t sock, short which, void *args) +{ + struct evthr *thread; + struct evthr_cmd cmd; + int stopped; + + if (!(thread = (struct evthr *)args)) { + return; + } + + stopped = 0; + + if (_evthr_read(thread, &cmd, sock) == 1) { + stopped = cmd.stop; + + if (cmd.cb != NULL) { + (cmd.cb)(thread, cmd.args, thread->arg); + } + } + + if (stopped == 1) { + event_base_loopbreak(thread->evbase); + } + + return; +} + +static void * +_evthr_loop(void *args) +{ + struct evthr *thread; + + if (!(thread = (struct evthr *)args)) { + return NULL; + } + + if (thread == NULL || thread->thr == NULL) { + pthread_exit(NULL); + } + + thread->evbase = event_base_new(); + thread->event = event_new(thread->evbase, thread->rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + + event_add(thread->event, NULL); + +#ifdef EVTHR_SHARED_PIPE + if (thread->pool_rdr > 0) { + thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + event_add(thread->shared_pool_ev, NULL); + } +#endif + + pthread_mutex_lock(&thread->lock); + if (thread->init_cb != NULL) { + (thread->init_cb)(thread, thread->arg); + } + + pthread_mutex_unlock(&thread->lock); + + event_base_loop(thread->evbase, 0); + + pthread_mutex_lock(&thread->lock); + if (thread->exit_cb != NULL) { + (thread->exit_cb)(thread, thread->arg); + } + + pthread_mutex_unlock(&thread->lock); + + pthread_exit(NULL); +} + +static enum evthr_res +evthr_defer(struct evthr *thread, evthr_cb cb, void *arg) +{ + struct evthr_cmd cmd = { + .cb = cb, + .args = arg, + .stop = 0 + }; + + if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) { + return EVTHR_RES_RETRY; + } + + return EVTHR_RES_OK; +} + +static enum evthr_res +evthr_stop(struct evthr *thread) +{ + struct evthr_cmd cmd = { + .cb = NULL, + .args = NULL, + .stop = 1 + }; + + if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) { + return EVTHR_RES_RETRY; + } + + pthread_join(*thread->thr, NULL); + return EVTHR_RES_OK; +} + +struct event_base * +evthr_get_base(struct evthr * thr) +{ + return thr ? thr->evbase : NULL; +} + +void +evthr_set_aux(struct evthr *thr, void *aux) +{ + if (thr) { + thr->aux = aux; + } +} + +void * +evthr_get_aux(struct evthr *thr) +{ + return thr ? thr->aux : NULL; +} + +static void +evthr_free(struct evthr *thread) +{ + if (thread == NULL) { + return; + } + + if (thread->rdr > 0) { + close(thread->rdr); + } + + if (thread->wdr > 0) { + close(thread->wdr); + } + + if (thread->thr) { + free(thread->thr); + } + + if (thread->event) { + event_free(thread->event); + } + +#ifdef EVTHR_SHARED_PIPE + if (thread->shared_pool_ev) { + event_free(thread->shared_pool_ev); + } +#endif + + if (thread->evbase) { + event_base_free(thread->evbase); + } + + free(thread); +} + +static struct evthr * +evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args) +{ + struct evthr *thread; + int fds[2]; + + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) { + return NULL; + } + + evutil_make_socket_nonblocking(fds[0]); + evutil_make_socket_nonblocking(fds[1]); + + if (!(thread = calloc(1, sizeof(struct evthr)))) { + return NULL; + } + + thread->thr = malloc(sizeof(pthread_t)); + thread->arg = args; + thread->rdr = fds[0]; + thread->wdr = fds[1]; + + thread->init_cb = init_cb; + thread->exit_cb = exit_cb; + + if (pthread_mutex_init(&thread->lock, NULL)) { + evthr_free(thread); + return NULL; + } + + return thread; +} + +static int +evthr_start(struct evthr *thread) +{ + if (thread == NULL || thread->thr == NULL) { + return -1; + } + + if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) { + return -1; + } + + return 0; +} + +void +evthr_pool_free(struct evthr_pool *pool) +{ + struct evthr *thread; + struct evthr *save; + + if (pool == NULL) { + return; + } + + TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { + TAILQ_REMOVE(&pool->threads, thread, next); + + evthr_free(thread); + } + + free(pool); +} + +enum evthr_res +evthr_pool_stop(struct evthr_pool *pool) +{ + struct evthr *thr; + struct evthr *save; + + if (pool == NULL) { + return EVTHR_RES_FATAL; + } + + TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) { + evthr_stop(thr); + } + + return EVTHR_RES_OK; +} + +static inline int +get_backlog_(struct evthr *thread) +{ + int backlog = 0; + + ioctl(thread->rdr, FIONREAD, &backlog); + + return (int)(backlog / sizeof(struct evthr_cmd)); +} + +enum evthr_res +evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg) +{ +#ifdef EVTHR_SHARED_PIPE + struct evthr_cmd cmd = { + .cb = cb, + .args = arg, + .stop = 0 + }; + + if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) { + return EVTHR_RES_RETRY; + } + + return EVTHR_RES_OK; +#endif + struct evthr *thread = NULL; + struct evthr *min_thread = NULL; + int min_backlog = 0; + + if (pool == NULL) { + return EVTHR_RES_FATAL; + } + + if (cb == NULL) { + return EVTHR_RES_NOCB; + } + + TAILQ_FOREACH(thread, &pool->threads, next) { + int backlog = get_backlog_(thread); + + if (backlog == 0) { + min_thread = thread; + break; + } + + if (min_thread == NULL || backlog < min_backlog) { + min_thread = thread; + min_backlog = backlog; + } + } + + return evthr_defer(min_thread, cb, arg); +} + +struct evthr_pool * +evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared) +{ + struct evthr_pool *pool; + int i; + +#ifdef EVTHR_SHARED_PIPE + int fds[2]; +#endif + + if (nthreads == 0) { + return NULL; + } + + if (!(pool = calloc(1, sizeof(struct evthr_pool)))) { + return NULL; + } + + pool->nthreads = nthreads; + TAILQ_INIT(&pool->threads); + +#ifdef EVTHR_SHARED_PIPE + if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) { + return NULL; + } + + evutil_make_socket_nonblocking(fds[0]); + evutil_make_socket_nonblocking(fds[1]); + + pool->rdr = fds[0]; + pool->wdr = fds[1]; +#endif + + for (i = 0; i < nthreads; i++) { + struct evthr * thread; + + if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { + evthr_pool_free(pool); + return NULL; + } + +#ifdef EVTHR_SHARED_PIPE + thread->pool_rdr = fds[0]; +#endif + + TAILQ_INSERT_TAIL(&pool->threads, thread, next); + } + + return pool; +} + +int +evthr_pool_start(struct evthr_pool *pool) +{ + struct evthr *evthr = NULL; + + if (pool == NULL) { + return -1; + } + + TAILQ_FOREACH(evthr, &pool->threads, next) { + if (evthr_start(evthr) < 0) { + return -1; + } + + usleep(5000); + } + + return 0; +} diff --git a/src/evthr.h b/src/evthr.h new file mode 100644 index 00000000..7f724f93 --- /dev/null +++ b/src/evthr.h @@ -0,0 +1,43 @@ +#ifndef __EVTHR_H__ +#define __EVTHR_H__ + +enum evthr_res { + EVTHR_RES_OK = 0, + EVTHR_RES_BACKLOG, + EVTHR_RES_RETRY, + EVTHR_RES_NOCB, + EVTHR_RES_FATAL +}; + +struct evthr_pool; +struct evthr; + +typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared); +typedef void (*evthr_init_cb)(struct evthr *thr, void *shared); +typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared); + +struct event_base * +evthr_get_base(struct evthr *thr); + +void +evthr_set_aux(struct evthr *thr, void *aux); + +void * +evthr_get_aux(struct evthr *thr); + +enum evthr_res +evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg); + +struct evthr_pool * +evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared); + +void +evthr_pool_free(struct evthr_pool *pool); + +enum evthr_res +evthr_pool_stop(struct evthr_pool *pool); + +int +evthr_pool_start(struct evthr_pool *pool); + +#endif /* !__EVTHR_H__ */ diff --git a/src/httpd.c b/src/httpd.c index c11645fc..fc11489b 100644 --- a/src/httpd.c +++ b/src/httpd.c @@ -30,13 +30,11 @@ #include #include #include -#include #include #include #include #include -#include #include // get thread ID #include @@ -49,6 +47,7 @@ #include "conffile.h" #include "misc.h" #include "worker.h" +#include "evthr.h" #include "httpd.h" #include "httpd_internal.h" #include "transcode.h" @@ -133,369 +132,9 @@ static int httpd_port; #define THREADPOOL_NTHREADS 4 -struct evthr_pool; - static struct evthr_pool *httpd_threadpool; -/* ----------------- Thread handling borrowed from libevhtp ----------------- */ - -#ifndef TAILQ_FOREACH_SAFE -#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ - for ((var) = TAILQ_FIRST((head)); \ - (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ - (var) = (tvar)) -#endif - -#define _evthr_read(thr, cmd, sock) \ - (recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0 - -#define EVTHR_SHARED_PIPE 1 - -enum evthr_res { - EVTHR_RES_OK = 0, - EVTHR_RES_BACKLOG, - EVTHR_RES_RETRY, - EVTHR_RES_NOCB, - EVTHR_RES_FATAL -}; - -struct evthr; - -typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared); -typedef void (*evthr_init_cb)(struct evthr *thr, void *shared); -typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared); - -struct evthr_cmd { - uint8_t stop; - void *args; - evthr_cb cb; -} __attribute__((packed)); - -struct evthr_pool { -#ifdef EVTHR_SHARED_PIPE - int rdr; - int wdr; -#endif - int nthreads; - TAILQ_HEAD(evthr_pool_slist, evthr) threads; -}; - -struct evthr { - int rdr; - int wdr; - char err; - struct event *event; - struct event_base *evbase; - httpd_server *server; - pthread_mutex_t lock; - pthread_t *thr; - evthr_init_cb init_cb; - evthr_exit_cb exit_cb; - void *arg; - void *aux; -#ifdef EVTHR_SHARED_PIPE - int pool_rdr; - struct event *shared_pool_ev; -#endif - TAILQ_ENTRY(evthr) next; -}; - - -static void -_evthr_read_cmd(evutil_socket_t sock, short which, void *args) -{ - struct evthr *thread; - struct evthr_cmd cmd; - int stopped; - - if (!(thread = (struct evthr *)args)) { - return; - } - - stopped = 0; - - if (_evthr_read(thread, &cmd, sock) == 1) { - stopped = cmd.stop; - - if (cmd.cb != NULL) { - (cmd.cb)(thread, cmd.args, thread->arg); - } - } - - if (stopped == 1) { - event_base_loopbreak(thread->evbase); - } - - return; -} - -static void * -_evthr_loop(void *args) -{ - struct evthr *thread; - - if (!(thread = (struct evthr *)args)) { - return NULL; - } - - if (thread == NULL || thread->thr == NULL) { - pthread_exit(NULL); - } - - thread->evbase = event_base_new(); - thread->event = event_new(thread->evbase, thread->rdr, - EV_READ | EV_PERSIST, _evthr_read_cmd, args); - - event_add(thread->event, NULL); - -#ifdef EVTHR_SHARED_PIPE - if (thread->pool_rdr > 0) { - thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr, - EV_READ | EV_PERSIST, _evthr_read_cmd, args); - event_add(thread->shared_pool_ev, NULL); - } -#endif - - pthread_mutex_lock(&thread->lock); - if (thread->init_cb != NULL) { - (thread->init_cb)(thread, thread->arg); - } - - pthread_mutex_unlock(&thread->lock); - - CHECK_ERR(L_MAIN, thread->err); - - event_base_loop(thread->evbase, 0); - - pthread_mutex_lock(&thread->lock); - if (thread->exit_cb != NULL) { - (thread->exit_cb)(thread, thread->arg); - } - - pthread_mutex_unlock(&thread->lock); - - pthread_exit(NULL); -} - -static enum evthr_res -evthr_stop(struct evthr *thread) -{ - struct evthr_cmd cmd = { - .cb = NULL, - .args = NULL, - .stop = 1 - }; - - if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) { - return EVTHR_RES_RETRY; - } - - pthread_join(*thread->thr, NULL); - return EVTHR_RES_OK; -} - -static void -evthr_free(struct evthr *thread) -{ - if (thread == NULL) { - return; - } - - if (thread->rdr > 0) { - close(thread->rdr); - } - - if (thread->wdr > 0) { - close(thread->wdr); - } - - if (thread->thr) { - free(thread->thr); - } - - if (thread->event) { - event_free(thread->event); - } - -#ifdef EVTHR_SHARED_PIPE - if (thread->shared_pool_ev) { - event_free(thread->shared_pool_ev); - } -#endif - - if (thread->evbase) { - event_base_free(thread->evbase); - } - - free(thread); -} - -static struct evthr * -evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args) -{ - struct evthr *thread; - int fds[2]; - - if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) { - return NULL; - } - - evutil_make_socket_nonblocking(fds[0]); - evutil_make_socket_nonblocking(fds[1]); - - if (!(thread = calloc(1, sizeof(struct evthr)))) { - return NULL; - } - - thread->thr = malloc(sizeof(pthread_t)); - thread->arg = args; - thread->rdr = fds[0]; - thread->wdr = fds[1]; - - thread->init_cb = init_cb; - thread->exit_cb = exit_cb; - - if (pthread_mutex_init(&thread->lock, NULL)) { - evthr_free(thread); - return NULL; - } - - return thread; -} - -static int -evthr_start(struct evthr *thread) -{ - if (thread == NULL || thread->thr == NULL) { - return -1; - } - - if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) { - return -1; - } - - return 0; -} - -static void -evthr_pool_free(struct evthr_pool *pool) -{ - struct evthr *thread; - struct evthr *save; - - if (pool == NULL) { - return; - } - - TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { - TAILQ_REMOVE(&pool->threads, thread, next); - - evthr_free(thread); - } - - free(pool); -} - -static enum evthr_res -evthr_pool_stop(struct evthr_pool *pool) -{ - struct evthr *thr; - struct evthr *save; - - if (pool == NULL) { - return EVTHR_RES_FATAL; - } - - TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) { - evthr_stop(thr); - } - - return EVTHR_RES_OK; -} - -static inline int -get_backlog_(struct evthr *thread) -{ - int backlog = 0; - - ioctl(thread->rdr, FIONREAD, &backlog); - - return (int)(backlog / sizeof(struct evthr_cmd)); -} - -static struct evthr_pool * -evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared) -{ - struct evthr_pool *pool; - int i; - -#ifdef EVTHR_SHARED_PIPE - int fds[2]; -#endif - - if (nthreads == 0) { - return NULL; - } - - if (!(pool = calloc(1, sizeof(struct evthr_pool)))) { - return NULL; - } - - pool->nthreads = nthreads; - TAILQ_INIT(&pool->threads); - -#ifdef EVTHR_SHARED_PIPE - if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) { - return NULL; - } - - evutil_make_socket_nonblocking(fds[0]); - evutil_make_socket_nonblocking(fds[1]); - - pool->rdr = fds[0]; - pool->wdr = fds[1]; -#endif - - for (i = 0; i < nthreads; i++) { - struct evthr * thread; - - if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { - evthr_pool_free(pool); - return NULL; - } - -#ifdef EVTHR_SHARED_PIPE - thread->pool_rdr = fds[0]; -#endif - - TAILQ_INSERT_TAIL(&pool->threads, thread, next); - } - - return pool; -} - -static int -evthr_pool_start(struct evthr_pool *pool) -{ - struct evthr *evthr = NULL; - - if (pool == NULL) { - return -1; - } - - TAILQ_FOREACH(evthr, &pool->threads, next) { - if (evthr_start(evthr) < 0) { - return -1; - } - - usleep(5000); - } - - return 0; -} - - /* -------------------------------- HELPERS --------------------------------- */ static int @@ -1747,37 +1386,44 @@ httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passw return -1; } +static int +bind_test(short unsigned port) +{ + int fd; + + fd = net_bind(&port, SOCK_STREAM, "httpd init"); + if (fd < 0) + return -1; + + close(fd); + return 0; +} + static void thread_init_cb(struct evthr *thr, void *shared) { - int ret; + struct event_base *evbase; + httpd_server *server; thread_setname(pthread_self(), "httpd"); - ret = db_perthread_init(); - if (ret < 0) - { - DPRINTF(E_FATAL, L_HTTPD, "Error: DB init failed\n"); - thr->err = EIO; - return; - } - - thr->server = httpd_server_new(thr->evbase, httpd_port, request_cb, NULL); - if (!thr->server) - { - DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port); - thr->err = EIO; - return; - } + CHECK_ERR(L_HTTPD, db_perthread_init()); + CHECK_NULL(L_HTTPD, evbase = evthr_get_base(thr)); + CHECK_NULL(L_HTTPD, server = httpd_server_new(evbase, httpd_port, request_cb, NULL)); // For CORS headers - httpd_server_allow_origin_set(thr->server, httpd_allow_origin); + httpd_server_allow_origin_set(server, httpd_allow_origin); + + evthr_set_aux(thr, server); } static void thread_exit_cb(struct evthr *thr, void *shared) { - httpd_server_free(thr->server); + httpd_server *server; + + server = evthr_get_aux(thr); + httpd_server_free(server); db_perthread_deinit(); } @@ -1813,6 +1459,15 @@ httpd_init(const char *webroot) if (strlen(httpd_allow_origin) == 0) httpd_allow_origin = NULL; + // Test that the port is free. We do it here because we can make a nicer exit + // than we can in thread_init_cb(), where the actual binding takes place. + ret = bind_test(httpd_port); + if (ret < 0) + { + DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port); + return -1; + } + // Prepare modules, e.g. httpd_daap ret = modules_init(); if (ret < 0) diff --git a/src/httpd_internal.h b/src/httpd_internal.h index 267328da..4d28fcef 100644 --- a/src/httpd_internal.h +++ b/src/httpd_internal.h @@ -235,9 +235,7 @@ httpd_send_reply_end(struct httpd_request *hreq); void httpd_send_error(struct httpd_request *hreq, int error, const char *reason); -/* - * Redirects to the given path - */ + void httpd_redirect_to(struct httpd_request *hreq, const char *path); diff --git a/src/httpd_libevhttp.c b/src/httpd_libevhttp.c index e8641244..c4402b1f 100644 --- a/src/httpd_libevhttp.c +++ b/src/httpd_libevhttp.c @@ -278,7 +278,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c server->request_cb = cb; server->request_cb_arg = arg; - server->fd = net_bind(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd"); + server->fd = net_bind_with_reuseport(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd"); if (server->fd <= 0) goto error; diff --git a/src/misc.c b/src/misc.c index 18513e91..0f7c7d78 100644 --- a/src/misc.c +++ b/src/misc.c @@ -278,8 +278,8 @@ net_connect(const char *addr, unsigned short port, int type, const char *log_ser // If *port is 0 then a random port will be assigned, and *port will be updated // with the port number -int -net_bind(short unsigned *port, int type, const char *log_service_name) +static int +net_bind_impl(short unsigned *port, int type, const char *log_service_name, bool reuseport) { struct addrinfo hints = { 0 }; struct addrinfo *servinfo; @@ -318,7 +318,10 @@ net_bind(short unsigned *port, int type, const char *log_service_name) continue; // Makes us able to attach multiple threads to the same port - ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); + if (reuseport) + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); + else + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &no, sizeof(no)); if (ret < 0) continue; @@ -381,6 +384,18 @@ net_bind(short unsigned *port, int type, const char *log_service_name) return -1; } +int +net_bind(short unsigned *port, int type, const char *log_service_name) +{ + return net_bind_impl(port, type, log_service_name, false); +} + +int +net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name) +{ + return net_bind_impl(port, type, log_service_name, true); +} + int net_evhttp_bind(struct evhttp *evhttp, unsigned short port, const char *log_service_name) { diff --git a/src/misc.h b/src/misc.h index 97f5da9b..d2716e73 100644 --- a/src/misc.h +++ b/src/misc.h @@ -53,6 +53,9 @@ net_connect(const char *addr, unsigned short port, int type, const char *log_ser int net_bind(short unsigned *port, int type, const char *log_service_name); +int +net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name); + // To avoid polluting namespace too much we don't include event2/http.h here struct evhttp; diff --git a/src/worker.c b/src/worker.c index 098fba03..9244a8bf 100644 --- a/src/worker.c +++ b/src/worker.c @@ -31,7 +31,6 @@ #include #include -#include #include #include @@ -41,438 +40,14 @@ #include "db.h" #include "logger.h" #include "worker.h" +#include "evthr.h" #include "misc.h" #define THREADPOOL_NTHREADS 2 -struct evthr_pool; - static struct evthr_pool *worker_threadpool; -/* ----------------- Thread handling borrowed from libevhtp ----------------- */ - -#ifndef TAILQ_FOREACH_SAFE -#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ - for ((var) = TAILQ_FIRST((head)); \ - (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ - (var) = (tvar)) -#endif - -#define _evthr_read(thr, cmd, sock) \ - (recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0 - -#define EVTHR_SHARED_PIPE 1 - -enum evthr_res { - EVTHR_RES_OK = 0, - EVTHR_RES_BACKLOG, - EVTHR_RES_RETRY, - EVTHR_RES_NOCB, - EVTHR_RES_FATAL -}; - -struct evthr; - -typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared); -typedef void (*evthr_init_cb)(struct evthr *thr, void *shared); -typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared); - -struct evthr_cmd { - uint8_t stop; - void *args; - evthr_cb cb; -} __attribute__((packed)); - -struct evthr_pool { -#ifdef EVTHR_SHARED_PIPE - int rdr; - int wdr; -#endif - int nthreads; - TAILQ_HEAD(evthr_pool_slist, evthr) threads; -}; - -struct evthr { - int rdr; - int wdr; - char err; - struct event *event; - struct event_base *evbase; - pthread_mutex_t lock; - pthread_t *thr; - evthr_init_cb init_cb; - evthr_exit_cb exit_cb; - void *arg; - void *aux; -#ifdef EVTHR_SHARED_PIPE - int pool_rdr; - struct event *shared_pool_ev; -#endif - TAILQ_ENTRY(evthr) next; -}; - - -static void -_evthr_read_cmd(evutil_socket_t sock, short which, void *args) -{ - struct evthr *thread; - struct evthr_cmd cmd; - int stopped; - - if (!(thread = (struct evthr *)args)) { - return; - } - - stopped = 0; - - if (_evthr_read(thread, &cmd, sock) == 1) { - stopped = cmd.stop; - - if (cmd.cb != NULL) { - (cmd.cb)(thread, cmd.args, thread->arg); - } - } - - if (stopped == 1) { - event_base_loopbreak(thread->evbase); - } - - return; -} - -static void * -_evthr_loop(void *args) -{ - struct evthr *thread; - - if (!(thread = (struct evthr *)args)) { - return NULL; - } - - if (thread == NULL || thread->thr == NULL) { - pthread_exit(NULL); - } - - thread->evbase = event_base_new(); - thread->event = event_new(thread->evbase, thread->rdr, - EV_READ | EV_PERSIST, _evthr_read_cmd, args); - - event_add(thread->event, NULL); - -#ifdef EVTHR_SHARED_PIPE - if (thread->pool_rdr > 0) { - thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr, - EV_READ | EV_PERSIST, _evthr_read_cmd, args); - event_add(thread->shared_pool_ev, NULL); - } -#endif - - pthread_mutex_lock(&thread->lock); - if (thread->init_cb != NULL) { - (thread->init_cb)(thread, thread->arg); - } - - pthread_mutex_unlock(&thread->lock); - - CHECK_ERR(L_MAIN, thread->err); - - event_base_loop(thread->evbase, 0); - - pthread_mutex_lock(&thread->lock); - if (thread->exit_cb != NULL) { - (thread->exit_cb)(thread, thread->arg); - } - - pthread_mutex_unlock(&thread->lock); - - pthread_exit(NULL); -} - -static enum evthr_res -evthr_defer(struct evthr *thread, evthr_cb cb, void *arg) -{ - struct evthr_cmd cmd = { - .cb = cb, - .args = arg, - .stop = 0 - }; - - if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) { - return EVTHR_RES_RETRY; - } - - return EVTHR_RES_OK; -} - -static enum evthr_res -evthr_stop(struct evthr *thread) -{ - struct evthr_cmd cmd = { - .cb = NULL, - .args = NULL, - .stop = 1 - }; - - if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) { - return EVTHR_RES_RETRY; - } - - pthread_join(*thread->thr, NULL); - return EVTHR_RES_OK; -} - -static struct event_base * -evthr_get_base(struct evthr * thr) -{ - return thr ? thr->evbase : NULL; -} - -static void -evthr_free(struct evthr *thread) -{ - if (thread == NULL) { - return; - } - - if (thread->rdr > 0) { - close(thread->rdr); - } - - if (thread->wdr > 0) { - close(thread->wdr); - } - - if (thread->thr) { - free(thread->thr); - } - - if (thread->event) { - event_free(thread->event); - } - -#ifdef EVTHR_SHARED_PIPE - if (thread->shared_pool_ev) { - event_free(thread->shared_pool_ev); - } -#endif - - if (thread->evbase) { - event_base_free(thread->evbase); - } - - free(thread); -} - -static struct evthr * -evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args) -{ - struct evthr *thread; - int fds[2]; - - if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) { - return NULL; - } - - evutil_make_socket_nonblocking(fds[0]); - evutil_make_socket_nonblocking(fds[1]); - - if (!(thread = calloc(1, sizeof(struct evthr)))) { - return NULL; - } - - thread->thr = malloc(sizeof(pthread_t)); - thread->arg = args; - thread->rdr = fds[0]; - thread->wdr = fds[1]; - - thread->init_cb = init_cb; - thread->exit_cb = exit_cb; - - if (pthread_mutex_init(&thread->lock, NULL)) { - evthr_free(thread); - return NULL; - } - - return thread; -} - -static int -evthr_start(struct evthr *thread) -{ - if (thread == NULL || thread->thr == NULL) { - return -1; - } - - if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) { - return -1; - } - - return 0; -} - -static void -evthr_pool_free(struct evthr_pool *pool) -{ - struct evthr *thread; - struct evthr *save; - - if (pool == NULL) { - return; - } - - TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { - TAILQ_REMOVE(&pool->threads, thread, next); - - evthr_free(thread); - } - - free(pool); -} - -static enum evthr_res -evthr_pool_stop(struct evthr_pool *pool) -{ - struct evthr *thr; - struct evthr *save; - - if (pool == NULL) { - return EVTHR_RES_FATAL; - } - - TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) { - evthr_stop(thr); - } - - return EVTHR_RES_OK; -} - -static inline int -get_backlog_(struct evthr *thread) -{ - int backlog = 0; - - ioctl(thread->rdr, FIONREAD, &backlog); - - return (int)(backlog / sizeof(struct evthr_cmd)); -} - -static enum evthr_res -evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg) -{ -#ifdef EVTHR_SHARED_PIPE - struct evthr_cmd cmd = { - .cb = cb, - .args = arg, - .stop = 0 - }; - - if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) { - return EVTHR_RES_RETRY; - } - - return EVTHR_RES_OK; -#endif - struct evthr *thread = NULL; - struct evthr *min_thread = NULL; - int min_backlog = 0; - - if (pool == NULL) { - return EVTHR_RES_FATAL; - } - - if (cb == NULL) { - return EVTHR_RES_NOCB; - } - - TAILQ_FOREACH(thread, &pool->threads, next) { - int backlog = get_backlog_(thread); - - if (backlog == 0) { - min_thread = thread; - break; - } - - if (min_thread == NULL || backlog < min_backlog) { - min_thread = thread; - min_backlog = backlog; - } - } - - return evthr_defer(min_thread, cb, arg); -} - -static struct evthr_pool * -evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared) -{ - struct evthr_pool *pool; - int i; - -#ifdef EVTHR_SHARED_PIPE - int fds[2]; -#endif - - if (nthreads == 0) { - return NULL; - } - - if (!(pool = calloc(1, sizeof(struct evthr_pool)))) { - return NULL; - } - - pool->nthreads = nthreads; - TAILQ_INIT(&pool->threads); - -#ifdef EVTHR_SHARED_PIPE - if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) { - return NULL; - } - - evutil_make_socket_nonblocking(fds[0]); - evutil_make_socket_nonblocking(fds[1]); - - pool->rdr = fds[0]; - pool->wdr = fds[1]; -#endif - - for (i = 0; i < nthreads; i++) { - struct evthr * thread; - - if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { - evthr_pool_free(pool); - return NULL; - } - -#ifdef EVTHR_SHARED_PIPE - thread->pool_rdr = fds[0]; -#endif - - TAILQ_INSERT_TAIL(&pool->threads, thread, next); - } - - return pool; -} - -static int -evthr_pool_start(struct evthr_pool *pool) -{ - struct evthr *evthr = NULL; - - if (pool == NULL) { - return -1; - } - - TAILQ_FOREACH(evthr, &pool->threads, next) { - if (evthr_start(evthr) < 0) { - return -1; - } - - usleep(5000); - } - - return 0; -} - /* ----------------------------- CALLBACK EXECUTION ------------------------- */ /* Worker threads */ @@ -521,15 +96,7 @@ execute(struct evthr *thr, void *arg, void *shared) static void init_cb(struct evthr *thr, void *shared) { - int ret; - - ret = db_perthread_init(); - if (ret < 0) - { - DPRINTF(E_FATAL, L_MAIN, "Error: DB init failed (worker thread)\n"); - thr->err = EIO; - return; - } + CHECK_ERR(L_MAIN, db_perthread_init()); thread_setname(pthread_self(), "worker"); }