From bd6f38282cc824056ada258e358cfb3d6ac3a5d6 Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Tue, 17 Jan 2023 17:14:04 +0100 Subject: [PATCH] [httpd] Main commit that adds multithreading to httpd Also refactor streaming implementation to make it more like an output and to let the design support multithreading. --- src/Makefile.am | 5 +- src/httpd.c | 628 ++++++++++++++++++++++++++--------- src/httpd_daap.c | 14 +- src/httpd_dacp.c | 289 +++++++--------- src/httpd_internal.h | 3 +- src/httpd_jsonapi.c | 2 +- src/httpd_libevhttp.c | 89 ++--- src/httpd_rsp.c | 2 +- src/httpd_streaming.c | 709 +++++++--------------------------------- src/httpd_streaming.h | 16 - src/misc.c | 5 + src/outputs/streaming.c | 525 ++++++++++++++++++++++++++++- src/outputs/streaming.h | 18 + src/worker.c | 2 +- 14 files changed, 1300 insertions(+), 1007 deletions(-) delete mode 100644 src/httpd_streaming.h create mode 100644 src/outputs/streaming.h diff --git a/src/Makefile.am b/src/Makefile.am index a23c032f..454dd7f2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -98,7 +98,7 @@ owntone_SOURCES = main.c \ httpd_daap.c httpd_daap.h \ httpd_dacp.c \ httpd_jsonapi.c \ - httpd_streaming.c httpd_streaming.h \ + httpd_streaming.c \ httpd_oauth.c \ httpd_artworkapi.c \ http.c http.h \ @@ -118,7 +118,8 @@ owntone_SOURCES = main.c \ outputs/rtp_common.h outputs/rtp_common.c \ outputs/raop.c outputs/airplay.c $(PAIR_AP_SRC) \ outputs/airplay_events.c outputs/airplay_events.h \ - outputs/streaming.c outputs/dummy.c outputs/fifo.c outputs/rcp.c \ + outputs/streaming.c outputs/streaming.h \ + outputs/dummy.c outputs/fifo.c outputs/rcp.c \ $(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \ evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \ $(SPOTIFY_SRC) \ diff --git a/src/httpd.c b/src/httpd.c index 5694b997..c11645fc 100644 --- a/src/httpd.c +++ b/src/httpd.c @@ -36,11 +36,9 @@ #include #include +#include #include // get thread ID -#ifdef HAVE_EVENTFD -# include -#endif #include #include @@ -128,22 +126,376 @@ static const struct content_type_map ext2ctype[] = }; static char webroot_directory[PATH_MAX]; -static struct event_base *evbase_httpd; - -#ifdef HAVE_EVENTFD -static int exit_efd; -#else -static int exit_pipe[2]; -#endif -static int httpd_exit; -static struct event *exitev; -static httpd_server *httpd_serv; -static pthread_t tid_httpd; static const char *httpd_allow_origin; static int httpd_port; +#define THREADPOOL_NTHREADS 4 + +struct evthr_pool; + +static struct evthr_pool *httpd_threadpool; + + +/* ----------------- Thread handling borrowed from libevhtp ----------------- */ + +#ifndef TAILQ_FOREACH_SAFE +#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \ + for ((var) = TAILQ_FIRST((head)); \ + (var) && ((tvar) = TAILQ_NEXT((var), field), 1); \ + (var) = (tvar)) +#endif + +#define _evthr_read(thr, cmd, sock) \ + (recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0 + +#define EVTHR_SHARED_PIPE 1 + +enum evthr_res { + EVTHR_RES_OK = 0, + EVTHR_RES_BACKLOG, + EVTHR_RES_RETRY, + EVTHR_RES_NOCB, + EVTHR_RES_FATAL +}; + +struct evthr; + +typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared); +typedef void (*evthr_init_cb)(struct evthr *thr, void *shared); +typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared); + +struct evthr_cmd { + uint8_t stop; + void *args; + evthr_cb cb; +} __attribute__((packed)); + +struct evthr_pool { +#ifdef EVTHR_SHARED_PIPE + int rdr; + int wdr; +#endif + int nthreads; + TAILQ_HEAD(evthr_pool_slist, evthr) threads; +}; + +struct evthr { + int rdr; + int wdr; + char err; + struct event *event; + struct event_base *evbase; + httpd_server *server; + pthread_mutex_t lock; + pthread_t *thr; + evthr_init_cb init_cb; + evthr_exit_cb exit_cb; + void *arg; + void *aux; +#ifdef EVTHR_SHARED_PIPE + int pool_rdr; + struct event *shared_pool_ev; +#endif + TAILQ_ENTRY(evthr) next; +}; + + +static void +_evthr_read_cmd(evutil_socket_t sock, short which, void *args) +{ + struct evthr *thread; + struct evthr_cmd cmd; + int stopped; + + if (!(thread = (struct evthr *)args)) { + return; + } + + stopped = 0; + + if (_evthr_read(thread, &cmd, sock) == 1) { + stopped = cmd.stop; + + if (cmd.cb != NULL) { + (cmd.cb)(thread, cmd.args, thread->arg); + } + } + + if (stopped == 1) { + event_base_loopbreak(thread->evbase); + } + + return; +} + +static void * +_evthr_loop(void *args) +{ + struct evthr *thread; + + if (!(thread = (struct evthr *)args)) { + return NULL; + } + + if (thread == NULL || thread->thr == NULL) { + pthread_exit(NULL); + } + + thread->evbase = event_base_new(); + thread->event = event_new(thread->evbase, thread->rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + + event_add(thread->event, NULL); + +#ifdef EVTHR_SHARED_PIPE + if (thread->pool_rdr > 0) { + thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr, + EV_READ | EV_PERSIST, _evthr_read_cmd, args); + event_add(thread->shared_pool_ev, NULL); + } +#endif + + pthread_mutex_lock(&thread->lock); + if (thread->init_cb != NULL) { + (thread->init_cb)(thread, thread->arg); + } + + pthread_mutex_unlock(&thread->lock); + + CHECK_ERR(L_MAIN, thread->err); + + event_base_loop(thread->evbase, 0); + + pthread_mutex_lock(&thread->lock); + if (thread->exit_cb != NULL) { + (thread->exit_cb)(thread, thread->arg); + } + + pthread_mutex_unlock(&thread->lock); + + pthread_exit(NULL); +} + +static enum evthr_res +evthr_stop(struct evthr *thread) +{ + struct evthr_cmd cmd = { + .cb = NULL, + .args = NULL, + .stop = 1 + }; + + if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) { + return EVTHR_RES_RETRY; + } + + pthread_join(*thread->thr, NULL); + return EVTHR_RES_OK; +} + +static void +evthr_free(struct evthr *thread) +{ + if (thread == NULL) { + return; + } + + if (thread->rdr > 0) { + close(thread->rdr); + } + + if (thread->wdr > 0) { + close(thread->wdr); + } + + if (thread->thr) { + free(thread->thr); + } + + if (thread->event) { + event_free(thread->event); + } + +#ifdef EVTHR_SHARED_PIPE + if (thread->shared_pool_ev) { + event_free(thread->shared_pool_ev); + } +#endif + + if (thread->evbase) { + event_base_free(thread->evbase); + } + + free(thread); +} + +static struct evthr * +evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args) +{ + struct evthr *thread; + int fds[2]; + + if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) { + return NULL; + } + + evutil_make_socket_nonblocking(fds[0]); + evutil_make_socket_nonblocking(fds[1]); + + if (!(thread = calloc(1, sizeof(struct evthr)))) { + return NULL; + } + + thread->thr = malloc(sizeof(pthread_t)); + thread->arg = args; + thread->rdr = fds[0]; + thread->wdr = fds[1]; + + thread->init_cb = init_cb; + thread->exit_cb = exit_cb; + + if (pthread_mutex_init(&thread->lock, NULL)) { + evthr_free(thread); + return NULL; + } + + return thread; +} + +static int +evthr_start(struct evthr *thread) +{ + if (thread == NULL || thread->thr == NULL) { + return -1; + } + + if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) { + return -1; + } + + return 0; +} + +static void +evthr_pool_free(struct evthr_pool *pool) +{ + struct evthr *thread; + struct evthr *save; + + if (pool == NULL) { + return; + } + + TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) { + TAILQ_REMOVE(&pool->threads, thread, next); + + evthr_free(thread); + } + + free(pool); +} + +static enum evthr_res +evthr_pool_stop(struct evthr_pool *pool) +{ + struct evthr *thr; + struct evthr *save; + + if (pool == NULL) { + return EVTHR_RES_FATAL; + } + + TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) { + evthr_stop(thr); + } + + return EVTHR_RES_OK; +} + +static inline int +get_backlog_(struct evthr *thread) +{ + int backlog = 0; + + ioctl(thread->rdr, FIONREAD, &backlog); + + return (int)(backlog / sizeof(struct evthr_cmd)); +} + +static struct evthr_pool * +evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared) +{ + struct evthr_pool *pool; + int i; + +#ifdef EVTHR_SHARED_PIPE + int fds[2]; +#endif + + if (nthreads == 0) { + return NULL; + } + + if (!(pool = calloc(1, sizeof(struct evthr_pool)))) { + return NULL; + } + + pool->nthreads = nthreads; + TAILQ_INIT(&pool->threads); + +#ifdef EVTHR_SHARED_PIPE + if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) { + return NULL; + } + + evutil_make_socket_nonblocking(fds[0]); + evutil_make_socket_nonblocking(fds[1]); + + pool->rdr = fds[0]; + pool->wdr = fds[1]; +#endif + + for (i = 0; i < nthreads; i++) { + struct evthr * thread; + + if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) { + evthr_pool_free(pool); + return NULL; + } + +#ifdef EVTHR_SHARED_PIPE + thread->pool_rdr = fds[0]; +#endif + + TAILQ_INSERT_TAIL(&pool->threads, thread, next); + } + + return pool; +} + +static int +evthr_pool_start(struct evthr_pool *pool) +{ + struct evthr *evthr = NULL; + + if (pool == NULL) { + return -1; + } + + TAILQ_FOREACH(evthr, &pool->threads, next) { + if (evthr_start(evthr) < 0) { + return -1; + } + + usleep(5000); + } + + return 0; +} + + /* -------------------------------- HELPERS --------------------------------- */ static int @@ -220,7 +572,7 @@ modules_handlers_set(struct httpd_uri_map *uri_map) } static int -modules_init(struct event_base *evbase) +modules_init(void) { struct httpd_module **ptr; struct httpd_module *m; @@ -228,7 +580,7 @@ modules_init(struct event_base *evbase) for (ptr = httpd_modules; *ptr; ptr++) { m = *ptr; - m->initialized = (!m->init || m->init(evbase) == 0); + m->initialized = (!m->init || m->init() == 0); if (!m->initialized) { DPRINTF(E_FATAL, L_HTTPD, "%s init failed\n", m->name); @@ -291,6 +643,35 @@ modules_search(const char *path) /* --------------------------- REQUEST HELPERS ------------------------------ */ +static void +cors_headers_add(struct httpd_request *hreq, const char *allow_origin) +{ + if (allow_origin) + httpd_header_add(hreq->out_headers, "Access-Control-Allow-Origin", httpd_allow_origin); + + httpd_header_add(hreq->out_headers, "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); + httpd_header_add(hreq->out_headers, "Access-Control-Allow-Headers", "authorization"); +} + +static int +handle_cors_preflight(struct httpd_request *hreq, const char *allow_origin) +{ + bool is_cors_preflight; + + is_cors_preflight = ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin && + httpd_header_find(hreq->in_headers, "Origin") && + httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") ); + if (!is_cors_preflight) + return -1; + + cors_headers_add(hreq, allow_origin); + + // In this case there is no reason to go through httpd_send_reply + httpd_backend_reply_send(hreq->backend, HTTP_OK, "OK", NULL); + httpd_request_free(hreq); + return 0; +} + void httpd_request_handler_set(struct httpd_request *hreq) { @@ -750,57 +1131,6 @@ stream_fail_cb(httpd_connection *conn, void *arg) /* ---------------------------- MAIN HTTPD THREAD --------------------------- */ -static void * -httpd(void *arg) -{ - int ret; - - ret = db_perthread_init(); - if (ret < 0) - { - DPRINTF(E_LOG, L_HTTPD, "Error: DB init failed\n"); - - pthread_exit(NULL); - } - - event_base_dispatch(evbase_httpd); - - if (!httpd_exit) - DPRINTF(E_FATAL, L_HTTPD, "HTTPd event loop terminated ahead of time!\n"); - - db_perthread_deinit(); - - pthread_exit(NULL); -} - -static void -exit_cb(int fd, short event, void *arg) -{ - event_base_loopbreak(evbase_httpd); - - httpd_exit = 1; -} - -static int -handle_cors_preflight(struct httpd_request *hreq, const char *allow_origin) -{ - bool is_cors_preflight; - - is_cors_preflight = ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin && - httpd_header_find(hreq->in_headers, "Origin") && - httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") ); - if (!is_cors_preflight) - return -1; - - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Origin", allow_origin); - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Headers", "authorization"); - - // In this case there is no reason to go through httpd_send_reply - httpd_backend_reply_send(hreq->backend, HTTP_OK, "OK", NULL); - return 0; -} - static void request_cb(struct httpd_request *hreq, void *arg) { @@ -850,6 +1180,7 @@ httpd_stream_file(struct httpd_request *hreq, int id) void (*stream_cb)(int fd, short event, void *arg); struct stat sb; struct timeval tv; + struct event_base *evbase; const char *param; const char *param_end; const char *client_codecs; @@ -1041,7 +1372,9 @@ httpd_stream_file(struct httpd_request *hreq, int id) goto out_cleanup; } - st->ev = event_new(evbase_httpd, -1, EV_TIMEOUT, stream_cb, st); + evbase = httpd_request_evbase_get(hreq); + + st->ev = event_new(evbase, -1, EV_TIMEOUT, stream_cb, st); evutil_timerclear(&tv); if (!st->ev || (event_add(st->ev, &tv) < 0)) { @@ -1215,8 +1548,7 @@ httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struc (strstr(param, "gzip") || strstr(param, "*")) ); - if (httpd_allow_origin) - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Origin", httpd_allow_origin); + cors_headers_add(hreq, httpd_allow_origin); if (do_gzip && (gzbuf = httpd_gzip_deflate(evbuf))) { @@ -1233,11 +1565,15 @@ httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struc { httpd_backend_reply_send(hreq->backend, code, reason, evbuf); } + + httpd_request_free(hreq); } void httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason) { + cors_headers_add(hreq, httpd_allow_origin); + httpd_backend_reply_start_send(hreq->backend, code, reason); } @@ -1251,6 +1587,7 @@ void httpd_send_reply_end(struct httpd_request *hreq) { httpd_backend_reply_end_send(hreq->backend); + httpd_request_free(hreq); } // This is a modified version of evhttp_send_error (credit libevent) @@ -1261,8 +1598,8 @@ httpd_send_error(struct httpd_request *hreq, int error, const char *reason) httpd_headers_clear(hreq->out_headers); - if (httpd_allow_origin) - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Origin", httpd_allow_origin); + cors_headers_add(hreq, httpd_allow_origin); + httpd_header_add(hreq->out_headers, "Content-Type", "text/html"); httpd_header_add(hreq->out_headers, "Connection", "close"); @@ -1276,6 +1613,8 @@ httpd_send_error(struct httpd_request *hreq, int error, const char *reason) if (evbuf) evbuffer_free(evbuf); + + httpd_request_free(hreq); } bool @@ -1408,12 +1747,46 @@ httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passw return -1; } +static void +thread_init_cb(struct evthr *thr, void *shared) +{ + int ret; + + thread_setname(pthread_self(), "httpd"); + + ret = db_perthread_init(); + if (ret < 0) + { + DPRINTF(E_FATAL, L_HTTPD, "Error: DB init failed\n"); + thr->err = EIO; + return; + } + + thr->server = httpd_server_new(thr->evbase, httpd_port, request_cb, NULL); + if (!thr->server) + { + DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port); + thr->err = EIO; + return; + } + + // For CORS headers + httpd_server_allow_origin_set(thr->server, httpd_allow_origin); +} + +static void +thread_exit_cb(struct evthr *thr, void *shared) +{ + httpd_server_free(thr->server); + + db_perthread_deinit(); +} + /* Thread: main */ int httpd_init(const char *webroot) { struct stat sb; - int exit_fd; int ret; DPRINTF(E_DBG, L_HTTPD, "Starting web server with root directory '%s'\n", webroot); @@ -1434,42 +1807,18 @@ httpd_init(const char *webroot) return -1; } - CHECK_NULL(L_HTTPD, evbase_httpd = event_base_new()); - -#ifdef HAVE_EVENTFD - CHECK_ERRNO(L_HTTPD, exit_efd = eventfd(0, EFD_CLOEXEC)); - exit_fd = exit_efd; -#else -# ifdef HAVE_PIPE2 - CHECK_ERRNO(L_HTTPD, pipe2(exit_pipe, O_CLOEXEC)); -# else - CHECK_ERRNO(L_HTTPD, pipe(exit_pipe)); -# endif - exit_fd = exit_pipe[0]; -#endif /* HAVE_EVENTFD */ - CHECK_NULL(L_HTTPD, exitev = event_new(evbase_httpd, exit_fd, EV_READ, exit_cb, NULL)); - event_add(exitev, NULL); - + // Read config httpd_port = cfg_getint(cfg_getsec(cfg, "library"), "port"); - httpd_serv = httpd_server_new(evbase_httpd, httpd_port, request_cb, NULL); - if (!httpd_serv) - { - DPRINTF(E_FATAL, L_HTTPD, "Could not create HTTP server on port %d (server already running?)\n", httpd_port); - goto server_fail; - } - - // For CORS headers httpd_allow_origin = cfg_getstr(cfg_getsec(cfg, "general"), "allow_origin"); if (strlen(httpd_allow_origin) == 0) httpd_allow_origin = NULL; - httpd_server_allow_origin_set(httpd_serv, httpd_allow_origin); // Prepare modules, e.g. httpd_daap - ret = modules_init(evbase_httpd); + ret = modules_init(); if (ret < 0) { DPRINTF(E_FATAL, L_HTTPD, "Modules init failed\n"); - goto modules_fail; + goto error; } #ifdef HAVE_LIBWEBSOCKETS @@ -1477,39 +1826,28 @@ httpd_init(const char *webroot) if (ret < 0) { DPRINTF(E_FATAL, L_HTTPD, "Websocket init failed\n"); - goto websocket_fail; + goto error; } #endif - ret = pthread_create(&tid_httpd, NULL, httpd, NULL); - if (ret != 0) + httpd_threadpool = evthr_pool_wexit_new(THREADPOOL_NTHREADS, thread_init_cb, thread_exit_cb, NULL); + if (!httpd_threadpool) { - DPRINTF(E_FATAL, L_HTTPD, "Could not spawn HTTPd thread: %s\n", strerror(errno)); - goto thread_fail; + DPRINTF(E_LOG, L_HTTPD, "Could not create httpd thread pool\n"); + goto error; } - thread_setname(tid_httpd, "httpd"); + ret = evthr_pool_start(httpd_threadpool); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not spawn worker threads\n"); + goto error; + } return 0; - thread_fail: -#ifdef HAVE_LIBWEBSOCKETS - websocket_deinit(); - websocket_fail: -#endif - modules_deinit(); - modules_fail: - httpd_server_free(httpd_serv); - server_fail: - event_free(exitev); -#ifdef HAVE_EVENTFD - close(exit_efd); -#else - close(exit_pipe[0]); - close(exit_pipe[1]); -#endif - event_base_free(evbase_httpd); - + error: + httpd_deinit(); return -1; } @@ -1517,49 +1855,13 @@ httpd_init(const char *webroot) void httpd_deinit(void) { - int ret; - -#ifdef HAVE_EVENTFD - ret = eventfd_write(exit_efd, 1); - if (ret < 0) - { - DPRINTF(E_FATAL, L_HTTPD, "Could not send exit event: %s\n", strerror(errno)); - - return; - } -#else - int dummy = 42; - - ret = write(exit_pipe[1], &dummy, sizeof(dummy)); - if (ret != sizeof(dummy)) - { - DPRINTF(E_FATAL, L_HTTPD, "Could not write to exit fd: %s\n", strerror(errno)); - - return; - } -#endif - - ret = pthread_join(tid_httpd, NULL); - if (ret != 0) - { - DPRINTF(E_FATAL, L_HTTPD, "Could not join HTTPd thread: %s\n", strerror(errno)); - - return; - } - + // Give modules a chance to hang up connections nicely modules_deinit(); #ifdef HAVE_LIBWEBSOCKETS websocket_deinit(); #endif -#ifdef HAVE_EVENTFD - close(exit_efd); -#else - close(exit_pipe[0]); - close(exit_pipe[1]); -#endif - event_free(exitev); - httpd_server_free(httpd_serv); - event_base_free(evbase_httpd); + evthr_pool_stop(httpd_threadpool); + evthr_pool_free(httpd_threadpool); } diff --git a/src/httpd_daap.c b/src/httpd_daap.c index 7dca5f94..73a2eca1 100644 --- a/src/httpd_daap.c +++ b/src/httpd_daap.c @@ -243,12 +243,14 @@ daap_session_add(bool is_remote, int request_session_id) return s; } - /* ---------------------- UPDATE REQUESTS HANDLERS -------------------------- */ static void update_free(struct daap_update_request *ur) { + if (!ur) + return; + if (ur->timeout) event_free(ur->timeout); @@ -315,7 +317,9 @@ update_fail_cb(httpd_connection *conn, void *arg) httpd_request_closecb_set(ur->hreq, NULL, NULL); - httpd_request_backend_free(ur->hreq); // TODO check if still necessary + // Peer won't get this, it is just to make sure hreq and evhttp's request get + // freed + httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request"); update_remove(ur); } @@ -667,7 +671,7 @@ daap_reply_send(struct httpd_request *hreq, enum daap_reply_result result) switch (result) { case DAAP_REPLY_LOGOUT: - httpd_send_reply(hreq, 204, "Logout Successful", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_NOCONTENT, "Logout Successful", hreq->out_body, 0); break; case DAAP_REPLY_NO_CONTENT: httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); @@ -680,7 +684,7 @@ daap_reply_send(struct httpd_request *hreq, enum daap_reply_result result) httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); break; case DAAP_REPLY_FORBIDDEN: - httpd_send_error(hreq, 403, "Forbidden"); + httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden"); break; case DAAP_REPLY_BAD_REQUEST: httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request"); @@ -2349,7 +2353,7 @@ daap_reply_build(const char *uri, const char *user_agent, int is_remote) } static int -daap_init(struct event_base *evbase) +daap_init(void) { srand((unsigned)time(NULL)); current_rev = 2; diff --git a/src/httpd_dacp.c b/src/httpd_dacp.c index f4c3be4c..2eda58a7 100644 --- a/src/httpd_dacp.c +++ b/src/httpd_dacp.c @@ -30,10 +30,7 @@ #include #include -#ifdef HAVE_EVENTFD -# include -#endif - +#include #include #include "httpd_internal.h" @@ -51,6 +48,7 @@ struct dacp_update_request { struct httpd_request *hreq; + struct event *updateev; struct dacp_update_request *next; }; @@ -125,25 +123,13 @@ dacp_propset_userrating(const char *value, struct httpd_request *hreq); /* gperf static hash, dacp_prop.gperf */ #include "dacp_prop_hash.h" - -/* Play status update */ -#ifdef HAVE_EVENTFD -static int update_efd; -#else -static int update_pipe[2]; -#endif -static struct event *updateev; -/* Next revision number the client should call with */ -static int current_rev; - -/* Play status update requests */ +// Play status update requests static struct dacp_update_request *update_requests; +static pthread_mutex_t update_request_lck; +// Next revision number the client should call with +static int update_current_rev; -/* Seek timer */ -static struct event *seek_timer; -static int seek_target; - -/* If an item is removed from the library while in the queue, we replace it with this */ +// If an item is removed from the library while in the queue, we replace it with this static struct media_file_info dummy_mfi; static struct db_queue_item dummy_queue_item; @@ -600,6 +586,8 @@ speaker_volume_step(struct player_speaker_info *speaker_info, int step) static void seek_timer_cb(int fd, short what, void *arg) { + intptr_t seek_target_packed = (intptr_t)arg; + int seek_target = seek_target_packed; int ret; DPRINTF(E_DBG, L_DACP, "Seek timer expired, target %d ms\n", seek_target); @@ -660,7 +648,7 @@ dacp_request_authorize(struct httpd_request *hreq) /* ---------------------- UPDATE REQUESTS HANDLERS -------------------------- */ static int -make_playstatusupdate(struct evbuffer *evbuf) +make_playstatusupdate(struct evbuffer *evbuf, int current_rev) { struct player_status status; struct db_queue_item *queue_item = NULL; @@ -726,109 +714,45 @@ make_playstatusupdate(struct evbuffer *evbuf) } static void -playstatusupdate_cb(int fd, short what, void *arg) +playstatusupdate_cb(int fd, short what, void *arg); + +static struct dacp_update_request * +update_request_new(struct httpd_request *hreq) { + struct event_base *evbase; struct dacp_update_request *ur; - struct evbuffer *evbuf; - struct evbuffer *update; - uint8_t *buf; - size_t len; - int ret; -#ifdef HAVE_EVENTFD - eventfd_t count; + evbase = httpd_request_evbase_get(hreq); - ret = eventfd_read(update_efd, &count); - if (ret < 0) - { - DPRINTF(E_LOG, L_DACP, "Could not read playstatusupdate event counter: %s\n", strerror(errno)); + CHECK_NULL(L_DACP, ur = calloc(1, sizeof(struct dacp_update_request))); + CHECK_NULL(L_DACP, ur->updateev = event_new(evbase, -1, 0, playstatusupdate_cb, ur)); + ur->hreq = hreq; - goto readd; - } -#else - int dummy; - - read(update_pipe[0], &dummy, sizeof(dummy)); -#endif - - current_rev++; - - if (!update_requests) - goto readd; - - CHECK_NULL(L_DACP, evbuf = evbuffer_new()); - CHECK_NULL(L_DACP, update = evbuffer_new()); - - ret = make_playstatusupdate(update); - if (ret < 0) - goto out_free_update; - - len = evbuffer_get_length(update); - - for (ur = update_requests; update_requests; ur = update_requests) - { - update_requests = ur->next; - - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - // Only copy buffer if we actually need to reuse it - if (ur->next) - { - buf = evbuffer_pullup(update, -1); - evbuffer_add(evbuf, buf, len); - httpd_send_reply(ur->hreq, HTTP_OK, "OK", evbuf, 0); - } - else - httpd_send_reply(ur->hreq, HTTP_OK, "OK", update, 0); - - free(ur); - } - - out_free_update: - evbuffer_free(update); - evbuffer_free(evbuf); - readd: - ret = event_add(updateev, NULL); - if (ret < 0) - DPRINTF(E_LOG, L_DACP, "Couldn't re-add event for playstatusupdate\n"); -} - -/* Thread: player */ -static void -dacp_playstatus_update_handler(short event_mask) -{ - int ret; - -#ifdef HAVE_EVENTFD - ret = eventfd_write(update_efd, 1); - if (ret < 0) - DPRINTF(E_LOG, L_DACP, "Could not send status update event: %s\n", strerror(errno)); -#else - int dummy = 42; - - ret = write(update_pipe[1], &dummy, sizeof(dummy)); - if (ret != sizeof(dummy)) - DPRINTF(E_LOG, L_DACP, "Could not write to status update fd: %s\n", strerror(errno)); -#endif + return ur; } static void -update_fail_cb(httpd_connection *conn, void *arg) +update_request_free(struct dacp_update_request *ur) +{ + if (!ur) + return; + + if (ur->updateev) + event_free(ur->updateev); + + free(ur); +} + +static void +update_request_remove(struct dacp_update_request **head, struct dacp_update_request *ur) { - struct dacp_update_request *ur; struct dacp_update_request *p; - ur = (struct dacp_update_request *)arg; - - DPRINTF(E_DBG, L_DACP, "Update request: client closed connection\n"); - - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - if (ur == update_requests) - update_requests = ur->next; + if (ur == *head) + *head = ur->next; else { - for (p = update_requests; p && (p->next != ur); p = p->next) + for (p = *head; p && (p->next != ur); p = p->next) ; if (!p) @@ -840,8 +764,65 @@ update_fail_cb(httpd_connection *conn, void *arg) p->next = ur->next; } - httpd_request_backend_free(ur->hreq); // TODO check if still necessary - free(ur); + update_request_free(ur); +} + +static void +playstatusupdate_cb(int fd, short what, void *arg) +{ + struct dacp_update_request *ur = arg; + struct evbuffer *update; + int ret; + + CHECK_NULL(L_DACP, update = evbuffer_new()); + + ret = make_playstatusupdate(update, update_current_rev); + if (ret < 0) + goto error; + + httpd_request_closecb_set(ur->hreq, NULL, NULL); + + httpd_send_reply(ur->hreq, HTTP_OK, "OK", update, 0); + + pthread_mutex_lock(&update_request_lck); + update_request_remove(&update_requests, ur); + pthread_mutex_unlock(&update_request_lck); + + error: + evbuffer_free(update); +} + +static void +update_fail_cb(httpd_connection *conn, void *arg) +{ + struct dacp_update_request *ur = arg; + + DPRINTF(E_DBG, L_DACP, "Update request: client closed connection\n"); + + httpd_request_closecb_set(ur->hreq, NULL, NULL); + + // Peer won't get this, it is just to make sure hreq and evhttp's request get + // freed + httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request"); + + pthread_mutex_lock(&update_request_lck); + update_request_remove(&update_requests, ur); + pthread_mutex_unlock(&update_request_lck); +} + +/* Thread: player */ +static void +dacp_playstatus_update_handler(short event_mask) +{ + struct dacp_update_request *ur; + + pthread_mutex_lock(&update_request_lck); + update_current_rev++; + for (ur = update_requests; ur; ur = ur->next) + { + event_active(ur->updateev, 0, 0); + } + pthread_mutex_unlock(&update_request_lck); } @@ -1047,7 +1028,10 @@ dacp_propset_devicebusy(const char *value, struct httpd_request *hreq) static void dacp_propset_playingtime(const char *value, struct httpd_request *hreq) { + struct event_base *evbase; struct timeval tv; + int seek_target; + intptr_t seek_target_packed; int ret; ret = safe_atoi32(value, &seek_target); @@ -1058,9 +1042,13 @@ dacp_propset_playingtime(const char *value, struct httpd_request *hreq) return; } + seek_target_packed = seek_target; + evutil_timerclear(&tv); tv.tv_usec = 200 * 1000; - evtimer_add(seek_timer, &tv); + + evbase = httpd_request_evbase_get(hreq); + event_base_once(evbase, -1, EV_TIMEOUT, seek_timer_cb, (void *)seek_target_packed, &tv); } static void @@ -2275,9 +2263,9 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq) // Caller didn't use current revision number. It was probably his first // request so we will give him status immediately, incl. which revision number // to use when he calls again. - if (reqd_rev != current_rev) + if (reqd_rev != update_current_rev) { - ret = make_playstatusupdate(hreq->out_body); + ret = make_playstatusupdate(hreq->out_body, update_current_rev); if (ret < 0) httpd_send_error(hreq, 500, "Internal Server Error"); else @@ -2287,7 +2275,7 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq) } // Else, just let the request hang until we have changes to push back - ur = calloc(1, sizeof(struct dacp_update_request)); + ur = update_request_new(hreq); if (!ur) { DPRINTF(E_LOG, L_DACP, "Out of memory for update request\n"); @@ -2296,10 +2284,10 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq) return -1; } - ur->hreq = hreq; - + pthread_mutex_lock(&update_request_lck); ur->next = update_requests; update_requests = ur; + pthread_mutex_unlock(&update_request_lck); /* If the connection fails before we have an update to push out * to the client, we need to know. @@ -2865,15 +2853,9 @@ dacp_request(struct httpd_request *hreq) hreq->handler(hreq); } -// Forward -static void -dacp_deinit(void); - static int -dacp_init(struct event_base *evbase) +dacp_init(void) { - current_rev = 2; - dummy_mfi.id = DB_MEDIA_FILE_NON_PERSISTENT_ID; dummy_mfi.title = CFG_NAME_UNKNOWN_TITLE; dummy_mfi.artist = CFG_NAME_UNKNOWN_ARTIST; @@ -2886,41 +2868,11 @@ dacp_init(struct event_base *evbase) dummy_queue_item.album = CFG_NAME_UNKNOWN_ALBUM; dummy_queue_item.genre = CFG_NAME_UNKNOWN_GENRE; -#ifdef HAVE_EVENTFD - update_efd = eventfd(0, EFD_CLOEXEC); - if (update_efd < 0) - { - DPRINTF(E_LOG, L_DACP, "Could not create update eventfd: %s\n", strerror(errno)); - goto error; - } - - CHECK_NULL(L_DACP, updateev = event_new(evbase, update_efd, EV_READ, playstatusupdate_cb, NULL)); -#else -# ifdef HAVE_PIPE2 - int ret = pipe2(update_pipe, O_CLOEXEC); -# else - int ret = pipe(update_pipe); -# endif - if (ret < 0) - { - DPRINTF(E_LOG, L_DACP, "Could not create update pipe: %s\n", strerror(errno)); - goto error; - } - - CHECK_NULL(L_DACP, updateev = event_new(evbase, update_pipe[0], EV_READ, playstatusupdate_cb, NULL)); -#endif /* HAVE_EVENTFD */ - - event_add(updateev, NULL); - - CHECK_NULL(L_DACP, seek_timer = evtimer_new(evbase, seek_timer_cb, NULL)); - + CHECK_ERR(L_DACP, mutex_init(&update_request_lck)); + update_current_rev = 2; listener_add(dacp_playstatus_update_handler, LISTENER_PLAYER | LISTENER_VOLUME | LISTENER_QUEUE); return 0; - - error: - dacp_deinit(); - return -1; } static void @@ -2929,6 +2881,8 @@ dacp_deinit(void) struct dacp_update_request *ur; httpd_connection *conn; + listener_remove(dacp_playstatus_update_handler); + for (ur = update_requests; update_requests; ur = update_requests) { update_requests = ur->next; @@ -2937,23 +2891,8 @@ dacp_deinit(void) conn = httpd_request_connection_get(ur->hreq); httpd_connection_free(conn); // TODO necessary? - free(ur); + update_request_free(ur); } - - listener_remove(dacp_playstatus_update_handler); - - if (seek_timer) - event_free(seek_timer); - - if (updateev) - event_free(updateev); - -#ifdef HAVE_EVENTFD - close(update_efd); -#else - close(update_pipe[0]); - close(update_pipe[1]); -#endif } struct httpd_module httpd_dacp = diff --git a/src/httpd_internal.h b/src/httpd_internal.h index 76c4bce6..267328da 100644 --- a/src/httpd_internal.h +++ b/src/httpd_internal.h @@ -108,7 +108,7 @@ struct httpd_module // Pointer to the module's handler definitions struct httpd_uri_map *handlers; - int (*init)(struct event_base *); + int (*init)(void); void (*deinit)(void); void (*request)(struct httpd_request *); }; @@ -138,7 +138,6 @@ struct httpd_request { // Backend private request object httpd_backend *backend; // For storing data that the actual backend doesn't have readily available - // e.g. peer address string for libevhtp httpd_backend_data *backend_data; // User-agent (if available) const char *user_agent; diff --git a/src/httpd_jsonapi.c b/src/httpd_jsonapi.c index 58a5a2f8..7fd3afb4 100644 --- a/src/httpd_jsonapi.c +++ b/src/httpd_jsonapi.c @@ -4755,7 +4755,7 @@ jsonapi_request(struct httpd_request *hreq) } static int -jsonapi_init(struct event_base *evbase) +jsonapi_init(void) { char *temp_path; diff --git a/src/httpd_libevhttp.c b/src/httpd_libevhttp.c index 49eac9f0..e8641244 100644 --- a/src/httpd_libevhttp.c +++ b/src/httpd_libevhttp.c @@ -22,16 +22,18 @@ #include #include +#include #include +#include // listen() #include #include #include #include #include +#include -#include "misc.h" // For net_evhttp_bind -#include "worker.h" +#include "misc.h" #include "logger.h" #include "httpd_internal.h" @@ -46,17 +48,12 @@ struct httpd_uri_parsed struct httpd_server { + int fd; struct evhttp *evhttp; httpd_request_cb request_cb; void *request_cb_arg; }; -struct cmdargs -{ - httpd_server *server; - httpd_backend *backend; -}; - const char * httpd_query_value_find(httpd_query *query, const char *key) @@ -141,15 +138,20 @@ struct event_base * httpd_request_evbase_get(struct httpd_request *hreq) { httpd_connection *conn = httpd_request_connection_get(hreq); - if (conn) + if (!conn) return NULL; return evhttp_connection_get_base(conn); } +int alloc_count; + void httpd_request_free(struct httpd_request *hreq) { + alloc_count--; + DPRINTF(E_LOG, L_HTTPD, "DEALLOC - COUNT %d\n", alloc_count); + if (!hreq) return; @@ -169,6 +171,9 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen CHECK_NULL(L_HTTPD, hreq = calloc(1, sizeof(struct httpd_request))); + alloc_count++; + DPRINTF(E_LOG, L_HTTPD, "ALLOC - COUNT %d\n", alloc_count); + // Populate hreq by getting values from the backend (or from the caller) hreq->backend = backend; if (backend) @@ -217,21 +222,24 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen } static void -request_free_cb(httpd_backend *backend, void *arg) +gencb_httpd(httpd_backend *backend, void *arg) { - struct httpd_request *hreq = arg; - - httpd_request_free(hreq); -} - -// Executed in a worker thread -static void -gencb_worker_cb(void *arg) -{ - struct cmdargs *cmd = arg; - httpd_server *server = cmd->server; - httpd_backend *backend = cmd->backend; + httpd_server *server = arg; struct httpd_request *hreq; + struct bufferevent *bufev; + + // Clear the proxy request flag set by evhttp if the request URI was absolute. + // It has side-effects on Connection: keep-alive + backend->flags &= ~EVHTTP_PROXY_REQUEST; + + // This is a workaround for some versions of libevent (2.0 and 2.1) that don't + // detect if the client hangs up, and thus don't clean up and never call the + // connection close cb(). See github issue #870 and + // https://github.com/libevent/libevent/issues/666. It should probably be + // removed again in the future. + bufev = evhttp_connection_get_bufferevent(evhttp_request_get_connection(backend)); + if (bufev) + bufferevent_enable(bufev, EV_READ); hreq = httpd_request_new(backend, NULL, NULL); if (!hreq) @@ -240,35 +248,21 @@ gencb_worker_cb(void *arg) return; } - evhttp_request_set_on_complete_cb(backend, request_free_cb, hreq); - server->request_cb(hreq, server->request_cb_arg); } -// Callback from evhttp in httpd thread -static void -gencb_httpd(httpd_backend *backend, void *server) -{ - struct cmdargs cmd; - - cmd.server = server; - cmd.backend = backend; - - // Clear the proxy request flag set by evhttp if the request URI was absolute. - // It has side-effects on Connection: keep-alive - backend->flags &= ~EVHTTP_PROXY_REQUEST; - - // Defer the execution to a worker thread - worker_execute(gencb_worker_cb, &cmd, sizeof(cmd), 0); -} - void httpd_server_free(httpd_server *server) { if (!server) return; - evhttp_free(server->evhttp); + if (server->fd > 0) + close(server->fd); + + if (server->evhttp) + evhttp_free(server->evhttp); + free(server); } @@ -284,7 +278,16 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c server->request_cb = cb; server->request_cb_arg = arg; - ret = net_evhttp_bind(server->evhttp, port, "httpd"); + server->fd = net_bind(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd"); + if (server->fd <= 0) + goto error; + + // Backlog of 128 is the same libevent uses + ret = listen(server->fd, 128); + if (ret < 0) + goto error; + + ret = evhttp_accept_socket(server->evhttp, server->fd); if (ret < 0) goto error; diff --git a/src/httpd_rsp.c b/src/httpd_rsp.c index edbf3e3f..38702626 100644 --- a/src/httpd_rsp.c +++ b/src/httpd_rsp.c @@ -880,7 +880,7 @@ rsp_request(struct httpd_request *hreq) } static int -rsp_init(struct event_base *evbase) +rsp_init(void) { snprintf(rsp_filter_files, sizeof(rsp_filter_files), "f.data_kind = %d", DATA_KIND_FILE); diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 65994c49..28fccc13 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2015 Espen Jürgensen + * Copyright (C) 2023 Espen Jürgensen * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -23,498 +23,160 @@ #include #include #include -#include -#include - #include #include -#include +#include #include +#include #include "httpd_internal.h" -#include "httpd_streaming.h" +#include "outputs/streaming.h" #include "logger.h" #include "conffile.h" -#include "transcode.h" -#include "player.h" -#include "listener.h" -#include "db.h" -// Seconds between sending silence when player is idle -// (to prevent client from hanging up) -#define STREAMING_SILENCE_INTERVAL 1 -// How many bytes we try to read at a time from the httpd pipe -#define STREAMING_READ_SIZE STOB(352, 16, 2) - -#define STREAMING_MP3_SAMPLE_RATE 44100 -#define STREAMING_MP3_BPS 16 -#define STREAMING_MP3_CHANNELS 2 -#define STREAMING_MP3_BIT_RATE 192000 - - -// Linked list of mp3 streaming requests struct streaming_session { struct httpd_request *hreq; - struct streaming_session *next; - bool require_icy; // Client requested icy meta - size_t bytes_sent; // Audio bytes sent since last metablock + int fd; + struct event *readev; + bool require_icy; + size_t bytes_sent; }; -static pthread_mutex_t streaming_sessions_lck; -static struct streaming_session *streaming_sessions; -// Means we're not able to encode to mp3 -static bool streaming_not_supported; - -// Interval for sending silence when playback is paused -static struct timeval streaming_silence_tv = { STREAMING_SILENCE_INTERVAL, 0 }; - -// Input buffer, output buffer and encoding ctx for transcode -static struct encode_ctx *streaming_encode_ctx; -static struct evbuffer *streaming_encoded_data; -static struct media_quality streaming_quality_in; -static struct media_quality streaming_quality_out = { STREAMING_MP3_SAMPLE_RATE, STREAMING_MP3_BPS, STREAMING_MP3_CHANNELS, STREAMING_MP3_BIT_RATE }; - -// Used for pushing events and data from the player -static struct event *streamingev; -static struct event *metaev; -static struct player_status streaming_player_status; -static int streaming_player_changed; -static int streaming_pipe[2]; -static int streaming_meta[2]; - -#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes) -#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer) +static struct media_quality streaming_default_quality = { + .sample_rate = 44100, + .bits_per_sample = 16, + .channels = 2, + .bit_rate = 128000, +}; /* As streaming quality goes up, we send more data to the remote client. With a - * smaller ICY_METAINT value we have to splice metadata more frequently - on - * some devices with small input buffers, a higher quality stream and low + * smaller ICY_METAINT value we have to splice metadata more frequently - on + * some devices with small input buffers, a higher quality stream and low * ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge */ -#define STREAMING_ICY_METAINT_DEFAULT 16384 -static unsigned short streaming_icy_metaint = STREAMING_ICY_METAINT_DEFAULT; -static unsigned streaming_icy_clients; -static char streaming_icy_title[STREAMING_ICY_METATITLELEN_MAX]; +static unsigned short streaming_icy_metaint = 16384; +/* ----------------------------- Session helpers ---------------------------- */ + static void -streaming_close_cb(httpd_connection *conn, void *arg) +session_free(struct streaming_session *session) { - struct streaming_session *this; - struct streaming_session *session; - struct streaming_session *prev; - - this = (struct streaming_session *)arg; - - DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", this->hreq->peer_address, (int)this->hreq->peer_port); - - pthread_mutex_lock(&streaming_sessions_lck); - if (!streaming_sessions) - { - // This close comes during deinit() - we don't free `this` since it is - // already a dangling ptr (free'd in deinit()) at this stage - pthread_mutex_unlock(&streaming_sessions_lck); - return; - } - - prev = NULL; - for (session = streaming_sessions; session; session = session->next) - { - if (session->hreq == this->hreq) - break; - - prev = session; - } - if (!session) + return; + + if (session->readev) { - DPRINTF(E_LOG, L_STREAMING, "Bug! Got a failure callback for an unknown stream (%s:%d)\n", this->hreq->peer_address, (int)this->hreq->peer_port); - free(this); - pthread_mutex_unlock(&streaming_sessions_lck); - return; + streaming_session_deregister(session->fd); + event_free(session->readev); } - if (!prev) - streaming_sessions = session->next; - else - prev->next = session->next; - - if (session->require_icy) - --streaming_icy_clients; - - // Valgrind says libevent doesn't free the request on disconnect (even though it owns it - libevent bug?), - // so we do it with a reply end - httpd_send_reply_end(session->hreq); free(session); - - if (!streaming_sessions) - { - DPRINTF(E_INFO, L_STREAMING, "No more clients, will stop streaming\n"); - event_del(streamingev); - event_del(metaev); - } - - pthread_mutex_unlock(&streaming_sessions_lck); } -static void -streaming_end(void) +static struct streaming_session * +session_new(struct httpd_request *hreq, bool require_icy) { struct streaming_session *session; - pthread_mutex_lock(&streaming_sessions_lck); - for (session = streaming_sessions; streaming_sessions; session = streaming_sessions) - { - DPRINTF(E_INFO, L_STREAMING, "Force close stream to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port); + CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session))); - httpd_request_closecb_set(session->hreq, NULL, NULL); - httpd_send_reply_end(session->hreq); + session->hreq = hreq; + session->require_icy = require_icy; - streaming_sessions = session->next; - free(session); - } - pthread_mutex_unlock(&streaming_sessions_lck); - - event_del(streamingev); - event_del(metaev); + return session; } static void -streaming_meta_cb(evutil_socket_t fd, short event, void *arg) +session_end(struct streaming_session *session) { - struct media_quality quality; - struct decode_ctx *decode_ctx; - int ret; + DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port); - transcode_encode_cleanup(&streaming_encode_ctx); - - ret = read(fd, &quality, sizeof(struct media_quality)); - if (ret != sizeof(struct media_quality)) - goto error; - - decode_ctx = NULL; - if (quality.bits_per_sample == 16) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality); - else if (quality.bits_per_sample == 24) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality); - else if (quality.bits_per_sample == 32) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality); - - if (!decode_ctx) - goto error; - - streaming_encode_ctx = transcode_encode_setup(XCODE_MP3, &streaming_quality_out, decode_ctx, NULL, 0, 0); - transcode_decode_cleanup(&decode_ctx); - if (!streaming_encode_ctx) - { - DPRINTF(E_LOG, L_STREAMING, "Will not be able to stream MP3, libav does not support MP3 encoding: %d/%d/%d @ %d\n", streaming_quality_out.sample_rate, streaming_quality_out.bits_per_sample, streaming_quality_out.channels, streaming_quality_out.bit_rate); - streaming_not_supported = 1; - streaming_end(); - return; - } - - streaming_quality_in = quality; - streaming_not_supported = 0; - - return; - - error: - DPRINTF(E_LOG, L_STREAMING, "Unknown or unsupported quality of input data (%d/%d/%d), cannot MP3 encode\n", quality.sample_rate, quality.bits_per_sample, quality.channels); - streaming_not_supported = 1; - streaming_end(); + // Valgrind says libevent doesn't free the request on disconnect (even though + // it owns it - libevent bug?), so we do it with a reply end. This also makes + // sure the hreq gets freed. + httpd_send_reply_end(session->hreq); + session_free(session); } -static int -encode_buffer(uint8_t *buffer, size_t size) + +/* ----------------------------- Event callbacks ---------------------------- */ + +static void +conn_close_cb(httpd_connection *conn, void *arg) { - transcode_frame *frame; - int samples; - int ret; + struct streaming_session *session = arg; - if (streaming_not_supported) - { - DPRINTF(E_LOG, L_STREAMING, "Streaming unsupported\n"); - return -1; - } - - if (streaming_quality_in.channels == 0) - { - DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", streaming_quality_in.sample_rate, streaming_quality_in.bits_per_sample, streaming_quality_in.channels); - return -1; - } - - samples = BTOS(size, streaming_quality_in.bits_per_sample, streaming_quality_in.channels); - - frame = transcode_frame_new(buffer, size, samples, &streaming_quality_in); - if (!frame) - { - DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n"); - return -1; - } - - ret = transcode_encode(streaming_encoded_data, streaming_encode_ctx, frame, 0); - transcode_frame_free(frame); - - return ret; -} - -/* We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must - * provide a buf of this size to avoid needless mallocs - * - * The icy meta block is defined by a single byte indicating how many double byte - * words used for the actual meta. Unused bytes are null padded - * - * https://stackoverflow.com/questions/4911062/pulling-track-info-from-an-audio-stream-using-php/4914538#4914538 - * http://www.smackfu.com/stuff/programming/shoutcast.html - */ -static uint8_t * -streaming_icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], const char *title, unsigned *buflen) -{ - unsigned titlelen; - unsigned metalen; - uint8_t no16s; - - *buflen = 0; - - if (title == NULL) - { - no16s = 0; - memcpy(buf, &no16s, 1); - - *buflen = 1; - } - else - { - titlelen = strlen(title); - if (titlelen > STREAMING_ICY_METATITLELEN_MAX) - titlelen = STREAMING_ICY_METATITLELEN_MAX; // dont worry about the null byte - - // [0] 1x byte N, indicate the total number of 16 bytes words required - // to represent the meta data - // [1..N] meta data book ended by "StreamTitle='" and "';" - // - // The '15' is strlen of StreamTitle=' + '; - no16s = (15 + titlelen)/16 +1; - metalen = 1 + no16s*16; - memset(buf, 0, metalen); - - memcpy(buf, &no16s, 1); - memcpy(buf+1, (const uint8_t*)"StreamTitle='", 13); - memcpy(buf+14, title, titlelen); - memcpy(buf+14+titlelen, (const uint8_t*)"';", 2); - - *buflen = metalen; - } - - return buf; -} - -static uint8_t * -streaming_icy_meta_splice(const uint8_t *data, size_t datalen, off_t offset, size_t *len) -{ - uint8_t meta[STREAMING_ICY_METALEN_MAX+1]; // Buffer, of max sz, for the created icymeta - unsigned metalen; // How much of the buffer is in use - uint8_t *buf; // Client returned buffer; contains the audio (from data) spliced w/meta (from meta) - - if (data == NULL || datalen == 0) - return NULL; - - memset(meta, 0, sizeof(meta)); - streaming_icy_meta_create(meta, streaming_icy_title, &metalen); - - *len = datalen + metalen; - // DPRINTF(E_DBG, L_STREAMING, "splicing meta, audio block=%d bytes, offset=%d, metalen=%d new buflen=%d\n", datalen, offset, metalen, *len); - buf = malloc(*len); - memcpy(buf, data, offset); - memcpy(buf+offset, &meta[0], metalen); - memcpy(buf+offset+metalen, data+offset, datalen-offset); - - return buf; + session_end(session); } static void -streaming_player_status_update(void) +read_cb(evutil_socket_t fd, short event, void *arg) { - struct db_queue_item *queue_item; - uint32_t prev_id; - - prev_id = streaming_player_status.id; - player_get_status(&streaming_player_status); - - if (prev_id == streaming_player_status.id || !streaming_icy_clients) - { - return; - } - - queue_item = db_queue_fetch_byfileid(streaming_player_status.id); - if (!queue_item) - { - streaming_icy_title[0] = '\0'; - return; - } - - snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s - %s", queue_item->title, queue_item->artist); - free_queue_item(queue_item, 0); -} - -static void -streaming_send_cb(evutil_socket_t fd, short event, void *arg) -{ - struct streaming_session *session; - struct evbuffer *evbuf; - uint8_t rawbuf[STREAMING_READ_SIZE]; - uint8_t *buf; - uint8_t *splice_buf = NULL; - size_t splice_len; - size_t count; - int overflow; + struct streaming_session *session = arg; + struct httpd_request *hreq; int len; - int ret; - // Player wrote data to the pipe (EV_READ) - if (event & EV_READ) + CHECK_NULL(L_STREAMING, hreq = session->hreq); + + len = evbuffer_read(hreq->out_body, fd, -1); + if (len < 0 && errno != EAGAIN) { - while (1) - { - ret = read(fd, &rawbuf, sizeof(rawbuf)); - if (ret <= 0) - break; - - if (streaming_player_changed) - { - streaming_player_changed = 0; - streaming_player_status_update(); - } - - ret = encode_buffer(rawbuf, ret); - if (ret < 0) - return; - } - } - // Event timed out, let's see what the player is doing and send silence if it is paused - else - { - if (streaming_player_changed) - { - streaming_player_changed = 0; - streaming_player_status_update(); - } - - if (streaming_player_status.status != PLAY_PAUSED) - return; - - memset(&rawbuf, 0, sizeof(rawbuf)); - ret = encode_buffer(rawbuf, sizeof(rawbuf)); - if (ret < 0) - return; + httpd_request_closecb_set(hreq, NULL, NULL); + session_end(session); + return; } - len = evbuffer_get_length(streaming_encoded_data); - if (len == 0) - return; + httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL); - // Send data - evbuf = evbuffer_new(); - pthread_mutex_lock(&streaming_sessions_lck); - for (session = streaming_sessions; session; session = session->next) - { - // Does this session want ICY meta data and is it time to send? - count = session->bytes_sent+len; - if (session->require_icy && count > streaming_icy_metaint) - { - overflow = count%streaming_icy_metaint; - buf = evbuffer_pullup(streaming_encoded_data, -1); - - // DPRINTF(E_DBG, L_STREAMING, "session=%x sent=%ld len=%ld overflow=%ld\n", session, session->bytes_sent, len, overflow); - - // Splice the 'icy title' in with the encoded audio data - splice_len = 0; - splice_buf = streaming_icy_meta_splice(buf, len, len-overflow, &splice_len); - - evbuffer_add(evbuf, splice_buf, splice_len); - - free(splice_buf); - splice_buf = NULL; - - httpd_send_reply_chunk(session->hreq, evbuf, NULL, NULL); - - if (session->next == NULL) - { - // We're the last session, drop the contents of the encoded buffer - evbuffer_drain(streaming_encoded_data, len); - } - session->bytes_sent = overflow; - } - else - { - if (session->next) - { - buf = evbuffer_pullup(streaming_encoded_data, -1); - evbuffer_add(evbuf, buf, len); - httpd_send_reply_chunk(session->hreq, evbuf, NULL, NULL); - } - else - { - httpd_send_reply_chunk(session->hreq, streaming_encoded_data, NULL, NULL); - } - session->bytes_sent += len; - } - } - pthread_mutex_unlock(&streaming_sessions_lck); - - evbuffer_free(evbuf); + session->bytes_sent += len; } -// Thread: player (not fully thread safe, but hey...) -static void -player_change_cb(short event_mask) -{ - streaming_player_changed = 1; -} -// Thread: player (also prone to race conditions, mostly during deinit) -void -streaming_write(struct output_buffer *obuf) -{ - int ret; +/* -------------------------- Module implementation ------------------------- */ - // Explicit no-lock - let the write to pipes fail if during deinit - if (!streaming_sessions) - return; - - if (!quality_is_equal(&obuf->data[0].quality, &streaming_quality_in)) - { - ret = write(streaming_meta[1], &obuf->data[0].quality, sizeof(struct media_quality)); - if (ret < 0) - { - if (errno == EBADF) - DPRINTF(E_LOG, L_STREAMING, "streaming pipe already closed\n"); - else - DPRINTF(E_LOG, L_STREAMING, "Error writing to streaming pipe: %s\n", strerror(errno)); - return; - } - } - - ret = write(streaming_pipe[1], obuf->data[0].buffer, obuf->data[0].bufsize); - if (ret < 0) - { - if (errno == EAGAIN) - DPRINTF(E_WARN, L_STREAMING, "Streaming pipe full, skipping write\n"); - else - { - if (errno == EBADF) - DPRINTF(E_LOG, L_STREAMING, "Streaming pipe already closed\n"); - else - DPRINTF(E_LOG, L_STREAMING, "Error writing to streaming pipe: %s\n", strerror(errno)); - } - } -} - -// Since streaming is a one-trick pony it doesn't need handlers static int -streaming_dummy_handler(struct httpd_request *hreq) +streaming_mp3_handler(struct httpd_request *hreq) { + struct streaming_session *session; + struct event_base *evbase; + const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); + const char *param; + bool require_icy; + char buf[9]; + + param = httpd_header_find(hreq->in_headers, "Icy-MetaData"); + require_icy = (param && strcmp(param, "1") == 0); + if (require_icy) + { + httpd_header_add(hreq->out_headers, "icy-name", name); + snprintf(buf, sizeof(buf)-1, "%d", streaming_icy_metaint); + httpd_header_add(hreq->out_headers, "icy-metaint", buf); + } + + session = session_new(hreq, require_icy); + if (!session) + return -1; + + // Ask streaming output module for a fd to read mp3 from + session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality); + + CHECK_NULL(L_STREAMING, evbase = httpd_request_evbase_get(hreq)); + CHECK_NULL(L_STREAMING, session->readev = event_new(evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); + event_add(session->readev, NULL); + + httpd_request_closecb_set(hreq, conn_close_cb, session); + + httpd_header_add(hreq->out_headers, "Content-Type", "audio/mpeg"); + httpd_header_add(hreq->out_headers, "Server", PACKAGE_NAME "/" VERSION); + httpd_header_add(hreq->out_headers, "Cache-Control", "no-cache"); + httpd_header_add(hreq->out_headers, "Pragma", "no-cache"); + httpd_header_add(hreq->out_headers, "Expires", "Mon, 31 Aug 2015 06:00:00 GMT"); + + httpd_send_reply_start(hreq, HTTP_OK, "OK"); + return 0; } @@ -522,7 +184,7 @@ static struct httpd_uri_map streaming_handlers[] = { { .regexp = "^/stream.mp3$", - .handler = streaming_dummy_handler + .handler = streaming_mp3_handler }, { .regexp = NULL, @@ -533,92 +195,34 @@ static struct httpd_uri_map streaming_handlers[] = static void streaming_request(struct httpd_request *hreq) { - struct streaming_session *session; - cfg_t *lib; - const char *name; - const char *param; - bool require_icy = false; - char buf[9]; + int ret; - if (streaming_not_supported) + if (!hreq->handler) { - DPRINTF(E_LOG, L_STREAMING, "Got MP3 streaming request, but cannot encode to MP3\n"); + DPRINTF(E_LOG, L_STREAMING, "Unrecognized path in streaming request: '%s'\n", hreq->uri); - httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); + httpd_send_error(hreq, HTTP_NOTFOUND, NULL); return; } - param = httpd_header_find(hreq->in_headers, "Icy-MetaData"); - if (param && strcmp(param, "1") == 0) - require_icy = true; - - DPRINTF(E_INFO, L_STREAMING, "Beginning mp3 streaming (with icy=%d, icy_metaint=%d) to %s:%d\n", require_icy, streaming_icy_metaint, hreq->peer_address, (int)hreq->peer_port); - - lib = cfg_getsec(cfg, "library"); - name = cfg_getstr(lib, "name"); - - httpd_header_add(hreq->out_headers, "Content-Type", "audio/mpeg"); - httpd_header_add(hreq->out_headers, "Server", PACKAGE_NAME "/" VERSION); - httpd_header_add(hreq->out_headers, "Cache-Control", "no-cache"); - httpd_header_add(hreq->out_headers, "Pragma", "no-cache"); - httpd_header_add(hreq->out_headers, "Expires", "Mon, 31 Aug 2015 06:00:00 GMT"); - if (require_icy) - { - ++streaming_icy_clients; - httpd_header_add(hreq->out_headers, "icy-name", name); - snprintf(buf, sizeof(buf)-1, "%d", streaming_icy_metaint); - httpd_header_add(hreq->out_headers, "icy-metaint", buf); - } - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Origin", "*"); - httpd_header_add(hreq->out_headers, "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS"); - - httpd_send_reply_start(hreq, HTTP_OK, "OK"); - - session = calloc(1, sizeof(struct streaming_session)); - if (!session) - { - DPRINTF(E_LOG, L_STREAMING, "Out of memory for streaming request\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - return; - } - - pthread_mutex_lock(&streaming_sessions_lck); - - if (!streaming_sessions) - { - event_add(streamingev, &streaming_silence_tv); - event_add(metaev, NULL); - } - - session->hreq = hreq; - session->next = streaming_sessions; - session->require_icy = require_icy; - session->bytes_sent = 0; - streaming_sessions = session; - - pthread_mutex_unlock(&streaming_sessions_lck); - - httpd_request_closecb_set(hreq, streaming_close_cb, session); + ret = hreq->handler(hreq); + if (ret < 0) + httpd_send_error(hreq, HTTP_INTERNAL, NULL); } static int -streaming_init(struct event_base *evbase) +streaming_init(void) { - int ret; - cfg_t *cfgsec; int val; - cfgsec = cfg_getsec(cfg, "streaming"); - - val = cfg_getint(cfgsec, "sample_rate"); + val = cfg_getint(cfg_getsec(cfg, "streaming"), "sample_rate"); // Validate against the variations of libmp3lame's supported sample rates: 32000/44100/48000 if (val % 11025 > 0 && val % 12000 > 0 && val % 8000 > 0) - DPRINTF(E_LOG, L_STREAMING, "Non standard streaming sample_rate=%d, defaulting\n", val); + DPRINTF(E_LOG, L_STREAMING, "Unsupported streaming sample_rate=%d, defaulting\n", val); else - streaming_quality_out.sample_rate = val; + streaming_default_quality.sample_rate = val; - val = cfg_getint(cfgsec, "bit_rate"); + val = cfg_getint(cfg_getsec(cfg, "streaming"), "bit_rate"); switch (val) { case 64: @@ -626,109 +230,25 @@ streaming_init(struct event_base *evbase) case 128: case 192: case 320: - streaming_quality_out.bit_rate = val*1000; + streaming_default_quality.bit_rate = val*1000; break; default: DPRINTF(E_LOG, L_STREAMING, "Unsuppported streaming bit_rate=%d, supports: 64/96/128/192/320, defaulting\n", val); } - DPRINTF(E_INFO, L_STREAMING, "Streaming quality: %d/%d/%d @ %dkbps\n", streaming_quality_out.sample_rate, streaming_quality_out.bits_per_sample, streaming_quality_out.channels, streaming_quality_out.bit_rate/1000); + DPRINTF(E_INFO, L_STREAMING, "Streaming quality: %d/%d/%d @ %dkbps\n", + streaming_default_quality.sample_rate, streaming_default_quality.bits_per_sample, + streaming_default_quality.channels, streaming_default_quality.bit_rate/1000); - val = cfg_getint(cfgsec, "icy_metaint"); + val = cfg_getint(cfg_getsec(cfg, "streaming"), "icy_metaint"); // Too low a value forces server to send more meta than data if (val >= 4096 && val <= 131072) streaming_icy_metaint = val; else DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint); - ret = mutex_init(&streaming_sessions_lck); - if (ret < 0) - { - DPRINTF(E_FATAL, L_STREAMING, "Could not initialize mutex (%d): %s\n", ret, strerror(ret)); - goto error; - } - - // Non-blocking because otherwise httpd and player thread may deadlock -#ifdef HAVE_PIPE2 - ret = pipe2(streaming_pipe, O_CLOEXEC | O_NONBLOCK); -#else - if ( pipe(streaming_pipe) < 0 || - fcntl(streaming_pipe[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 || - fcntl(streaming_pipe[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ) - ret = -1; - else - ret = 0; -#endif - if (ret < 0) - { - DPRINTF(E_FATAL, L_STREAMING, "Could not create pipe: %s\n", strerror(errno)); - goto error; - } - -#ifdef HAVE_PIPE2 - ret = pipe2(streaming_meta, O_CLOEXEC | O_NONBLOCK); -#else - if ( pipe(streaming_meta) < 0 || - fcntl(streaming_meta[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 || - fcntl(streaming_meta[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ) - ret = -1; - else - ret = 0; -#endif - if (ret < 0) - { - DPRINTF(E_FATAL, L_STREAMING, "Could not create pipe: %s\n", strerror(errno)); - goto error; - } - - // Listen to playback changes so we don't have to poll to check for pausing - ret = listener_add(player_change_cb, LISTENER_PLAYER); - if (ret < 0) - { - DPRINTF(E_FATAL, L_STREAMING, "Could not add listener\n"); - goto error; - } - - // Initialize buffer for encoded mp3 audio and event for pipe reading - CHECK_NULL(L_STREAMING, streaming_encoded_data = evbuffer_new()); - - CHECK_NULL(L_STREAMING, streamingev = event_new(evbase, streaming_pipe[0], EV_TIMEOUT | EV_READ | EV_PERSIST, streaming_send_cb, NULL)); - CHECK_NULL(L_STREAMING, metaev = event_new(evbase, streaming_meta[0], EV_READ | EV_PERSIST, streaming_meta_cb, NULL)); - - streaming_icy_clients = 0; - return 0; - - error: - close(streaming_pipe[0]); - close(streaming_pipe[1]); - close(streaming_meta[0]); - close(streaming_meta[1]); - - return -1; -} - -static void -streaming_deinit(void) -{ - streaming_end(); - - event_free(metaev); - event_free(streamingev); - streamingev = NULL; - - listener_remove(player_change_cb); - - close(streaming_pipe[0]); - close(streaming_pipe[1]); - close(streaming_meta[0]); - close(streaming_meta[1]); - - transcode_encode_cleanup(&streaming_encode_ctx); - evbuffer_free(streaming_encoded_data); - - pthread_mutex_destroy(&streaming_sessions_lck); } struct httpd_module httpd_streaming = @@ -739,6 +259,5 @@ struct httpd_module httpd_streaming = .fullpaths = { "/stream.mp3", NULL }, .handlers = streaming_handlers, .init = streaming_init, - .deinit = streaming_deinit, .request = streaming_request, }; diff --git a/src/httpd_streaming.h b/src/httpd_streaming.h deleted file mode 100644 index e32d6094..00000000 --- a/src/httpd_streaming.h +++ /dev/null @@ -1,16 +0,0 @@ - -#ifndef __HTTPD_STREAMING_H__ -#define __HTTPD_STREAMING_H__ - -#include "outputs.h" - -/* httpd_streaming takes care of incoming requests to /stream.mp3 - * It will receive decoded audio from the player, and encode it, and - * stream it to one or more clients. It will not be available - * if a suitable ffmpeg/libav encoder is not present at runtime. - */ - -void -streaming_write(struct output_buffer *obuf); - -#endif /* !__HTTPD_STREAMING_H__ */ diff --git a/src/misc.c b/src/misc.c index 625b09a2..18513e91 100644 --- a/src/misc.c +++ b/src/misc.c @@ -317,6 +317,11 @@ net_bind(short unsigned *port, int type, const char *log_service_name) if (fd < 0) continue; + // Makes us able to attach multiple threads to the same port + ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)); + if (ret < 0) + continue; + // TODO libevent sets this, we do the same? ret = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)); if (ret < 0) diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index a6d02413..9929e7ca 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Espen Jürgensen + * Copyright (C) 2023 Espen Jürgensen * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -16,13 +16,530 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#ifdef HAVE_CONFIG_H +# include +#endif + #include #include -#include +#include #include +#include +#include +#include +#include "streaming.h" #include "outputs.h" -#include "httpd_streaming.h" +#include "misc.h" +#include "worker.h" +#include "transcode.h" +#include "logger.h" + +/* About + * + * This output takes the writes from the player thread, gives them to a worker + * thread for mp3 encoding, and then the mp3 is written to a fd for the httpd + * request handler to read and pass to clients. If there is no writing from the + * player, but there are clients, it instead writes silence to the fd. + */ + +// How many times per second we send silence when player is idle (to prevent +// client from hanging up). This value matches the player tick interval. +#define SILENCE_TICKS_PER_SEC 100 + +// The wanted structure represents a particular format and quality that should +// be produced for one or more sessions. A pipe pair is created for each session +// for the i/o. +#define WANTED_PIPES_MAX 8 + +struct pipepair +{ + int writefd; + int readfd; +}; + +struct streaming_wanted +{ + int refcount; + struct pipepair pipes[WANTED_PIPES_MAX]; + + enum streaming_format format; + struct media_quality quality_in; + struct media_quality quality_out; + + struct encode_ctx *xcode_ctx; + struct evbuffer *encoded_data; + + struct streaming_wanted *next; +}; + +struct streaming_ctx +{ + struct streaming_wanted *wanted; + struct event *silenceev; + struct timeval silencetv; + struct media_quality last_quality; +}; + +struct encode_cmdarg +{ + uint8_t *buf; + size_t bufsize; + int samples; + struct media_quality quality; +}; + +static pthread_mutex_t streaming_wanted_lck; +static struct streaming_ctx streaming = +{ + .silencetv = { 0, (1000000 / SILENCE_TICKS_PER_SEC) }, +}; + +extern struct event_base *evbase_player; + + +/* ------------------------------- Helpers ---------------------------------- */ + +static int +pipe_open(struct pipepair *pipe) +{ + int fd[2]; + int ret; + +#ifdef HAVE_PIPE2 + ret = pipe2(fd, O_CLOEXEC | O_NONBLOCK); +#else + if ( pipe(fd) < 0 || + fcntl(fd[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 || + fcntl(fd[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ) + ret = -1; + else + ret = 0; +#endif + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Could not create pipe: %s\n", strerror(errno)); + return -1; + } + + pipe->writefd = fd[1]; + pipe->readfd = fd[0]; + return 0; +} + +static void +pipe_close(struct pipepair *pipe) +{ + if (pipe->readfd >= 0) + close(pipe->readfd); + if (pipe->writefd >= 0) + close(pipe->writefd); + + pipe->writefd = -1; + pipe->readfd = -1; +} + +static void +wanted_free(struct streaming_wanted *w) +{ + if (!w) + return; + + for (int i = 0; i < WANTED_PIPES_MAX; i++) + pipe_close(&w->pipes[i]); + + transcode_encode_cleanup(&w->xcode_ctx); + evbuffer_free(w->encoded_data); + free(w); +} + +static struct streaming_wanted * +wanted_new(enum streaming_format format, struct media_quality quality) +{ + struct streaming_wanted *w; + + CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted))); + CHECK_NULL(L_STREAMING, w->encoded_data = evbuffer_new()); + + w->quality_out = quality; + w->format = format; + + for (int i = 0; i < WANTED_PIPES_MAX; i++) + { + w->pipes[i].writefd = -1; + w->pipes[i].readfd = -1; + } + + return w; +} + +static void +wanted_remove(struct streaming_wanted **wanted, struct streaming_wanted *remove) +{ + struct streaming_wanted *prev = NULL; + struct streaming_wanted *w; + + for (w = *wanted; w; w = w->next) + { + if (w == remove) + break; + + prev = w; + } + + if (!w) + return; + + if (!prev) + *wanted = remove->next; + else + prev->next = remove->next; + + wanted_free(remove); +} + +static struct streaming_wanted * +wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struct media_quality quality) +{ + struct streaming_wanted *w; + + w = wanted_new(format, quality); + w->next = *wanted; + *wanted = w; + + return w; +} + +static struct streaming_wanted * +wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format format, struct media_quality quality) +{ + struct streaming_wanted *w; + + for (w = wanted; w; w = w->next) + { + if (w->format == format && quality_is_equal(&w->quality_out, &quality)) + return w; + } + + return NULL; +} + +static struct streaming_wanted * +wanted_find_byreadfd(struct streaming_wanted *wanted, int readfd) +{ + struct streaming_wanted *w; + int i; + + for (w = wanted; w; w = w->next) + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + if (w->pipes[i].readfd == readfd) + return w; + } + + return NULL; +} + +static int +wanted_session_add(struct pipepair *pipe, struct streaming_wanted *w) +{ + int ret; + int i; + + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + if (w->pipes[i].writefd != -1) // In use + continue; + + ret = pipe_open(&w->pipes[i]); + if (ret < 0) + return -1; + + memcpy(pipe, &w->pipes[i], sizeof(struct pipepair)); + break; + } + + if (i == WANTED_PIPES_MAX) + { + DPRINTF(E_LOG, L_STREAMING, "Cannot add streaming session, max pipe limit reached\n"); + return -1; + } + + w->refcount++; + DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->refcount=%d\n", pipe->readfd, w->refcount); + return 0; +} + + +static void +wanted_session_remove(struct streaming_wanted *w, int readfd) +{ + int i; + + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + if (w->pipes[i].readfd != readfd) + continue; + + pipe_close(&w->pipes[i]); + break; + } + + if (i == WANTED_PIPES_MAX) + { + DPRINTF(E_LOG, L_STREAMING, "Cannot remove streaming session, readfd %d not found\n", readfd); + return; + } + + w->refcount--; + DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->refcount=%d\n", readfd, w->refcount); +} + + +/* ----------------------------- Thread: Worker ----------------------------- */ + +static int +encode_reset(struct streaming_wanted *w, struct media_quality quality_in) +{ + struct media_quality quality_out = w->quality_out; + struct decode_ctx *decode_ctx = NULL; + + transcode_encode_cleanup(&w->xcode_ctx); + + if (quality_in.bits_per_sample == 16) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality_in); + else if (quality_in.bits_per_sample == 24) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality_in); + else if (quality_in.bits_per_sample == 32) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality_in); + + if (!decode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for input quality sr %d, bps %d, ch %d, cannot MP3 encode\n", + quality_in.sample_rate, quality_in.bits_per_sample, quality_in.channels); + goto error; + } + + w->quality_in = quality_in; + w->xcode_ctx = transcode_encode_setup(XCODE_MP3, &quality_out, decode_ctx, NULL, 0, 0); + if (!w->xcode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for output quality sr %d, bps %d, ch %d, cannot MP3 encode\n", + quality_out.sample_rate, quality_out.bits_per_sample, quality_out.channels); + goto error; + } + + transcode_decode_cleanup(&decode_ctx); + return 0; + + error: + transcode_decode_cleanup(&decode_ctx); + return -1; +} + +static int +encode_frame(struct streaming_wanted *w, struct media_quality quality_in, transcode_frame *frame) +{ + int ret; + + if (!w->xcode_ctx || !quality_is_equal(&quality_in, &w->quality_in)) + { + DPRINTF(E_DBG, L_STREAMING, "Resetting transcode context\n"); + if (encode_reset(w, quality_in) < 0) + return -1; + } + + ret = transcode_encode(w->encoded_data, w->xcode_ctx, frame, 0); + if (ret < 0) + { + return -1; + } + + return 0; +} + +static void +encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *pipe) +{ + int ret; + + if (pipe->writefd < 0) + return; + + ret = write(pipe->writefd, buf, buflen); + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", pipe->writefd, w->format, strerror(errno)); + wanted_session_remove(w, pipe->readfd); + } +} + +static void +encode_data_cb(void *arg) +{ + struct encode_cmdarg *ctx = arg; + transcode_frame *frame; + struct streaming_wanted *w; + struct streaming_wanted *next; + uint8_t *buf; + size_t len; + int ret; + int i; + + frame = transcode_frame_new(ctx->buf, ctx->bufsize, ctx->samples, &ctx->quality); + if (!frame) + { + DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n"); + goto out; + } + + pthread_mutex_lock(&streaming_wanted_lck); + for (w = streaming.wanted; w; w = next) + { + next = w->next; + ret = encode_frame(w, ctx->quality, frame); + if (ret < 0) + wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error + + len = evbuffer_get_length(w->encoded_data); + if (len == 0) + continue; + + buf = evbuffer_pullup(w->encoded_data, -1); + + for (i = 0; i < WANTED_PIPES_MAX; i++) + encode_write(buf, len, w, &w->pipes[i]); + + evbuffer_drain(w->encoded_data, -1); + + if (w->refcount == 0) + wanted_remove(&streaming.wanted, w); + } + pthread_mutex_unlock(&streaming_wanted_lck); + + out: + transcode_frame_free(frame); + free(ctx->buf); +} + + +/* ----------------------------- Thread: Player ----------------------------- */ + +static void +encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) +{ + struct encode_cmdarg ctx; + + if (quality.channels == 0) + { + DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", + quality.sample_rate, quality.bits_per_sample, quality.channels); + return; + } + + ctx.buf = buf; + ctx.bufsize = bufsize; + ctx.samples = samples; + ctx.quality = quality; + + worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); +} + +static void +streaming_write(struct output_buffer *obuf) +{ + uint8_t *rawbuf; + + if (!streaming.wanted) + return; + + // Need to make a copy since it will be passed of to the async worker + CHECK_NULL(L_STREAMING, rawbuf = malloc(obuf->data[0].bufsize)); + memcpy(rawbuf, obuf->data[0].buffer, obuf->data[0].bufsize); + + encode_worker_invoke(rawbuf, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality); + + streaming.last_quality = obuf->data[0].quality; + + // In case this is the last player write() we want to start streaming silence + evtimer_add(streaming.silenceev, &streaming.silencetv); +} + +static void +silenceev_cb(evutil_socket_t fd, short event, void *arg) +{ + uint8_t *rawbuf; + size_t bufsize; + int samples; + + // TODO what if everyone has disconnected? Check for streaming.wanted? + + samples = streaming.last_quality.sample_rate / SILENCE_TICKS_PER_SEC; + bufsize = STOB(samples, streaming.last_quality.bits_per_sample, streaming.last_quality.channels); + + CHECK_NULL(L_STREAMING, rawbuf = calloc(1, bufsize)); + + encode_worker_invoke(rawbuf, bufsize, samples, streaming.last_quality); + + evtimer_add(streaming.silenceev, &streaming.silencetv); +} + +/* ----------------------------- Thread: httpd ------------------------------ */ + +int +streaming_session_register(enum streaming_format format, struct media_quality quality) +{ + struct streaming_wanted *w; + struct pipepair pipe; + int ret; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byformat(streaming.wanted, format, quality); + if (!w) + w = wanted_add(&streaming.wanted, format, quality); + + ret = wanted_session_add(&pipe, w); + if (ret < 0) + pipe.readfd = -1; + + pthread_mutex_unlock(&streaming_wanted_lck); + + return pipe.readfd; +} + +void +streaming_session_deregister(int readfd) +{ + struct streaming_wanted *w; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byreadfd(streaming.wanted, readfd); + if (!w) + goto out; + + wanted_session_remove(w, readfd); + + if (w->refcount == 0) + wanted_remove(&streaming.wanted, w); + + out: + pthread_mutex_unlock(&streaming_wanted_lck); +} + +static int +streaming_init(void) +{ + CHECK_NULL(L_STREAMING, streaming.silenceev = event_new(evbase_player, -1, 0, silenceev_cb, NULL)); + CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck)); + + return 0; +} + +static void +streaming_deinit(void) +{ + event_free(streaming.silenceev); +} struct output_definition output_streaming = { @@ -30,5 +547,7 @@ struct output_definition output_streaming = .type = OUTPUT_TYPE_STREAMING, .priority = 0, .disabled = 0, + .init = streaming_init, + .deinit = streaming_deinit, .write = streaming_write, }; diff --git a/src/outputs/streaming.h b/src/outputs/streaming.h new file mode 100644 index 00000000..901112ae --- /dev/null +++ b/src/outputs/streaming.h @@ -0,0 +1,18 @@ + +#ifndef __STREAMING_H__ +#define __STREAMING_H__ + +#include "misc.h" // struct media_quality + +enum streaming_format +{ + STREAMING_FORMAT_MP3, +}; + +int +streaming_session_register(enum streaming_format format, struct media_quality quality); + +void +streaming_session_deregister(int readfd); + +#endif /* !__STREAMING_H__ */ diff --git a/src/worker.c b/src/worker.c index f665fc40..098fba03 100644 --- a/src/worker.c +++ b/src/worker.c @@ -43,7 +43,7 @@ #include "worker.h" #include "misc.h" -#define THREADPOOL_NTHREADS 4 +#define THREADPOOL_NTHREADS 2 struct evthr_pool;