mirror of
https://github.com/owntone/owntone-server.git
synced 2024-12-26 23:25:56 -05:00
[evhtr] Consolidate pool threading in evthr.c
Also reinstate check for server already running
This commit is contained in:
parent
bd6f38282c
commit
4d0c297901
@ -122,6 +122,7 @@ owntone_SOURCES = main.c \
|
|||||||
outputs/dummy.c outputs/fifo.c outputs/rcp.c \
|
outputs/dummy.c outputs/fifo.c outputs/rcp.c \
|
||||||
$(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \
|
$(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \
|
||||||
evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \
|
evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \
|
||||||
|
evthr.c evthr.h \
|
||||||
$(SPOTIFY_SRC) \
|
$(SPOTIFY_SRC) \
|
||||||
$(LASTFM_SRC) \
|
$(LASTFM_SRC) \
|
||||||
$(MPD_SRC) \
|
$(MPD_SRC) \
|
||||||
|
467
src/evthr.c
Normal file
467
src/evthr.c
Normal file
@ -0,0 +1,467 @@
|
|||||||
|
/*
|
||||||
|
------------------- Thread handling borrowed from libevhtp ---------------------
|
||||||
|
|
||||||
|
BSD 3-Clause License
|
||||||
|
|
||||||
|
Copyright (c) 2010-2018, Mark Ellzey, Nathan French, Marcus Sundberg
|
||||||
|
All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without
|
||||||
|
modification, are permitted provided that the following conditions are met:
|
||||||
|
|
||||||
|
* Redistributions of source code must retain the above copyright notice, this
|
||||||
|
list of conditions and the following disclaimer.
|
||||||
|
|
||||||
|
* Redistributions in binary form must reproduce the above copyright notice,
|
||||||
|
this list of conditions and the following disclaimer in the documentation
|
||||||
|
and/or other materials provided with the distribution.
|
||||||
|
|
||||||
|
* Neither the name of the copyright holder nor the names of its
|
||||||
|
contributors may be used to endorse or promote products derived from
|
||||||
|
this software without specific prior written permission.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||||
|
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||||
|
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||||
|
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||||
|
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||||
|
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||||
|
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||||
|
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||||
|
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||||
|
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <limits.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include <sys/queue.h>
|
||||||
|
#include <sys/ioctl.h>
|
||||||
|
|
||||||
|
#include <event2/event.h>
|
||||||
|
#include <event2/thread.h>
|
||||||
|
|
||||||
|
#include "evthr.h"
|
||||||
|
|
||||||
|
#ifndef TAILQ_FOREACH_SAFE
|
||||||
|
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
|
||||||
|
for ((var) = TAILQ_FIRST((head)); \
|
||||||
|
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
|
||||||
|
(var) = (tvar))
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define _evthr_read(thr, cmd, sock) \
|
||||||
|
(recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0
|
||||||
|
|
||||||
|
#define EVTHR_SHARED_PIPE 1
|
||||||
|
|
||||||
|
struct evthr_cmd {
|
||||||
|
uint8_t stop;
|
||||||
|
void *args;
|
||||||
|
evthr_cb cb;
|
||||||
|
} __attribute__((packed));
|
||||||
|
|
||||||
|
struct evthr_pool {
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
int rdr;
|
||||||
|
int wdr;
|
||||||
|
#endif
|
||||||
|
int nthreads;
|
||||||
|
TAILQ_HEAD(evthr_pool_slist, evthr) threads;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct evthr {
|
||||||
|
int rdr;
|
||||||
|
int wdr;
|
||||||
|
char err;
|
||||||
|
struct event *event;
|
||||||
|
struct event_base *evbase;
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
pthread_t *thr;
|
||||||
|
evthr_init_cb init_cb;
|
||||||
|
evthr_exit_cb exit_cb;
|
||||||
|
void *arg;
|
||||||
|
void *aux;
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
int pool_rdr;
|
||||||
|
struct event *shared_pool_ev;
|
||||||
|
#endif
|
||||||
|
TAILQ_ENTRY(evthr) next;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
_evthr_read_cmd(evutil_socket_t sock, short which, void *args)
|
||||||
|
{
|
||||||
|
struct evthr *thread;
|
||||||
|
struct evthr_cmd cmd;
|
||||||
|
int stopped;
|
||||||
|
|
||||||
|
if (!(thread = (struct evthr *)args)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
stopped = 0;
|
||||||
|
|
||||||
|
if (_evthr_read(thread, &cmd, sock) == 1) {
|
||||||
|
stopped = cmd.stop;
|
||||||
|
|
||||||
|
if (cmd.cb != NULL) {
|
||||||
|
(cmd.cb)(thread, cmd.args, thread->arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stopped == 1) {
|
||||||
|
event_base_loopbreak(thread->evbase);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *
|
||||||
|
_evthr_loop(void *args)
|
||||||
|
{
|
||||||
|
struct evthr *thread;
|
||||||
|
|
||||||
|
if (!(thread = (struct evthr *)args)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread == NULL || thread->thr == NULL) {
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
thread->evbase = event_base_new();
|
||||||
|
thread->event = event_new(thread->evbase, thread->rdr,
|
||||||
|
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
||||||
|
|
||||||
|
event_add(thread->event, NULL);
|
||||||
|
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
if (thread->pool_rdr > 0) {
|
||||||
|
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
|
||||||
|
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
||||||
|
event_add(thread->shared_pool_ev, NULL);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
pthread_mutex_lock(&thread->lock);
|
||||||
|
if (thread->init_cb != NULL) {
|
||||||
|
(thread->init_cb)(thread, thread->arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&thread->lock);
|
||||||
|
|
||||||
|
event_base_loop(thread->evbase, 0);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&thread->lock);
|
||||||
|
if (thread->exit_cb != NULL) {
|
||||||
|
(thread->exit_cb)(thread, thread->arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&thread->lock);
|
||||||
|
|
||||||
|
pthread_exit(NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static enum evthr_res
|
||||||
|
evthr_defer(struct evthr *thread, evthr_cb cb, void *arg)
|
||||||
|
{
|
||||||
|
struct evthr_cmd cmd = {
|
||||||
|
.cb = cb,
|
||||||
|
.args = arg,
|
||||||
|
.stop = 0
|
||||||
|
};
|
||||||
|
|
||||||
|
if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
|
||||||
|
return EVTHR_RES_RETRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return EVTHR_RES_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
static enum evthr_res
|
||||||
|
evthr_stop(struct evthr *thread)
|
||||||
|
{
|
||||||
|
struct evthr_cmd cmd = {
|
||||||
|
.cb = NULL,
|
||||||
|
.args = NULL,
|
||||||
|
.stop = 1
|
||||||
|
};
|
||||||
|
|
||||||
|
if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) {
|
||||||
|
return EVTHR_RES_RETRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_join(*thread->thr, NULL);
|
||||||
|
return EVTHR_RES_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct event_base *
|
||||||
|
evthr_get_base(struct evthr * thr)
|
||||||
|
{
|
||||||
|
return thr ? thr->evbase : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
evthr_set_aux(struct evthr *thr, void *aux)
|
||||||
|
{
|
||||||
|
if (thr) {
|
||||||
|
thr->aux = aux;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *
|
||||||
|
evthr_get_aux(struct evthr *thr)
|
||||||
|
{
|
||||||
|
return thr ? thr->aux : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
evthr_free(struct evthr *thread)
|
||||||
|
{
|
||||||
|
if (thread == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread->rdr > 0) {
|
||||||
|
close(thread->rdr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread->wdr > 0) {
|
||||||
|
close(thread->wdr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread->thr) {
|
||||||
|
free(thread->thr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread->event) {
|
||||||
|
event_free(thread->event);
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
if (thread->shared_pool_ev) {
|
||||||
|
event_free(thread->shared_pool_ev);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (thread->evbase) {
|
||||||
|
event_base_free(thread->evbase);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct evthr *
|
||||||
|
evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
|
||||||
|
{
|
||||||
|
struct evthr *thread;
|
||||||
|
int fds[2];
|
||||||
|
|
||||||
|
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
evutil_make_socket_nonblocking(fds[0]);
|
||||||
|
evutil_make_socket_nonblocking(fds[1]);
|
||||||
|
|
||||||
|
if (!(thread = calloc(1, sizeof(struct evthr)))) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
thread->thr = malloc(sizeof(pthread_t));
|
||||||
|
thread->arg = args;
|
||||||
|
thread->rdr = fds[0];
|
||||||
|
thread->wdr = fds[1];
|
||||||
|
|
||||||
|
thread->init_cb = init_cb;
|
||||||
|
thread->exit_cb = exit_cb;
|
||||||
|
|
||||||
|
if (pthread_mutex_init(&thread->lock, NULL)) {
|
||||||
|
evthr_free(thread);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return thread;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
evthr_start(struct evthr *thread)
|
||||||
|
{
|
||||||
|
if (thread == NULL || thread->thr == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
evthr_pool_free(struct evthr_pool *pool)
|
||||||
|
{
|
||||||
|
struct evthr *thread;
|
||||||
|
struct evthr *save;
|
||||||
|
|
||||||
|
if (pool == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
|
||||||
|
TAILQ_REMOVE(&pool->threads, thread, next);
|
||||||
|
|
||||||
|
evthr_free(thread);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
enum evthr_res
|
||||||
|
evthr_pool_stop(struct evthr_pool *pool)
|
||||||
|
{
|
||||||
|
struct evthr *thr;
|
||||||
|
struct evthr *save;
|
||||||
|
|
||||||
|
if (pool == NULL) {
|
||||||
|
return EVTHR_RES_FATAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
|
||||||
|
evthr_stop(thr);
|
||||||
|
}
|
||||||
|
|
||||||
|
return EVTHR_RES_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int
|
||||||
|
get_backlog_(struct evthr *thread)
|
||||||
|
{
|
||||||
|
int backlog = 0;
|
||||||
|
|
||||||
|
ioctl(thread->rdr, FIONREAD, &backlog);
|
||||||
|
|
||||||
|
return (int)(backlog / sizeof(struct evthr_cmd));
|
||||||
|
}
|
||||||
|
|
||||||
|
enum evthr_res
|
||||||
|
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg)
|
||||||
|
{
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
struct evthr_cmd cmd = {
|
||||||
|
.cb = cb,
|
||||||
|
.args = arg,
|
||||||
|
.stop = 0
|
||||||
|
};
|
||||||
|
|
||||||
|
if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) {
|
||||||
|
return EVTHR_RES_RETRY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return EVTHR_RES_OK;
|
||||||
|
#endif
|
||||||
|
struct evthr *thread = NULL;
|
||||||
|
struct evthr *min_thread = NULL;
|
||||||
|
int min_backlog = 0;
|
||||||
|
|
||||||
|
if (pool == NULL) {
|
||||||
|
return EVTHR_RES_FATAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cb == NULL) {
|
||||||
|
return EVTHR_RES_NOCB;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAILQ_FOREACH(thread, &pool->threads, next) {
|
||||||
|
int backlog = get_backlog_(thread);
|
||||||
|
|
||||||
|
if (backlog == 0) {
|
||||||
|
min_thread = thread;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (min_thread == NULL || backlog < min_backlog) {
|
||||||
|
min_thread = thread;
|
||||||
|
min_backlog = backlog;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return evthr_defer(min_thread, cb, arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct evthr_pool *
|
||||||
|
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
|
||||||
|
{
|
||||||
|
struct evthr_pool *pool;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
int fds[2];
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (nthreads == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(pool = calloc(1, sizeof(struct evthr_pool)))) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pool->nthreads = nthreads;
|
||||||
|
TAILQ_INIT(&pool->threads);
|
||||||
|
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
evutil_make_socket_nonblocking(fds[0]);
|
||||||
|
evutil_make_socket_nonblocking(fds[1]);
|
||||||
|
|
||||||
|
pool->rdr = fds[0];
|
||||||
|
pool->wdr = fds[1];
|
||||||
|
#endif
|
||||||
|
|
||||||
|
for (i = 0; i < nthreads; i++) {
|
||||||
|
struct evthr * thread;
|
||||||
|
|
||||||
|
if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
|
||||||
|
evthr_pool_free(pool);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef EVTHR_SHARED_PIPE
|
||||||
|
thread->pool_rdr = fds[0];
|
||||||
|
#endif
|
||||||
|
|
||||||
|
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pool;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
evthr_pool_start(struct evthr_pool *pool)
|
||||||
|
{
|
||||||
|
struct evthr *evthr = NULL;
|
||||||
|
|
||||||
|
if (pool == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAILQ_FOREACH(evthr, &pool->threads, next) {
|
||||||
|
if (evthr_start(evthr) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
usleep(5000);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
43
src/evthr.h
Normal file
43
src/evthr.h
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
#ifndef __EVTHR_H__
|
||||||
|
#define __EVTHR_H__
|
||||||
|
|
||||||
|
enum evthr_res {
|
||||||
|
EVTHR_RES_OK = 0,
|
||||||
|
EVTHR_RES_BACKLOG,
|
||||||
|
EVTHR_RES_RETRY,
|
||||||
|
EVTHR_RES_NOCB,
|
||||||
|
EVTHR_RES_FATAL
|
||||||
|
};
|
||||||
|
|
||||||
|
struct evthr_pool;
|
||||||
|
struct evthr;
|
||||||
|
|
||||||
|
typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared);
|
||||||
|
typedef void (*evthr_init_cb)(struct evthr *thr, void *shared);
|
||||||
|
typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared);
|
||||||
|
|
||||||
|
struct event_base *
|
||||||
|
evthr_get_base(struct evthr *thr);
|
||||||
|
|
||||||
|
void
|
||||||
|
evthr_set_aux(struct evthr *thr, void *aux);
|
||||||
|
|
||||||
|
void *
|
||||||
|
evthr_get_aux(struct evthr *thr);
|
||||||
|
|
||||||
|
enum evthr_res
|
||||||
|
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg);
|
||||||
|
|
||||||
|
struct evthr_pool *
|
||||||
|
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared);
|
||||||
|
|
||||||
|
void
|
||||||
|
evthr_pool_free(struct evthr_pool *pool);
|
||||||
|
|
||||||
|
enum evthr_res
|
||||||
|
evthr_pool_stop(struct evthr_pool *pool);
|
||||||
|
|
||||||
|
int
|
||||||
|
evthr_pool_start(struct evthr_pool *pool);
|
||||||
|
|
||||||
|
#endif /* !__EVTHR_H__ */
|
415
src/httpd.c
415
src/httpd.c
@ -30,13 +30,11 @@
|
|||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <sys/param.h>
|
#include <sys/param.h>
|
||||||
#include <sys/queue.h>
|
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
|
|
||||||
#include <sys/ioctl.h>
|
|
||||||
#include <syscall.h> // get thread ID
|
#include <syscall.h> // get thread ID
|
||||||
|
|
||||||
#include <event2/event.h>
|
#include <event2/event.h>
|
||||||
@ -49,6 +47,7 @@
|
|||||||
#include "conffile.h"
|
#include "conffile.h"
|
||||||
#include "misc.h"
|
#include "misc.h"
|
||||||
#include "worker.h"
|
#include "worker.h"
|
||||||
|
#include "evthr.h"
|
||||||
#include "httpd.h"
|
#include "httpd.h"
|
||||||
#include "httpd_internal.h"
|
#include "httpd_internal.h"
|
||||||
#include "transcode.h"
|
#include "transcode.h"
|
||||||
@ -133,369 +132,9 @@ static int httpd_port;
|
|||||||
|
|
||||||
#define THREADPOOL_NTHREADS 4
|
#define THREADPOOL_NTHREADS 4
|
||||||
|
|
||||||
struct evthr_pool;
|
|
||||||
|
|
||||||
static struct evthr_pool *httpd_threadpool;
|
static struct evthr_pool *httpd_threadpool;
|
||||||
|
|
||||||
|
|
||||||
/* ----------------- Thread handling borrowed from libevhtp ----------------- */
|
|
||||||
|
|
||||||
#ifndef TAILQ_FOREACH_SAFE
|
|
||||||
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
|
|
||||||
for ((var) = TAILQ_FIRST((head)); \
|
|
||||||
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
|
|
||||||
(var) = (tvar))
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define _evthr_read(thr, cmd, sock) \
|
|
||||||
(recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0
|
|
||||||
|
|
||||||
#define EVTHR_SHARED_PIPE 1
|
|
||||||
|
|
||||||
enum evthr_res {
|
|
||||||
EVTHR_RES_OK = 0,
|
|
||||||
EVTHR_RES_BACKLOG,
|
|
||||||
EVTHR_RES_RETRY,
|
|
||||||
EVTHR_RES_NOCB,
|
|
||||||
EVTHR_RES_FATAL
|
|
||||||
};
|
|
||||||
|
|
||||||
struct evthr;
|
|
||||||
|
|
||||||
typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared);
|
|
||||||
typedef void (*evthr_init_cb)(struct evthr *thr, void *shared);
|
|
||||||
typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared);
|
|
||||||
|
|
||||||
struct evthr_cmd {
|
|
||||||
uint8_t stop;
|
|
||||||
void *args;
|
|
||||||
evthr_cb cb;
|
|
||||||
} __attribute__((packed));
|
|
||||||
|
|
||||||
struct evthr_pool {
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int rdr;
|
|
||||||
int wdr;
|
|
||||||
#endif
|
|
||||||
int nthreads;
|
|
||||||
TAILQ_HEAD(evthr_pool_slist, evthr) threads;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct evthr {
|
|
||||||
int rdr;
|
|
||||||
int wdr;
|
|
||||||
char err;
|
|
||||||
struct event *event;
|
|
||||||
struct event_base *evbase;
|
|
||||||
httpd_server *server;
|
|
||||||
pthread_mutex_t lock;
|
|
||||||
pthread_t *thr;
|
|
||||||
evthr_init_cb init_cb;
|
|
||||||
evthr_exit_cb exit_cb;
|
|
||||||
void *arg;
|
|
||||||
void *aux;
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int pool_rdr;
|
|
||||||
struct event *shared_pool_ev;
|
|
||||||
#endif
|
|
||||||
TAILQ_ENTRY(evthr) next;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
_evthr_read_cmd(evutil_socket_t sock, short which, void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
struct evthr_cmd cmd;
|
|
||||||
int stopped;
|
|
||||||
|
|
||||||
if (!(thread = (struct evthr *)args)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
stopped = 0;
|
|
||||||
|
|
||||||
if (_evthr_read(thread, &cmd, sock) == 1) {
|
|
||||||
stopped = cmd.stop;
|
|
||||||
|
|
||||||
if (cmd.cb != NULL) {
|
|
||||||
(cmd.cb)(thread, cmd.args, thread->arg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stopped == 1) {
|
|
||||||
event_base_loopbreak(thread->evbase);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *
|
|
||||||
_evthr_loop(void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
|
|
||||||
if (!(thread = (struct evthr *)args)) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread == NULL || thread->thr == NULL) {
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
thread->evbase = event_base_new();
|
|
||||||
thread->event = event_new(thread->evbase, thread->rdr,
|
|
||||||
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
|
||||||
|
|
||||||
event_add(thread->event, NULL);
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (thread->pool_rdr > 0) {
|
|
||||||
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
|
|
||||||
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
|
||||||
event_add(thread->shared_pool_ev, NULL);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pthread_mutex_lock(&thread->lock);
|
|
||||||
if (thread->init_cb != NULL) {
|
|
||||||
(thread->init_cb)(thread, thread->arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&thread->lock);
|
|
||||||
|
|
||||||
CHECK_ERR(L_MAIN, thread->err);
|
|
||||||
|
|
||||||
event_base_loop(thread->evbase, 0);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&thread->lock);
|
|
||||||
if (thread->exit_cb != NULL) {
|
|
||||||
(thread->exit_cb)(thread, thread->arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&thread->lock);
|
|
||||||
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_stop(struct evthr *thread)
|
|
||||||
{
|
|
||||||
struct evthr_cmd cmd = {
|
|
||||||
.cb = NULL,
|
|
||||||
.args = NULL,
|
|
||||||
.stop = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) {
|
|
||||||
return EVTHR_RES_RETRY;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_join(*thread->thr, NULL);
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
evthr_free(struct evthr *thread)
|
|
||||||
{
|
|
||||||
if (thread == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->rdr > 0) {
|
|
||||||
close(thread->rdr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->wdr > 0) {
|
|
||||||
close(thread->wdr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->thr) {
|
|
||||||
free(thread->thr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->event) {
|
|
||||||
event_free(thread->event);
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (thread->shared_pool_ev) {
|
|
||||||
event_free(thread->shared_pool_ev);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (thread->evbase) {
|
|
||||||
event_base_free(thread->evbase);
|
|
||||||
}
|
|
||||||
|
|
||||||
free(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct evthr *
|
|
||||||
evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
int fds[2];
|
|
||||||
|
|
||||||
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
evutil_make_socket_nonblocking(fds[0]);
|
|
||||||
evutil_make_socket_nonblocking(fds[1]);
|
|
||||||
|
|
||||||
if (!(thread = calloc(1, sizeof(struct evthr)))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread->thr = malloc(sizeof(pthread_t));
|
|
||||||
thread->arg = args;
|
|
||||||
thread->rdr = fds[0];
|
|
||||||
thread->wdr = fds[1];
|
|
||||||
|
|
||||||
thread->init_cb = init_cb;
|
|
||||||
thread->exit_cb = exit_cb;
|
|
||||||
|
|
||||||
if (pthread_mutex_init(&thread->lock, NULL)) {
|
|
||||||
evthr_free(thread);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return thread;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
evthr_start(struct evthr *thread)
|
|
||||||
{
|
|
||||||
if (thread == NULL || thread->thr == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
evthr_pool_free(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
struct evthr *save;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
|
|
||||||
TAILQ_REMOVE(&pool->threads, thread, next);
|
|
||||||
|
|
||||||
evthr_free(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
free(pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_pool_stop(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *thr;
|
|
||||||
struct evthr *save;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return EVTHR_RES_FATAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
|
|
||||||
evthr_stop(thr);
|
|
||||||
}
|
|
||||||
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int
|
|
||||||
get_backlog_(struct evthr *thread)
|
|
||||||
{
|
|
||||||
int backlog = 0;
|
|
||||||
|
|
||||||
ioctl(thread->rdr, FIONREAD, &backlog);
|
|
||||||
|
|
||||||
return (int)(backlog / sizeof(struct evthr_cmd));
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct evthr_pool *
|
|
||||||
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
|
|
||||||
{
|
|
||||||
struct evthr_pool *pool;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int fds[2];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (nthreads == 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pool = calloc(1, sizeof(struct evthr_pool)))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pool->nthreads = nthreads;
|
|
||||||
TAILQ_INIT(&pool->threads);
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
evutil_make_socket_nonblocking(fds[0]);
|
|
||||||
evutil_make_socket_nonblocking(fds[1]);
|
|
||||||
|
|
||||||
pool->rdr = fds[0];
|
|
||||||
pool->wdr = fds[1];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
for (i = 0; i < nthreads; i++) {
|
|
||||||
struct evthr * thread;
|
|
||||||
|
|
||||||
if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
|
|
||||||
evthr_pool_free(pool);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
thread->pool_rdr = fds[0];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pool;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
evthr_pool_start(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *evthr = NULL;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH(evthr, &pool->threads, next) {
|
|
||||||
if (evthr_start(evthr) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
usleep(5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* -------------------------------- HELPERS --------------------------------- */
|
/* -------------------------------- HELPERS --------------------------------- */
|
||||||
|
|
||||||
static int
|
static int
|
||||||
@ -1747,37 +1386,44 @@ httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passw
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
bind_test(short unsigned port)
|
||||||
|
{
|
||||||
|
int fd;
|
||||||
|
|
||||||
|
fd = net_bind(&port, SOCK_STREAM, "httpd init");
|
||||||
|
if (fd < 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
close(fd);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
thread_init_cb(struct evthr *thr, void *shared)
|
thread_init_cb(struct evthr *thr, void *shared)
|
||||||
{
|
{
|
||||||
int ret;
|
struct event_base *evbase;
|
||||||
|
httpd_server *server;
|
||||||
|
|
||||||
thread_setname(pthread_self(), "httpd");
|
thread_setname(pthread_self(), "httpd");
|
||||||
|
|
||||||
ret = db_perthread_init();
|
CHECK_ERR(L_HTTPD, db_perthread_init());
|
||||||
if (ret < 0)
|
CHECK_NULL(L_HTTPD, evbase = evthr_get_base(thr));
|
||||||
{
|
CHECK_NULL(L_HTTPD, server = httpd_server_new(evbase, httpd_port, request_cb, NULL));
|
||||||
DPRINTF(E_FATAL, L_HTTPD, "Error: DB init failed\n");
|
|
||||||
thr->err = EIO;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
thr->server = httpd_server_new(thr->evbase, httpd_port, request_cb, NULL);
|
|
||||||
if (!thr->server)
|
|
||||||
{
|
|
||||||
DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port);
|
|
||||||
thr->err = EIO;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// For CORS headers
|
// For CORS headers
|
||||||
httpd_server_allow_origin_set(thr->server, httpd_allow_origin);
|
httpd_server_allow_origin_set(server, httpd_allow_origin);
|
||||||
|
|
||||||
|
evthr_set_aux(thr, server);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
thread_exit_cb(struct evthr *thr, void *shared)
|
thread_exit_cb(struct evthr *thr, void *shared)
|
||||||
{
|
{
|
||||||
httpd_server_free(thr->server);
|
httpd_server *server;
|
||||||
|
|
||||||
|
server = evthr_get_aux(thr);
|
||||||
|
httpd_server_free(server);
|
||||||
|
|
||||||
db_perthread_deinit();
|
db_perthread_deinit();
|
||||||
}
|
}
|
||||||
@ -1813,6 +1459,15 @@ httpd_init(const char *webroot)
|
|||||||
if (strlen(httpd_allow_origin) == 0)
|
if (strlen(httpd_allow_origin) == 0)
|
||||||
httpd_allow_origin = NULL;
|
httpd_allow_origin = NULL;
|
||||||
|
|
||||||
|
// Test that the port is free. We do it here because we can make a nicer exit
|
||||||
|
// than we can in thread_init_cb(), where the actual binding takes place.
|
||||||
|
ret = bind_test(httpd_port);
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// Prepare modules, e.g. httpd_daap
|
// Prepare modules, e.g. httpd_daap
|
||||||
ret = modules_init();
|
ret = modules_init();
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
|
@ -235,9 +235,7 @@ httpd_send_reply_end(struct httpd_request *hreq);
|
|||||||
void
|
void
|
||||||
httpd_send_error(struct httpd_request *hreq, int error, const char *reason);
|
httpd_send_error(struct httpd_request *hreq, int error, const char *reason);
|
||||||
|
|
||||||
/*
|
|
||||||
* Redirects to the given path
|
|
||||||
*/
|
|
||||||
void
|
void
|
||||||
httpd_redirect_to(struct httpd_request *hreq, const char *path);
|
httpd_redirect_to(struct httpd_request *hreq, const char *path);
|
||||||
|
|
||||||
|
@ -278,7 +278,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c
|
|||||||
server->request_cb = cb;
|
server->request_cb = cb;
|
||||||
server->request_cb_arg = arg;
|
server->request_cb_arg = arg;
|
||||||
|
|
||||||
server->fd = net_bind(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd");
|
server->fd = net_bind_with_reuseport(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd");
|
||||||
if (server->fd <= 0)
|
if (server->fd <= 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
|
21
src/misc.c
21
src/misc.c
@ -278,8 +278,8 @@ net_connect(const char *addr, unsigned short port, int type, const char *log_ser
|
|||||||
|
|
||||||
// If *port is 0 then a random port will be assigned, and *port will be updated
|
// If *port is 0 then a random port will be assigned, and *port will be updated
|
||||||
// with the port number
|
// with the port number
|
||||||
int
|
static int
|
||||||
net_bind(short unsigned *port, int type, const char *log_service_name)
|
net_bind_impl(short unsigned *port, int type, const char *log_service_name, bool reuseport)
|
||||||
{
|
{
|
||||||
struct addrinfo hints = { 0 };
|
struct addrinfo hints = { 0 };
|
||||||
struct addrinfo *servinfo;
|
struct addrinfo *servinfo;
|
||||||
@ -318,7 +318,10 @@ net_bind(short unsigned *port, int type, const char *log_service_name)
|
|||||||
continue;
|
continue;
|
||||||
|
|
||||||
// Makes us able to attach multiple threads to the same port
|
// Makes us able to attach multiple threads to the same port
|
||||||
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
|
if (reuseport)
|
||||||
|
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
|
||||||
|
else
|
||||||
|
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &no, sizeof(no));
|
||||||
if (ret < 0)
|
if (ret < 0)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
@ -381,6 +384,18 @@ net_bind(short unsigned *port, int type, const char *log_service_name)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
net_bind(short unsigned *port, int type, const char *log_service_name)
|
||||||
|
{
|
||||||
|
return net_bind_impl(port, type, log_service_name, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name)
|
||||||
|
{
|
||||||
|
return net_bind_impl(port, type, log_service_name, true);
|
||||||
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
net_evhttp_bind(struct evhttp *evhttp, unsigned short port, const char *log_service_name)
|
net_evhttp_bind(struct evhttp *evhttp, unsigned short port, const char *log_service_name)
|
||||||
{
|
{
|
||||||
|
@ -53,6 +53,9 @@ net_connect(const char *addr, unsigned short port, int type, const char *log_ser
|
|||||||
int
|
int
|
||||||
net_bind(short unsigned *port, int type, const char *log_service_name);
|
net_bind(short unsigned *port, int type, const char *log_service_name);
|
||||||
|
|
||||||
|
int
|
||||||
|
net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name);
|
||||||
|
|
||||||
// To avoid polluting namespace too much we don't include event2/http.h here
|
// To avoid polluting namespace too much we don't include event2/http.h here
|
||||||
struct evhttp;
|
struct evhttp;
|
||||||
|
|
||||||
|
437
src/worker.c
437
src/worker.c
@ -31,7 +31,6 @@
|
|||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <sys/queue.h>
|
#include <sys/queue.h>
|
||||||
|
|
||||||
#include <sys/ioctl.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
@ -41,438 +40,14 @@
|
|||||||
#include "db.h"
|
#include "db.h"
|
||||||
#include "logger.h"
|
#include "logger.h"
|
||||||
#include "worker.h"
|
#include "worker.h"
|
||||||
|
#include "evthr.h"
|
||||||
#include "misc.h"
|
#include "misc.h"
|
||||||
|
|
||||||
#define THREADPOOL_NTHREADS 2
|
#define THREADPOOL_NTHREADS 2
|
||||||
|
|
||||||
struct evthr_pool;
|
|
||||||
|
|
||||||
static struct evthr_pool *worker_threadpool;
|
static struct evthr_pool *worker_threadpool;
|
||||||
|
|
||||||
|
|
||||||
/* ----------------- Thread handling borrowed from libevhtp ----------------- */
|
|
||||||
|
|
||||||
#ifndef TAILQ_FOREACH_SAFE
|
|
||||||
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
|
|
||||||
for ((var) = TAILQ_FIRST((head)); \
|
|
||||||
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
|
|
||||||
(var) = (tvar))
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define _evthr_read(thr, cmd, sock) \
|
|
||||||
(recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0
|
|
||||||
|
|
||||||
#define EVTHR_SHARED_PIPE 1
|
|
||||||
|
|
||||||
enum evthr_res {
|
|
||||||
EVTHR_RES_OK = 0,
|
|
||||||
EVTHR_RES_BACKLOG,
|
|
||||||
EVTHR_RES_RETRY,
|
|
||||||
EVTHR_RES_NOCB,
|
|
||||||
EVTHR_RES_FATAL
|
|
||||||
};
|
|
||||||
|
|
||||||
struct evthr;
|
|
||||||
|
|
||||||
typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared);
|
|
||||||
typedef void (*evthr_init_cb)(struct evthr *thr, void *shared);
|
|
||||||
typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared);
|
|
||||||
|
|
||||||
struct evthr_cmd {
|
|
||||||
uint8_t stop;
|
|
||||||
void *args;
|
|
||||||
evthr_cb cb;
|
|
||||||
} __attribute__((packed));
|
|
||||||
|
|
||||||
struct evthr_pool {
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int rdr;
|
|
||||||
int wdr;
|
|
||||||
#endif
|
|
||||||
int nthreads;
|
|
||||||
TAILQ_HEAD(evthr_pool_slist, evthr) threads;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct evthr {
|
|
||||||
int rdr;
|
|
||||||
int wdr;
|
|
||||||
char err;
|
|
||||||
struct event *event;
|
|
||||||
struct event_base *evbase;
|
|
||||||
pthread_mutex_t lock;
|
|
||||||
pthread_t *thr;
|
|
||||||
evthr_init_cb init_cb;
|
|
||||||
evthr_exit_cb exit_cb;
|
|
||||||
void *arg;
|
|
||||||
void *aux;
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int pool_rdr;
|
|
||||||
struct event *shared_pool_ev;
|
|
||||||
#endif
|
|
||||||
TAILQ_ENTRY(evthr) next;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
_evthr_read_cmd(evutil_socket_t sock, short which, void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
struct evthr_cmd cmd;
|
|
||||||
int stopped;
|
|
||||||
|
|
||||||
if (!(thread = (struct evthr *)args)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
stopped = 0;
|
|
||||||
|
|
||||||
if (_evthr_read(thread, &cmd, sock) == 1) {
|
|
||||||
stopped = cmd.stop;
|
|
||||||
|
|
||||||
if (cmd.cb != NULL) {
|
|
||||||
(cmd.cb)(thread, cmd.args, thread->arg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (stopped == 1) {
|
|
||||||
event_base_loopbreak(thread->evbase);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void *
|
|
||||||
_evthr_loop(void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
|
|
||||||
if (!(thread = (struct evthr *)args)) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread == NULL || thread->thr == NULL) {
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
thread->evbase = event_base_new();
|
|
||||||
thread->event = event_new(thread->evbase, thread->rdr,
|
|
||||||
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
|
||||||
|
|
||||||
event_add(thread->event, NULL);
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (thread->pool_rdr > 0) {
|
|
||||||
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
|
|
||||||
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
|
|
||||||
event_add(thread->shared_pool_ev, NULL);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pthread_mutex_lock(&thread->lock);
|
|
||||||
if (thread->init_cb != NULL) {
|
|
||||||
(thread->init_cb)(thread, thread->arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&thread->lock);
|
|
||||||
|
|
||||||
CHECK_ERR(L_MAIN, thread->err);
|
|
||||||
|
|
||||||
event_base_loop(thread->evbase, 0);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&thread->lock);
|
|
||||||
if (thread->exit_cb != NULL) {
|
|
||||||
(thread->exit_cb)(thread, thread->arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&thread->lock);
|
|
||||||
|
|
||||||
pthread_exit(NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_defer(struct evthr *thread, evthr_cb cb, void *arg)
|
|
||||||
{
|
|
||||||
struct evthr_cmd cmd = {
|
|
||||||
.cb = cb,
|
|
||||||
.args = arg,
|
|
||||||
.stop = 0
|
|
||||||
};
|
|
||||||
|
|
||||||
if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
|
|
||||||
return EVTHR_RES_RETRY;
|
|
||||||
}
|
|
||||||
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_stop(struct evthr *thread)
|
|
||||||
{
|
|
||||||
struct evthr_cmd cmd = {
|
|
||||||
.cb = NULL,
|
|
||||||
.args = NULL,
|
|
||||||
.stop = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) {
|
|
||||||
return EVTHR_RES_RETRY;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_join(*thread->thr, NULL);
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct event_base *
|
|
||||||
evthr_get_base(struct evthr * thr)
|
|
||||||
{
|
|
||||||
return thr ? thr->evbase : NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
evthr_free(struct evthr *thread)
|
|
||||||
{
|
|
||||||
if (thread == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->rdr > 0) {
|
|
||||||
close(thread->rdr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->wdr > 0) {
|
|
||||||
close(thread->wdr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->thr) {
|
|
||||||
free(thread->thr);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (thread->event) {
|
|
||||||
event_free(thread->event);
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (thread->shared_pool_ev) {
|
|
||||||
event_free(thread->shared_pool_ev);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (thread->evbase) {
|
|
||||||
event_base_free(thread->evbase);
|
|
||||||
}
|
|
||||||
|
|
||||||
free(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct evthr *
|
|
||||||
evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
int fds[2];
|
|
||||||
|
|
||||||
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
evutil_make_socket_nonblocking(fds[0]);
|
|
||||||
evutil_make_socket_nonblocking(fds[1]);
|
|
||||||
|
|
||||||
if (!(thread = calloc(1, sizeof(struct evthr)))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread->thr = malloc(sizeof(pthread_t));
|
|
||||||
thread->arg = args;
|
|
||||||
thread->rdr = fds[0];
|
|
||||||
thread->wdr = fds[1];
|
|
||||||
|
|
||||||
thread->init_cb = init_cb;
|
|
||||||
thread->exit_cb = exit_cb;
|
|
||||||
|
|
||||||
if (pthread_mutex_init(&thread->lock, NULL)) {
|
|
||||||
evthr_free(thread);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return thread;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
evthr_start(struct evthr *thread)
|
|
||||||
{
|
|
||||||
if (thread == NULL || thread->thr == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
evthr_pool_free(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *thread;
|
|
||||||
struct evthr *save;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
|
|
||||||
TAILQ_REMOVE(&pool->threads, thread, next);
|
|
||||||
|
|
||||||
evthr_free(thread);
|
|
||||||
}
|
|
||||||
|
|
||||||
free(pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_pool_stop(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *thr;
|
|
||||||
struct evthr *save;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return EVTHR_RES_FATAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
|
|
||||||
evthr_stop(thr);
|
|
||||||
}
|
|
||||||
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline int
|
|
||||||
get_backlog_(struct evthr *thread)
|
|
||||||
{
|
|
||||||
int backlog = 0;
|
|
||||||
|
|
||||||
ioctl(thread->rdr, FIONREAD, &backlog);
|
|
||||||
|
|
||||||
return (int)(backlog / sizeof(struct evthr_cmd));
|
|
||||||
}
|
|
||||||
|
|
||||||
static enum evthr_res
|
|
||||||
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg)
|
|
||||||
{
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
struct evthr_cmd cmd = {
|
|
||||||
.cb = cb,
|
|
||||||
.args = arg,
|
|
||||||
.stop = 0
|
|
||||||
};
|
|
||||||
|
|
||||||
if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) {
|
|
||||||
return EVTHR_RES_RETRY;
|
|
||||||
}
|
|
||||||
|
|
||||||
return EVTHR_RES_OK;
|
|
||||||
#endif
|
|
||||||
struct evthr *thread = NULL;
|
|
||||||
struct evthr *min_thread = NULL;
|
|
||||||
int min_backlog = 0;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return EVTHR_RES_FATAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cb == NULL) {
|
|
||||||
return EVTHR_RES_NOCB;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH(thread, &pool->threads, next) {
|
|
||||||
int backlog = get_backlog_(thread);
|
|
||||||
|
|
||||||
if (backlog == 0) {
|
|
||||||
min_thread = thread;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (min_thread == NULL || backlog < min_backlog) {
|
|
||||||
min_thread = thread;
|
|
||||||
min_backlog = backlog;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return evthr_defer(min_thread, cb, arg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static struct evthr_pool *
|
|
||||||
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
|
|
||||||
{
|
|
||||||
struct evthr_pool *pool;
|
|
||||||
int i;
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
int fds[2];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (nthreads == 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(pool = calloc(1, sizeof(struct evthr_pool)))) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pool->nthreads = nthreads;
|
|
||||||
TAILQ_INIT(&pool->threads);
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
evutil_make_socket_nonblocking(fds[0]);
|
|
||||||
evutil_make_socket_nonblocking(fds[1]);
|
|
||||||
|
|
||||||
pool->rdr = fds[0];
|
|
||||||
pool->wdr = fds[1];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
for (i = 0; i < nthreads; i++) {
|
|
||||||
struct evthr * thread;
|
|
||||||
|
|
||||||
if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
|
|
||||||
evthr_pool_free(pool);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef EVTHR_SHARED_PIPE
|
|
||||||
thread->pool_rdr = fds[0];
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pool;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int
|
|
||||||
evthr_pool_start(struct evthr_pool *pool)
|
|
||||||
{
|
|
||||||
struct evthr *evthr = NULL;
|
|
||||||
|
|
||||||
if (pool == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
TAILQ_FOREACH(evthr, &pool->threads, next) {
|
|
||||||
if (evthr_start(evthr) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
usleep(5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ----------------------------- CALLBACK EXECUTION ------------------------- */
|
/* ----------------------------- CALLBACK EXECUTION ------------------------- */
|
||||||
/* Worker threads */
|
/* Worker threads */
|
||||||
@ -521,15 +96,7 @@ execute(struct evthr *thr, void *arg, void *shared)
|
|||||||
static void
|
static void
|
||||||
init_cb(struct evthr *thr, void *shared)
|
init_cb(struct evthr *thr, void *shared)
|
||||||
{
|
{
|
||||||
int ret;
|
CHECK_ERR(L_MAIN, db_perthread_init());
|
||||||
|
|
||||||
ret = db_perthread_init();
|
|
||||||
if (ret < 0)
|
|
||||||
{
|
|
||||||
DPRINTF(E_FATAL, L_MAIN, "Error: DB init failed (worker thread)\n");
|
|
||||||
thr->err = EIO;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_setname(pthread_self(), "worker");
|
thread_setname(pthread_self(), "worker");
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user