Merge pull request #1576 from owntone/thread_httpd4

[httpd] Multithreaded httpd and refactor of streaming
This commit is contained in:
ejurgensen 2023-03-09 00:08:15 +01:00 committed by GitHub
commit 98ee49dca5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 4004 additions and 3427 deletions

View File

@ -50,7 +50,7 @@ AC_CHECK_HEADERS([sys/wait.h sys/param.h dirent.h getopt.h stdint.h], [],
[AC_MSG_ERROR([[Missing header required to build OwnTone]])])
AC_CHECK_HEADERS([time.h], [],
[AC_MSG_ERROR([[Missing header required to build OwnTone]])])
AC_CHECK_FUNCS_ONCE([posix_fadvise pipe2])
AC_CHECK_FUNCS_ONCE([posix_fadvise pipe2 syscall])
AC_CHECK_FUNCS([strptime strtok_r], [],
[AC_MSG_ERROR([[Missing function required to build OwnTone]])])
@ -148,13 +148,10 @@ OWNTONE_MODULES_CHECK([COMMON], [SQLITE3], [sqlite3 >= 3.5.0],
[AC_MSG_RESULT([[runtime will tell]])])
])
OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT], [libevent >= 2],
[event_base_new], [event2/event.h],
[dnl check for old version
PKG_CHECK_EXISTS([libevent >= 2.1.4], [],
[AC_DEFINE([HAVE_LIBEVENT2_OLD], 1,
[Define to 1 if you have libevent 2 (<2.1.4)])])
])
OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT], [libevent >= 2.1.4],
[event_base_new], [event2/event.h])
OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT_PTHREADS], [libevent_pthreads],
[evthread_use_pthreads], [event2/thread.h])
dnl Check for evhttp_connection_get_peer() signature
AC_COMPILE_IFELSE([AC_LANG_PROGRAM([[
@ -269,11 +266,6 @@ OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [libwebsockets support], [libwebsockets],
[libwebsockets >= 2.0.2])
AM_CONDITIONAL([COND_LIBWEBSOCKETS], [[test "x$with_libwebsockets" = "xyes"]])
dnl Build with libevent_pthreads
OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [libevent_pthreads support],
[libevent_pthreads], [LIBEVENT_PTHREADS], [libevent_pthreads],
[evthread_use_pthreads], [event2/thread.h])
dnl Build with Avahi (or Bonjour if not)
OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [Avahi mDNS], [avahi], [AVAHI],
[avahi-client >= 0.6.24], [avahi_client_new], [avahi-client/client.h])

View File

@ -224,7 +224,7 @@ Libraries:
from <http://ffmpeg.org/>
- libconfuse
from <http://www.nongnu.org/confuse/>
- libevent 2.0+ (best with 2.1.4+)
- libevent 2.1.4+
from <http://libevent.org/>
- MiniXML (aka mxml or libmxml)
from <http://minixml.org/software.php>

View File

@ -92,14 +92,15 @@ owntone_SOURCES = main.c \
library.c library.h \
$(MDNS_SRC) mdns.h \
remote_pairing.c remote_pairing.h \
httpd.c httpd.h \
httpd_rsp.c httpd_rsp.h \
httpd_libevhttp.c \
httpd.c httpd.h httpd_internal.h \
httpd_rsp.c \
httpd_daap.c httpd_daap.h \
httpd_dacp.c httpd_dacp.h \
httpd_jsonapi.c httpd_jsonapi.h \
httpd_streaming.c httpd_streaming.h \
httpd_oauth.c httpd_oauth.h \
httpd_artworkapi.c httpd_artworkapi.h \
httpd_dacp.c \
httpd_jsonapi.c \
httpd_streaming.c \
httpd_oauth.c \
httpd_artworkapi.c \
http.c http.h \
dmap_common.c dmap_common.h \
transcode.c transcode.h \
@ -117,9 +118,11 @@ owntone_SOURCES = main.c \
outputs/rtp_common.h outputs/rtp_common.c \
outputs/raop.c outputs/airplay.c $(PAIR_AP_SRC) \
outputs/airplay_events.c outputs/airplay_events.h \
outputs/streaming.c outputs/dummy.c outputs/fifo.c outputs/rcp.c \
outputs/streaming.c outputs/streaming.h \
outputs/dummy.c outputs/fifo.c outputs/rcp.c \
$(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \
evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \
evthr.c evthr.h \
$(SPOTIFY_SRC) \
$(LASTFM_SRC) \
$(MPD_SRC) \

View File

@ -582,20 +582,6 @@ size_calculate(int *dst_w, int *dst_h, int src_w, int src_h, int max_w, int max_
DPRINTF(E_DBG, L_ART, "Rescale required, destination width %d height %d\n", *dst_w, *dst_h);
}
#ifdef HAVE_LIBEVENT2_OLD
// This is not how this function is actually defined in libevent 2.1+, but it
// works as a less optimal stand-in
int
evbuffer_add_buffer_reference(struct evbuffer *outbuf, struct evbuffer *inbuf)
{
uint8_t *buf = evbuffer_pullup(inbuf, -1);
if (!buf)
return -1;
return evbuffer_add_reference(outbuf, buf, evbuffer_get_length(inbuf), NULL, NULL);
}
#endif
/*
* Either gets the artwork file given in "path" (rescaled if needed) or rescales
* the artwork given in "inbuf".

View File

@ -63,8 +63,8 @@ command_cb_async(struct commands_base *cmdbase, struct command *cmd)
// Command is executed asynchronously
cmdstate = cmd->func(cmd->arg, &cmd->ret);
// Only free arg if there are no pending events (used in worker.c)
if (cmdstate != COMMAND_PENDING && cmd->arg)
// Only free arg if there are no pending events (used in httpd.c)
if (cmdstate != COMMAND_PENDING)
free(cmd->arg);
free(cmd);

View File

@ -25,7 +25,6 @@
#include "db.h"
#include "misc.h"
#include "httpd.h"
#include "logger.h"
#include "dmap_common.h"
#include "parsers/daap_parser.h"
@ -360,31 +359,6 @@ dmap_error_make(struct evbuffer *evbuf, const char *container, const char *errms
dmap_add_string(evbuf, "msts", errmsg);
}
void
dmap_send_error(struct evhttp_request *req, const char *container, const char *errmsg)
{
struct evbuffer *evbuf;
if (!req)
return;
evbuf = evbuffer_new();
if (!evbuf)
{
DPRINTF(E_LOG, L_DMAP, "Could not allocate evbuffer for DMAP error\n");
httpd_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
return;
}
dmap_error_make(evbuf, container, errmsg);
httpd_send_reply(req, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
}
int
dmap_encode_file_metadata(struct evbuffer *songlist, struct evbuffer *song, struct db_media_file_info *dbmfi, const struct dmap_field **meta, int nmeta, int sort_tags, int force_wav)
{

View File

@ -78,10 +78,6 @@ dmap_add_field(struct evbuffer *evbuf, const struct dmap_field *df, char *strval
void
dmap_error_make(struct evbuffer *evbuf, const char *container, const char *errmsg);
void
dmap_send_error(struct evhttp_request *req, const char *container, const char *errmsg);
int
dmap_encode_file_metadata(struct evbuffer *songlist, struct evbuffer *song, struct db_media_file_info *dbmfi, const struct dmap_field **meta, int nmeta, int sort_tags, int force_wav);

View File

@ -63,17 +63,6 @@
#include "log.h"
#include "rtsp-internal.h"
// For compability with libevent 2.0 (HAVE_LIBEVENT2_OLD)
#if defined(_EVENT_HAVE_GETNAMEINFO)
# define EVENT__HAVE_GETNAMEINFO 1
#endif
#if defined(_EVENT_HAVE_GETADDRINFO)
# define EVENT__HAVE_GETADDRINFO 1
#endif
#if defined(_EVENT_HAVE_STRSEP)
# define EVENT__HAVE_STRSEP 1
#endif
#ifndef EVENT__HAVE_GETNAMEINFO
#define NI_MAXSERV 32
#define NI_MAXHOST 1025

467
src/evthr.c Normal file
View File

@ -0,0 +1,467 @@
/*
------------------- Thread handling borrowed from libevhtp ---------------------
BSD 3-Clause License
Copyright (c) 2010-2018, Mark Ellzey, Nathan French, Marcus Sundberg
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <limits.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/queue.h>
#include <sys/ioctl.h>
#include <event2/event.h>
#include <event2/thread.h>
#include "evthr.h"
#ifndef TAILQ_FOREACH_SAFE
#define TAILQ_FOREACH_SAFE(var, head, field, tvar) \
for ((var) = TAILQ_FIRST((head)); \
(var) && ((tvar) = TAILQ_NEXT((var), field), 1); \
(var) = (tvar))
#endif
#define _evthr_read(thr, cmd, sock) \
(recv(sock, cmd, sizeof(struct evthr_cmd), 0) == sizeof(struct evthr_cmd)) ? 1 : 0
#define EVTHR_SHARED_PIPE 1
struct evthr_cmd {
uint8_t stop;
void *args;
evthr_cb cb;
} __attribute__((packed));
struct evthr_pool {
#ifdef EVTHR_SHARED_PIPE
int rdr;
int wdr;
#endif
int nthreads;
TAILQ_HEAD(evthr_pool_slist, evthr) threads;
};
struct evthr {
int rdr;
int wdr;
char err;
struct event *event;
struct event_base *evbase;
pthread_mutex_t lock;
pthread_t *thr;
evthr_init_cb init_cb;
evthr_exit_cb exit_cb;
void *arg;
void *aux;
#ifdef EVTHR_SHARED_PIPE
int pool_rdr;
struct event *shared_pool_ev;
#endif
TAILQ_ENTRY(evthr) next;
};
static void
_evthr_read_cmd(evutil_socket_t sock, short which, void *args)
{
struct evthr *thread;
struct evthr_cmd cmd;
int stopped;
if (!(thread = (struct evthr *)args)) {
return;
}
stopped = 0;
if (_evthr_read(thread, &cmd, sock) == 1) {
stopped = cmd.stop;
if (cmd.cb != NULL) {
(cmd.cb)(thread, cmd.args, thread->arg);
}
}
if (stopped == 1) {
event_base_loopbreak(thread->evbase);
}
return;
}
static void *
_evthr_loop(void *args)
{
struct evthr *thread;
if (!(thread = (struct evthr *)args)) {
return NULL;
}
if (thread == NULL || thread->thr == NULL) {
pthread_exit(NULL);
}
thread->evbase = event_base_new();
thread->event = event_new(thread->evbase, thread->rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->event, NULL);
#ifdef EVTHR_SHARED_PIPE
if (thread->pool_rdr > 0) {
thread->shared_pool_ev = event_new(thread->evbase, thread->pool_rdr,
EV_READ | EV_PERSIST, _evthr_read_cmd, args);
event_add(thread->shared_pool_ev, NULL);
}
#endif
pthread_mutex_lock(&thread->lock);
if (thread->init_cb != NULL) {
(thread->init_cb)(thread, thread->arg);
}
pthread_mutex_unlock(&thread->lock);
event_base_loop(thread->evbase, 0);
pthread_mutex_lock(&thread->lock);
if (thread->exit_cb != NULL) {
(thread->exit_cb)(thread, thread->arg);
}
pthread_mutex_unlock(&thread->lock);
pthread_exit(NULL);
}
static enum evthr_res
evthr_defer(struct evthr *thread, evthr_cb cb, void *arg)
{
struct evthr_cmd cmd = {
.cb = cb,
.args = arg,
.stop = 0
};
if (send(thread->wdr, &cmd, sizeof(cmd), 0) <= 0) {
return EVTHR_RES_RETRY;
}
return EVTHR_RES_OK;
}
static enum evthr_res
evthr_stop(struct evthr *thread)
{
struct evthr_cmd cmd = {
.cb = NULL,
.args = NULL,
.stop = 1
};
if (send(thread->wdr, &cmd, sizeof(struct evthr_cmd), 0) < 0) {
return EVTHR_RES_RETRY;
}
pthread_join(*thread->thr, NULL);
return EVTHR_RES_OK;
}
struct event_base *
evthr_get_base(struct evthr * thr)
{
return thr ? thr->evbase : NULL;
}
void
evthr_set_aux(struct evthr *thr, void *aux)
{
if (thr) {
thr->aux = aux;
}
}
void *
evthr_get_aux(struct evthr *thr)
{
return thr ? thr->aux : NULL;
}
static void
evthr_free(struct evthr *thread)
{
if (thread == NULL) {
return;
}
if (thread->rdr > 0) {
close(thread->rdr);
}
if (thread->wdr > 0) {
close(thread->wdr);
}
if (thread->thr) {
free(thread->thr);
}
if (thread->event) {
event_free(thread->event);
}
#ifdef EVTHR_SHARED_PIPE
if (thread->shared_pool_ev) {
event_free(thread->shared_pool_ev);
}
#endif
if (thread->evbase) {
event_base_free(thread->evbase);
}
free(thread);
}
static struct evthr *
evthr_wexit_new(evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *args)
{
struct evthr *thread;
int fds[2];
if (evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, fds) == -1) {
return NULL;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
if (!(thread = calloc(1, sizeof(struct evthr)))) {
return NULL;
}
thread->thr = malloc(sizeof(pthread_t));
thread->arg = args;
thread->rdr = fds[0];
thread->wdr = fds[1];
thread->init_cb = init_cb;
thread->exit_cb = exit_cb;
if (pthread_mutex_init(&thread->lock, NULL)) {
evthr_free(thread);
return NULL;
}
return thread;
}
static int
evthr_start(struct evthr *thread)
{
if (thread == NULL || thread->thr == NULL) {
return -1;
}
if (pthread_create(thread->thr, NULL, _evthr_loop, (void *)thread)) {
return -1;
}
return 0;
}
void
evthr_pool_free(struct evthr_pool *pool)
{
struct evthr *thread;
struct evthr *save;
if (pool == NULL) {
return;
}
TAILQ_FOREACH_SAFE(thread, &pool->threads, next, save) {
TAILQ_REMOVE(&pool->threads, thread, next);
evthr_free(thread);
}
free(pool);
}
enum evthr_res
evthr_pool_stop(struct evthr_pool *pool)
{
struct evthr *thr;
struct evthr *save;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
TAILQ_FOREACH_SAFE(thr, &pool->threads, next, save) {
evthr_stop(thr);
}
return EVTHR_RES_OK;
}
static inline int
get_backlog_(struct evthr *thread)
{
int backlog = 0;
ioctl(thread->rdr, FIONREAD, &backlog);
return (int)(backlog / sizeof(struct evthr_cmd));
}
enum evthr_res
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg)
{
#ifdef EVTHR_SHARED_PIPE
struct evthr_cmd cmd = {
.cb = cb,
.args = arg,
.stop = 0
};
if (send(pool->wdr, &cmd, sizeof(cmd), 0) == -1) {
return EVTHR_RES_RETRY;
}
return EVTHR_RES_OK;
#endif
struct evthr *thread = NULL;
struct evthr *min_thread = NULL;
int min_backlog = 0;
if (pool == NULL) {
return EVTHR_RES_FATAL;
}
if (cb == NULL) {
return EVTHR_RES_NOCB;
}
TAILQ_FOREACH(thread, &pool->threads, next) {
int backlog = get_backlog_(thread);
if (backlog == 0) {
min_thread = thread;
break;
}
if (min_thread == NULL || backlog < min_backlog) {
min_thread = thread;
min_backlog = backlog;
}
}
return evthr_defer(min_thread, cb, arg);
}
struct evthr_pool *
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared)
{
struct evthr_pool *pool;
int i;
#ifdef EVTHR_SHARED_PIPE
int fds[2];
#endif
if (nthreads == 0) {
return NULL;
}
if (!(pool = calloc(1, sizeof(struct evthr_pool)))) {
return NULL;
}
pool->nthreads = nthreads;
TAILQ_INIT(&pool->threads);
#ifdef EVTHR_SHARED_PIPE
if (evutil_socketpair(AF_UNIX, SOCK_DGRAM, 0, fds) == -1) {
return NULL;
}
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
pool->rdr = fds[0];
pool->wdr = fds[1];
#endif
for (i = 0; i < nthreads; i++) {
struct evthr * thread;
if (!(thread = evthr_wexit_new(init_cb, exit_cb, shared))) {
evthr_pool_free(pool);
return NULL;
}
#ifdef EVTHR_SHARED_PIPE
thread->pool_rdr = fds[0];
#endif
TAILQ_INSERT_TAIL(&pool->threads, thread, next);
}
return pool;
}
int
evthr_pool_start(struct evthr_pool *pool)
{
struct evthr *evthr = NULL;
if (pool == NULL) {
return -1;
}
TAILQ_FOREACH(evthr, &pool->threads, next) {
if (evthr_start(evthr) < 0) {
return -1;
}
usleep(5000);
}
return 0;
}

43
src/evthr.h Normal file
View File

@ -0,0 +1,43 @@
#ifndef __EVTHR_H__
#define __EVTHR_H__
enum evthr_res {
EVTHR_RES_OK = 0,
EVTHR_RES_BACKLOG,
EVTHR_RES_RETRY,
EVTHR_RES_NOCB,
EVTHR_RES_FATAL
};
struct evthr_pool;
struct evthr;
typedef void (*evthr_cb)(struct evthr *thr, void *cmd_arg, void *shared);
typedef void (*evthr_init_cb)(struct evthr *thr, void *shared);
typedef void (*evthr_exit_cb)(struct evthr *thr, void *shared);
struct event_base *
evthr_get_base(struct evthr *thr);
void
evthr_set_aux(struct evthr *thr, void *aux);
void *
evthr_get_aux(struct evthr *thr);
enum evthr_res
evthr_pool_defer(struct evthr_pool *pool, evthr_cb cb, void *arg);
struct evthr_pool *
evthr_pool_wexit_new(int nthreads, evthr_init_cb init_cb, evthr_exit_cb exit_cb, void *shared);
void
evthr_pool_free(struct evthr_pool *pool);
enum evthr_res
evthr_pool_stop(struct evthr_pool *pool);
int
evthr_pool_start(struct evthr_pool *pool);
#endif /* !__EVTHR_H__ */

View File

@ -40,7 +40,6 @@
#include <curl/curl.h>
#include "http.h"
#include "httpd.h"
#include "logger.h"
#include "misc.h"
#include "conffile.h"
@ -240,9 +239,11 @@ http_form_urlencode(struct keyval *kv)
int
http_stream_setup(char **stream, const char *url)
{
CURLU *url_handle;
CURLUcode rc;
struct http_client_ctx ctx;
struct httpd_uri_parsed *parsed;
struct evbuffer *evbuf;
char *path;
const char *ext;
char *line;
char *pos;
@ -253,17 +254,28 @@ http_stream_setup(char **stream, const char *url)
*stream = NULL;
parsed = httpd_uri_parse(url);
if (!parsed)
CHECK_NULL(L_HTTP, url_handle = curl_url());
rc = curl_url_set(url_handle, CURLUPART_URL, url, 0);
if (rc != 0)
{
DPRINTF(E_LOG, L_HTTP, "Couldn't parse internet playlist: '%s'\n", url);
curl_url_cleanup(url_handle);
return -1;
}
// parsed->path does not include query or fragment, so should work with any url's
rc = curl_url_get(url_handle, CURLUPART_PATH, &path, 0);
if (rc != 0)
{
DPRINTF(E_LOG, L_HTTP, "Couldn't find internet playlist path: '%s'\n", url);
curl_url_cleanup(url_handle);
return -1;
}
// path does not include query or fragment, so should work with any url's
// e.g. http://yp.shoutcast.com/sbin/tunein-station.pls?id=99179772#Air Jazz
pl_format = PLAYLIST_UNK;
if (parsed->path && (ext = strrchr(parsed->path, '.')))
if (path && (ext = strrchr(path, '.')))
{
if (strcasecmp(ext, ".m3u") == 0)
pl_format = PLAYLIST_M3U;
@ -271,7 +283,9 @@ http_stream_setup(char **stream, const char *url)
pl_format = PLAYLIST_PLS;
}
httpd_uri_free(parsed);
curl_free(path);
curl_url_cleanup(url_handle);
if (pl_format==PLAYLIST_UNK)
{

File diff suppressed because it is too large Load Diff

View File

@ -2,112 +2,7 @@
#ifndef __HTTPD_H__
#define __HTTPD_H__
#include <stdbool.h>
#include <regex.h>
#include <time.h>
#include <event2/http.h>
#include <event2/buffer.h>
#include <event2/keyvalq_struct.h>
enum httpd_send_flags
{
HTTPD_SEND_NO_GZIP = (1 << 0),
};
/*
* Contains a parsed version of the URI httpd got. The URI may have been
* complete:
* scheme:[//[user[:password]@]host[:port]][/path][?query][#fragment]
* or relative:
* [/path][?query][#fragment]
*
* We are interested in the path and the query, so they are disassembled to
* path_parts and ev_query. If the request is http://x:3689/foo/bar?key1=val1,
* then part_parts[0] is "foo", [1] is "bar" and the rest is null.
*
* Each path_part is an allocated URI decoded string.
*/
struct httpd_uri_parsed
{
const char *uri;
struct evhttp_uri *ev_uri;
struct evkeyvalq ev_query;
char *uri_decoded;
char *path;
char *path_parts[31];
};
/*
* A collection of pointers to request data that the reply handlers may need.
* Also has the function pointer to the reply handler and a pointer to a reply
* evbuffer.
*/
struct httpd_request {
// User-agent (if available)
const char *user_agent;
// The parsed request URI given to us by httpd_uri_parse
struct httpd_uri_parsed *uri_parsed;
// Shortcut to &uri_parsed->ev_query
struct evkeyvalq *query;
// http request struct (if available)
struct evhttp_request *req;
// Source IP address (ipv4 or ipv6) and port of the request (if available)
const char *peer_address;
unsigned short peer_port;
// A pointer to extra data that the module handling the request might need
void *extra_data;
// Reply evbuffer
struct evbuffer *reply;
// A pointer to the handler that will process the request
int (*handler)(struct httpd_request *hreq);
};
/*
* Maps a regex of the request path to a handler of the request
*/
struct httpd_uri_map
{
int method;
char *regexp;
int (*handler)(struct httpd_request *hreq);
regex_t preg;
};
/*
* Helper to free the parsed uri struct
*/
void
httpd_uri_free(struct httpd_uri_parsed *parsed);
/*
* Parse an URI into the struct
*/
struct httpd_uri_parsed *
httpd_uri_parse(const char *uri);
/*
* Parse a request into the httpd_request struct. It can later be freed with
* free(), unless the module has allocated something to *extra_data. Note that
* the pointers in the returned struct are only valid as long as the inputs are
* still valid. If req is not null, then we will find the user-agent from the
* request headers, except if provided as an argument to this function.
*/
struct httpd_request *
httpd_request_parse(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed, const char *user_agent, struct httpd_uri_map *uri_map);
void
httpd_stream_file(struct evhttp_request *req, int id);
bool
httpd_request_not_modified_since(struct evhttp_request *req, time_t mtime);
bool
httpd_request_etag_matches(struct evhttp_request *req, const char *etag);
void
httpd_response_not_cachable(struct evhttp_request *req);
/*
* Gzips an evbuffer
@ -118,52 +13,6 @@ httpd_response_not_cachable(struct evhttp_request *req);
struct evbuffer *
httpd_gzip_deflate(struct evbuffer *in);
/*
* This wrapper around evhttp_send_reply should be used whenever a request may
* come from a browser. It will automatically gzip if feasible, but the caller
* may direct it not to. It will set CORS headers as appropriate. Should be
* thread safe.
*
* @in req The evhttp request struct
* @in code HTTP code, e.g. 200
* @in reason A brief explanation of the error - if NULL the standard meaning
of the error code will be used
* @in evbuf Data for the response body
* @in flags See flags above
*/
void
httpd_send_reply(struct evhttp_request *req, int code, const char *reason, struct evbuffer *evbuf, enum httpd_send_flags flags);
/*
* This is a substitute for evhttp_send_error that should be used whenever an
* error may be returned to a browser. It will set CORS headers as appropriate,
* which is not possible with evhttp_send_error, because it clears the headers.
* Should be thread safe.
*
* @in req The evhttp request struct
* @in error HTTP code, e.g. 200
* @in reason A brief explanation of the error - if NULL the standard meaning
of the error code will be used
*/
void
httpd_send_error(struct evhttp_request *req, int error, const char *reason);
/*
* Redirects to the given path
*/
void
httpd_redirect_to(struct evhttp_request *req, const char *path);
bool
httpd_admin_check_auth(struct evhttp_request *req);
int
httpd_basic_auth(struct evhttp_request *req, const char *user, const char *passwd, const char *realm);
void
httpd_peer_get(const char **address, ev_uint16_t *port, struct evhttp_connection *evcon);
int
httpd_init(const char *webroot);

View File

@ -20,12 +20,11 @@
# include <config.h>
#endif
#include <regex.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include "httpd_artworkapi.h"
#include "httpd_internal.h"
#include "logger.h"
#include "misc.h"
#include "player.h"
@ -40,20 +39,20 @@ request_process(struct httpd_request *hreq, uint32_t *max_w, uint32_t *max_h)
*max_w = 0;
*max_h = 0;
param = evhttp_find_header(hreq->query, "maxwidth");
param = httpd_query_value_find(hreq->query, "maxwidth");
if (param)
{
ret = safe_atou32(param, max_w);
if (ret < 0)
DPRINTF(E_LOG, L_WEB, "Invalid width in request: '%s'\n", hreq->uri_parsed->uri);
DPRINTF(E_LOG, L_WEB, "Invalid width in request: '%s'\n", hreq->uri);
}
param = evhttp_find_header(hreq->query, "maxheight");
param = httpd_query_value_find(hreq->query, "maxheight");
if (param)
{
ret = safe_atou32(param, max_h);
if (ret < 0)
DPRINTF(E_LOG, L_WEB, "Invalid height in request: '%s'\n", hreq->uri_parsed->uri);
DPRINTF(E_LOG, L_WEB, "Invalid height in request: '%s'\n", hreq->uri);
}
return 0;
@ -62,14 +61,10 @@ request_process(struct httpd_request *hreq, uint32_t *max_w, uint32_t *max_h)
static int
response_process(struct httpd_request *hreq, int format)
{
struct evkeyvalq *headers;
headers = evhttp_request_get_output_headers(hreq->req);
if (format == ART_FMT_PNG)
evhttp_add_header(headers, "Content-Type", "image/png");
httpd_header_add(hreq->out_headers, "Content-Type", "image/png");
else if (format == ART_FMT_JPEG)
evhttp_add_header(headers, "Content-Type", "image/jpeg");
httpd_header_add(hreq->out_headers, "Content-Type", "image/jpeg");
else
return HTTP_NOCONTENT;
@ -92,7 +87,7 @@ artworkapi_reply_nowplaying(struct httpd_request *hreq)
if (ret != 0)
return HTTP_NOTFOUND;
ret = artwork_get_item(hreq->reply, id, max_w, max_h, 0);
ret = artwork_get_item(hreq->out_body, id, max_w, max_h, 0);
return response_process(hreq, ret);
}
@ -109,11 +104,11 @@ artworkapi_reply_item(struct httpd_request *hreq)
if (ret != 0)
return ret;
ret = safe_atou32(hreq->uri_parsed->path_parts[2], &id);
ret = safe_atou32(hreq->path_parts[2], &id);
if (ret != 0)
return HTTP_BADREQUEST;
ret = artwork_get_item(hreq->reply, id, max_w, max_h, 0);
ret = artwork_get_item(hreq->out_body, id, max_w, max_h, 0);
return response_process(hreq, ret);
}
@ -130,111 +125,73 @@ artworkapi_reply_group(struct httpd_request *hreq)
if (ret != 0)
return ret;
ret = safe_atou32(hreq->uri_parsed->path_parts[2], &id);
ret = safe_atou32(hreq->path_parts[2], &id);
if (ret != 0)
return HTTP_BADREQUEST;
ret = artwork_get_group(hreq->reply, id, max_w, max_h, 0);
ret = artwork_get_group(hreq->out_body, id, max_w, max_h, 0);
return response_process(hreq, ret);
}
static struct httpd_uri_map artworkapi_handlers[] =
{
{ EVHTTP_REQ_GET, "^/artwork/nowplaying$", artworkapi_reply_nowplaying },
{ EVHTTP_REQ_GET, "^/artwork/item/[[:digit:]]+$", artworkapi_reply_item },
{ EVHTTP_REQ_GET, "^/artwork/group/[[:digit:]]+$", artworkapi_reply_group },
{ HTTPD_METHOD_GET, "^/artwork/nowplaying$", artworkapi_reply_nowplaying },
{ HTTPD_METHOD_GET, "^/artwork/item/[[:digit:]]+$", artworkapi_reply_item },
{ HTTPD_METHOD_GET, "^/artwork/group/[[:digit:]]+$", artworkapi_reply_group },
{ 0, NULL, NULL }
};
/* ------------------------------- API --------------------------------- */
void
artworkapi_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed)
static void
artworkapi_request(struct httpd_request *hreq)
{
struct httpd_request *hreq;
int status_code;
DPRINTF(E_DBG, L_WEB, "Artwork api request: '%s'\n", uri_parsed->uri);
if (!httpd_admin_check_auth(req))
if (!httpd_admin_check_auth(hreq))
return;
hreq = httpd_request_parse(req, uri_parsed, NULL, artworkapi_handlers);
if (!hreq)
if (!hreq->handler)
{
DPRINTF(E_LOG, L_WEB, "Unrecognized path '%s' in artwork api request: '%s'\n", uri_parsed->path, uri_parsed->uri);
DPRINTF(E_LOG, L_WEB, "Unrecognized path in artwork api request: '%s'\n", hreq->uri);
httpd_send_error(req, HTTP_BADREQUEST, "Bad Request");
httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request");
return;
}
CHECK_NULL(L_WEB, hreq->reply = evbuffer_new());
status_code = hreq->handler(hreq);
switch (status_code)
{
case HTTP_OK: /* 200 OK */
httpd_send_reply(req, status_code, "OK", hreq->reply, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "OK", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOCONTENT: /* 204 No Content */
httpd_send_reply(req, status_code, "No Content", hreq->reply, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "No Content", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOTMODIFIED: /* 304 Not Modified */
httpd_send_reply(req, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP);
break;
case HTTP_BADREQUEST: /* 400 Bad Request */
httpd_send_error(req, status_code, "Bad Request");
httpd_send_error(hreq, status_code, "Bad Request");
break;
case HTTP_NOTFOUND: /* 404 Not Found */
httpd_send_error(req, status_code, "Not Found");
httpd_send_error(hreq, status_code, "Not Found");
break;
case HTTP_INTERNAL: /* 500 Internal Server Error */
default:
httpd_send_error(req, HTTP_INTERNAL, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
}
evbuffer_free(hreq->reply);
free(hreq);
}
int
artworkapi_is_request(const char *path)
struct httpd_module httpd_artworkapi =
{
if (strncmp(path, "/artwork/", strlen("/artwork/")) == 0)
return 1;
return 0;
}
int
artworkapi_init(void)
{
char buf[64];
int i;
int ret;
for (i = 0; artworkapi_handlers[i].handler; i++)
{
ret = regcomp(&artworkapi_handlers[i].preg, artworkapi_handlers[i].regexp, REG_EXTENDED | REG_NOSUB);
if (ret != 0)
{
regerror(ret, &artworkapi_handlers[i].preg, buf, sizeof(buf));
DPRINTF(E_FATAL, L_WEB, "artwork api init failed; regexp error: %s\n", buf);
return -1;
}
}
return 0;
}
void
artworkapi_deinit(void)
{
int i;
for (i = 0; artworkapi_handlers[i].handler; i++)
regfree(&artworkapi_handlers[i].preg);
}
.name = "Artwork API",
.type = MODULE_ARTWORKAPI,
.logdomain = L_WEB,
.subpaths = { "/artwork/", NULL },
.handlers = artworkapi_handlers,
.request = artworkapi_request,
};

View File

@ -1,18 +0,0 @@
#ifndef __HTTPD_ARTWORK_H__
#define __HTTPD_ARTWORK_H__
#include "httpd.h"
int
artworkapi_init(void);
void
artworkapi_deinit(void);
void
artworkapi_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
artworkapi_is_request(const char *path);
#endif

File diff suppressed because it is too large Load Diff

View File

@ -2,20 +2,6 @@
#ifndef __HTTPD_DAAP_H__
#define __HTTPD_DAAP_H__
#include "httpd.h"
int
daap_init(void);
void
daap_deinit(void);
void
daap_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
daap_is_request(const char *path);
int
daap_session_is_valid(int id);

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +0,0 @@
#ifndef __HTTPD_DACP_H__
#define __HTTPD_DACP_H__
#include "httpd.h"
int
dacp_init(void);
void
dacp_deinit(void);
void
dacp_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
dacp_is_request(const char *path);
#endif /* !__HTTPD_DACP_H__ */

366
src/httpd_internal.h Normal file
View File

@ -0,0 +1,366 @@
#ifndef __HTTPD_INTERNAL_H__
#define __HTTPD_INTERNAL_H__
#include <stdbool.h>
#include <time.h>
#include <event2/event.h>
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
/* Response codes from event2/http.h */
#define HTTP_CONTINUE 100 /**< client should proceed to send */
#define HTTP_SWITCH_PROTOCOLS 101 /**< switching to another protocol */
#define HTTP_PROCESSING 102 /**< processing the request, but no response is available yet */
#define HTTP_EARLYHINTS 103 /**< return some response headers */
#define HTTP_OK 200 /**< request completed ok */
#define HTTP_CREATED 201 /**< new resource is created */
#define HTTP_ACCEPTED 202 /**< accepted for processing */
#define HTTP_NONAUTHORITATIVE 203 /**< returning a modified version of the origin's response */
#define HTTP_NOCONTENT 204 /**< request does not have content */
#define HTTP_MOVEPERM 301 /**< the uri moved permanently */
#define HTTP_MOVETEMP 302 /**< the uri moved temporarily */
#define HTTP_NOTMODIFIED 304 /**< page was not modified from last */
#define HTTP_BADREQUEST 400 /**< invalid http request was made */
#define HTTP_UNAUTHORIZED 401 /**< authentication is required */
#define HTTP_PAYMENTREQUIRED 402 /**< user exceeded limit on requests */
#define HTTP_FORBIDDEN 403 /**< user not having the necessary permissions */
#define HTTP_NOTFOUND 404 /**< could not find content for uri */
#define HTTP_BADMETHOD 405 /**< method not allowed for this uri */
#define HTTP_ENTITYTOOLARGE 413 /**< request is larger than the server is able to process */
#define HTTP_EXPECTATIONFAILED 417 /**< we can't handle this expectation */
#define HTTP_INTERNAL 500 /**< internal error */
#define HTTP_NOTIMPLEMENTED 501 /**< not implemented */
#define HTTP_BADGATEWAY 502 /**< received an invalid response from the upstream */
#define HTTP_SERVUNAVAIL 503 /**< the server is not available */
struct httpd_request;
// Declaring here instead of including event2/http.h makes it easier to support
// other backends than evhttp in the future, e.g. libevhtp
struct httpd_server;
struct evhttp_connection;
struct evhttp_request;
struct evkeyvalq;
struct httpd_uri_parsed;
typedef struct httpd_server httpd_server;
typedef struct evhttp_connection httpd_connection;
typedef struct evhttp_request httpd_backend;
typedef struct evkeyvalq httpd_headers;
typedef struct evkeyvalq httpd_query;
typedef struct httpd_uri_parsed httpd_uri_parsed;
typedef struct httpd_backend_data httpd_backend_data;
typedef char *httpd_uri_path_parts[31];
typedef void (*httpd_request_cb)(struct httpd_request *hreq, void *arg);
typedef void (*httpd_close_cb)(void *arg);
typedef void (*httpd_connection_chunkcb)(httpd_connection *conn, void *arg);
typedef void (*httpd_query_iteratecb)(const char *key, const char *val, void *arg);
enum httpd_methods
{
HTTPD_METHOD_GET = 1 << 0,
HTTPD_METHOD_POST = 1 << 1,
HTTPD_METHOD_HEAD = 1 << 2,
HTTPD_METHOD_PUT = 1 << 3,
HTTPD_METHOD_DELETE = 1 << 4,
HTTPD_METHOD_OPTIONS = 1 << 5,
HTTPD_METHOD_TRACE = 1 << 6,
HTTPD_METHOD_CONNECT = 1 << 7,
HTTPD_METHOD_PATCH = 1 << 8,
};
#define HTTPD_F_REPLY_LAST (1 << 15)
enum httpd_reply_type
{
HTTPD_REPLY_START = 1,
HTTPD_REPLY_CHUNK = 2,
HTTPD_REPLY_END = HTTPD_F_REPLY_LAST | 1,
HTTPD_REPLY_COMPLETE = HTTPD_F_REPLY_LAST | 2,
};
enum httpd_send_flags
{
HTTPD_SEND_NO_GZIP = (1 << 0),
};
/*---------------------------------- MODULES ---------------------------------*/
// Must be in sync with modules[] in httpd.c
enum httpd_modules
{
MODULE_DACP,
MODULE_DAAP,
MODULE_JSONAPI,
MODULE_ARTWORKAPI,
MODULE_STREAMING,
MODULE_OAUTH,
MODULE_RSP,
};
enum httpd_handler_flags
{
// Most requests are pushed to a worker thread, but some handlers deal with
// requests that must be answered quickly. Can only be used for nonblocking
// handlers.
HTTPD_HANDLER_REALTIME = (1 << 0),
};
struct httpd_module
{
const char *name;
enum httpd_modules type;
char initialized;
int logdomain;
// Null-terminated list of URL subpath that the module accepts e.g., /subpath/morepath/file.mp3
const char *subpaths[16];
// Null-terminated list of URL fullparhs that the module accepts e.g., /fullpath
const char *fullpaths[16];
// Pointer to the module's handler definitions
struct httpd_uri_map *handlers;
int (*init)(void);
void (*deinit)(void);
void (*request)(struct httpd_request *);
};
/*
* Maps a regex of the request path to a handler of the request
*/
struct httpd_uri_map
{
enum httpd_methods method;
char *regexp;
int (*handler)(struct httpd_request *hreq);
void *preg;
int flags; // See enum httpd_handler_flags
};
/*------------------------------- HTTPD STRUCTS ------------------------------*/
/*
* A collection of pointers to request data that the reply handlers may need.
* Also has the function pointer to the reply handler and a pointer to a reply
* evbuffer.
*/
struct httpd_request {
// Request method
enum httpd_methods method;
// Backend private request object
httpd_backend *backend;
// For storing data that the actual backend doesn't have readily available
httpd_backend_data *backend_data;
// User-agent (if available)
const char *user_agent;
// Source IP address (ipv4 or ipv6) and port of the request (if available)
const char *peer_address;
unsigned short peer_port;
// The original, request URI. The URI may have been complete:
// scheme:[//[user[:password]@]host[:port]][/path][?query][#fragment]
// or relative:
// [/path][?query][#fragment]
const char *uri;
// URI decoded path from the request URI
const char *path;
// If the request is http://x:3689/foo/bar?key1=val1, then part_parts[0] is
// "foo", [1] is "bar" and the rest is null. Each path_part is an allocated
// URI decoded string.
httpd_uri_path_parts path_parts;
// Struct with the query, used with httpd_query_ functions
httpd_query *query;
// Backend private parser URI object
httpd_uri_parsed *uri_parsed;
// Request headers
httpd_headers *in_headers;
// Request body
struct evbuffer *in_body;
// Response headers
httpd_headers *out_headers;
// Response body
struct evbuffer *out_body;
// Our httpd module that will process this request
struct httpd_module *module;
// A pointer to the handler that will process the request
int (*handler)(struct httpd_request *hreq);
// Is the processing defered to a worker thread
bool is_async;
// Handler thread's evbase in case the handler needs to scehdule an event
struct event_base *evbase;
// A pointer to extra data that the module handling the request might need
void *extra_data;
};
/*------------------------------ HTTPD FUNCTIONS -----------------------------*/
void
httpd_stream_file(struct httpd_request *hreq, int id);
void
httpd_request_handler_set(struct httpd_request *hreq);
bool
httpd_request_not_modified_since(struct httpd_request *hreq, time_t mtime);
bool
httpd_request_etag_matches(struct httpd_request *hreq, const char *etag);
void
httpd_response_not_cachable(struct httpd_request *hreq);
/*
* This wrapper around evhttp_send_reply should be used whenever a request may
* come from a browser. It will automatically gzip if feasible, but the caller
* may direct it not to. It will set CORS headers as appropriate. Should be
* thread safe.
*
* @in req The http request struct
* @in code HTTP code, e.g. 200
* @in reason A brief explanation of the error - if NULL the standard meaning
of the error code will be used
* @in flags See flags above
*/
void
httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, enum httpd_send_flags flags);
void
httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason);
void
httpd_send_reply_chunk(struct httpd_request *hreq, httpd_connection_chunkcb cb, void *arg);
void
httpd_send_reply_end(struct httpd_request *hreq);
/*
* This is a substitute for evhttp_send_error that should be used whenever an
* error may be returned to a browser. It will set CORS headers as appropriate,
* which is not possible with evhttp_send_error, because it clears the headers.
* Should be thread safe.
*
* @in req The http request struct
* @in error HTTP code, e.g. 200
* @in reason A brief explanation of the error - if NULL the standard meaning
of the error code will be used
*/
void
httpd_send_error(struct httpd_request *hreq, int error, const char *reason);
void
httpd_redirect_to(struct httpd_request *hreq, const char *path);
bool
httpd_admin_check_auth(struct httpd_request *hreq);
int
httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passwd, const char *realm);
/*-------------------------- WRAPPERS FOR EVHTTP -----------------------------*/
const char *
httpd_query_value_find(httpd_query *query, const char *key);
void
httpd_query_iterate(httpd_query *query, httpd_query_iteratecb cb, void *arg);
void
httpd_query_clear(httpd_query *query);
const char *
httpd_header_find(httpd_headers *headers, const char *key);
void
httpd_header_remove(httpd_headers *headers, const char *key);
void
httpd_header_add(httpd_headers *headers, const char *key, const char *val);
void
httpd_headers_clear(httpd_headers *headers);
void
httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg);
void
httpd_request_free(struct httpd_request *hreq);
struct httpd_request *
httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent);
void
httpd_server_free(httpd_server *server);
httpd_server *
httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_cb cb, void *arg);
void
httpd_server_allow_origin_set(httpd_server *server, bool allow);
/*----------------- Only called by httpd.c to send raw replies ---------------*/
void
httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason,
httpd_connection_chunkcb cb, void *cbarg);
/*---------- Only called by httpd.c to populate struct httpd_request ---------*/
httpd_backend_data *
httpd_backend_data_create(httpd_backend *backend, httpd_server *server);
void
httpd_backend_data_free(httpd_backend_data *backend_data);
struct event_base *
httpd_backend_evbase_get(httpd_backend *backend);
const char *
httpd_backend_uri_get(httpd_backend *backend, httpd_backend_data *backend_data);
httpd_headers *
httpd_backend_input_headers_get(httpd_backend *backend);
httpd_headers *
httpd_backend_output_headers_get(httpd_backend *backend);
struct evbuffer *
httpd_backend_input_buffer_get(httpd_backend *backend);
int
httpd_backend_peer_get(const char **addr, uint16_t *port, httpd_backend *backend, httpd_backend_data *backend_data);
int
httpd_backend_method_get(enum httpd_methods *method, httpd_backend *backend);
httpd_uri_parsed *
httpd_uri_parsed_create(httpd_backend *backend);
httpd_uri_parsed *
httpd_uri_parsed_create_fromuri(const char *uri);
void
httpd_uri_parsed_free(httpd_uri_parsed *uri_parsed);
httpd_query *
httpd_uri_query_get(httpd_uri_parsed *parsed);
const char *
httpd_uri_path_get(httpd_uri_parsed *parsed);
void
httpd_uri_path_parts_get(httpd_uri_path_parts *part_parts, httpd_uri_parsed *parsed);
#endif /* !__HTTPD_INTERNAL_H__ */

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +0,0 @@
#ifndef __HTTPD_JSONAPI_H__
#define __HTTPD_JSONAPI_H__
#include "httpd.h"
int
jsonapi_init(void);
void
jsonapi_deinit(void);
void
jsonapi_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
jsonapi_is_request(const char *path);
#endif /* !__HTTPD_JSONAPI_H__ */

652
src/httpd_libevhttp.c Normal file
View File

@ -0,0 +1,652 @@
/*
* Copyright (C) 2023 Espen Jürgensen <espenjurgensen@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/queue.h> // TAILQ_FOREACH
#include <sys/socket.h> // listen()
#include <event2/http.h>
#include <event2/http_struct.h> // flags in struct evhttp
#include <event2/keyvalq_struct.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <pthread.h>
#include "misc.h"
#include "logger.h"
#include "commands.h"
#include "httpd_internal.h"
// #define DEBUG_ALLOC 1
#ifdef DEBUG_ALLOC
static pthread_mutex_t debug_alloc_lck = PTHREAD_MUTEX_INITIALIZER;
static int debug_alloc_count;
#endif
struct httpd_uri_parsed
{
struct evhttp_uri *ev_uri;
struct evkeyvalq query;
char *path;
httpd_uri_path_parts path_parts;
};
struct httpd_server
{
int fd;
struct evhttp *evhttp;
struct commands_base *cmdbase;
httpd_request_cb request_cb;
void *request_cb_arg;
};
struct httpd_reply
{
struct httpd_request *hreq;
enum httpd_reply_type type;
int code;
const char *reason;
httpd_connection_chunkcb chunkcb;
void *cbarg;
};
struct httpd_disconnect
{
pthread_mutex_t lock;
struct event *ev;
httpd_close_cb cb;
void *cbarg;
};
struct httpd_backend_data
{
// Pointer to server instance processing the request
struct httpd_server *server;
// If caller wants a callback on disconnect
struct httpd_disconnect disconnect;
};
// Forward
static void
closecb_worker(evutil_socket_t fd, short event, void *arg);
const char *
httpd_query_value_find(httpd_query *query, const char *key)
{
return evhttp_find_header(query, key);
}
void
httpd_query_iterate(httpd_query *query, httpd_query_iteratecb cb, void *arg)
{
struct evkeyval *param;
TAILQ_FOREACH(param, query, next)
{
cb(param->key, param->value, arg);
}
}
void
httpd_query_clear(httpd_query *query)
{
evhttp_clear_headers(query);
}
const char *
httpd_header_find(httpd_headers *headers, const char *key)
{
return evhttp_find_header(headers, key);
}
void
httpd_header_remove(httpd_headers *headers, const char *key)
{
evhttp_remove_header(headers, key);
}
void
httpd_header_add(httpd_headers *headers, const char *key, const char *val)
{
evhttp_add_header(headers, key, val);
}
void
httpd_headers_clear(httpd_headers *headers)
{
evhttp_clear_headers(headers);
}
void
httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg)
{
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
pthread_mutex_lock(&disconnect->lock);
disconnect->cb = cb;
disconnect->cbarg = arg;
if (hreq->is_async)
{
if (disconnect->ev)
event_free(disconnect->ev);
if (disconnect->cb)
disconnect->ev = event_new(hreq->evbase, -1, 0, closecb_worker, hreq);
else
disconnect->ev = NULL;
}
pthread_mutex_unlock(&disconnect->lock);
}
void
httpd_request_free(struct httpd_request *hreq)
{
#ifdef DEBUG_ALLOC
pthread_mutex_lock(&debug_alloc_lck);
debug_alloc_count--;
pthread_mutex_unlock(&debug_alloc_lck);
DPRINTF(E_DBG, L_HTTPD, "DEALLOC hreq - count is %d\n", debug_alloc_count);
#endif
if (!hreq)
return;
if (hreq->out_body)
evbuffer_free(hreq->out_body);
httpd_uri_parsed_free(hreq->uri_parsed);
httpd_backend_data_free(hreq->backend_data);
free(hreq);
}
struct httpd_request *
httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent)
{
struct httpd_request *hreq;
httpd_backend_data *backend_data;
CHECK_NULL(L_HTTPD, hreq = calloc(1, sizeof(struct httpd_request)));
#ifdef DEBUG_ALLOC
pthread_mutex_lock(&debug_alloc_lck);
debug_alloc_count++;
pthread_mutex_unlock(&debug_alloc_lck);
DPRINTF(E_DBG, L_HTTPD, "ALLOC hreq - count is %d\n", debug_alloc_count);
#endif
// Populate hreq by getting values from the backend (or from the caller)
hreq->backend = backend;
if (backend)
{
backend_data = httpd_backend_data_create(backend, server);
hreq->backend_data = backend_data;
hreq->uri = httpd_backend_uri_get(backend, backend_data);
hreq->uri_parsed = httpd_uri_parsed_create(backend);
hreq->in_headers = httpd_backend_input_headers_get(backend);
hreq->out_headers = httpd_backend_output_headers_get(backend);
hreq->in_body = httpd_backend_input_buffer_get(backend);
httpd_backend_method_get(&hreq->method, backend);
httpd_backend_peer_get(&hreq->peer_address, &hreq->peer_port, backend, backend_data);
hreq->user_agent = httpd_header_find(hreq->in_headers, "User-Agent");
}
else
{
hreq->uri = uri;
hreq->uri_parsed = httpd_uri_parsed_create_fromuri(uri);
hreq->user_agent = user_agent;
}
if (!hreq->uri_parsed)
{
DPRINTF(E_LOG, L_HTTPD, "Unable to parse URI '%s' in request from '%s'\n", hreq->uri, hreq->peer_address);
goto error;
}
// Don't write directly to backend's buffer. This way we are sure we own the
// buffer even if there is no backend.
CHECK_NULL(L_HTTPD, hreq->out_body = evbuffer_new());
hreq->path = httpd_uri_path_get(hreq->uri_parsed);
hreq->query = httpd_uri_query_get(hreq->uri_parsed);
httpd_uri_path_parts_get(&hreq->path_parts, hreq->uri_parsed);
return hreq;
error:
httpd_request_free(hreq);
return NULL;
}
// Since this is async, libevent will already have closed the connection, so
// the parts of hreq that are from httpd_connection will now be invalid e.g.
// peer_address.
static void
closecb_worker(evutil_socket_t fd, short event, void *arg)
{
struct httpd_request *hreq = arg;
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
pthread_mutex_lock(&disconnect->lock);
if (disconnect->cb)
disconnect->cb(disconnect->cbarg);
pthread_mutex_unlock(&disconnect->lock);
httpd_send_reply_end(hreq); // hreq is now deallocated
}
static void
closecb_httpd(httpd_connection *conn, void *arg)
{
struct httpd_request *hreq = arg;
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
DPRINTF(E_WARN, hreq->module->logdomain, "Connection to '%s' was closed\n", hreq->peer_address);
// The disconnect event may occur while a worker thread is accessing hreq, or
// has an event scheduled that will do so, so we have to be careful to let it
// finish and cancel events.
pthread_mutex_lock(&disconnect->lock);
if (hreq->is_async)
{
if (disconnect->cb)
event_active(disconnect->ev, 0, 0);
pthread_mutex_unlock(&disconnect->lock);
return;
}
pthread_mutex_unlock(&disconnect->lock);
if (!disconnect->cb)
return;
disconnect->cb(disconnect->cbarg);
httpd_send_reply_end(hreq); // hreq is now deallocated
}
static void
gencb_httpd(httpd_backend *backend, void *arg)
{
httpd_server *server = arg;
struct httpd_request *hreq;
struct bufferevent *bufev;
// Clear the proxy request flag set by evhttp if the request URI was absolute.
// It has side-effects on Connection: keep-alive
backend->flags &= ~EVHTTP_PROXY_REQUEST;
// This is a workaround for some versions of libevent (2.0 and 2.1) that don't
// detect if the client hangs up, and thus don't clean up and never call the
// connection close cb(). See github issue #870 and
// https://github.com/libevent/libevent/issues/666. It should probably be
// removed again in the future.
bufev = evhttp_connection_get_bufferevent(evhttp_request_get_connection(backend));
if (bufev)
bufferevent_enable(bufev, EV_READ);
hreq = httpd_request_new(backend, server, NULL, NULL);
if (!hreq)
{
evhttp_send_error(backend, HTTP_INTERNAL, "Internal error");
return;
}
// We must hook connection close, so we can assure that conn close callbacks
// to handlers running in a worker are made in the same thread.
evhttp_connection_set_closecb(evhttp_request_get_connection(backend), closecb_httpd, hreq);
server->request_cb(hreq, server->request_cb_arg);
}
void
httpd_server_free(httpd_server *server)
{
if (!server)
return;
if (server->fd > 0)
close(server->fd);
if (server->evhttp)
evhttp_free(server->evhttp);
commands_base_free(server->cmdbase);
free(server);
}
httpd_server *
httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_cb cb, void *arg)
{
httpd_server *server;
int ret;
CHECK_NULL(L_HTTPD, server = calloc(1, sizeof(httpd_server)));
CHECK_NULL(L_HTTPD, server->evhttp = evhttp_new(evbase));
CHECK_NULL(L_HTTPD, server->cmdbase = commands_base_new(evbase, NULL));
server->request_cb = cb;
server->request_cb_arg = arg;
server->fd = net_bind_with_reuseport(&port, SOCK_STREAM | SOCK_NONBLOCK, "httpd");
if (server->fd <= 0)
goto error;
// Backlog of 128 is the same that libevent uses
ret = listen(server->fd, 128);
if (ret < 0)
goto error;
ret = evhttp_accept_socket(server->evhttp, server->fd);
if (ret < 0)
goto error;
evhttp_set_gencb(server->evhttp, gencb_httpd, server);
return server;
error:
httpd_server_free(server);
return NULL;
}
void
httpd_server_allow_origin_set(httpd_server *server, bool allow)
{
evhttp_set_allowed_methods(server->evhttp, EVHTTP_REQ_GET | EVHTTP_REQ_POST | EVHTTP_REQ_PUT | EVHTTP_REQ_DELETE | EVHTTP_REQ_HEAD | EVHTTP_REQ_OPTIONS);
}
// No locking of hreq required here, we're in the httpd thread, and the worker
// thread is waiting at commands_exec_sync()
static void
send_reply_and_free(struct httpd_reply *reply)
{
struct httpd_request *hreq = reply->hreq;
httpd_connection *conn;
// DPRINTF(E_DBG, L_HTTPD, "Send from httpd thread, type %d, backend %p\n", reply->type, hreq->backend);
if (reply->type & HTTPD_F_REPLY_LAST)
{
conn = evhttp_request_get_connection(hreq->backend);
if (conn)
evhttp_connection_set_closecb(conn, NULL, NULL);
}
switch (reply->type)
{
case HTTPD_REPLY_COMPLETE:
evhttp_send_reply(hreq->backend, reply->code, reply->reason, hreq->out_body);
break;
case HTTPD_REPLY_START:
evhttp_send_reply_start(hreq->backend, reply->code, reply->reason);
break;
case HTTPD_REPLY_CHUNK:
evhttp_send_reply_chunk_with_cb(hreq->backend, hreq->out_body, reply->chunkcb, reply->cbarg);
break;
case HTTPD_REPLY_END:
evhttp_send_reply_end(hreq->backend);
break;
}
}
static enum command_state
send_reply_and_free_cb(void *arg, int *retval)
{
struct httpd_reply *reply = arg;
send_reply_and_free(reply);
return COMMAND_END;
}
void
httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason, httpd_connection_chunkcb cb, void *cbarg)
{
struct httpd_server *server = hreq->backend_data->server;
struct httpd_reply reply = {
.hreq = hreq,
.type = type,
.code = code,
.chunkcb = cb,
.cbarg = cbarg,
.reason = reason,
};
if (type & HTTPD_F_REPLY_LAST)
httpd_request_close_cb_set(hreq, NULL, NULL);
// Sending async is not a option, because then the worker thread might touch
// hreq before we have completed sending the current chunk
if (hreq->is_async)
commands_exec_sync(server->cmdbase, send_reply_and_free_cb, NULL, &reply);
else
send_reply_and_free(&reply);
if (type & HTTPD_F_REPLY_LAST)
httpd_request_free(hreq);
}
httpd_backend_data *
httpd_backend_data_create(httpd_backend *backend, httpd_server *server)
{
httpd_backend_data *backend_data;
CHECK_NULL(L_HTTPD, backend_data = calloc(1, sizeof(httpd_backend_data)));
CHECK_ERR(L_HTTPD, mutex_init(&backend_data->disconnect.lock));
backend_data->server = server;
return backend_data;
}
void
httpd_backend_data_free(httpd_backend_data *backend_data)
{
if (!backend_data)
return;
if (backend_data->disconnect.ev)
event_free(backend_data->disconnect.ev);
free(backend_data);
}
struct event_base *
httpd_backend_evbase_get(httpd_backend *backend)
{
httpd_connection *conn = evhttp_request_get_connection(backend);
if (!conn)
return NULL;
return evhttp_connection_get_base(conn);
}
const char *
httpd_backend_uri_get(httpd_backend *backend, httpd_backend_data *backend_data)
{
return evhttp_request_get_uri(backend);
}
httpd_headers *
httpd_backend_input_headers_get(httpd_backend *backend)
{
return evhttp_request_get_input_headers(backend);
}
httpd_headers *
httpd_backend_output_headers_get(httpd_backend *backend)
{
return evhttp_request_get_output_headers(backend);
}
struct evbuffer *
httpd_backend_input_buffer_get(httpd_backend *backend)
{
return evhttp_request_get_input_buffer(backend);
}
struct evbuffer *
httpd_backend_output_buffer_get(httpd_backend *backend)
{
return evhttp_request_get_output_buffer(backend);
}
int
httpd_backend_peer_get(const char **addr, uint16_t *port, httpd_backend *backend, httpd_backend_data *backend_data)
{
httpd_connection *conn = evhttp_request_get_connection(backend);
if (!conn)
return -1;
#ifdef HAVE_EVHTTP_CONNECTION_GET_PEER_CONST_CHAR
evhttp_connection_get_peer(conn, addr, port);
#else
evhttp_connection_get_peer(conn, (char **)addr, port);
#endif
return 0;
}
int
httpd_backend_method_get(enum httpd_methods *method, httpd_backend *backend)
{
enum evhttp_cmd_type cmd = evhttp_request_get_command(backend);
switch (cmd)
{
case EVHTTP_REQ_GET: *method = HTTPD_METHOD_GET; break;
case EVHTTP_REQ_POST: *method = HTTPD_METHOD_POST; break;
case EVHTTP_REQ_HEAD: *method = HTTPD_METHOD_HEAD; break;
case EVHTTP_REQ_PUT: *method = HTTPD_METHOD_PUT; break;
case EVHTTP_REQ_DELETE: *method = HTTPD_METHOD_DELETE; break;
case EVHTTP_REQ_OPTIONS: *method = HTTPD_METHOD_OPTIONS; break;
case EVHTTP_REQ_TRACE: *method = HTTPD_METHOD_TRACE; break;
case EVHTTP_REQ_CONNECT: *method = HTTPD_METHOD_CONNECT; break;
case EVHTTP_REQ_PATCH: *method = HTTPD_METHOD_PATCH; break;
default: *method = HTTPD_METHOD_GET; return -1;
}
return 0;
}
httpd_uri_parsed *
httpd_uri_parsed_create(httpd_backend *backend)
{
const char *uri = evhttp_request_get_uri(backend);
return httpd_uri_parsed_create_fromuri(uri);
}
httpd_uri_parsed *
httpd_uri_parsed_create_fromuri(const char *uri)
{
struct httpd_uri_parsed *parsed;
const char *query;
char *path = NULL;
char *path_part;
char *ptr;
int i;
parsed = calloc(1, sizeof(struct httpd_uri_parsed));
if (!parsed)
goto error;
parsed->ev_uri = evhttp_uri_parse_with_flags(uri, EVHTTP_URI_NONCONFORMANT);
if (!parsed->ev_uri)
goto error;
query = evhttp_uri_get_query(parsed->ev_uri);
if (query && strchr(query, '=') && evhttp_parse_query_str(query, &(parsed->query)) < 0)
goto error;
path = strdup(evhttp_uri_get_path(parsed->ev_uri));
if (!path || !(parsed->path = evhttp_uridecode(path, 0, NULL)))
goto error;
path_part = strtok_r(path, "/", &ptr);
for (i = 0; (i < ARRAY_SIZE(parsed->path_parts) && path_part); i++)
{
parsed->path_parts[i] = evhttp_uridecode(path_part, 0, NULL);
path_part = strtok_r(NULL, "/", &ptr);
}
// If "path_part" is not NULL, we have path tokens that could not be parsed into the "parsed->path_parts" array
if (path_part)
goto error;
free(path);
return parsed;
error:
free(path);
httpd_uri_parsed_free(parsed);
return NULL;
}
void
httpd_uri_parsed_free(httpd_uri_parsed *parsed)
{
int i;
if (!parsed)
return;
free(parsed->path);
for (i = 0; i < ARRAY_SIZE(parsed->path_parts); i++)
free(parsed->path_parts[i]);
httpd_query_clear(&(parsed->query));
if (parsed->ev_uri)
evhttp_uri_free(parsed->ev_uri);
free(parsed);
}
httpd_query *
httpd_uri_query_get(httpd_uri_parsed *parsed)
{
return &parsed->query;
}
const char *
httpd_uri_path_get(httpd_uri_parsed *parsed)
{
return parsed->path;
}
void
httpd_uri_path_parts_get(httpd_uri_path_parts *path_parts, httpd_uri_parsed *parsed)
{
memcpy(path_parts, parsed->path_parts, sizeof(httpd_uri_path_parts));
}

View File

@ -28,7 +28,7 @@
#include <stdint.h>
#include <inttypes.h>
#include "httpd_oauth.h"
#include "httpd_internal.h"
#include "logger.h"
#include "misc.h"
#include "conffile.h"
@ -54,12 +54,12 @@ oauth_reply_spotify(struct httpd_request *hreq)
ret = spotifywebapi_oauth_callback(hreq->query, redirect_uri, &errmsg);
if (ret < 0)
{
DPRINTF(E_LOG, L_WEB, "Could not parse Spotify OAuth callback '%s': %s\n", hreq->uri_parsed->uri, errmsg);
httpd_send_error(hreq->req, HTTP_INTERNAL, errmsg);
DPRINTF(E_LOG, L_WEB, "Could not parse Spotify OAuth callback '%s': %s\n", hreq->uri, errmsg);
httpd_send_error(hreq, HTTP_INTERNAL, errmsg);
return -1;
}
httpd_redirect_to(hreq->req, "/#/settings/online-services");
httpd_redirect_to(hreq, "/#/settings/online-services");
return 0;
}
@ -69,7 +69,7 @@ oauth_reply_spotify(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_WEB, "This version was built without support for Spotify\n");
httpd_send_error(hreq->req, HTTP_NOTFOUND, "This version was built without support for Spotify");
httpd_send_error(hreq, HTTP_NOTFOUND, "This version was built without support for Spotify");
return -1;
}
@ -90,65 +90,27 @@ static struct httpd_uri_map oauth_handlers[] =
/* ------------------------------- OAUTH API -------------------------------- */
void
oauth_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed)
static void
oauth_request(struct httpd_request *hreq)
{
struct httpd_request *hreq;
DPRINTF(E_LOG, L_WEB, "OAuth request: '%s'\n", uri_parsed->uri);
hreq = httpd_request_parse(req, uri_parsed, NULL, oauth_handlers);
if (!hreq)
if (!hreq->handler)
{
DPRINTF(E_LOG, L_WEB, "Unrecognized path '%s' in OAuth request: '%s'\n", uri_parsed->path, uri_parsed->uri);
DPRINTF(E_LOG, L_WEB, "Unrecognized path in OAuth request: '%s'\n", hreq->uri);
httpd_send_error(req, HTTP_NOTFOUND, NULL);
httpd_send_error(hreq, HTTP_NOTFOUND, NULL);
return;
}
hreq->handler(hreq);
free(hreq);
}
int
oauth_is_request(const char *path)
struct httpd_module httpd_oauth =
{
if (strncmp(path, "/oauth/", strlen("/oauth/")) == 0)
return 1;
if (strcmp(path, "/oauth") == 0)
return 1;
return 0;
}
int
oauth_init(void)
{
char buf[64];
int i;
int ret;
for (i = 0; oauth_handlers[i].handler; i++)
{
ret = regcomp(&oauth_handlers[i].preg, oauth_handlers[i].regexp, REG_EXTENDED | REG_NOSUB);
if (ret != 0)
{
regerror(ret, &oauth_handlers[i].preg, buf, sizeof(buf));
DPRINTF(E_FATAL, L_WEB, "OAuth init failed; regexp error: %s\n", buf);
return -1;
}
}
return 0;
}
void
oauth_deinit(void)
{
int i;
for (i = 0; oauth_handlers[i].handler; i++)
regfree(&oauth_handlers[i].preg);
}
.name = "OAuth",
.type = MODULE_OAUTH,
.logdomain = L_WEB,
.subpaths = { "/oauth/", NULL },
.fullpaths = { "/oauth", NULL },
.handlers = oauth_handlers,
.request = oauth_request,
};

View File

@ -1,18 +0,0 @@
#ifndef __HTTPD_OAUTH_H__
#define __HTTPD_OAUTH_H__
#include "httpd.h"
void
oauth_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
oauth_is_request(const char *path);
int
oauth_init(void);
void
oauth_deinit(void);
#endif /* !__HTTPD_OAUTH_H__ */

View File

@ -32,12 +32,11 @@
#include "mxml-compat.h"
#include "httpd_rsp.h"
#include "httpd_internal.h"
#include "logger.h"
#include "db.h"
#include "conffile.h"
#include "misc.h"
#include "httpd.h"
#include "transcode.h"
#include "parsers/rsp_parser.h"
@ -120,28 +119,17 @@ static const struct field_map rsp_fields[] =
/* -------------------------------- HELPERS --------------------------------- */
static struct evbuffer *
mxml_to_evbuf(mxml_node_t *tree)
static int
mxml_to_evbuf(struct evbuffer *evbuf, mxml_node_t *tree)
{
struct evbuffer *evbuf;
char *xml;
int ret;
evbuf = evbuffer_new();
if (!evbuf)
{
DPRINTF(E_LOG, L_RSP, "Could not create evbuffer for RSP reply\n");
return NULL;
}
xml = mxmlSaveAllocString(tree, MXML_NO_CALLBACK);
if (!xml)
{
DPRINTF(E_LOG, L_RSP, "Could not finalize RSP reply\n");
evbuffer_free(evbuf);
return NULL;
return -1;
}
ret = evbuffer_add(evbuf, xml, strlen(xml));
@ -149,22 +137,19 @@ mxml_to_evbuf(mxml_node_t *tree)
if (ret < 0)
{
DPRINTF(E_LOG, L_RSP, "Could not load evbuffer for RSP reply\n");
evbuffer_free(evbuf);
return NULL;
return -1;
}
return evbuf;
return 0;
}
static void
rsp_send_error(struct evhttp_request *req, char *errmsg)
rsp_send_error(struct httpd_request *hreq, char *errmsg)
{
struct evbuffer *evbuf;
struct evkeyvalq *headers;
mxml_node_t *reply;
mxml_node_t *status;
mxml_node_t *node;
int ret;
/* We'd use mxmlNewXML(), but then we can't put any attributes
* on the root node and we need some.
@ -187,23 +172,19 @@ rsp_send_error(struct evhttp_request *req, char *errmsg)
node = mxmlNewElement(status, "totalrecords");
mxmlNewText(node, 0, "0");
evbuf = mxml_to_evbuf(reply);
ret = mxml_to_evbuf(hreq->out_body, reply);
mxmlDelete(reply);
if (!evbuf)
if (ret < 0)
{
httpd_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
return;
}
headers = evhttp_request_get_output_headers(req);
evhttp_add_header(headers, "Content-Type", "text/xml; charset=utf-8");
evhttp_add_header(headers, "Connection", "close");
httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8");
httpd_header_add(hreq->out_headers, "Connection", "close");
httpd_send_reply(req, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
}
static int
@ -215,25 +196,25 @@ query_params_set(struct query_params *qp, struct httpd_request *hreq)
int ret;
qp->offset = 0;
param = evhttp_find_header(hreq->query, "offset");
param = httpd_query_value_find(hreq->query, "offset");
if (param)
{
ret = safe_atoi32(param, &qp->offset);
if (ret < 0)
{
rsp_send_error(hreq->req, "Invalid offset");
rsp_send_error(hreq, "Invalid offset");
return -1;
}
}
qp->limit = 0;
param = evhttp_find_header(hreq->query, "limit");
param = httpd_query_value_find(hreq->query, "limit");
if (param)
{
ret = safe_atoi32(param, &qp->limit);
if (ret < 0)
{
rsp_send_error(hreq->req, "Invalid limit");
rsp_send_error(hreq, "Invalid limit");
return -1;
}
}
@ -244,7 +225,7 @@ query_params_set(struct query_params *qp, struct httpd_request *hreq)
qp->idx_type = I_NONE;
qp->filter = NULL;
param = evhttp_find_header(hreq->query, "query");
param = httpd_query_value_find(hreq->query, "query");
if (param)
{
ret = snprintf(query, sizeof(query), "%s", param);
@ -278,28 +259,23 @@ query_params_set(struct query_params *qp, struct httpd_request *hreq)
}
static void
rsp_send_reply(struct evhttp_request *req, mxml_node_t *reply)
rsp_send_reply(struct httpd_request *hreq, mxml_node_t *reply)
{
struct evbuffer *evbuf;
struct evkeyvalq *headers;
int ret;
evbuf = mxml_to_evbuf(reply);
ret = mxml_to_evbuf(hreq->out_body, reply);
mxmlDelete(reply);
if (!evbuf)
if (ret < 0)
{
rsp_send_error(req, "Could not finalize reply");
rsp_send_error(hreq, "Could not finalize reply");
return;
}
headers = evhttp_request_get_output_headers(req);
evhttp_add_header(headers, "Content-Type", "text/xml; charset=utf-8");
evhttp_add_header(headers, "Connection", "close");
httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8");
httpd_header_add(hreq->out_headers, "Connection", "close");
httpd_send_reply(req, HTTP_OK, "OK", evbuf, 0);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
}
static int
@ -318,7 +294,7 @@ rsp_request_authorize(struct httpd_request *hreq)
DPRINTF(E_DBG, L_RSP, "Checking authentication for library\n");
// We don't care about the username
ret = httpd_basic_auth(hreq->req, NULL, passwd, cfg_getstr(cfg_getsec(cfg, "library"), "name"));
ret = httpd_basic_auth(hreq, NULL, passwd, cfg_getstr(cfg_getsec(cfg, "library"), "name"));
if (ret != 0)
{
DPRINTF(E_LOG, L_RSP, "Unsuccessful library authorization attempt from '%s'\n", hreq->peer_address);
@ -382,7 +358,7 @@ rsp_reply_info(struct httpd_request *hreq)
node = mxmlNewElement(info, "name");
mxmlNewText(node, 0, library);
rsp_send_reply(hreq->req, reply);
rsp_send_reply(hreq, reply);
return 0;
}
@ -411,7 +387,7 @@ rsp_reply_db(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_RSP, "Could not start query\n");
rsp_send_error(hreq->req, "Could not start query");
rsp_send_error(hreq, "Could not start query");
return -1;
}
@ -465,7 +441,7 @@ rsp_reply_db(struct httpd_request *hreq)
mxmlDelete(reply);
db_query_end(&qp);
rsp_send_error(hreq->req, "Error fetching query results");
rsp_send_error(hreq, "Error fetching query results");
return -1;
}
@ -479,7 +455,7 @@ rsp_reply_db(struct httpd_request *hreq)
db_query_end(&qp);
rsp_send_reply(hreq->req, reply);
rsp_send_reply(hreq, reply);
return 0;
}
@ -489,7 +465,6 @@ rsp_reply_playlist(struct httpd_request *hreq)
{
struct query_params qp;
struct db_media_file_info dbmfi;
struct evkeyvalq *headers;
const char *param;
const char *ua;
const char *client_codecs;
@ -508,10 +483,10 @@ rsp_reply_playlist(struct httpd_request *hreq)
memset(&qp, 0, sizeof(struct query_params));
ret = safe_atoi32(hreq->uri_parsed->path_parts[2], &qp.id);
ret = safe_atoi32(hreq->path_parts[2], &qp.id);
if (ret < 0)
{
rsp_send_error(hreq->req, "Invalid playlist ID");
rsp_send_error(hreq, "Invalid playlist ID");
return -1;
}
@ -523,7 +498,7 @@ rsp_reply_playlist(struct httpd_request *hreq)
qp.sort = S_NAME;
mode = F_FULL;
param = evhttp_find_header(hreq->query, "type");
param = httpd_query_value_find(hreq->query, "type");
if (param)
{
if (strcasecmp(param, "full") == 0)
@ -547,7 +522,7 @@ rsp_reply_playlist(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_RSP, "Could not start query\n");
rsp_send_error(hreq->req, "Could not start query");
rsp_send_error(hreq, "Could not start query");
if (qp.filter)
free(qp.filter);
@ -587,10 +562,8 @@ rsp_reply_playlist(struct httpd_request *hreq)
/* Items block (all items) */
while ((ret = db_query_fetch_file(&dbmfi, &qp)) == 0)
{
headers = evhttp_request_get_input_headers(hreq->req);
ua = evhttp_find_header(headers, "User-Agent");
client_codecs = evhttp_find_header(headers, "Accept-Codecs");
ua = httpd_header_find(hreq->in_headers, "User-Agent");
client_codecs = httpd_header_find(hreq->in_headers, "Accept-Codecs");
transcode = transcode_needed(ua, client_codecs, dbmfi.codectype);
@ -658,7 +631,7 @@ rsp_reply_playlist(struct httpd_request *hreq)
mxmlDelete(reply);
db_query_end(&qp);
rsp_send_error(hreq->req, "Error fetching query results");
rsp_send_error(hreq, "Error fetching query results");
return -1;
}
@ -672,7 +645,7 @@ rsp_reply_playlist(struct httpd_request *hreq)
db_query_end(&qp);
rsp_send_reply(hreq->req, reply);
rsp_send_reply(hreq, reply);
return 0;
}
@ -691,34 +664,34 @@ rsp_reply_browse(struct httpd_request *hreq)
memset(&qp, 0, sizeof(struct query_params));
if (strcmp(hreq->uri_parsed->path_parts[3], "artist") == 0)
if (strcmp(hreq->path_parts[3], "artist") == 0)
{
qp.type = Q_BROWSE_ARTISTS;
}
else if (strcmp(hreq->uri_parsed->path_parts[3], "genre") == 0)
else if (strcmp(hreq->path_parts[3], "genre") == 0)
{
qp.type = Q_BROWSE_GENRES;
}
else if (strcmp(hreq->uri_parsed->path_parts[3], "album") == 0)
else if (strcmp(hreq->path_parts[3], "album") == 0)
{
qp.type = Q_BROWSE_ALBUMS;
}
else if (strcmp(hreq->uri_parsed->path_parts[3], "composer") == 0)
else if (strcmp(hreq->path_parts[3], "composer") == 0)
{
qp.type = Q_BROWSE_COMPOSERS;
}
else
{
DPRINTF(E_LOG, L_RSP, "Unsupported browse type '%s'\n", hreq->uri_parsed->path_parts[3]);
DPRINTF(E_LOG, L_RSP, "Unsupported browse type '%s'\n", hreq->path_parts[3]);
rsp_send_error(hreq->req, "Unsupported browse type");
rsp_send_error(hreq, "Unsupported browse type");
return -1;
}
ret = safe_atoi32(hreq->uri_parsed->path_parts[2], &qp.id);
ret = safe_atoi32(hreq->path_parts[2], &qp.id);
if (ret < 0)
{
rsp_send_error(hreq->req, "Invalid playlist ID");
rsp_send_error(hreq, "Invalid playlist ID");
return -1;
}
@ -731,7 +704,7 @@ rsp_reply_browse(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_RSP, "Could not start query\n");
rsp_send_error(hreq->req, "Could not start query");
rsp_send_error(hreq, "Could not start query");
if (qp.filter)
free(qp.filter);
@ -784,7 +757,7 @@ rsp_reply_browse(struct httpd_request *hreq)
mxmlDelete(reply);
db_query_end(&qp);
rsp_send_error(hreq->req, "Error fetching query results");
rsp_send_error(hreq, "Error fetching query results");
return -1;
}
@ -798,7 +771,7 @@ rsp_reply_browse(struct httpd_request *hreq)
db_query_end(&qp);
rsp_send_reply(hreq->req, reply);
rsp_send_reply(hreq, reply);
return 0;
}
@ -809,14 +782,14 @@ rsp_stream(struct httpd_request *hreq)
int id;
int ret;
ret = safe_atoi32(hreq->uri_parsed->path_parts[2], &id);
ret = safe_atoi32(hreq->path_parts[2], &id);
if (ret < 0)
{
httpd_send_error(hreq->req, HTTP_BADREQUEST, "Bad Request");
httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request");
return -1;
}
httpd_stream_file(hreq->req, id);
httpd_stream_file(hreq, id);
return 0;
}
@ -852,7 +825,7 @@ static struct httpd_uri_map rsp_handlers[] =
},
{
.regexp = "^/rsp/stream/[[:digit:]]+$",
.handler = rsp_stream
.handler = rsp_stream,
},
{
.regexp = NULL,
@ -863,74 +836,45 @@ static struct httpd_uri_map rsp_handlers[] =
/* -------------------------------- RSP API --------------------------------- */
void
rsp_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed)
static void
rsp_request(struct httpd_request *hreq)
{
struct httpd_request *hreq;
int ret;
DPRINTF(E_DBG, L_RSP, "RSP request: '%s'\n", uri_parsed->uri);
hreq = httpd_request_parse(req, uri_parsed, NULL, rsp_handlers);
if (!hreq)
if (!hreq->handler)
{
DPRINTF(E_LOG, L_RSP, "Unrecognized path '%s' in RSP request: '%s'\n", uri_parsed->path, uri_parsed->uri);
DPRINTF(E_LOG, L_RSP, "Unrecognized path in RSP request: '%s'\n", hreq->uri);
rsp_send_error(req, "Server error");
rsp_send_error(hreq, "Server error");
return;
}
ret = rsp_request_authorize(hreq);
if (ret < 0)
{
rsp_send_error(req, "Access denied");
rsp_send_error(hreq, "Access denied");
free(hreq);
return;
}
hreq->handler(hreq);
free(hreq);
}
int
rsp_is_request(const char *path)
{
if (strncmp(path, "/rsp/", strlen("/rsp/")) == 0)
return 1;
return 0;
}
int
static int
rsp_init(void)
{
char buf[64];
int i;
int ret;
snprintf(rsp_filter_files, sizeof(rsp_filter_files), "f.data_kind = %d", DATA_KIND_FILE);
for (i = 0; rsp_handlers[i].handler; i++)
{
ret = regcomp(&rsp_handlers[i].preg, rsp_handlers[i].regexp, REG_EXTENDED | REG_NOSUB);
if (ret != 0)
{
regerror(ret, &rsp_handlers[i].preg, buf, sizeof(buf));
DPRINTF(E_FATAL, L_RSP, "RSP init failed; regexp error: %s\n", buf);
return -1;
}
}
return 0;
}
void
rsp_deinit(void)
struct httpd_module httpd_rsp =
{
int i;
for (i = 0; rsp_handlers[i].handler; i++)
regfree(&rsp_handlers[i].preg);
}
.name = "RSP",
.type = MODULE_RSP,
.logdomain = L_RSP,
.subpaths = { "/rsp/", NULL },
.handlers = rsp_handlers,
.init = rsp_init,
.request = rsp_request,
};

View File

@ -1,19 +0,0 @@
#ifndef __HTTPD_RSP_H__
#define __HTTPD_RSP_H__
#include "httpd.h"
int
rsp_init(void);
void
rsp_deinit(void);
void
rsp_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
rsp_is_request(const char *path);
#endif /* !__HTTPD_RSP_H__ */

View File

@ -1,5 +1,5 @@
/*
* Copyright (C) 2015 Espen Jürgensen <espenjurgensen@gmail.com>
* Copyright (C) 2023 Espen Jürgensen <espenjurgensen@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -23,265 +23,66 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
#include <uninorm.h>
#include <unistd.h>
#include <pthread.h>
#include <errno.h>
#include <event2/event.h>
#include <event2/buffer.h>
#include "httpd_streaming.h"
#include "httpd_internal.h"
#include "outputs/streaming.h"
#include "logger.h"
#include "conffile.h"
#include "transcode.h"
#include "player.h"
#include "listener.h"
#include "db.h"
/* httpd event base, from httpd.c */
extern struct event_base *evbase_httpd;
// Seconds between sending silence when player is idle
// (to prevent client from hanging up)
#define STREAMING_SILENCE_INTERVAL 1
// How many bytes we try to read at a time from the httpd pipe
#define STREAMING_READ_SIZE STOB(352, 16, 2)
#define STREAMING_MP3_SAMPLE_RATE 44100
#define STREAMING_MP3_BPS 16
#define STREAMING_MP3_CHANNELS 2
#define STREAMING_MP3_BIT_RATE 192000
// Linked list of mp3 streaming requests
struct streaming_session {
struct evhttp_request *req;
struct streaming_session *next;
struct httpd_request *hreq;
bool require_icy; // Client requested icy meta
size_t bytes_sent; // Audio bytes sent since last metablock
int fd;
struct event *readev;
struct evbuffer *readbuf;
size_t bytes_sent;
bool icy_is_requested;
size_t icy_remaining;
};
static pthread_mutex_t streaming_sessions_lck;
static struct streaming_session *streaming_sessions;
// Means we're not able to encode to mp3
static bool streaming_not_supported;
static struct media_quality streaming_default_quality = {
.sample_rate = 44100,
.bits_per_sample = 16,
.channels = 2,
.bit_rate = 128000,
};
// Interval for sending silence when playback is paused
static struct timeval streaming_silence_tv = { STREAMING_SILENCE_INTERVAL, 0 };
// Input buffer, output buffer and encoding ctx for transcode
static struct encode_ctx *streaming_encode_ctx;
static struct evbuffer *streaming_encoded_data;
static struct media_quality streaming_quality_in;
static struct media_quality streaming_quality_out = { STREAMING_MP3_SAMPLE_RATE, STREAMING_MP3_BPS, STREAMING_MP3_CHANNELS, STREAMING_MP3_BIT_RATE };
/* ------------------------------ ICY metadata -------------------------------*/
// Used for pushing events and data from the player
static struct event *streamingev;
static struct event *metaev;
static struct player_status streaming_player_status;
static int streaming_player_changed;
static int streaming_pipe[2];
static int streaming_meta[2];
// To test mp3 and ICY tagm it is good to use:
// mpv --display-tags=* http://localhost:3689/stream.mp3
#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes)
#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer)
/* As streaming quality goes up, we send more data to the remote client. With a
* smaller ICY_METAINT value we have to splice metadata more frequently - on
* some devices with small input buffers, a higher quality stream and low
* ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge
*/
#define STREAMING_ICY_METAINT_DEFAULT 16384
static unsigned short streaming_icy_metaint = STREAMING_ICY_METAINT_DEFAULT;
static unsigned streaming_icy_clients;
// As streaming quality goes up, we send more data to the remote client. With a
// smaller ICY_METAINT value we have to splice metadata more frequently - on
// some devices with small input buffers, a higher quality stream and low
// ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge
static unsigned short streaming_icy_metaint = 16384;
static pthread_mutex_t streaming_metadata_lck;
static char streaming_icy_title[STREAMING_ICY_METATITLELEN_MAX];
static void
streaming_close_cb(struct evhttp_connection *evcon, void *arg)
{
struct streaming_session *this;
struct streaming_session *session;
struct streaming_session *prev;
const char *address;
ev_uint16_t port;
this = (struct streaming_session *)arg;
httpd_peer_get(&address, &port, evcon);
DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", address, (int)port);
pthread_mutex_lock(&streaming_sessions_lck);
if (!streaming_sessions)
{
// This close comes during deinit() - we don't free `this` since it is
// already a dangling ptr (free'd in deinit()) at this stage
pthread_mutex_unlock(&streaming_sessions_lck);
return;
}
prev = NULL;
for (session = streaming_sessions; session; session = session->next)
{
if (session->req == this->req)
break;
prev = session;
}
if (!session)
{
DPRINTF(E_LOG, L_STREAMING, "Bug! Got a failure callback for an unknown stream (%s:%d)\n", address, (int)port);
free(this);
pthread_mutex_unlock(&streaming_sessions_lck);
return;
}
if (!prev)
streaming_sessions = session->next;
else
prev->next = session->next;
if (session->require_icy)
--streaming_icy_clients;
// Valgrind says libevent doesn't free the request on disconnect (even though it owns it - libevent bug?),
// so we do it with a reply end
evhttp_send_reply_end(session->req);
free(session);
if (!streaming_sessions)
{
DPRINTF(E_INFO, L_STREAMING, "No more clients, will stop streaming\n");
event_del(streamingev);
event_del(metaev);
}
pthread_mutex_unlock(&streaming_sessions_lck);
}
static void
streaming_end(void)
{
struct streaming_session *session;
struct evhttp_connection *evcon;
const char *address;
ev_uint16_t port;
pthread_mutex_lock(&streaming_sessions_lck);
for (session = streaming_sessions; streaming_sessions; session = streaming_sessions)
{
evcon = evhttp_request_get_connection(session->req);
if (evcon)
{
evhttp_connection_set_closecb(evcon, NULL, NULL);
httpd_peer_get(&address, &port, evcon);
DPRINTF(E_INFO, L_STREAMING, "Force close stream to %s:%d\n", address, (int)port);
}
evhttp_send_reply_end(session->req);
streaming_sessions = session->next;
free(session);
}
pthread_mutex_unlock(&streaming_sessions_lck);
event_del(streamingev);
event_del(metaev);
}
static void
streaming_meta_cb(evutil_socket_t fd, short event, void *arg)
{
struct media_quality quality;
struct decode_ctx *decode_ctx;
int ret;
transcode_encode_cleanup(&streaming_encode_ctx);
ret = read(fd, &quality, sizeof(struct media_quality));
if (ret != sizeof(struct media_quality))
goto error;
decode_ctx = NULL;
if (quality.bits_per_sample == 16)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality);
else if (quality.bits_per_sample == 24)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality);
else if (quality.bits_per_sample == 32)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality);
if (!decode_ctx)
goto error;
streaming_encode_ctx = transcode_encode_setup(XCODE_MP3, &streaming_quality_out, decode_ctx, NULL, 0, 0);
transcode_decode_cleanup(&decode_ctx);
if (!streaming_encode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Will not be able to stream MP3, libav does not support MP3 encoding: %d/%d/%d @ %d\n", streaming_quality_out.sample_rate, streaming_quality_out.bits_per_sample, streaming_quality_out.channels, streaming_quality_out.bit_rate);
streaming_not_supported = 1;
streaming_end();
return;
}
streaming_quality_in = quality;
streaming_not_supported = 0;
return;
error:
DPRINTF(E_LOG, L_STREAMING, "Unknown or unsupported quality of input data (%d/%d/%d), cannot MP3 encode\n", quality.sample_rate, quality.bits_per_sample, quality.channels);
streaming_not_supported = 1;
streaming_end();
}
static int
encode_buffer(uint8_t *buffer, size_t size)
{
transcode_frame *frame;
int samples;
int ret;
if (streaming_not_supported)
{
DPRINTF(E_LOG, L_STREAMING, "Streaming unsupported\n");
return -1;
}
if (streaming_quality_in.channels == 0)
{
DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", streaming_quality_in.sample_rate, streaming_quality_in.bits_per_sample, streaming_quality_in.channels);
return -1;
}
samples = BTOS(size, streaming_quality_in.bits_per_sample, streaming_quality_in.channels);
frame = transcode_frame_new(buffer, size, samples, &streaming_quality_in);
if (!frame)
{
DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n");
return -1;
}
ret = transcode_encode(streaming_encoded_data, streaming_encode_ctx, frame, 0);
transcode_frame_free(frame);
return ret;
}
/* We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must
* provide a buf of this size to avoid needless mallocs
*
* The icy meta block is defined by a single byte indicating how many double byte
* words used for the actual meta. Unused bytes are null padded
*
* https://stackoverflow.com/questions/4911062/pulling-track-info-from-an-audio-stream-using-php/4914538#4914538
* http://www.smackfu.com/stuff/programming/shoutcast.html
*/
// We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must
// provide a buf of this size to avoid needless mallocs
//
// The icy meta block is defined by a single byte indicating how many double byte
// words used for the actual meta. Unused bytes are null padded
//
// https://stackoverflow.com/questions/4911062/pulling-track-info-from-an-audio-stream-using-php/4914538#4914538
// http://www.smackfu.com/stuff/programming/shoutcast.html
static uint8_t *
streaming_icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], const char *title, unsigned *buflen)
icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], unsigned *buflen, const char *title)
{
unsigned titlelen;
unsigned metalen;
@ -322,317 +123,203 @@ streaming_icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], const char *
return buf;
}
static uint8_t *
streaming_icy_meta_splice(const uint8_t *data, size_t datalen, off_t offset, size_t *len)
static void
icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining)
{
uint8_t meta[STREAMING_ICY_METALEN_MAX+1]; // Buffer, of max sz, for the created icymeta
unsigned metalen; // How much of the buffer is in use
uint8_t *buf; // Client returned buffer; contains the audio (from data) spliced w/meta (from meta)
uint8_t meta[STREAMING_ICY_METALEN_MAX + 1];
unsigned metalen;
size_t buf_remaining;
size_t consume;
if (data == NULL || datalen == 0)
return NULL;
for (buf_remaining = evbuffer_get_length(in); buf_remaining > 0; buf_remaining -= consume)
{
consume = MIN(*icy_remaining, buf_remaining);
evbuffer_remove_buffer(in, out, consume);
*icy_remaining -= consume;
if (*icy_remaining == 0)
{
pthread_mutex_lock(&streaming_metadata_lck);
icy_meta_create(meta, &metalen, streaming_icy_title);
pthread_mutex_unlock(&streaming_metadata_lck);
memset(meta, 0, sizeof(meta));
streaming_icy_meta_create(meta, streaming_icy_title, &metalen);
*len = datalen + metalen;
// DPRINTF(E_DBG, L_STREAMING, "splicing meta, audio block=%d bytes, offset=%d, metalen=%d new buflen=%d\n", datalen, offset, metalen, *len);
buf = malloc(*len);
memcpy(buf, data, offset);
memcpy(buf+offset, &meta[0], metalen);
memcpy(buf+offset+metalen, data+offset, datalen-offset);
return buf;
evbuffer_add(out, meta, metalen);
*icy_remaining = streaming_icy_metaint;
}
}
}
// Thread: player. TODO Would be nice to avoid the lock. Consider moving all the
// ICY tag stuff to streaming.c and make a STREAMING_FORMAT_MP3_ICY?
static void
streaming_player_status_update(void)
icy_metadata_cb(char *metadata)
{
struct db_queue_item *queue_item;
uint32_t prev_id;
prev_id = streaming_player_status.id;
player_get_status(&streaming_player_status);
if (prev_id == streaming_player_status.id || !streaming_icy_clients)
{
return;
}
queue_item = db_queue_fetch_byfileid(streaming_player_status.id);
if (!queue_item)
{
streaming_icy_title[0] = '\0';
return;
}
snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s - %s", queue_item->title, queue_item->artist);
free_queue_item(queue_item, 0);
pthread_mutex_lock(&streaming_metadata_lck);
snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s", metadata);
pthread_mutex_unlock(&streaming_metadata_lck);
}
/* ----------------------------- Session helpers ---------------------------- */
static void
streaming_send_cb(evutil_socket_t fd, short event, void *arg)
session_free(struct streaming_session *session)
{
if (!session)
return;
if (session->readev)
{
streaming_session_deregister(session->fd);
event_free(session->readev);
}
evbuffer_free(session->readbuf);
free(session);
}
static struct streaming_session *
session_new(struct httpd_request *hreq, bool icy_is_requested)
{
struct streaming_session *session;
struct evbuffer *evbuf;
uint8_t rawbuf[STREAMING_READ_SIZE];
uint8_t *buf;
uint8_t *splice_buf = NULL;
size_t splice_len;
size_t count;
int overflow;
CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session)));
CHECK_NULL(L_STREAMING, session->readbuf = evbuffer_new());
session->hreq = hreq;
session->icy_is_requested = icy_is_requested;
session->icy_remaining = streaming_icy_metaint;
return session;
}
/* ----------------------------- Event callbacks ---------------------------- */
static void
conn_close_cb(void *arg)
{
struct streaming_session *session = arg;
session_free(session);
}
static void
read_cb(evutil_socket_t fd, short event, void *arg)
{
struct streaming_session *session = arg;
struct httpd_request *hreq;
int len;
int ret;
// Player wrote data to the pipe (EV_READ)
if (event & EV_READ)
CHECK_NULL(L_STREAMING, hreq = session->hreq);
len = evbuffer_read(session->readbuf, fd, -1);
if (len < 0 && errno != EAGAIN)
{
while (1)
{
ret = read(fd, &rawbuf, sizeof(rawbuf));
if (ret <= 0)
break;
DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port);
if (streaming_player_changed)
{
streaming_player_changed = 0;
streaming_player_status_update();
}
ret = encode_buffer(rawbuf, ret);
if (ret < 0)
return;
}
httpd_send_reply_end(session->hreq);
session_free(session);
return;
}
// Event timed out, let's see what the player is doing and send silence if it is paused
if (session->icy_is_requested)
icy_meta_splice(hreq->out_body, session->readbuf, &session->icy_remaining);
else
{
if (streaming_player_changed)
{
streaming_player_changed = 0;
streaming_player_status_update();
}
evbuffer_add_buffer(hreq->out_body, session->readbuf);
if (streaming_player_status.status != PLAY_PAUSED)
return;
httpd_send_reply_chunk(hreq, NULL, NULL);
memset(&rawbuf, 0, sizeof(rawbuf));
ret = encode_buffer(rawbuf, sizeof(rawbuf));
if (ret < 0)
return;
}
len = evbuffer_get_length(streaming_encoded_data);
if (len == 0)
return;
// Send data
evbuf = evbuffer_new();
pthread_mutex_lock(&streaming_sessions_lck);
for (session = streaming_sessions; session; session = session->next)
{
// Does this session want ICY meta data and is it time to send?
count = session->bytes_sent+len;
if (session->require_icy && count > streaming_icy_metaint)
{
overflow = count%streaming_icy_metaint;
buf = evbuffer_pullup(streaming_encoded_data, -1);
// DPRINTF(E_DBG, L_STREAMING, "session=%x sent=%ld len=%ld overflow=%ld\n", session, session->bytes_sent, len, overflow);
// Splice the 'icy title' in with the encoded audio data
splice_len = 0;
splice_buf = streaming_icy_meta_splice(buf, len, len-overflow, &splice_len);
evbuffer_add(evbuf, splice_buf, splice_len);
free(splice_buf);
splice_buf = NULL;
evhttp_send_reply_chunk(session->req, evbuf);
if (session->next == NULL)
{
// We're the last session, drop the contents of the encoded buffer
evbuffer_drain(streaming_encoded_data, len);
}
session->bytes_sent = overflow;
}
else
{
if (session->next)
{
buf = evbuffer_pullup(streaming_encoded_data, -1);
evbuffer_add(evbuf, buf, len);
evhttp_send_reply_chunk(session->req, evbuf);
}
else
{
evhttp_send_reply_chunk(session->req, streaming_encoded_data);
}
session->bytes_sent += len;
}
}
pthread_mutex_unlock(&streaming_sessions_lck);
evbuffer_free(evbuf);
session->bytes_sent += len;
}
// Thread: player (not fully thread safe, but hey...)
static void
player_change_cb(short event_mask)
{
streaming_player_changed = 1;
}
// Thread: player (also prone to race conditions, mostly during deinit)
void
streaming_write(struct output_buffer *obuf)
{
int ret;
/* -------------------------- Module implementation ------------------------- */
// Explicit no-lock - let the write to pipes fail if during deinit
if (!streaming_sessions)
return;
if (!quality_is_equal(&obuf->data[0].quality, &streaming_quality_in))
{
ret = write(streaming_meta[1], &obuf->data[0].quality, sizeof(struct media_quality));
if (ret < 0)
{
if (errno == EBADF)
DPRINTF(E_LOG, L_STREAMING, "streaming pipe already closed\n");
else
DPRINTF(E_LOG, L_STREAMING, "Error writing to streaming pipe: %s\n", strerror(errno));
return;
}
}
ret = write(streaming_pipe[1], obuf->data[0].buffer, obuf->data[0].bufsize);
if (ret < 0)
{
if (errno == EAGAIN)
DPRINTF(E_WARN, L_STREAMING, "Streaming pipe full, skipping write\n");
else
{
if (errno == EBADF)
DPRINTF(E_LOG, L_STREAMING, "Streaming pipe already closed\n");
else
DPRINTF(E_LOG, L_STREAMING, "Error writing to streaming pipe: %s\n", strerror(errno));
}
}
}
int
streaming_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed)
static int
streaming_mp3_handler(struct httpd_request *hreq)
{
struct streaming_session *session;
struct evhttp_connection *evcon;
struct evkeyvalq *output_headers;
cfg_t *lib;
const char *name;
const char *address;
ev_uint16_t port;
const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name");
const char *param;
bool require_icy = false;
bool icy_is_requested;
char buf[9];
if (streaming_not_supported)
param = httpd_header_find(hreq->in_headers, "Icy-MetaData");
icy_is_requested = (param && strcmp(param, "1") == 0);
if (icy_is_requested)
{
DPRINTF(E_LOG, L_STREAMING, "Got MP3 streaming request, but cannot encode to MP3\n");
evhttp_send_error(req, HTTP_NOTFOUND, "Not Found");
return -1;
httpd_header_add(hreq->out_headers, "icy-name", name);
snprintf(buf, sizeof(buf), "%d", streaming_icy_metaint);
httpd_header_add(hreq->out_headers, "icy-metaint", buf);
}
evcon = evhttp_request_get_connection(req);
httpd_peer_get(&address, &port, evcon);
param = evhttp_find_header( evhttp_request_get_input_headers(req), "Icy-MetaData");
if (param && strcmp(param, "1") == 0)
require_icy = true;
DPRINTF(E_INFO, L_STREAMING, "Beginning mp3 streaming (with icy=%d, icy_metaint=%d) to %s:%d\n", require_icy, streaming_icy_metaint, address, (int)port);
lib = cfg_getsec(cfg, "library");
name = cfg_getstr(lib, "name");
output_headers = evhttp_request_get_output_headers(req);
evhttp_add_header(output_headers, "Content-Type", "audio/mpeg");
evhttp_add_header(output_headers, "Server", PACKAGE_NAME "/" VERSION);
evhttp_add_header(output_headers, "Cache-Control", "no-cache");
evhttp_add_header(output_headers, "Pragma", "no-cache");
evhttp_add_header(output_headers, "Expires", "Mon, 31 Aug 2015 06:00:00 GMT");
if (require_icy)
{
++streaming_icy_clients;
evhttp_add_header(output_headers, "icy-name", name);
snprintf(buf, sizeof(buf)-1, "%d", streaming_icy_metaint);
evhttp_add_header(output_headers, "icy-metaint", buf);
}
evhttp_add_header(output_headers, "Access-Control-Allow-Origin", "*");
evhttp_add_header(output_headers, "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS");
evhttp_send_reply_start(req, HTTP_OK, "OK");
session = calloc(1, sizeof(struct streaming_session));
session = session_new(hreq, icy_is_requested);
if (!session)
{
DPRINTF(E_LOG, L_STREAMING, "Out of memory for streaming request\n");
return -1;
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
return -1;
}
// Ask streaming output module for a fd to read mp3 from
session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality);
pthread_mutex_lock(&streaming_sessions_lck);
CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session));
event_add(session->readev, NULL);
if (!streaming_sessions)
{
event_add(streamingev, &streaming_silence_tv);
event_add(metaev, NULL);
}
httpd_request_close_cb_set(hreq, conn_close_cb, session);
session->req = req;
session->next = streaming_sessions;
session->require_icy = require_icy;
session->bytes_sent = 0;
streaming_sessions = session;
httpd_header_add(hreq->out_headers, "Content-Type", "audio/mpeg");
httpd_header_add(hreq->out_headers, "Server", PACKAGE_NAME "/" VERSION);
httpd_header_add(hreq->out_headers, "Cache-Control", "no-cache");
httpd_header_add(hreq->out_headers, "Pragma", "no-cache");
httpd_header_add(hreq->out_headers, "Expires", "Mon, 31 Aug 2015 06:00:00 GMT");
pthread_mutex_unlock(&streaming_sessions_lck);
evhttp_connection_set_closecb(evcon, streaming_close_cb, session);
httpd_send_reply_start(hreq, HTTP_OK, "OK");
return 0;
}
int
streaming_is_request(const char *path)
{
char *ptr;
static struct httpd_uri_map streaming_handlers[] =
{
{
.regexp = "^/stream.mp3$",
.handler = streaming_mp3_handler,
.flags = HTTPD_HANDLER_REALTIME,
},
{
.regexp = NULL,
.handler = NULL
}
};
ptr = strrchr(path, '/');
if (ptr && (strcasecmp(ptr, "/stream.mp3") == 0))
return 1;
return 0;
}
int
streaming_init(void)
static void
streaming_request(struct httpd_request *hreq)
{
int ret;
cfg_t *cfgsec;
if (!hreq->handler)
{
DPRINTF(E_LOG, L_STREAMING, "Unrecognized path in streaming request: '%s'\n", hreq->uri);
httpd_send_error(hreq, HTTP_NOTFOUND, NULL);
return;
}
ret = hreq->handler(hreq);
if (ret < 0)
httpd_send_error(hreq, HTTP_INTERNAL, NULL);
}
static int
streaming_init(void)
{
int val;
cfgsec = cfg_getsec(cfg, "streaming");
val = cfg_getint(cfgsec, "sample_rate");
val = cfg_getint(cfg_getsec(cfg, "streaming"), "sample_rate");
// Validate against the variations of libmp3lame's supported sample rates: 32000/44100/48000
if (val % 11025 > 0 && val % 12000 > 0 && val % 8000 > 0)
DPRINTF(E_LOG, L_STREAMING, "Non standard streaming sample_rate=%d, defaulting\n", val);
DPRINTF(E_LOG, L_STREAMING, "Unsupported streaming sample_rate=%d, defaulting\n", val);
else
streaming_quality_out.sample_rate = val;
streaming_default_quality.sample_rate = val;
val = cfg_getint(cfgsec, "bit_rate");
val = cfg_getint(cfg_getsec(cfg, "streaming"), "bit_rate");
switch (val)
{
case 64:
@ -640,107 +327,37 @@ streaming_init(void)
case 128:
case 192:
case 320:
streaming_quality_out.bit_rate = val*1000;
streaming_default_quality.bit_rate = val*1000;
break;
default:
DPRINTF(E_LOG, L_STREAMING, "Unsuppported streaming bit_rate=%d, supports: 64/96/128/192/320, defaulting\n", val);
}
DPRINTF(E_INFO, L_STREAMING, "Streaming quality: %d/%d/%d @ %dkbps\n", streaming_quality_out.sample_rate, streaming_quality_out.bits_per_sample, streaming_quality_out.channels, streaming_quality_out.bit_rate/1000);
DPRINTF(E_INFO, L_STREAMING, "Streaming quality: %d/%d/%d @ %dkbps\n",
streaming_default_quality.sample_rate, streaming_default_quality.bits_per_sample,
streaming_default_quality.channels, streaming_default_quality.bit_rate/1000);
val = cfg_getint(cfgsec, "icy_metaint");
val = cfg_getint(cfg_getsec(cfg, "streaming"), "icy_metaint");
// Too low a value forces server to send more meta than data
if (val >= 4096 && val <= 131072)
streaming_icy_metaint = val;
else
DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint);
ret = mutex_init(&streaming_sessions_lck);
if (ret < 0)
{
DPRINTF(E_FATAL, L_STREAMING, "Could not initialize mutex (%d): %s\n", ret, strerror(ret));
goto error;
}
// Non-blocking because otherwise httpd and player thread may deadlock
#ifdef HAVE_PIPE2
ret = pipe2(streaming_pipe, O_CLOEXEC | O_NONBLOCK);
#else
if ( pipe(streaming_pipe) < 0 ||
fcntl(streaming_pipe[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ||
fcntl(streaming_pipe[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 )
ret = -1;
else
ret = 0;
#endif
if (ret < 0)
{
DPRINTF(E_FATAL, L_STREAMING, "Could not create pipe: %s\n", strerror(errno));
goto error;
}
#ifdef HAVE_PIPE2
ret = pipe2(streaming_meta, O_CLOEXEC | O_NONBLOCK);
#else
if ( pipe(streaming_meta) < 0 ||
fcntl(streaming_meta[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ||
fcntl(streaming_meta[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 )
ret = -1;
else
ret = 0;
#endif
if (ret < 0)
{
DPRINTF(E_FATAL, L_STREAMING, "Could not create pipe: %s\n", strerror(errno));
goto error;
}
// Listen to playback changes so we don't have to poll to check for pausing
ret = listener_add(player_change_cb, LISTENER_PLAYER);
if (ret < 0)
{
DPRINTF(E_FATAL, L_STREAMING, "Could not add listener\n");
goto error;
}
// Initialize buffer for encoded mp3 audio and event for pipe reading
CHECK_NULL(L_STREAMING, streaming_encoded_data = evbuffer_new());
CHECK_NULL(L_STREAMING, streamingev = event_new(evbase_httpd, streaming_pipe[0], EV_TIMEOUT | EV_READ | EV_PERSIST, streaming_send_cb, NULL));
CHECK_NULL(L_STREAMING, metaev = event_new(evbase_httpd, streaming_meta[0], EV_READ | EV_PERSIST, streaming_meta_cb, NULL));
streaming_icy_clients = 0;
CHECK_ERR(L_STREAMING, mutex_init(&streaming_metadata_lck));
streaming_metadatacb_register(icy_metadata_cb);
return 0;
error:
close(streaming_pipe[0]);
close(streaming_pipe[1]);
close(streaming_meta[0]);
close(streaming_meta[1]);
return -1;
}
void
streaming_deinit(void)
struct httpd_module httpd_streaming =
{
streaming_end();
event_free(metaev);
event_free(streamingev);
streamingev = NULL;
listener_remove(player_change_cb);
close(streaming_pipe[0]);
close(streaming_pipe[1]);
close(streaming_meta[0]);
close(streaming_meta[1]);
transcode_encode_cleanup(&streaming_encode_ctx);
evbuffer_free(streaming_encoded_data);
pthread_mutex_destroy(&streaming_sessions_lck);
}
.name = "Streaming",
.type = MODULE_STREAMING,
.logdomain = L_STREAMING,
.fullpaths = { "/stream.mp3", NULL },
.handlers = streaming_handlers,
.init = streaming_init,
.request = streaming_request,
};

View File

@ -1,29 +0,0 @@
#ifndef __HTTPD_STREAMING_H__
#define __HTTPD_STREAMING_H__
#include "httpd.h"
#include "outputs.h"
/* httpd_streaming takes care of incoming requests to /stream.mp3
* It will receive decoded audio from the player, and encode it, and
* stream it to one or more clients. It will not be available
* if a suitable ffmpeg/libav encoder is not present at runtime.
*/
void
streaming_write(struct output_buffer *obuf);
int
streaming_request(struct evhttp_request *req, struct httpd_uri_parsed *uri_parsed);
int
streaming_is_request(const char *path);
int
streaming_init(void);
void
streaming_deinit(void);
#endif /* !__HTTPD_STREAMING_H__ */

View File

@ -47,9 +47,7 @@
#include <getopt.h>
#include <event2/event.h>
#ifdef HAVE_LIBEVENT_PTHREADS
# include <event2/thread.h>
#endif
#include <event2/thread.h>
#include <libavutil/avutil.h>
#include <libavutil/log.h>
#include <libavformat/avformat.h>
@ -730,9 +728,7 @@ main(int argc, char **argv)
/* Initialize event base (after forking) */
CHECK_NULL(L_MAIN, evbase_main = event_base_new());
#ifdef HAVE_LIBEVENT_PTHREADS
CHECK_ERR(L_MAIN, evthread_use_pthreads());
#endif
DPRINTF(E_LOG, L_MAIN, "mDNS init\n");
ret = mdns_init();

View File

@ -52,6 +52,8 @@
#include <arpa/inet.h>
#include <ifaddrs.h> // getifaddrs
#include <event2/http.h> // evhttp_bind
#include <unistr.h>
#include <uniconv.h>
@ -276,8 +278,8 @@ net_connect(const char *addr, unsigned short port, int type, const char *log_ser
// If *port is 0 then a random port will be assigned, and *port will be updated
// with the port number
int
net_bind(short unsigned *port, int type, const char *log_service_name)
static int
net_bind_impl(short unsigned *port, int type, const char *log_service_name, bool reuseport)
{
struct addrinfo hints = { 0 };
struct addrinfo *servinfo;
@ -315,6 +317,14 @@ net_bind(short unsigned *port, int type, const char *log_service_name)
if (fd < 0)
continue;
// Makes us able to attach multiple threads to the same port
if (reuseport)
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
else
ret = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &no, sizeof(no));
if (ret < 0)
continue;
// TODO libevent sets this, we do the same?
ret = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes));
if (ret < 0)
@ -375,7 +385,19 @@ net_bind(short unsigned *port, int type, const char *log_service_name)
}
int
net_evhttp_bind(struct evhttp *evhttp, short unsigned port, const char *log_service_name)
net_bind(short unsigned *port, int type, const char *log_service_name)
{
return net_bind_impl(port, type, log_service_name, false);
}
int
net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name)
{
return net_bind_impl(port, type, log_service_name, true);
}
int
net_evhttp_bind(struct evhttp *evhttp, unsigned short port, const char *log_service_name)
{
const char *bind_address;
bool v6_enabled;

View File

@ -14,7 +14,6 @@
#include <sys/socket.h>
#include <netinet/in.h>
#include <event2/http.h>
#ifndef SOCK_NONBLOCK
#include <fcntl.h>
@ -55,7 +54,13 @@ int
net_bind(short unsigned *port, int type, const char *log_service_name);
int
net_evhttp_bind(struct evhttp *evhttp, short unsigned port, const char *log_service_name);
net_bind_with_reuseport(short unsigned *port, int type, const char *log_service_name);
// To avoid polluting namespace too much we don't include event2/http.h here
struct evhttp;
int
net_evhttp_bind(struct evhttp *evhttp, unsigned short port, const char *log_service_name);
// Just checks if the protocol is http or https
bool

View File

@ -47,7 +47,6 @@
#include "commands.h"
#include "conffile.h"
#include "db.h"
#include "httpd.h"
#include "library.h"
#include "listener.h"
#include "logger.h"
@ -4651,7 +4650,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (evhttp_request_get_command(req) != EVHTTP_REQ_GET)
{
DPRINTF(E_LOG, L_MPD, "Unsupported request type for artwork\n");
httpd_send_error(req, HTTP_BADMETHOD, "Method not allowed");
evhttp_send_error(req, HTTP_BADMETHOD, "Method not allowed");
return;
}
@ -4662,7 +4661,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (!decoded)
{
DPRINTF(E_LOG, L_MPD, "Bad artwork request with uri '%s'\n", uri);
httpd_send_error(req, HTTP_BADREQUEST, 0);
evhttp_send_error(req, HTTP_BADREQUEST, 0);
return;
}
@ -4670,7 +4669,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (!path)
{
DPRINTF(E_LOG, L_MPD, "Invalid path from artwork request with uri '%s'\n", uri);
httpd_send_error(req, HTTP_BADREQUEST, 0);
evhttp_send_error(req, HTTP_BADREQUEST, 0);
evhttp_uri_free(decoded);
return;
}
@ -4679,7 +4678,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (!decoded_path)
{
DPRINTF(E_LOG, L_MPD, "Error decoding path from artwork request with uri '%s'\n", uri);
httpd_send_error(req, HTTP_BADREQUEST, 0);
evhttp_send_error(req, HTTP_BADREQUEST, 0);
evhttp_uri_free(decoded);
return;
}
@ -4694,7 +4693,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (!itemid)
{
DPRINTF(E_WARN, L_MPD, "No item found for path '%s' from request uri '%s'\n", decoded_path, uri);
httpd_send_error(req, HTTP_NOTFOUND, "Document was not found");
evhttp_send_error(req, HTTP_NOTFOUND, "Document was not found");
evhttp_uri_free(decoded);
free(decoded_path);
return;
@ -4704,7 +4703,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
if (!evbuffer)
{
DPRINTF(E_LOG, L_MPD, "Could not allocate an evbuffer for artwork request\n");
httpd_send_error(req, HTTP_INTERNAL, "Document was not found");
evhttp_send_error(req, HTTP_INTERNAL, "Document was not found");
evhttp_uri_free(decoded);
free(decoded_path);
return;
@ -4713,7 +4712,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
format = artwork_get_item(evbuffer, itemid, ART_DEFAULT_WIDTH, ART_DEFAULT_HEIGHT, 0);
if (format < 0)
{
httpd_send_error(req, HTTP_NOTFOUND, "Document was not found");
evhttp_send_error(req, HTTP_NOTFOUND, "Document was not found");
}
else
{
@ -4728,7 +4727,7 @@ artwork_cb(struct evhttp_request *req, void *arg)
break;
}
httpd_send_reply(req, HTTP_OK, "OK", evbuffer, HTTPD_SEND_NO_GZIP);
evhttp_send_reply(req, HTTP_OK, "OK", evbuffer);
}
evbuffer_free(evbuffer);

View File

@ -467,6 +467,16 @@ metadata_cb_prepare(void *arg)
event_active(metadata->ev, 0, 0);
}
static void
metadata_free(struct output_metadata *metadata)
{
if (!metadata)
return;
if (metadata->ev)
event_free(metadata->ev);
free(metadata);
}
static void
metadata_send(enum output_types type, uint32_t item_id, bool startup, output_metadata_finalize_cb cb)
{
@ -689,6 +699,11 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state state)
event_active(outputs_deferredev, 0, 0);
}
void
outputs_metadata_free(struct output_metadata *metadata)
{
metadata_free(metadata);
}
/* ---------------------------- Called by player ---------------------------- */

View File

@ -252,7 +252,7 @@ struct output_definition
// Called from worker thread for async preparation of metadata (e.g. getting
// artwork, which might involce downloading image data). The prepared data is
// saved to metadata->data, which metadata_send() can use.
// saved to metadata->priv, which metadata_send() can use.
void *(*metadata_prepare)(struct output_metadata *metadata);
// Send metadata to outputs. Ownership of *metadata is transferred.
@ -284,6 +284,9 @@ outputs_quality_unsubscribe(struct media_quality *quality);
void
outputs_cb(int callback_id, uint64_t device_id, enum output_device_state);
void
outputs_metadata_free(struct output_metadata *metadata);
/* ---------------------------- Called by player ---------------------------- */
// Ownership of *add is transferred, so don't address after calling. Instead you

View File

@ -1,5 +1,5 @@
/*
* Copyright (C) 2016 Espen Jürgensen <espenjurgensen@gmail.com>
* Copyright (C) 2023 Espen Jürgensen <espenjurgensen@gmail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@ -16,13 +16,593 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdbool.h>
#include <stdint.h>
#include <unistd.h>
#include <uninorm.h>
#include <fcntl.h>
#include "streaming.h"
#include "outputs.h"
#include "httpd_streaming.h"
#include "misc.h"
#include "worker.h"
#include "transcode.h"
#include "logger.h"
#include "db.h"
/* About
*
* This output takes the writes from the player thread, gives them to a worker
* thread for mp3 encoding, and then the mp3 is written to a fd for the httpd
* request handler to read and pass to clients. If there is no writing from the
* player, but there are clients, it instead writes silence to the fd.
*/
// Seconds between sending silence when player is idle
// (to prevent client from hanging up)
#define STREAMING_SILENCE_INTERVAL 1
// How many bytes of silence we encode with the above interval. There is no
// particular reason for using this size, just that it seems to have worked for
// a while.
#define SILENCE_BUF_SIZE STOB(352, 16, 2)
// The wanted structure represents a particular format and quality that should
// be produced for one or more sessions. A pipe pair is created for each session
// for the i/o.
#define WANTED_PIPES_MAX 8
struct pipepair
{
int writefd;
int readfd;
};
struct streaming_wanted
{
int refcount;
struct pipepair pipes[WANTED_PIPES_MAX];
enum streaming_format format;
struct media_quality quality_in;
struct media_quality quality_out;
struct encode_ctx *xcode_ctx;
struct evbuffer *encoded_data;
struct streaming_wanted *next;
};
struct streaming_ctx
{
struct streaming_wanted *wanted;
struct event *silenceev;
struct timeval silencetv;
struct media_quality last_quality;
// seqnum may wrap around so must be unsigned
unsigned int seqnum;
unsigned int seqnum_encode_next;
// callback with new metadata, e.g. for ICY tags
void (*metadatacb)(char *metadata);
};
struct encode_cmdarg
{
uint8_t *buf;
size_t bufsize;
int samples;
unsigned int seqnum;
struct media_quality quality;
};
static pthread_mutex_t streaming_wanted_lck;
static pthread_cond_t streaming_sequence_cond;
static struct streaming_ctx streaming =
{
.silencetv = { STREAMING_SILENCE_INTERVAL, 0 },
};
extern struct event_base *evbase_player;
/* ------------------------------- Helpers ---------------------------------- */
static int
pipe_open(struct pipepair *p)
{
int fd[2];
int ret;
#ifdef HAVE_PIPE2
ret = pipe2(fd, O_CLOEXEC | O_NONBLOCK);
#else
if ( pipe(fd) < 0 ||
fcntl(fd[0], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 ||
fcntl(fd[1], F_SETFL, O_CLOEXEC | O_NONBLOCK) < 0 )
ret = -1;
else
ret = 0;
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Could not create pipe: %s\n", strerror(errno));
return -1;
}
p->writefd = fd[1];
p->readfd = fd[0];
return 0;
}
static void
pipe_close(struct pipepair *p)
{
if (p->readfd >= 0)
close(p->readfd);
if (p->writefd >= 0)
close(p->writefd);
p->writefd = -1;
p->readfd = -1;
}
static void
wanted_free(struct streaming_wanted *w)
{
if (!w)
return;
for (int i = 0; i < WANTED_PIPES_MAX; i++)
pipe_close(&w->pipes[i]);
transcode_encode_cleanup(&w->xcode_ctx);
evbuffer_free(w->encoded_data);
free(w);
}
static struct streaming_wanted *
wanted_new(enum streaming_format format, struct media_quality quality)
{
struct streaming_wanted *w;
CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted)));
CHECK_NULL(L_STREAMING, w->encoded_data = evbuffer_new());
w->quality_out = quality;
w->format = format;
for (int i = 0; i < WANTED_PIPES_MAX; i++)
{
w->pipes[i].writefd = -1;
w->pipes[i].readfd = -1;
}
return w;
}
static void
wanted_remove(struct streaming_wanted **wanted, struct streaming_wanted *remove)
{
struct streaming_wanted *prev = NULL;
struct streaming_wanted *w;
for (w = *wanted; w; w = w->next)
{
if (w == remove)
break;
prev = w;
}
if (!w)
return;
if (!prev)
*wanted = remove->next;
else
prev->next = remove->next;
wanted_free(remove);
}
static struct streaming_wanted *
wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struct media_quality quality)
{
struct streaming_wanted *w;
w = wanted_new(format, quality);
w->next = *wanted;
*wanted = w;
return w;
}
static struct streaming_wanted *
wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format format, struct media_quality quality)
{
struct streaming_wanted *w;
for (w = wanted; w; w = w->next)
{
if (w->format == format && quality_is_equal(&w->quality_out, &quality))
return w;
}
return NULL;
}
static struct streaming_wanted *
wanted_find_byreadfd(struct streaming_wanted *wanted, int readfd)
{
struct streaming_wanted *w;
int i;
for (w = wanted; w; w = w->next)
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].readfd == readfd)
return w;
}
return NULL;
}
static int
wanted_session_add(struct pipepair *p, struct streaming_wanted *w)
{
int ret;
int i;
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].writefd != -1) // In use
continue;
ret = pipe_open(&w->pipes[i]);
if (ret < 0)
return -1;
memcpy(p, &w->pipes[i], sizeof(struct pipepair));
break;
}
if (i == WANTED_PIPES_MAX)
{
DPRINTF(E_LOG, L_STREAMING, "Cannot add streaming session, max pipe limit reached\n");
return -1;
}
w->refcount++;
DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->refcount=%d\n", p->readfd, w->refcount);
return 0;
}
static void
wanted_session_remove(struct streaming_wanted *w, int readfd)
{
int i;
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].readfd != readfd)
continue;
pipe_close(&w->pipes[i]);
break;
}
if (i == WANTED_PIPES_MAX)
{
DPRINTF(E_LOG, L_STREAMING, "Cannot remove streaming session, readfd %d not found\n", readfd);
return;
}
w->refcount--;
DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->refcount=%d\n", readfd, w->refcount);
}
/* ----------------------------- Thread: Worker ----------------------------- */
static int
encode_reset(struct streaming_wanted *w, struct media_quality quality_in)
{
struct media_quality quality_out = w->quality_out;
struct decode_ctx *decode_ctx = NULL;
transcode_encode_cleanup(&w->xcode_ctx);
if (quality_in.bits_per_sample == 16)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality_in);
else if (quality_in.bits_per_sample == 24)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality_in);
else if (quality_in.bits_per_sample == 32)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality_in);
if (!decode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for input quality sr %d, bps %d, ch %d, cannot MP3 encode\n",
quality_in.sample_rate, quality_in.bits_per_sample, quality_in.channels);
goto error;
}
w->quality_in = quality_in;
w->xcode_ctx = transcode_encode_setup(XCODE_MP3, &quality_out, decode_ctx, NULL, 0, 0);
if (!w->xcode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for output quality sr %d, bps %d, ch %d, cannot MP3 encode\n",
quality_out.sample_rate, quality_out.bits_per_sample, quality_out.channels);
goto error;
}
transcode_decode_cleanup(&decode_ctx);
return 0;
error:
transcode_decode_cleanup(&decode_ctx);
return -1;
}
static int
encode_frame(struct streaming_wanted *w, struct media_quality quality_in, transcode_frame *frame)
{
int ret;
if (!w->xcode_ctx || !quality_is_equal(&quality_in, &w->quality_in))
{
DPRINTF(E_DBG, L_STREAMING, "Resetting transcode context\n");
if (encode_reset(w, quality_in) < 0)
return -1;
}
ret = transcode_encode(w->encoded_data, w->xcode_ctx, frame, 0);
if (ret < 0)
{
return -1;
}
return 0;
}
static void
encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p)
{
int ret;
if (p->writefd < 0)
return;
ret = write(p->writefd, buf, buflen);
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno));
wanted_session_remove(w, p->readfd);
}
}
static void
encode_data_cb(void *arg)
{
struct encode_cmdarg *ctx = arg;
transcode_frame *frame;
struct streaming_wanted *w;
struct streaming_wanted *next;
uint8_t *buf;
size_t len;
int ret;
int i;
frame = transcode_frame_new(ctx->buf, ctx->bufsize, ctx->samples, &ctx->quality);
if (!frame)
{
DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n");
goto out;
}
pthread_mutex_lock(&streaming_wanted_lck);
// To make sure we process the frames in order
while (ctx->seqnum != streaming.seqnum_encode_next)
pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck);
for (w = streaming.wanted; w; w = next)
{
next = w->next;
ret = encode_frame(w, ctx->quality, frame);
if (ret < 0)
wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error
len = evbuffer_get_length(w->encoded_data);
if (len == 0)
continue;
buf = evbuffer_pullup(w->encoded_data, -1);
for (i = 0; i < WANTED_PIPES_MAX; i++)
encode_write(buf, len, w, &w->pipes[i]);
evbuffer_drain(w->encoded_data, -1);
if (w->refcount == 0)
wanted_remove(&streaming.wanted, w);
}
streaming.seqnum_encode_next++;
pthread_cond_broadcast(&streaming_sequence_cond);
pthread_mutex_unlock(&streaming_wanted_lck);
out:
transcode_frame_free(frame);
free(ctx->buf);
}
static void *
streaming_metadata_prepare(struct output_metadata *metadata)
{
struct db_queue_item *queue_item;
char *title;
queue_item = db_queue_fetch_byitemid(metadata->item_id);
if (!queue_item)
{
DPRINTF(E_LOG, L_STREAMING, "Could not fetch queue item id %d for new metadata\n", metadata->item_id);
return NULL;
}
title = safe_asprintf("%s - %s", queue_item->title, queue_item->artist);
free_queue_item(queue_item, 0);
return title;
}
/* ----------------------------- Thread: httpd ------------------------------ */
int
streaming_session_register(enum streaming_format format, struct media_quality quality)
{
struct streaming_wanted *w;
struct pipepair pipe;
int ret;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byformat(streaming.wanted, format, quality);
if (!w)
w = wanted_add(&streaming.wanted, format, quality);
ret = wanted_session_add(&pipe, w);
if (ret < 0)
pipe.readfd = -1;
pthread_mutex_unlock(&streaming_wanted_lck);
return pipe.readfd;
}
void
streaming_session_deregister(int readfd)
{
struct streaming_wanted *w;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byreadfd(streaming.wanted, readfd);
if (!w)
goto out;
wanted_session_remove(w, readfd);
if (w->refcount == 0)
wanted_remove(&streaming.wanted, w);
out:
pthread_mutex_unlock(&streaming_wanted_lck);
}
// Not thread safe, but only called once during httpd init
void
streaming_metadatacb_register(streaming_metadatacb cb)
{
streaming.metadatacb = cb;
}
/* ----------------------------- Thread: Player ----------------------------- */
static void
encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality)
{
struct encode_cmdarg ctx;
if (quality.channels == 0)
{
DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n",
quality.sample_rate, quality.bits_per_sample, quality.channels);
return;
}
CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize));
memcpy(ctx.buf, buf, bufsize);
ctx.bufsize = bufsize;
ctx.samples = samples;
ctx.quality = quality;
ctx.seqnum = streaming.seqnum;
streaming.seqnum++;
worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0);
}
static void
silenceev_cb(evutil_socket_t fd, short event, void *arg)
{
uint8_t silence[SILENCE_BUF_SIZE] = { 0 };
int samples;
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels);
encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality);
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
streaming_write(struct output_buffer *obuf)
{
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality);
streaming.last_quality = obuf->data[0].quality;
// In case this is the last player write() we want to start streaming silence
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
streaming_metadata_send(struct output_metadata *metadata)
{
char *title = metadata->priv;
// Calls back to httpd_streaming to update the title
if (streaming.metadatacb)
streaming.metadatacb(title);
free(title);
outputs_metadata_free(metadata);
}
static int
streaming_init(void)
{
CHECK_NULL(L_STREAMING, streaming.silenceev = event_new(evbase_player, -1, 0, silenceev_cb, NULL));
CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck));
CHECK_ERR(L_STREAMING, pthread_cond_init(&streaming_sequence_cond, NULL));
return 0;
}
static void
streaming_deinit(void)
{
event_free(streaming.silenceev);
}
struct output_definition output_streaming =
{
@ -30,5 +610,9 @@ struct output_definition output_streaming =
.type = OUTPUT_TYPE_STREAMING,
.priority = 0,
.disabled = 0,
.init = streaming_init,
.deinit = streaming_deinit,
.write = streaming_write,
.metadata_prepare = streaming_metadata_prepare,
.metadata_send = streaming_metadata_send,
};

23
src/outputs/streaming.h Normal file
View File

@ -0,0 +1,23 @@
#ifndef __STREAMING_H__
#define __STREAMING_H__
#include "misc.h" // struct media_quality
typedef void (*streaming_metadatacb)(char *metadata);
enum streaming_format
{
STREAMING_FORMAT_MP3,
};
int
streaming_session_register(enum streaming_format format, struct media_quality quality);
void
streaming_session_deregister(int readfd);
void
streaming_metadatacb_register(streaming_metadatacb cb);
#endif /* !__STREAMING_H__ */

View File

@ -29,6 +29,10 @@
#include <time.h>
#include <string.h>
#include <errno.h>
#include <sys/queue.h>
#include <unistd.h>
#include <pthread.h>
#include <event2/event.h>
@ -36,9 +40,17 @@
#include "db.h"
#include "logger.h"
#include "worker.h"
#include "commands.h"
#include "evthr.h"
#include "misc.h"
#define THREADPOOL_NTHREADS 4
static struct evthr_pool *worker_threadpool;
static __thread struct evthr *worker_thr;
/* ----------------------------- CALLBACK EXECUTION ------------------------- */
/* Worker threads */
struct worker_arg
{
@ -49,19 +61,6 @@ struct worker_arg
};
/* --- Globals --- */
// worker thread
static pthread_t tid_worker;
// Event base, pipes and events
struct event_base *evbase_worker;
static int g_initialized;
static struct commands_base *cmdbase;
/* ---------------------------- CALLBACK EXECUTION ------------------------- */
/* Thread: worker */
static void
execute_cb(int fd, short what, void *arg)
{
@ -74,64 +73,47 @@ execute_cb(int fd, short what, void *arg)
free(cmdarg);
}
static enum command_state
execute(void *arg, int *retval)
static void
execute(struct evthr *thr, void *arg, void *shared)
{
struct worker_arg *cmdarg = arg;
struct timeval tv = { cmdarg->delay, 0 };
struct event_base *evbase;
if (cmdarg->delay)
{
cmdarg->timer = evtimer_new(evbase_worker, execute_cb, cmdarg);
evbase = evthr_get_base(thr);
cmdarg->timer = evtimer_new(evbase, execute_cb, cmdarg);
evtimer_add(cmdarg->timer, &tv);
*retval = 0;
return COMMAND_PENDING; // Not done yet, ask caller not to free cmd
return;
}
cmdarg->cb(cmdarg->cb_arg);
free(cmdarg->cb_arg);
*retval = 0;
return COMMAND_END;
free(cmdarg);
}
/* --------------------------------- MAIN --------------------------------- */
/* Thread: worker */
static void *
worker(void *arg)
static void
init_cb(struct evthr *thr, void *shared)
{
int ret;
CHECK_ERR(L_MAIN, db_perthread_init());
ret = db_perthread_init();
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Error: DB init failed (worker thread)\n");
pthread_exit(NULL);
}
worker_thr = thr;
g_initialized = 1;
thread_setname(pthread_self(), "worker");
}
event_base_dispatch(evbase_worker);
if (g_initialized)
{
DPRINTF(E_LOG, L_MAIN, "Worker event loop terminated ahead of time!\n");
g_initialized = 0;
}
static void
exit_cb(struct evthr *thr, void *shared)
{
worker_thr = NULL;
db_perthread_deinit();
pthread_exit(NULL);
}
/* ---------------------------- Our worker API --------------------------- */
/* Thread: player */
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
{
@ -164,7 +146,13 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
cmdarg->cb_arg = argcpy;
cmdarg->delay = delay;
commands_exec_async(cmdbase, execute, cmdarg);
evthr_pool_defer(worker_threadpool, execute, cmdarg);
}
struct event_base *
worker_evbase_get(void)
{
return evthr_get_base(worker_thr);
}
int
@ -172,51 +160,30 @@ worker_init(void)
{
int ret;
evbase_worker = event_base_new();
if (!evbase_worker)
worker_threadpool = evthr_pool_wexit_new(THREADPOOL_NTHREADS, init_cb, exit_cb, NULL);
if (!worker_threadpool)
{
DPRINTF(E_LOG, L_MAIN, "Could not create an event base\n");
goto evbase_fail;
DPRINTF(E_LOG, L_MAIN, "Could not create worker thread pool\n");
goto error;
}
cmdbase = commands_base_new(evbase_worker, NULL);
ret = pthread_create(&tid_worker, NULL, worker, NULL);
ret = evthr_pool_start(worker_threadpool);
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not spawn worker thread: %s\n", strerror(errno));
goto thread_fail;
DPRINTF(E_LOG, L_MAIN, "Could not spawn worker threads\n");
goto error;
}
thread_setname(tid_worker, "worker");
return 0;
thread_fail:
commands_base_free(cmdbase);
event_base_free(evbase_worker);
evbase_worker = NULL;
evbase_fail:
error:
worker_deinit();
return -1;
}
void
worker_deinit(void)
{
int ret;
g_initialized = 0;
commands_base_destroy(cmdbase);
ret = pthread_join(tid_worker, NULL);
if (ret != 0)
{
DPRINTF(E_FATAL, L_MAIN, "Could not join worker thread: %s\n", strerror(errno));
return;
}
// Free event base (should free events too)
event_base_free(evbase_worker);
evthr_pool_stop(worker_threadpool);
evthr_pool_free(worker_threadpool);
}

View File

@ -2,8 +2,10 @@
#ifndef __WORKER_H__
#define __WORKER_H__
#include <event2/event.h>
/* The worker thread is made for running asyncronous tasks from a real time
* thread, mainly the player thread.
* thread.
* The worker_execute() function will trigger a callback from the worker thread.
* Before returning the function will copy the argument given, so the caller
@ -19,6 +21,11 @@
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay);
/* Can be called within a callback to get the worker thread's event base
*/
struct event_base *
worker_evbase_get(void);
int
worker_init(void);