[httpd] Multithread solution using worker threads instead of httpd threads

Using worker threads instead of httpd threads means that we, not libevent,
decide which requests get handled by which threads. This means that we can
make sure blocking requests (e.g. volume changes) don't get in the way of
realtime(ish) stuff like mp3 streaming.

Includes refactor of httpd_stream_file() since it was a bit of a monster.
This commit is contained in:
ejurgensen 2023-01-27 23:13:46 +01:00
parent 81922e147e
commit 18a80f15dd
12 changed files with 611 additions and 598 deletions

View File

@ -63,8 +63,8 @@ command_cb_async(struct commands_base *cmdbase, struct command *cmd)
// Command is executed asynchronously
cmdstate = cmd->func(cmd->arg, &cmd->ret);
// Only free arg if there are no pending events (used in worker.c)
if (cmdstate != COMMAND_PENDING && cmd->arg)
// Only free arg if there are no pending events (used in httpd.c)
if (cmdstate != COMMAND_PENDING)
free(cmd->arg);
free(cmd);

View File

@ -27,7 +27,6 @@
#include <fcntl.h>
#include <limits.h>
#include <errno.h>
#include <pthread.h>
#include <time.h>
#include <sys/param.h>
#include <sys/types.h>
@ -99,8 +98,6 @@ struct content_type_map {
struct stream_ctx {
struct httpd_request *hreq;
uint8_t *buf;
struct evbuffer *evbuf;
struct event *ev;
int id;
int fd;
@ -132,7 +129,18 @@ static const char *httpd_allow_origin;
static int httpd_port;
#define THREADPOOL_NTHREADS 4
// The server is designed around a single thread listening for requests. When
// received, the request is passed to a thread from the worker pool, where a
// handler will process it and prepare a response for the httpd thread to send
// back. The idea is that the httpd thread never blocks. The handler in the
// worker thread can block, but shouldn't hold the thread if it is a long-
// running request (e.g. a long poll), because then we can run out of worker
// threads. The handler should use events to avoid this. Handlers, that are non-
// blocking and where the response must not be delayed can use
// HTTPD_HANDLER_REALTIME, then the httpd thread calls it directly (sync)
// instead of the async worker. In short, you shouldn't need to increase the
// below.
#define THREADPOOL_NTHREADS 1
static struct evthr_pool *httpd_threadpool;
@ -294,23 +302,12 @@ cors_headers_add(struct httpd_request *hreq, const char *allow_origin)
httpd_header_add(hreq->out_headers, "Access-Control-Allow-Headers", "authorization");
}
static int
handle_cors_preflight(struct httpd_request *hreq, const char *allow_origin)
static bool
is_cors_preflight(struct httpd_request *hreq, const char *allow_origin)
{
bool is_cors_preflight;
is_cors_preflight = ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin &&
httpd_header_find(hreq->in_headers, "Origin") &&
httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") );
if (!is_cors_preflight)
return -1;
cors_headers_add(hreq, allow_origin);
// In this case there is no reason to go through httpd_send_reply
httpd_backend_reply_send(hreq->backend, HTTP_OK, "OK", NULL);
httpd_request_free(hreq);
return 0;
return ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin &&
httpd_header_find(hreq->in_headers, "Origin") &&
httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") );
}
void
@ -337,6 +334,7 @@ httpd_request_handler_set(struct httpd_request *hreq)
continue;
hreq->handler = map->handler;
hreq->is_async = !(map->flags & HTTPD_HANDLER_REALTIME);
break;
}
}
@ -346,7 +344,7 @@ httpd_redirect_to(struct httpd_request *hreq, const char *path)
{
httpd_header_add(hreq->out_headers, "Location", path);
httpd_send_reply(hreq, HTTP_MOVETEMP, "Moved", NULL, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_MOVETEMP, "Moved", HTTPD_SEND_NO_GZIP);
}
/*
@ -430,7 +428,6 @@ serve_file(struct httpd_request *hreq)
char path[PATH_MAX];
char deref[PATH_MAX];
char *ctype;
struct evbuffer *evbuf;
struct stat sb;
int fd;
int i;
@ -511,23 +508,14 @@ serve_file(struct httpd_request *hreq)
{
DPRINTF(E_WARN, L_HTTPD, "Access to file outside the web root dir forbidden: %s\n", deref);
httpd_send_error(hreq, 403, "Forbidden");
httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden");
return;
}
if (httpd_request_not_modified_since(hreq, sb.st_mtime))
{
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP);
return;
}
evbuf = evbuffer_new();
if (!evbuf)
{
DPRINTF(E_LOG, L_HTTPD, "Could not create evbuffer\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal error");
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP);
return;
}
@ -537,11 +525,10 @@ serve_file(struct httpd_request *hreq)
DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", deref, strerror(errno));
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
evbuffer_free(evbuf);
return;
}
ret = evbuffer_expand(evbuf, sb.st_size);
ret = evbuffer_expand(hreq->out_body, sb.st_size);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Out of memory for htdocs-file\n");
@ -549,7 +536,7 @@ serve_file(struct httpd_request *hreq)
}
while ((ret = read(fd, buf, sizeof(buf))) > 0)
evbuffer_add(evbuf, buf, ret);
evbuffer_add(hreq->out_body, buf, ret);
if (ret < 0)
{
@ -573,42 +560,53 @@ serve_file(struct httpd_request *hreq)
httpd_header_add(hreq->out_headers, "Content-Type", ctype);
httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
close(fd);
return;
out_fail:
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal error");
evbuffer_free(evbuf);
close(fd);
}
/* ---------------------------- STREAM HANDLING ----------------------------- */
// This will triggered in a httpd thread, but since the reading may be in a
// worker thread we just want to trigger the read loop
static void
stream_chunk_resched_cb(httpd_connection *conn, void *arg)
{
struct stream_ctx *st = arg;
// TODO not thread safe if st was freed in worker thread, but maybe not possible?
event_active(st->ev, 0, 0);
}
static void
stream_free(struct stream_ctx *st)
{
if (!st)
return;
if (st->ev)
event_free(st->ev);
if (st->fd >= 0)
close(st->fd);
transcode_cleanup(&st->xcode);
free(st);
}
static void
stream_end(struct stream_ctx *st)
{
httpd_request_closecb_set(st->hreq, NULL, NULL);
DPRINTF(E_DBG, L_HTTPD, "Ending stream %d\n", st->id);
// Alwayss send reply, even if connection failed, otherwise we memleak hreq
// and possibly the evhttp req as well.
httpd_send_reply_end(st->hreq);
evbuffer_free(st->evbuf);
event_free(st->ev);
if (st->xcode)
transcode_cleanup(&st->xcode);
else
{
free(st->buf);
close(st->fd);
}
free(st);
stream_free(st);
}
static void
@ -626,36 +624,134 @@ stream_end_register(struct stream_ctx *st)
}
}
static void
stream_chunk_resched_cb(httpd_connection *conn, void *arg)
static struct stream_ctx *
stream_new(struct media_file_info *mfi, struct httpd_request *hreq, event_callback_fn stream_cb)
{
struct stream_ctx *st;
struct timeval tv;
CHECK_NULL(L_HTTPD, st = calloc(1, sizeof(struct stream_ctx)));
st->fd = -1;
st->ev = event_new(hreq->evbase, -1, EV_PERSIST, stream_cb, st);
if (!st->ev)
{
DPRINTF(E_LOG, L_HTTPD, "Could not create event for streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto error;
}
event_active(st->ev, 0, 0);
st->id = mfi->id;
st->hreq = hreq;
return st;
error:
stream_free(st);
return NULL;
}
static struct stream_ctx *
stream_new_transcode(struct media_file_info *mfi, struct httpd_request *hreq, int64_t offset, int64_t end_offset, event_callback_fn stream_cb)
{
struct stream_ctx *st;
struct media_quality quality = { HTTPD_STREAM_SAMPLE_RATE, HTTPD_STREAM_BPS, HTTPD_STREAM_CHANNELS, 0 };
st = stream_new(mfi, hreq, stream_cb);
if (!st)
{
goto error;
}
st->xcode = transcode_setup(XCODE_PCM16_HEADER, &quality, mfi->data_kind, mfi->path, mfi->song_length, &st->size);
if (!st->xcode)
{
DPRINTF(E_WARN, L_HTTPD, "Transcoding setup failed, aborting streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto error;
}
st->stream_size = st->size - offset;
if (end_offset > 0)
st->stream_size -= (st->size - end_offset);
st->start_offset = offset;
return st;
error:
stream_free(st);
return NULL;
}
static struct stream_ctx *
stream_new_raw(struct media_file_info *mfi, struct httpd_request *hreq, int64_t offset, int64_t end_offset, event_callback_fn stream_cb)
{
struct stream_ctx *st;
struct stat sb;
off_t pos;
int ret;
st = (struct stream_ctx *)arg;
st = stream_new(mfi, hreq, stream_cb);
if (!st)
{
goto error;
}
evutil_timerclear(&tv);
ret = event_add(st->ev, &tv);
st->fd = open(mfi->path, O_RDONLY);
if (st->fd < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno));
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
goto error;
}
ret = stat(mfi->path, &sb);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n");
DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno));
stream_end(st);
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
goto error;
}
st->size = sb.st_size;
st->stream_size = st->size - offset;
if (end_offset > 0)
st->stream_size -= (st->size - end_offset);
st->start_offset = offset;
st->offset = offset;
st->end_offset = end_offset;
pos = lseek(st->fd, offset, SEEK_SET);
if (pos == (off_t) -1)
{
DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno));
httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request");
goto error;
}
return st;
error:
stream_free(st);
return NULL;
}
static void
stream_chunk_xcode_cb(int fd, short event, void *arg)
{
struct stream_ctx *st;
struct timeval tv;
struct stream_ctx *st = arg;
int xcoded;
int ret;
st = (struct stream_ctx *)arg;
xcoded = transcode(st->evbuf, NULL, st->xcode, STREAM_CHUNK_SIZE);
xcoded = transcode(st->hreq->out_body, NULL, st->xcode, STREAM_CHUNK_SIZE);
if (xcoded <= 0)
{
if (xcoded == 0)
@ -669,58 +765,45 @@ stream_chunk_xcode_cb(int fd, short event, void *arg)
DPRINTF(E_DBG, L_HTTPD, "Got %d bytes from transcode; streaming file id %d\n", xcoded, st->id);
/* Consume transcoded data until we meet start_offset */
// Consume transcoded data until we meet start_offset
if (st->start_offset > st->offset)
{
ret = st->start_offset - st->offset;
if (ret < xcoded)
{
evbuffer_drain(st->evbuf, ret);
evbuffer_drain(st->hreq->out_body, ret);
st->offset += ret;
ret = xcoded - ret;
}
else
{
evbuffer_drain(st->evbuf, xcoded);
evbuffer_drain(st->hreq->out_body, xcoded);
st->offset += xcoded;
goto consume;
// Reschedule immediately - consume up to start_offset
event_active(st->ev, 0, 0);
return;
}
}
else
ret = xcoded;
httpd_send_reply_chunk(st->hreq, st->evbuf, stream_chunk_resched_cb, st);
httpd_send_reply_chunk(st->hreq, stream_chunk_resched_cb, st);
st->offset += ret;
stream_end_register(st);
return;
consume: /* reschedule immediately - consume up to start_offset */
evutil_timerclear(&tv);
ret = event_add(st->ev, &tv);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming (xcode)\n");
stream_end(st);
return;
}
}
static void
stream_chunk_raw_cb(int fd, short event, void *arg)
{
struct stream_ctx *st;
struct stream_ctx *st = arg;
size_t chunk_size;
int ret;
st = (struct stream_ctx *)arg;
if (st->end_offset && (st->offset > st->end_offset))
{
stream_end(st);
@ -732,7 +815,7 @@ stream_chunk_raw_cb(int fd, short event, void *arg)
else
chunk_size = STREAM_CHUNK_SIZE;
ret = read(st->fd, st->buf, chunk_size);
ret = evbuffer_read(st->hreq->out_body, st->fd, chunk_size);
if (ret <= 0)
{
if (ret == 0)
@ -746,9 +829,7 @@ stream_chunk_raw_cb(int fd, short event, void *arg)
DPRINTF(E_DBG, L_HTTPD, "Read %d bytes; streaming file id %d\n", ret, st->id);
evbuffer_add(st->evbuf, st->buf, ret);
httpd_send_reply_chunk(st->hreq, st->evbuf, stream_chunk_resched_cb, st);
httpd_send_reply_chunk(st->hreq, stream_chunk_resched_cb, st);
st->offset += ret;
@ -756,33 +837,41 @@ stream_chunk_raw_cb(int fd, short event, void *arg)
}
static void
stream_fail_cb(httpd_connection *conn, void *arg)
stream_fail_cb(struct httpd_request *hreq, void *arg)
{
struct stream_ctx *st;
struct stream_ctx *st = arg;
st = (struct stream_ctx *)arg;
DPRINTF(E_WARN, L_HTTPD, "Connection failed; stopping streaming of file ID %d\n", st->id);
/* Stop streaming */
event_del(st->ev);
stream_end(st);
stream_free(st);
}
/* ---------------------------- MAIN HTTPD THREAD --------------------------- */
/* ---------------------------- REQUEST CALLBACKS --------------------------- */
// Worker thread, invoked by request_cb() below
static void
request_async_cb(void *arg)
{
struct httpd_request *hreq = *(struct httpd_request **)arg;
#ifdef HAVE_SYSCALL
DPRINTF(E_DBG, hreq->module->logdomain, "%s request '%s' in worker thread %ld\n", hreq->module->name, hreq->uri, syscall(SYS_gettid));
#endif
// Some handlers require an evbase to schedule events
hreq->evbase = worker_evbase_get();
hreq->module->request(hreq);
}
// httpd thread
static void
request_cb(struct httpd_request *hreq, void *arg)
{
// Did we get a CORS preflight request?
if (handle_cors_preflight(hreq, httpd_allow_origin) == 0)
if (is_cors_preflight(hreq, httpd_allow_origin))
{
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
return;
}
if (!hreq->uri || !hreq->uri_parsed)
else if (!hreq->uri || !hreq->uri_parsed)
{
DPRINTF(E_WARN, L_HTTPD, "Invalid URI in request: '%s'\n", hreq->uri);
httpd_redirect_to(hreq, "/");
@ -796,13 +885,14 @@ request_cb(struct httpd_request *hreq, void *arg)
}
httpd_request_handler_set(hreq);
if (hreq->module)
if (hreq->module && hreq->is_async)
{
worker_execute(request_async_cb, &hreq, sizeof(struct httpd_request *), 0);
}
else if (hreq->module)
{
#ifdef HAVE_SYSCALL
DPRINTF(E_DBG, hreq->module->logdomain, "%s request: '%s' (thread %ld)\n", hreq->module->name, hreq->uri, syscall(SYS_gettid));
#else
DPRINTF(E_DBG, hreq->module->logdomain, "%s request: '%s'\n", hreq->module->name, hreq->uri);
#endif
hreq->evbase = httpd_backend_evbase_get(hreq->backend);
hreq->module->request(hreq);
}
else
@ -811,35 +901,26 @@ request_cb(struct httpd_request *hreq, void *arg)
DPRINTF(E_DBG, L_HTTPD, "HTTP request: '%s'\n", hreq->uri);
serve_file(hreq);
}
// Don't touch hreq here, if async it has been passed to a worker thread
}
/* ------------------------------- HTTPD API -------------------------------- */
/* Thread: httpd */
void
httpd_stream_file(struct httpd_request *hreq, int id)
{
struct media_quality quality = { HTTPD_STREAM_SAMPLE_RATE, HTTPD_STREAM_BPS, HTTPD_STREAM_CHANNELS, 0 };
struct media_file_info *mfi;
struct stream_ctx *st;
void (*stream_cb)(int fd, short event, void *arg);
struct stat sb;
struct timeval tv;
struct event_base *evbase;
struct media_file_info *mfi = NULL;
struct stream_ctx *st = NULL;
const char *param;
const char *param_end;
const char *client_codecs;
char buf[64];
int64_t offset;
int64_t end_offset;
off_t pos;
int64_t offset = 0;
int64_t end_offset = 0;
int transcode;
int ret;
offset = 0;
end_offset = 0;
param = httpd_header_find(hreq->in_headers, "Range");
if (param)
{
@ -880,103 +961,45 @@ httpd_stream_file(struct httpd_request *hreq, int id)
DPRINTF(E_LOG, L_HTTPD, "Item %d not found\n", id);
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
return;
goto error;
}
if (mfi->data_kind != DATA_KIND_FILE)
{
DPRINTF(E_LOG, L_HTTPD, "Could not serve '%s' to client, not a file\n", mfi->path);
httpd_send_error(hreq, 500, "Cannot stream non-file content");
goto out_free_mfi;
httpd_send_error(hreq, HTTP_INTERNAL, "Cannot stream non-file content");
goto error;
}
st = (struct stream_ctx *)malloc(sizeof(struct stream_ctx));
if (!st)
{
DPRINTF(E_LOG, L_HTTPD, "Out of memory for struct stream_ctx\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_free_mfi;
}
memset(st, 0, sizeof(struct stream_ctx));
st->fd = -1;
client_codecs = httpd_header_find(hreq->in_headers, "Accept-Codecs");
transcode = transcode_needed(hreq->user_agent, client_codecs, mfi->codectype);
param = httpd_header_find(hreq->in_headers, "Accept-Codecs");
transcode = transcode_needed(hreq->user_agent, param, mfi->codectype);
if (transcode)
{
DPRINTF(E_INFO, L_HTTPD, "Preparing to transcode %s\n", mfi->path);
stream_cb = stream_chunk_xcode_cb;
st->xcode = transcode_setup(XCODE_PCM16_HEADER, &quality, mfi->data_kind, mfi->path, mfi->song_length, &st->size);
if (!st->xcode)
{
DPRINTF(E_WARN, L_HTTPD, "Transcoding setup failed, aborting streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_free_st;
}
st = stream_new_transcode(mfi, hreq, offset, end_offset, stream_chunk_xcode_cb);
if (!st)
goto error;
if (!httpd_header_find(hreq->out_headers, "Content-Type"))
httpd_header_add(hreq->out_headers, "Content-Type", "audio/wav");
}
else
{
/* Stream the raw file */
DPRINTF(E_INFO, L_HTTPD, "Preparing to stream %s\n", mfi->path);
st->buf = (uint8_t *)malloc(STREAM_CHUNK_SIZE);
if (!st->buf)
{
DPRINTF(E_LOG, L_HTTPD, "Out of memory for raw streaming buffer\n");
st = stream_new_raw(mfi, hreq, offset, end_offset, stream_chunk_raw_cb);
if (!st)
goto error;
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_free_st;
}
stream_cb = stream_chunk_raw_cb;
st->fd = open(mfi->path, O_RDONLY);
if (st->fd < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno));
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
goto out_cleanup;
}
ret = stat(mfi->path, &sb);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno));
httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found");
goto out_cleanup;
}
st->size = sb.st_size;
pos = lseek(st->fd, offset, SEEK_SET);
if (pos == (off_t) -1)
{
DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno));
httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request");
goto out_cleanup;
}
st->offset = offset;
st->end_offset = end_offset;
/* Content-Type for video files is different than for audio files
* and overrides whatever may have been set previously, like
* application/x-dmap-tagged when we're speaking DAAP.
*/
// Content-Type for video files is different than for audio files and
// overrides whatever may have been set previously, like
// application/x-dmap-tagged when we're speaking DAAP.
if (mfi->has_video)
{
/* Front Row and others expect video/<type> */
// Front Row and others expect video/<type>
ret = snprintf(buf, sizeof(buf), "video/%s", mfi->type);
if ((ret < 0) || (ret >= sizeof(buf)))
DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n");
@ -986,10 +1009,9 @@ httpd_stream_file(struct httpd_request *hreq, int id)
httpd_header_add(hreq->out_headers, "Content-Type", buf);
}
}
/* If no Content-Type has been set and we're streaming audio, add a proper
* Content-Type for the file we're streaming. Remember DAAP streams audio
* with application/x-dmap-tagged as the Content-Type (ugh!).
*/
// If no Content-Type has been set and we're streaming audio, add a proper
// Content-Type for the file we're streaming. Remember DAAP streams audio
// with application/x-dmap-tagged as the Content-Type (ugh!).
else if (!httpd_header_find(hreq->out_headers, "Content-Type") && mfi->type)
{
ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type);
@ -1000,47 +1022,11 @@ httpd_stream_file(struct httpd_request *hreq, int id)
}
}
st->evbuf = evbuffer_new();
if (!st->evbuf)
{
DPRINTF(E_LOG, L_HTTPD, "Could not allocate an evbuffer for streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_cleanup;
}
ret = evbuffer_expand(st->evbuf, STREAM_CHUNK_SIZE);
if (ret != 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not expand evbuffer for streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_cleanup;
}
evbase = httpd_request_evbase_get(hreq);
st->ev = event_new(evbase, -1, EV_TIMEOUT, stream_cb, st);
evutil_timerclear(&tv);
if (!st->ev || (event_add(st->ev, &tv) < 0))
{
DPRINTF(E_LOG, L_HTTPD, "Could not add one-shot event for streaming\n");
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
goto out_cleanup;
}
st->id = mfi->id;
st->start_offset = offset;
st->stream_size = st->size;
st->hreq = hreq;
if ((offset == 0) && (end_offset == 0))
{
/* If we are not decoding, send the Content-Length. We don't do
* that if we are decoding because we can only guesstimate the
* size in this case and the error margin is unknown and variable.
*/
// If we are not decoding, send the Content-Length. We don't do that if we
// are decoding because we can only guesstimate the size in this case and
// the error margin is unknown and variable.
if (!transcode)
{
ret = snprintf(buf, sizeof(buf), "%" PRIi64, (int64_t)st->size);
@ -1054,11 +1040,6 @@ httpd_stream_file(struct httpd_request *hreq, int id)
}
else
{
if (offset > 0)
st->stream_size -= offset;
if (end_offset > 0)
st->stream_size -= (st->size - end_offset);
DPRINTF(E_DBG, L_HTTPD, "Stream request with range %" PRIi64 "-%" PRIi64 "\n", offset, end_offset);
ret = snprintf(buf, sizeof(buf), "bytes %" PRIi64 "-%" PRIi64 "/%" PRIi64,
@ -1080,7 +1061,7 @@ httpd_stream_file(struct httpd_request *hreq, int id)
#ifdef HAVE_POSIX_FADVISE
if (!transcode)
{
/* Hint the OS */
// Hint the OS
if ( (ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_WILLNEED)) != 0 ||
(ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_SEQUENTIAL)) != 0 ||
(ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_NOREUSE)) != 0 )
@ -1088,26 +1069,15 @@ httpd_stream_file(struct httpd_request *hreq, int id)
}
#endif
httpd_request_closecb_set(hreq, stream_fail_cb, st);
httpd_request_close_cb_set(hreq, stream_fail_cb, st);
DPRINTF(E_INFO, L_HTTPD, "Kicking off streaming for %s\n", mfi->path);
free_mfi(mfi, 0);
return;
out_cleanup:
if (st->evbuf)
evbuffer_free(st->evbuf);
if (st->xcode)
transcode_cleanup(&st->xcode);
if (st->buf)
free(st->buf);
if (st->fd > 0)
close(st->fd);
out_free_st:
free(st);
out_free_mfi:
error:
stream_free(st);
free_mfi(mfi, 0);
}
@ -1178,10 +1148,18 @@ httpd_gzip_deflate(struct evbuffer *in)
return NULL;
}
// The httpd_send functions below can be called from a worker thread (with
// hreq->is_async) or directly from the httpd thread. In the former case, they
// will command sending from the httpd thread, since it is not safe to access
// the backend (evhttp) from a worker thread. hreq will be freed (again,
// possibly async) if the type is either _COMPLETE or _END.
void
httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struct evbuffer *evbuf, enum httpd_send_flags flags)
httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, enum httpd_send_flags flags)
{
struct evbuffer *gzbuf;
struct evbuffer *save;
const char *param;
int do_gzip;
@ -1189,30 +1167,24 @@ httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struc
return;
do_gzip = ( (!(flags & HTTPD_SEND_NO_GZIP)) &&
evbuf && (evbuffer_get_length(evbuf) > 512) &&
(evbuffer_get_length(hreq->out_body) > 512) &&
(param = httpd_header_find(hreq->in_headers, "Accept-Encoding")) &&
(strstr(param, "gzip") || strstr(param, "*"))
);
cors_headers_add(hreq, httpd_allow_origin);
if (do_gzip && (gzbuf = httpd_gzip_deflate(evbuf)))
if (do_gzip && (gzbuf = httpd_gzip_deflate(hreq->out_body)))
{
DPRINTF(E_DBG, L_HTTPD, "Gzipping response\n");
httpd_header_add(hreq->out_headers, "Content-Encoding", "gzip");
httpd_backend_reply_send(hreq->backend, code, reason, gzbuf);
evbuffer_free(gzbuf);
// Drain original buffer, as would be after evhttp_send_reply()
evbuffer_drain(evbuf, evbuffer_get_length(evbuf));
}
else
{
httpd_backend_reply_send(hreq->backend, code, reason, evbuf);
save = hreq->out_body;
hreq->out_body = gzbuf;
evbuffer_free(save);
}
httpd_request_free(hreq);
httpd_send(hreq, HTTPD_REPLY_COMPLETE, code, reason, NULL, NULL);
}
void
@ -1220,28 +1192,26 @@ httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason)
{
cors_headers_add(hreq, httpd_allow_origin);
httpd_backend_reply_start_send(hreq->backend, code, reason);
httpd_send(hreq, HTTPD_REPLY_START, code, reason, NULL, NULL);
}
void
httpd_send_reply_chunk(struct httpd_request *hreq, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg)
httpd_send_reply_chunk(struct httpd_request *hreq, httpd_connection_chunkcb cb, void *arg)
{
httpd_backend_reply_chunk_send(hreq->backend, evbuf, cb, arg);
httpd_send(hreq, HTTPD_REPLY_CHUNK, 0, NULL, cb, arg);
}
void
httpd_send_reply_end(struct httpd_request *hreq)
{
httpd_backend_reply_end_send(hreq->backend);
httpd_request_free(hreq);
httpd_send(hreq, HTTPD_REPLY_END, 0, NULL, NULL, NULL);
}
// This is a modified version of evhttp_send_error (credit libevent)
void
httpd_send_error(struct httpd_request *hreq, int error, const char *reason)
{
struct evbuffer *evbuf;
evbuffer_drain(hreq->out_body, -1);
httpd_headers_clear(hreq->out_headers);
cors_headers_add(hreq, httpd_allow_origin);
@ -1249,18 +1219,9 @@ httpd_send_error(struct httpd_request *hreq, int error, const char *reason)
httpd_header_add(hreq->out_headers, "Content-Type", "text/html");
httpd_header_add(hreq->out_headers, "Connection", "close");
evbuf = evbuffer_new();
if (!evbuf)
DPRINTF(E_LOG, L_HTTPD, "Could not allocate evbuffer for error page\n");
else
evbuffer_add_printf(evbuf, ERR_PAGE, error, reason, reason);
evbuffer_add_printf(hreq->out_body, ERR_PAGE, error, reason, reason);
httpd_backend_reply_send(hreq->backend, error, reason, evbuf);
if (evbuf)
evbuffer_free(evbuf);
httpd_request_free(hreq);
httpd_send(hreq, HTTPD_REPLY_COMPLETE, error, reason, NULL, NULL);
}
bool
@ -1277,7 +1238,7 @@ httpd_admin_check_auth(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_HTTPD, "Web interface request to '%s' denied: No password set in the config\n", hreq->uri);
httpd_send_error(hreq, 403, "Forbidden");
httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden");
return false;
}
@ -1300,7 +1261,6 @@ httpd_admin_check_auth(struct httpd_request *hreq)
int
httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passwd, const char *realm)
{
struct evbuffer *evbuf;
char header[256];
const char *auth;
char *authuser;
@ -1375,20 +1335,11 @@ httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passw
return -1;
}
evbuf = evbuffer_new();
if (!evbuf)
{
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
return -1;
}
httpd_header_add(hreq->out_headers, "WWW-Authenticate", header);
evbuffer_add_printf(evbuf, ERR_PAGE, 401, "Unauthorized", "Authorization required");
evbuffer_add_printf(hreq->out_body, ERR_PAGE, HTTP_UNAUTHORIZED, "Unauthorized", "Authorization required");
httpd_send_reply(hreq, 401, "Unauthorized", evbuf, HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_UNAUTHORIZED, "Unauthorized", HTTPD_SEND_NO_GZIP);
return -1;
}

View File

@ -166,13 +166,13 @@ artworkapi_request(struct httpd_request *hreq)
switch (status_code)
{
case HTTP_OK: /* 200 OK */
httpd_send_reply(hreq, status_code, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "OK", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOCONTENT: /* 204 No Content */
httpd_send_reply(hreq, status_code, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "No Content", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOTMODIFIED: /* 304 Not Modified */
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP);
break;
case HTTP_BADREQUEST: /* 400 Bad Request */
httpd_send_error(hreq, status_code, "Bad Request");

View File

@ -284,42 +284,28 @@ update_remove(struct daap_update_request *ur)
static void
update_refresh_cb(int fd, short event, void *arg)
{
struct daap_update_request *ur;
struct evbuffer *reply;
ur = (struct daap_update_request *)arg;
CHECK_NULL(L_DAAP, reply = evbuffer_new());
CHECK_ERR(L_DAAP, evbuffer_expand(reply, 32));
struct daap_update_request *ur = arg;
struct httpd_request *hreq = ur->hreq;
current_rev++;
/* Send back current revision */
dmap_add_container(reply, "mupd", 24);
dmap_add_int(reply, "mstt", 200); /* 12 */
dmap_add_int(reply, "musr", current_rev); /* 12 */
dmap_add_container(hreq->out_body, "mupd", 24);
dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */
dmap_add_int(hreq->out_body, "musr", current_rev); /* 12 */
httpd_request_closecb_set(ur->hreq, NULL, NULL);
httpd_send_reply(ur->hreq, HTTP_OK, "OK", reply, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
update_remove(ur);
}
static void
update_fail_cb(httpd_connection *conn, void *arg)
update_fail_cb(struct httpd_request *hreq, void *arg)
{
struct daap_update_request *ur;
ur = (struct daap_update_request *)arg;
struct daap_update_request *ur = arg;
DPRINTF(E_DBG, L_DAAP, "Update request: client closed connection\n");
httpd_request_closecb_set(ur->hreq, NULL, NULL);
// Peer won't get this, it is just to make sure hreq and evhttp's request get
// freed
httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request");
update_remove(ur);
}
@ -671,17 +657,17 @@ daap_reply_send(struct httpd_request *hreq, enum daap_reply_result result)
switch (result)
{
case DAAP_REPLY_LOGOUT:
httpd_send_reply(hreq, HTTP_NOCONTENT, "Logout Successful", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_NOCONTENT, "Logout Successful", 0);
break;
case DAAP_REPLY_NO_CONTENT:
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
break;
case DAAP_REPLY_OK:
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
break;
case DAAP_REPLY_OK_NO_GZIP:
case DAAP_REPLY_ERROR:
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
break;
case DAAP_REPLY_FORBIDDEN:
httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden");
@ -724,7 +710,7 @@ daap_request_authorize(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DAAP, "Unauthorized request from '%s', DAAP session not found: '%s'\n", hreq->peer_address, hreq->uri);
httpd_send_error(hreq, 401, "Unauthorized");
httpd_send_error(hreq, HTTP_UNAUTHORIZED, "Unauthorized");;
return -1;
}
@ -760,7 +746,7 @@ daap_request_authorize(struct httpd_request *hreq)
/* --------------------------- REPLY HANDLERS ------------------------------- */
/* Note that some handlers can be called without a connection (needed for */
/* cache regeneration), while others cannot. Those that cannot should check */
/* that httpd_request_connection_get(hreq) is not null. */
/* that hreq->backend is not null. */
static enum daap_reply_result
daap_reply_server_info(struct httpd_request *hreq)
@ -773,7 +759,7 @@ daap_reply_server_info(struct httpd_request *hreq)
int mpro;
int apro;
if (!httpd_request_connection_get(hreq))
if (!hreq->backend)
{
DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_server_info() cannot be called without an actual connection\n");
return DAAP_REPLY_NO_CONNECTION;
@ -986,12 +972,11 @@ static enum daap_reply_result
daap_reply_update(struct httpd_request *hreq)
{
struct daap_update_request *ur;
struct event_base *evbase;
const char *param;
int reqd_rev;
int ret;
if (!httpd_request_connection_get(hreq))
if (!hreq->backend)
{
DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_update() cannot be called without an actual connection\n");
return DAAP_REPLY_NO_CONNECTION;
@ -1039,9 +1024,7 @@ daap_reply_update(struct httpd_request *hreq)
if (DAAP_UPDATE_REFRESH > 0)
{
evbase = httpd_request_evbase_get(hreq);
ur->timeout = evtimer_new(evbase, update_refresh_cb, ur);
ur->timeout = evtimer_new(hreq->evbase, update_refresh_cb, ur);
if (ur->timeout)
ret = evtimer_add(ur->timeout, &daap_update_refresh_tv);
else
@ -1066,7 +1049,7 @@ daap_reply_update(struct httpd_request *hreq)
/* If the connection fails before we have an update to push out
* to the client, we need to know.
*/
httpd_request_closecb_set(hreq, update_fail_cb, ur);
httpd_request_close_cb_set(hreq, update_fail_cb, ur);
return DAAP_REPLY_NONE;
}
@ -1948,7 +1931,7 @@ daap_reply_extra_data(struct httpd_request *hreq)
int max_h;
int ret;
if (!httpd_request_connection_get(hreq))
if (!hreq->backend)
{
DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_extra_data() cannot be called without an actual connection\n");
return DAAP_REPLY_NO_CONNECTION;
@ -2028,7 +2011,7 @@ daap_stream(struct httpd_request *hreq)
int id;
int ret;
if (!httpd_request_connection_get(hreq))
if (!hreq->backend)
{
DPRINTF(E_LOG, L_DAAP, "Bug! daap_stream() cannot be called without an actual connection\n");
return DAAP_REPLY_NO_CONNECTION;
@ -2153,7 +2136,7 @@ static struct httpd_uri_map daap_handlers[] =
},
{
.regexp = "^/update$",
.handler = daap_reply_update
.handler = daap_reply_update,
},
{
.regexp = "^/activity$",
@ -2229,8 +2212,7 @@ daap_request(struct httpd_request *hreq)
if (!hreq->handler)
{
DPRINTF(E_LOG, L_DAAP, "Unrecognized path in DAAP request: '%s'\n", hreq->uri);
httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request");
daap_reply_send(hreq, DAAP_REPLY_BAD_REQUEST);
return;
}
@ -2273,7 +2255,7 @@ daap_request(struct httpd_request *hreq)
{
// The cache will return the data gzipped, so httpd_send_reply won't need to do it
httpd_header_add(hreq->out_headers, "Content-Encoding", "gzip");
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); // TODO not all want this reply
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); // TODO not all want this reply
return;
}
@ -2317,7 +2299,7 @@ daap_reply_build(const char *uri, const char *user_agent, int is_remote)
DPRINTF(E_DBG, L_DAAP, "Building reply for DAAP request: '%s'\n", uri);
hreq = httpd_request_new(NULL, uri, user_agent);
hreq = httpd_request_new(NULL, NULL, uri, user_agent);
if (!hreq)
{
DPRINTF(E_LOG, L_DAAP, "Error building request: '%s'\n", uri);
@ -2366,7 +2348,6 @@ daap_deinit(void)
{
struct daap_session *s;
struct daap_update_request *ur;
httpd_connection *conn;
for (s = daap_sessions; daap_sessions; s = daap_sessions)
{
@ -2378,10 +2359,7 @@ daap_deinit(void)
{
update_requests = ur->next;
httpd_request_closecb_set(ur->hreq, NULL, NULL);
conn = httpd_request_connection_get(ur->hreq);
httpd_connection_free(conn); // TODO necessary?
daap_reply_send(ur->hreq, DAAP_REPLY_SERVUNAVAIL);
update_free(ur);
}
}

View File

@ -139,25 +139,12 @@ static struct db_queue_item dummy_queue_item;
static void
dacp_send_error(struct httpd_request *hreq, const char *container, const char *errmsg)
{
struct evbuffer *evbuf;
if (!hreq)
return;
evbuf = evbuffer_new();
if (!evbuf)
{
DPRINTF(E_LOG, L_DACP, "Could not allocate evbuffer for DMAP error\n");
dmap_error_make(hreq->out_body, container, errmsg);
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
return;
}
dmap_error_make(evbuf, container, errmsg);
httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
}
static void
@ -640,7 +627,7 @@ dacp_request_authorize(struct httpd_request *hreq)
invalid:
DPRINTF(E_LOG, L_DACP, "Unauthorized request '%s' from '%s' (is peer trusted in your config?)\n", hreq->uri, hreq->peer_address);
httpd_send_error(hreq, 403, "Forbidden");
httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden");
return -1;
}
@ -719,13 +706,10 @@ playstatusupdate_cb(int fd, short what, void *arg);
static struct dacp_update_request *
update_request_new(struct httpd_request *hreq)
{
struct event_base *evbase;
struct dacp_update_request *ur;
evbase = httpd_request_evbase_get(hreq);
CHECK_NULL(L_DACP, ur = calloc(1, sizeof(struct dacp_update_request)));
CHECK_NULL(L_DACP, ur->updateev = event_new(evbase, -1, 0, playstatusupdate_cb, ur));
CHECK_NULL(L_DACP, ur->updateev = event_new(hreq->evbase, -1, 0, playstatusupdate_cb, ur));
ur->hreq = hreq;
return ur;
@ -771,40 +755,30 @@ static void
playstatusupdate_cb(int fd, short what, void *arg)
{
struct dacp_update_request *ur = arg;
struct evbuffer *update;
struct httpd_request *hreq = ur->hreq;
int ret;
CHECK_NULL(L_DACP, update = evbuffer_new());
ret = make_playstatusupdate(update, update_current_rev);
ret = make_playstatusupdate(hreq->out_body, update_current_rev);
if (ret < 0)
goto error;
httpd_request_closecb_set(ur->hreq, NULL, NULL);
httpd_send_reply(ur->hreq, HTTP_OK, "OK", update, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
pthread_mutex_lock(&update_request_lck);
update_request_remove(&update_requests, ur);
pthread_mutex_unlock(&update_request_lck);
error:
evbuffer_free(update);
return;
}
static void
update_fail_cb(httpd_connection *conn, void *arg)
update_fail_cb(struct httpd_request *hreq, void *arg)
{
struct dacp_update_request *ur = arg;
DPRINTF(E_DBG, L_DACP, "Update request: client closed connection\n");
httpd_request_closecb_set(ur->hreq, NULL, NULL);
// Peer won't get this, it is just to make sure hreq and evhttp's request get
// freed
httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request");
pthread_mutex_lock(&update_request_lck);
update_request_remove(&update_requests, ur);
pthread_mutex_unlock(&update_request_lck);
@ -1028,7 +1002,6 @@ dacp_propset_devicebusy(const char *value, struct httpd_request *hreq)
static void
dacp_propset_playingtime(const char *value, struct httpd_request *hreq)
{
struct event_base *evbase;
struct timeval tv;
int seek_target;
intptr_t seek_target_packed;
@ -1047,8 +1020,7 @@ dacp_propset_playingtime(const char *value, struct httpd_request *hreq)
evutil_timerclear(&tv);
tv.tv_usec = 200 * 1000;
evbase = httpd_request_evbase_get(hreq);
event_base_once(evbase, -1, EV_TIMEOUT, seek_timer_cb, (void *)seek_target_packed, &tv);
event_base_once(hreq->evbase, -1, EV_TIMEOUT, seek_timer_cb, (void *)seek_target_packed, &tv);
}
static void
@ -1194,7 +1166,7 @@ dacp_reply_ctrlint(struct httpd_request *hreq)
dmap_add_char(hreq->out_body, "cmrl", 1); /* 9, unknown */
dmap_add_long(hreq->out_body, "ceSX", (1 << 1 | 1)); /* 16, unknown dacp - lowest bit announces support for playqueue-contents/-edit */
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
}
@ -1338,7 +1310,7 @@ dacp_reply_cue_play(struct httpd_request *hreq)
dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */
dmap_add_int(hreq->out_body, "miid", status.id);/* 12 */
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
}
@ -1358,7 +1330,7 @@ dacp_reply_cue_clear(struct httpd_request *hreq)
dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */
dmap_add_int(hreq->out_body, "miid", 0); /* 12 */
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
}
@ -1407,12 +1379,12 @@ dacp_reply_play(struct httpd_request *hreq)
ret = player_playback_start();
if (ret < 0)
{
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1544,11 +1516,11 @@ dacp_reply_playspec(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
out_fail:
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
@ -1565,7 +1537,7 @@ dacp_reply_stop(struct httpd_request *hreq)
player_playback_stop();
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1582,7 +1554,7 @@ dacp_reply_pause(struct httpd_request *hreq)
player_playback_pause();
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1609,13 +1581,13 @@ dacp_reply_playpause(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DACP, "Player returned an error for start after pause\n");
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1634,7 +1606,7 @@ dacp_reply_nextitem(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DACP, "Player returned an error for nextitem\n");
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
@ -1643,12 +1615,12 @@ dacp_reply_nextitem(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DACP, "Player returned an error for start after nextitem\n");
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1667,7 +1639,7 @@ dacp_reply_previtem(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DACP, "Player returned an error for previtem\n");
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
@ -1676,12 +1648,12 @@ dacp_reply_previtem(struct httpd_request *hreq)
{
DPRINTF(E_LOG, L_DACP, "Player returned an error for start after previtem\n");
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1698,7 +1670,7 @@ dacp_reply_beginff(struct httpd_request *hreq)
/* TODO */
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1715,7 +1687,7 @@ dacp_reply_beginrew(struct httpd_request *hreq)
/* TODO */
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1732,7 +1704,7 @@ dacp_reply_playresume(struct httpd_request *hreq)
/* TODO */
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -1884,7 +1856,7 @@ dacp_reply_playqueuecontents(struct httpd_request *hreq)
dmap_add_char(hreq->out_body, "apsm", status.shuffle); /* 9, daap.playlistshufflemode - not part of mlcl container */
dmap_add_char(hreq->out_body, "aprm", status.repeat); /* 9, daap.playlistrepeatmode - not part of mlcl container */
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
@ -1922,7 +1894,7 @@ dacp_reply_playqueueedit_clear(struct httpd_request *hreq)
dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */
dmap_add_int(hreq->out_body, "miid", 0); /* 12 */
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
}
@ -2060,7 +2032,7 @@ dacp_reply_playqueueedit_add(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2108,7 +2080,7 @@ dacp_reply_playqueueedit_move(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2144,7 +2116,7 @@ dacp_reply_playqueueedit_remove(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2267,9 +2239,9 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq)
{
ret = make_playstatusupdate(hreq->out_body, update_current_rev);
if (ret < 0)
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
else
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return ret;
}
@ -2289,10 +2261,9 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq)
update_requests = ur;
pthread_mutex_unlock(&update_request_lck);
/* If the connection fails before we have an update to push out
* to the client, we need to know.
*/
httpd_request_closecb_set(hreq, update_fail_cb, ur);
// If the connection fails before we have an update to push out to the client,
// we need to know.
httpd_request_close_cb_set(hreq, update_fail_cb, ur);
return 0;
}
@ -2370,7 +2341,7 @@ dacp_reply_nowplayingartwork(struct httpd_request *hreq)
snprintf(clen, sizeof(clen), "%ld", (long)len);
httpd_header_add(hreq->out_headers, "Content-Length", clen);
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
return 0;
no_artwork:
@ -2471,7 +2442,7 @@ dacp_reply_getproperty(struct httpd_request *hreq)
evbuffer_free(proplist);
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
@ -2526,7 +2497,7 @@ dacp_reply_setproperty(struct httpd_request *hreq)
httpd_query_iterate(hreq->query, setproperty_cb, hreq);
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2554,7 +2525,7 @@ dacp_reply_getspeakers(struct httpd_request *hreq)
evbuffer_free(spklist);
httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
return 0;
}
@ -2638,13 +2609,13 @@ dacp_reply_setspeakers(struct httpd_request *hreq)
if (ret == -2)
httpd_send_error(hreq, 902, "");
else
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2670,7 +2641,7 @@ dacp_reply_volumeup(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2696,7 +2667,7 @@ dacp_reply_volumedown(struct httpd_request *hreq)
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2718,12 +2689,12 @@ dacp_reply_mutetoggle(struct httpd_request *hreq)
ret = speaker_info.selected ? player_speaker_disable(speaker_info.id) : player_speaker_enable(speaker_info.id);
if (ret < 0)
{
httpd_send_error(hreq, 500, "Internal Server Error");
httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error");
return -1;
}
/* 204 No Content is the canonical reply */
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP);
return 0;
}
@ -2784,7 +2755,7 @@ static struct httpd_uri_map dacp_handlers[] =
},
{
.regexp = "^/ctrl-int/[[:digit:]]+/playstatusupdate$",
.handler = dacp_reply_playstatusupdate
.handler = dacp_reply_playstatusupdate,
},
{
.regexp = "^/ctrl-int/[[:digit:]]+/playqueue-contents$",
@ -2879,7 +2850,6 @@ static void
dacp_deinit(void)
{
struct dacp_update_request *ur;
httpd_connection *conn;
listener_remove(dacp_playstatus_update_handler);
@ -2887,10 +2857,7 @@ dacp_deinit(void)
{
update_requests = ur->next;
httpd_request_closecb_set(ur->hreq, NULL, NULL);
conn = httpd_request_connection_get(ur->hreq);
httpd_connection_free(conn); // TODO necessary?
httpd_send_error(ur->hreq, HTTP_SERVUNAVAIL, "Service Unavailable");
update_request_free(ur);
}
}

View File

@ -53,11 +53,11 @@ typedef struct evhttp_request httpd_backend;
typedef struct evkeyvalq httpd_headers;
typedef struct evkeyvalq httpd_query;
typedef struct httpd_uri_parsed httpd_uri_parsed;
typedef void httpd_backend_data; // Not used for evhttp
typedef struct httpd_backend_data httpd_backend_data;
typedef char *httpd_uri_path_parts[31];
typedef void (*httpd_request_cb)(struct httpd_request *hreq, void *arg);
typedef void (*httpd_connection_closecb)(httpd_connection *conn, void *arg);
typedef void (*httpd_close_cb)(struct httpd_request *hreq, void *arg);
typedef void (*httpd_connection_chunkcb)(httpd_connection *conn, void *arg);
typedef void (*httpd_query_iteratecb)(const char *key, const char *val, void *arg);
@ -74,6 +74,15 @@ enum httpd_methods
HTTPD_METHOD_PATCH = 1 << 8,
};
#define HTTPD_F_REPLY_LAST (1 << 15)
enum httpd_reply_type
{
HTTPD_REPLY_START = 1,
HTTPD_REPLY_CHUNK = 2,
HTTPD_REPLY_END = HTTPD_F_REPLY_LAST | 1,
HTTPD_REPLY_COMPLETE = HTTPD_F_REPLY_LAST | 2,
};
enum httpd_send_flags
{
HTTPD_SEND_NO_GZIP = (1 << 0),
@ -94,6 +103,14 @@ enum httpd_modules
MODULE_RSP,
};
enum httpd_handler_flags
{
// Most requests are pushed to a worker thread, but some handlers deal with
// requests that must be answered quickly. Can only be used for nonblocking
// handlers.
HTTPD_HANDLER_REALTIME = (1 << 0),
};
struct httpd_module
{
const char *name;
@ -122,6 +139,7 @@ struct httpd_uri_map
char *regexp;
int (*handler)(struct httpd_request *hreq);
void *preg;
int flags; // See enum httpd_handler_flags
};
@ -174,6 +192,10 @@ struct httpd_request {
struct httpd_module *module;
// A pointer to the handler that will process the request
int (*handler)(struct httpd_request *hreq);
// Is the processing defered to a worker thread
bool is_async;
// Handler thread's evbase in case the handler needs to scehdule an event
struct event_base *evbase;
// A pointer to extra data that the module handling the request might need
void *extra_data;
};
@ -206,17 +228,16 @@ httpd_response_not_cachable(struct httpd_request *hreq);
* @in code HTTP code, e.g. 200
* @in reason A brief explanation of the error - if NULL the standard meaning
of the error code will be used
* @in evbuf Data for the response body
* @in flags See flags above
*/
void
httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struct evbuffer *evbuf, enum httpd_send_flags flags);
httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, enum httpd_send_flags flags);
void
httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason);
void
httpd_send_reply_chunk(struct httpd_request *hreq, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg);
httpd_send_reply_chunk(struct httpd_request *hreq, httpd_connection_chunkcb cb, void *arg);
void
httpd_send_reply_end(struct httpd_request *hreq);
@ -270,25 +291,13 @@ void
httpd_headers_clear(httpd_headers *headers);
void
httpd_connection_free(httpd_connection *conn);
httpd_connection *
httpd_request_connection_get(struct httpd_request *hreq);
void
httpd_request_backend_free(struct httpd_request *hreq);
int
httpd_request_closecb_set(struct httpd_request *hreq, httpd_connection_closecb cb, void *arg);
struct event_base *
httpd_request_evbase_get(struct httpd_request *hreq);
httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg);
void
httpd_request_free(struct httpd_request *hreq);
struct httpd_request *
httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agent);
httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent);
void
httpd_server_free(httpd_server *server);
@ -303,28 +312,20 @@ httpd_server_allow_origin_set(httpd_server *server, bool allow);
/*----------------- Only called by httpd.c to send raw replies ---------------*/
void
httpd_backend_reply_send(httpd_backend *backend, int code, const char *reason, struct evbuffer *evbuf);
void
httpd_backend_reply_start_send(httpd_backend *backend, int code, const char *reason);
void
httpd_backend_reply_chunk_send(httpd_backend *backend, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg);
void
httpd_backend_reply_end_send(httpd_backend *backend);
httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason,
httpd_connection_chunkcb cb, void *cbarg);
/*---------- Only called by httpd.c to populate struct httpd_request ---------*/
httpd_backend_data *
httpd_backend_data_create(httpd_backend *backend);
httpd_backend_data_create(httpd_backend *backend, httpd_server *server);
void
httpd_backend_data_free(httpd_backend_data *backend_data);
httpd_connection *
httpd_backend_connection_get(httpd_backend *backend);
struct event_base *
httpd_backend_evbase_get(httpd_backend *backend);
const char *
httpd_backend_uri_get(httpd_backend *backend, httpd_backend_data *backend_data);

View File

@ -4727,13 +4727,13 @@ jsonapi_request(struct httpd_request *hreq)
{
case HTTP_OK: /* 200 OK */
httpd_header_add(hreq->out_headers, "Content-Type", "application/json");
httpd_send_reply(hreq, status_code, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "OK", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOCONTENT: /* 204 No Content */
httpd_send_reply(hreq, status_code, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, status_code, "No Content", HTTPD_SEND_NO_GZIP);
break;
case HTTP_NOTMODIFIED: /* 304 Not Modified */
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP);
httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP);
break;
case HTTP_BADREQUEST: /* 400 Bad Request */
httpd_send_error(hreq, status_code, "Bad Request");

View File

@ -32,14 +32,16 @@
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <pthread.h>
#include "misc.h"
#include "logger.h"
#include "commands.h"
#include "httpd_internal.h"
#define DEBUG_ALLOC 1
// #define DEBUG_ALLOC 1
#ifdef DEBUG_ALLOC
#include <pthread.h>
static pthread_mutex_t debug_alloc_lck = PTHREAD_MUTEX_INITIALIZER;
static int debug_alloc_count;
#endif
@ -56,10 +58,41 @@ struct httpd_server
{
int fd;
struct evhttp *evhttp;
struct commands_base *cmdbase;
httpd_request_cb request_cb;
void *request_cb_arg;
};
struct httpd_reply
{
struct httpd_request *hreq;
enum httpd_reply_type type;
int code;
const char *reason;
httpd_connection_chunkcb chunkcb;
void *cbarg;
};
struct httpd_disconnect
{
pthread_mutex_t lock;
struct event *ev;
httpd_close_cb cb;
void *cbarg;
};
struct httpd_backend_data
{
// Pointer to server instance processing the request
struct httpd_server *server;
// If caller wants a callback on disconnect
struct httpd_disconnect disconnect;
};
// Forward
static void
closecb_worker(evutil_socket_t fd, short event, void *arg);
const char *
httpd_query_value_find(httpd_query *query, const char *key)
@ -109,45 +142,27 @@ httpd_headers_clear(httpd_headers *headers)
}
void
httpd_connection_free(httpd_connection *conn)
httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg)
{
if (!conn)
return;
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
evhttp_connection_free(conn);
}
pthread_mutex_lock(&disconnect->lock);
httpd_connection *
httpd_request_connection_get(struct httpd_request *hreq)
{
return httpd_backend_connection_get(hreq->backend);
}
disconnect->cb = cb;
disconnect->cbarg = arg;
void
httpd_request_backend_free(struct httpd_request *hreq)
{
evhttp_request_free(hreq->backend);
}
if (hreq->is_async)
{
if (disconnect->ev)
event_free(disconnect->ev);
int
httpd_request_closecb_set(struct httpd_request *hreq, httpd_connection_closecb cb, void *arg)
{
httpd_connection *conn = httpd_request_connection_get(hreq);
if (!conn)
return -1;
if (disconnect->cb)
disconnect->ev = event_new(hreq->evbase, -1, 0, closecb_worker, hreq);
else
disconnect->ev = NULL;
}
evhttp_connection_set_closecb(conn, cb, arg);
return 0;
}
struct event_base *
httpd_request_evbase_get(struct httpd_request *hreq)
{
httpd_connection *conn = httpd_request_connection_get(hreq);
if (!conn)
return NULL;
return evhttp_connection_get_base(conn);
pthread_mutex_unlock(&disconnect->lock);
}
void
@ -172,7 +187,7 @@ httpd_request_free(struct httpd_request *hreq)
}
struct httpd_request *
httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agent)
httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent)
{
struct httpd_request *hreq;
httpd_backend_data *backend_data;
@ -190,7 +205,7 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen
hreq->backend = backend;
if (backend)
{
backend_data = httpd_backend_data_create(backend);
backend_data = httpd_backend_data_create(backend, server);
hreq->backend_data = backend_data;
hreq->uri = httpd_backend_uri_get(backend, backend_data);
@ -233,6 +248,51 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen
return NULL;
}
static void
closecb_worker(evutil_socket_t fd, short event, void *arg)
{
struct httpd_request *hreq = arg;
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
pthread_mutex_lock(&disconnect->lock);
if (disconnect->cb)
disconnect->cb(hreq, disconnect->cbarg);
pthread_mutex_unlock(&disconnect->lock);
httpd_send_reply_end(hreq); // hreq is now deallocated
}
static void
closecb_httpd(httpd_connection *conn, void *arg)
{
struct httpd_request *hreq = arg;
struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect;
DPRINTF(E_WARN, hreq->module->logdomain, "Connection to '%s' was closed\n", hreq->peer_address);
// The disconnect event may occur while a worker thread is accessing hreq, or
// has an event scheduled that will do so, so we have to be careful to let it
// finish and cancel events.
pthread_mutex_lock(&disconnect->lock);
if (hreq->is_async)
{
if (disconnect->cb)
event_active(disconnect->ev, 0, 0);
pthread_mutex_unlock(&disconnect->lock);
return;
}
pthread_mutex_unlock(&disconnect->lock);
if (!disconnect->cb)
return;
disconnect->cb(hreq, disconnect->cbarg);
httpd_send_reply_end(hreq); // hreq is now deallocated
}
static void
gencb_httpd(httpd_backend *backend, void *arg)
{
@ -253,13 +313,17 @@ gencb_httpd(httpd_backend *backend, void *arg)
if (bufev)
bufferevent_enable(bufev, EV_READ);
hreq = httpd_request_new(backend, NULL, NULL);
hreq = httpd_request_new(backend, server, NULL, NULL);
if (!hreq)
{
evhttp_send_error(backend, HTTP_INTERNAL, "Internal error");
return;
}
// We must hook connection close, so we can assure that conn close callbacks
// to handlers running in a worker are made in the same thread.
evhttp_connection_set_closecb(evhttp_request_get_connection(backend), closecb_httpd, hreq);
server->request_cb(hreq, server->request_cb_arg);
}
@ -275,6 +339,7 @@ httpd_server_free(httpd_server *server)
if (server->evhttp)
evhttp_free(server->evhttp);
commands_base_free(server->cmdbase);
free(server);
}
@ -286,6 +351,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c
CHECK_NULL(L_HTTPD, server = calloc(1, sizeof(httpd_server)));
CHECK_NULL(L_HTTPD, server->evhttp = evhttp_new(evbase));
CHECK_NULL(L_HTTPD, server->cmdbase = commands_base_new(evbase, NULL));
server->request_cb = cb;
server->request_cb_arg = arg;
@ -294,7 +360,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c
if (server->fd <= 0)
goto error;
// Backlog of 128 is the same libevent uses
// Backlog of 128 is the same that libevent uses
ret = listen(server->fd, 128);
if (ret < 0)
goto error;
@ -318,46 +384,109 @@ httpd_server_allow_origin_set(httpd_server *server, bool allow)
evhttp_set_allowed_methods(server->evhttp, EVHTTP_REQ_GET | EVHTTP_REQ_POST | EVHTTP_REQ_PUT | EVHTTP_REQ_DELETE | EVHTTP_REQ_HEAD | EVHTTP_REQ_OPTIONS);
}
void
httpd_backend_reply_send(httpd_backend *backend, int code, const char *reason, struct evbuffer *evbuf)
// No locking of hreq required here, we're in the httpd thread, and the worker
// thread is waiting at commands_exec_sync()
static void
send_reply_and_free(struct httpd_reply *reply)
{
evhttp_send_reply(backend, code, reason, evbuf);
struct httpd_request *hreq = reply->hreq;
httpd_connection *conn;
// DPRINTF(E_DBG, L_HTTPD, "Send from httpd thread, type %d, backend %p\n", reply->type, hreq->backend);
if (reply->type & HTTPD_F_REPLY_LAST)
{
conn = evhttp_request_get_connection(hreq->backend);
if (conn)
evhttp_connection_set_closecb(conn, NULL, NULL);
}
switch (reply->type)
{
case HTTPD_REPLY_COMPLETE:
evhttp_send_reply(hreq->backend, reply->code, reply->reason, hreq->out_body);
break;
case HTTPD_REPLY_START:
evhttp_send_reply_start(hreq->backend, reply->code, reply->reason);
break;
case HTTPD_REPLY_CHUNK:
evhttp_send_reply_chunk_with_cb(hreq->backend, hreq->out_body, reply->chunkcb, reply->cbarg);
break;
case HTTPD_REPLY_END:
evhttp_send_reply_end(hreq->backend);
break;
}
}
static enum command_state
send_reply_and_free_cb(void *arg, int *retval)
{
struct httpd_reply *reply = arg;
send_reply_and_free(reply);
return COMMAND_END;
}
void
httpd_backend_reply_start_send(httpd_backend *backend, int code, const char *reason)
httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason, httpd_connection_chunkcb cb, void *cbarg)
{
evhttp_send_reply_start(backend, code, reason);
}
struct httpd_server *server = hreq->backend_data->server;
struct httpd_reply reply = {
.hreq = hreq,
.type = type,
.code = code,
.chunkcb = cb,
.cbarg = cbarg,
.reason = reason,
};
void
httpd_backend_reply_chunk_send(httpd_backend *backend, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg)
{
evhttp_send_reply_chunk_with_cb(backend, evbuf, cb, arg);
}
if (type & HTTPD_F_REPLY_LAST)
httpd_request_close_cb_set(hreq, NULL, NULL);
void
httpd_backend_reply_end_send(httpd_backend *backend)
{
evhttp_send_reply_end(backend);
// Sending async is not a option, because then the worker thread might touch
// hreq before we have completed sending the current chunk
if (hreq->is_async)
commands_exec_sync(server->cmdbase, send_reply_and_free_cb, NULL, &reply);
else
send_reply_and_free(&reply);
if (type & HTTPD_F_REPLY_LAST)
httpd_request_free(hreq);
}
httpd_backend_data *
httpd_backend_data_create(httpd_backend *backend)
httpd_backend_data_create(httpd_backend *backend, httpd_server *server)
{
return "dummy";
httpd_backend_data *backend_data;
CHECK_NULL(L_HTTPD, backend_data = calloc(1, sizeof(httpd_backend_data)));
CHECK_ERR(L_HTTPD, mutex_init(&backend_data->disconnect.lock));
backend_data->server = server;
return backend_data;
}
void
httpd_backend_data_free(httpd_backend_data *backend_data)
{
// Nothing to do
if (!backend_data)
return;
if (backend_data->disconnect.ev)
event_free(backend_data->disconnect.ev);
free(backend_data);
}
httpd_connection *
httpd_backend_connection_get(httpd_backend *backend)
struct event_base *
httpd_backend_evbase_get(httpd_backend *backend)
{
return evhttp_request_get_connection(backend);
httpd_connection *conn = evhttp_request_get_connection(backend);
if (!conn)
return NULL;
return evhttp_connection_get_base(conn);
}
const char *
@ -393,7 +522,7 @@ httpd_backend_output_buffer_get(httpd_backend *backend)
int
httpd_backend_peer_get(const char **addr, uint16_t *port, httpd_backend *backend, httpd_backend_data *backend_data)
{
httpd_connection *conn = httpd_backend_connection_get(backend);
httpd_connection *conn = evhttp_request_get_connection(backend);
if (!conn)
return -1;

View File

@ -119,28 +119,17 @@ static const struct field_map rsp_fields[] =
/* -------------------------------- HELPERS --------------------------------- */
static struct evbuffer *
mxml_to_evbuf(mxml_node_t *tree)
static int
mxml_to_evbuf(struct evbuffer *evbuf, mxml_node_t *tree)
{
struct evbuffer *evbuf;
char *xml;
int ret;
evbuf = evbuffer_new();
if (!evbuf)
{
DPRINTF(E_LOG, L_RSP, "Could not create evbuffer for RSP reply\n");
return NULL;
}
xml = mxmlSaveAllocString(tree, MXML_NO_CALLBACK);
if (!xml)
{
DPRINTF(E_LOG, L_RSP, "Could not finalize RSP reply\n");
evbuffer_free(evbuf);
return NULL;
return -1;
}
ret = evbuffer_add(evbuf, xml, strlen(xml));
@ -148,21 +137,19 @@ mxml_to_evbuf(mxml_node_t *tree)
if (ret < 0)
{
DPRINTF(E_LOG, L_RSP, "Could not load evbuffer for RSP reply\n");
evbuffer_free(evbuf);
return NULL;
return -1;
}
return evbuf;
return 0;
}
static void
rsp_send_error(struct httpd_request *hreq, char *errmsg)
{
struct evbuffer *evbuf;
mxml_node_t *reply;
mxml_node_t *status;
mxml_node_t *node;
int ret;
/* We'd use mxmlNewXML(), but then we can't put any attributes
* on the root node and we need some.
@ -185,22 +172,19 @@ rsp_send_error(struct httpd_request *hreq, char *errmsg)
node = mxmlNewElement(status, "totalrecords");
mxmlNewText(node, 0, "0");
evbuf = mxml_to_evbuf(reply);
ret = mxml_to_evbuf(hreq->out_body, reply);
mxmlDelete(reply);
if (!evbuf)
if (ret < 0)
{
httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error");
return;
}
httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8");
httpd_header_add(hreq->out_headers, "Connection", "close");
httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP);
}
static int
@ -277,24 +261,21 @@ query_params_set(struct query_params *qp, struct httpd_request *hreq)
static void
rsp_send_reply(struct httpd_request *hreq, mxml_node_t *reply)
{
struct evbuffer *evbuf;
int ret;
evbuf = mxml_to_evbuf(reply);
ret = mxml_to_evbuf(hreq->out_body, reply);
mxmlDelete(reply);
if (!evbuf)
if (ret < 0)
{
rsp_send_error(hreq, "Could not finalize reply");
return;
}
httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8");
httpd_header_add(hreq->out_headers, "Connection", "close");
httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, 0);
evbuffer_free(evbuf);
httpd_send_reply(hreq, HTTP_OK, "OK", 0);
}
static int
@ -844,7 +825,7 @@ static struct httpd_uri_map rsp_handlers[] =
},
{
.regexp = "^/rsp/stream/[[:digit:]]+$",
.handler = rsp_stream
.handler = rsp_stream,
},
{
.regexp = NULL,

View File

@ -192,27 +192,15 @@ session_new(struct httpd_request *hreq, bool icy_is_requested)
return session;
}
static void
session_end(struct streaming_session *session)
{
DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port);
// Valgrind says libevent doesn't free the request on disconnect (even though
// it owns it - libevent bug?), so we do it with a reply end. This also makes
// sure the hreq gets freed.
httpd_send_reply_end(session->hreq);
session_free(session);
}
/* ----------------------------- Event callbacks ---------------------------- */
static void
conn_close_cb(httpd_connection *conn, void *arg)
conn_close_cb(struct httpd_request *hreq, void *arg)
{
struct streaming_session *session = arg;
session_end(session);
session_free(session);
}
static void
@ -227,8 +215,10 @@ read_cb(evutil_socket_t fd, short event, void *arg)
len = evbuffer_read(session->readbuf, fd, -1);
if (len < 0 && errno != EAGAIN)
{
httpd_request_closecb_set(hreq, NULL, NULL);
session_end(session);
DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port);
httpd_send_reply_end(session->hreq);
session_free(session);
return;
}
@ -237,7 +227,7 @@ read_cb(evutil_socket_t fd, short event, void *arg)
else
evbuffer_add_buffer(hreq->out_body, session->readbuf);
httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL);
httpd_send_reply_chunk(hreq, NULL, NULL);
session->bytes_sent += len;
}
@ -249,7 +239,6 @@ static int
streaming_mp3_handler(struct httpd_request *hreq)
{
struct streaming_session *session;
struct event_base *evbase;
const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name");
const char *param;
bool icy_is_requested;
@ -271,11 +260,10 @@ streaming_mp3_handler(struct httpd_request *hreq)
// Ask streaming output module for a fd to read mp3 from
session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality);
CHECK_NULL(L_STREAMING, evbase = httpd_request_evbase_get(hreq));
CHECK_NULL(L_STREAMING, session->readev = event_new(evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session));
CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session));
event_add(session->readev, NULL);
httpd_request_closecb_set(hreq, conn_close_cb, session);
httpd_request_close_cb_set(hreq, conn_close_cb, session);
httpd_header_add(hreq->out_headers, "Content-Type", "audio/mpeg");
httpd_header_add(hreq->out_headers, "Server", PACKAGE_NAME "/" VERSION);
@ -292,7 +280,8 @@ static struct httpd_uri_map streaming_handlers[] =
{
{
.regexp = "^/stream.mp3$",
.handler = streaming_mp3_handler
.handler = streaming_mp3_handler,
.flags = HTTPD_HANDLER_REALTIME,
},
{
.regexp = NULL,

View File

@ -43,10 +43,10 @@
#include "evthr.h"
#include "misc.h"
#define THREADPOOL_NTHREADS 2
#define THREADPOOL_NTHREADS 4
static struct evthr_pool *worker_threadpool;
static __thread struct evthr *worker_thr;
/* ----------------------------- CALLBACK EXECUTION ------------------------- */
@ -98,12 +98,16 @@ init_cb(struct evthr *thr, void *shared)
{
CHECK_ERR(L_MAIN, db_perthread_init());
worker_thr = thr;
thread_setname(pthread_self(), "worker");
}
static void
exit_cb(struct evthr *thr, void *shared)
{
worker_thr = NULL;
db_perthread_deinit();
}
@ -145,6 +149,12 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
evthr_pool_defer(worker_threadpool, execute, cmdarg);
}
struct event_base *
worker_evbase_get(void)
{
return evthr_get_base(worker_thr);
}
int
worker_init(void)
{

View File

@ -2,8 +2,10 @@
#ifndef __WORKER_H__
#define __WORKER_H__
#include <event2/event.h>
/* The worker thread is made for running asyncronous tasks from a real time
* thread, mainly the player thread.
* thread.
* The worker_execute() function will trigger a callback from the worker thread.
* Before returning the function will copy the argument given, so the caller
@ -19,6 +21,11 @@
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay);
/* Can be called within a callback to get the worker thread's event base
*/
struct event_base *
worker_evbase_get(void);
int
worker_init(void);