diff --git a/src/commands.c b/src/commands.c index 7043bff1..822c65e5 100644 --- a/src/commands.c +++ b/src/commands.c @@ -63,8 +63,8 @@ command_cb_async(struct commands_base *cmdbase, struct command *cmd) // Command is executed asynchronously cmdstate = cmd->func(cmd->arg, &cmd->ret); - // Only free arg if there are no pending events (used in worker.c) - if (cmdstate != COMMAND_PENDING && cmd->arg) + // Only free arg if there are no pending events (used in httpd.c) + if (cmdstate != COMMAND_PENDING) free(cmd->arg); free(cmd); diff --git a/src/httpd.c b/src/httpd.c index 9ece0c90..4d1b241e 100644 --- a/src/httpd.c +++ b/src/httpd.c @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -99,8 +98,6 @@ struct content_type_map { struct stream_ctx { struct httpd_request *hreq; - uint8_t *buf; - struct evbuffer *evbuf; struct event *ev; int id; int fd; @@ -132,7 +129,18 @@ static const char *httpd_allow_origin; static int httpd_port; -#define THREADPOOL_NTHREADS 4 +// The server is designed around a single thread listening for requests. When +// received, the request is passed to a thread from the worker pool, where a +// handler will process it and prepare a response for the httpd thread to send +// back. The idea is that the httpd thread never blocks. The handler in the +// worker thread can block, but shouldn't hold the thread if it is a long- +// running request (e.g. a long poll), because then we can run out of worker +// threads. The handler should use events to avoid this. Handlers, that are non- +// blocking and where the response must not be delayed can use +// HTTPD_HANDLER_REALTIME, then the httpd thread calls it directly (sync) +// instead of the async worker. In short, you shouldn't need to increase the +// below. +#define THREADPOOL_NTHREADS 1 static struct evthr_pool *httpd_threadpool; @@ -294,23 +302,12 @@ cors_headers_add(struct httpd_request *hreq, const char *allow_origin) httpd_header_add(hreq->out_headers, "Access-Control-Allow-Headers", "authorization"); } -static int -handle_cors_preflight(struct httpd_request *hreq, const char *allow_origin) +static bool +is_cors_preflight(struct httpd_request *hreq, const char *allow_origin) { - bool is_cors_preflight; - - is_cors_preflight = ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin && - httpd_header_find(hreq->in_headers, "Origin") && - httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") ); - if (!is_cors_preflight) - return -1; - - cors_headers_add(hreq, allow_origin); - - // In this case there is no reason to go through httpd_send_reply - httpd_backend_reply_send(hreq->backend, HTTP_OK, "OK", NULL); - httpd_request_free(hreq); - return 0; + return ( hreq->method == HTTPD_METHOD_OPTIONS && hreq->in_headers && allow_origin && + httpd_header_find(hreq->in_headers, "Origin") && + httpd_header_find(hreq->in_headers, "Access-Control-Request-Method") ); } void @@ -337,6 +334,7 @@ httpd_request_handler_set(struct httpd_request *hreq) continue; hreq->handler = map->handler; + hreq->is_async = !(map->flags & HTTPD_HANDLER_REALTIME); break; } } @@ -346,7 +344,7 @@ httpd_redirect_to(struct httpd_request *hreq, const char *path) { httpd_header_add(hreq->out_headers, "Location", path); - httpd_send_reply(hreq, HTTP_MOVETEMP, "Moved", NULL, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_MOVETEMP, "Moved", HTTPD_SEND_NO_GZIP); } /* @@ -430,7 +428,6 @@ serve_file(struct httpd_request *hreq) char path[PATH_MAX]; char deref[PATH_MAX]; char *ctype; - struct evbuffer *evbuf; struct stat sb; int fd; int i; @@ -511,23 +508,14 @@ serve_file(struct httpd_request *hreq) { DPRINTF(E_WARN, L_HTTPD, "Access to file outside the web root dir forbidden: %s\n", deref); - httpd_send_error(hreq, 403, "Forbidden"); + httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden"); return; } if (httpd_request_not_modified_since(hreq, sb.st_mtime)) { - httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP); - return; - } - - evbuf = evbuffer_new(); - if (!evbuf) - { - DPRINTF(E_LOG, L_HTTPD, "Could not create evbuffer\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal error"); + httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP); return; } @@ -537,11 +525,10 @@ serve_file(struct httpd_request *hreq) DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", deref, strerror(errno)); httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); - evbuffer_free(evbuf); return; } - ret = evbuffer_expand(evbuf, sb.st_size); + ret = evbuffer_expand(hreq->out_body, sb.st_size); if (ret < 0) { DPRINTF(E_LOG, L_HTTPD, "Out of memory for htdocs-file\n"); @@ -549,7 +536,7 @@ serve_file(struct httpd_request *hreq) } while ((ret = read(fd, buf, sizeof(buf))) > 0) - evbuffer_add(evbuf, buf, ret); + evbuffer_add(hreq->out_body, buf, ret); if (ret < 0) { @@ -573,42 +560,53 @@ serve_file(struct httpd_request *hreq) httpd_header_add(hreq->out_headers, "Content-Type", ctype); - httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); - evbuffer_free(evbuf); close(fd); return; out_fail: httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal error"); - evbuffer_free(evbuf); close(fd); } /* ---------------------------- STREAM HANDLING ----------------------------- */ +// This will triggered in a httpd thread, but since the reading may be in a +// worker thread we just want to trigger the read loop +static void +stream_chunk_resched_cb(httpd_connection *conn, void *arg) +{ + struct stream_ctx *st = arg; + + // TODO not thread safe if st was freed in worker thread, but maybe not possible? + event_active(st->ev, 0, 0); +} + +static void +stream_free(struct stream_ctx *st) +{ + if (!st) + return; + + if (st->ev) + event_free(st->ev); + if (st->fd >= 0) + close(st->fd); + + transcode_cleanup(&st->xcode); + free(st); +} + static void stream_end(struct stream_ctx *st) { - httpd_request_closecb_set(st->hreq, NULL, NULL); + DPRINTF(E_DBG, L_HTTPD, "Ending stream %d\n", st->id); - // Alwayss send reply, even if connection failed, otherwise we memleak hreq - // and possibly the evhttp req as well. httpd_send_reply_end(st->hreq); - evbuffer_free(st->evbuf); - event_free(st->ev); - - if (st->xcode) - transcode_cleanup(&st->xcode); - else - { - free(st->buf); - close(st->fd); - } - - free(st); + stream_free(st); } static void @@ -626,36 +624,134 @@ stream_end_register(struct stream_ctx *st) } } -static void -stream_chunk_resched_cb(httpd_connection *conn, void *arg) +static struct stream_ctx * +stream_new(struct media_file_info *mfi, struct httpd_request *hreq, event_callback_fn stream_cb) { struct stream_ctx *st; - struct timeval tv; + + CHECK_NULL(L_HTTPD, st = calloc(1, sizeof(struct stream_ctx))); + st->fd = -1; + + st->ev = event_new(hreq->evbase, -1, EV_PERSIST, stream_cb, st); + if (!st->ev) + { + DPRINTF(E_LOG, L_HTTPD, "Could not create event for streaming\n"); + + httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); + goto error; + } + + event_active(st->ev, 0, 0); + + st->id = mfi->id; + st->hreq = hreq; + return st; + + error: + stream_free(st); + return NULL; +} + +static struct stream_ctx * +stream_new_transcode(struct media_file_info *mfi, struct httpd_request *hreq, int64_t offset, int64_t end_offset, event_callback_fn stream_cb) +{ + struct stream_ctx *st; + struct media_quality quality = { HTTPD_STREAM_SAMPLE_RATE, HTTPD_STREAM_BPS, HTTPD_STREAM_CHANNELS, 0 }; + + st = stream_new(mfi, hreq, stream_cb); + if (!st) + { + goto error; + } + + st->xcode = transcode_setup(XCODE_PCM16_HEADER, &quality, mfi->data_kind, mfi->path, mfi->song_length, &st->size); + if (!st->xcode) + { + DPRINTF(E_WARN, L_HTTPD, "Transcoding setup failed, aborting streaming\n"); + + httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); + goto error; + } + + st->stream_size = st->size - offset; + if (end_offset > 0) + st->stream_size -= (st->size - end_offset); + + st->start_offset = offset; + + return st; + + error: + stream_free(st); + return NULL; +} + +static struct stream_ctx * +stream_new_raw(struct media_file_info *mfi, struct httpd_request *hreq, int64_t offset, int64_t end_offset, event_callback_fn stream_cb) +{ + struct stream_ctx *st; + struct stat sb; + off_t pos; int ret; - st = (struct stream_ctx *)arg; + st = stream_new(mfi, hreq, stream_cb); + if (!st) + { + goto error; + } - evutil_timerclear(&tv); - ret = event_add(st->ev, &tv); + st->fd = open(mfi->path, O_RDONLY); + if (st->fd < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno)); + + httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); + goto error; + } + + ret = stat(mfi->path, &sb); if (ret < 0) { - DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n"); + DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno)); - stream_end(st); + httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); + goto error; } + + st->size = sb.st_size; + + st->stream_size = st->size - offset; + if (end_offset > 0) + st->stream_size -= (st->size - end_offset); + + st->start_offset = offset; + st->offset = offset; + st->end_offset = end_offset; + + pos = lseek(st->fd, offset, SEEK_SET); + if (pos == (off_t) -1) + { + DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno)); + + httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request"); + goto error; + } + + return st; + + error: + stream_free(st); + return NULL; } static void stream_chunk_xcode_cb(int fd, short event, void *arg) { - struct stream_ctx *st; - struct timeval tv; + struct stream_ctx *st = arg; int xcoded; int ret; - st = (struct stream_ctx *)arg; - - xcoded = transcode(st->evbuf, NULL, st->xcode, STREAM_CHUNK_SIZE); + xcoded = transcode(st->hreq->out_body, NULL, st->xcode, STREAM_CHUNK_SIZE); if (xcoded <= 0) { if (xcoded == 0) @@ -669,58 +765,45 @@ stream_chunk_xcode_cb(int fd, short event, void *arg) DPRINTF(E_DBG, L_HTTPD, "Got %d bytes from transcode; streaming file id %d\n", xcoded, st->id); - /* Consume transcoded data until we meet start_offset */ + // Consume transcoded data until we meet start_offset if (st->start_offset > st->offset) { ret = st->start_offset - st->offset; if (ret < xcoded) { - evbuffer_drain(st->evbuf, ret); + evbuffer_drain(st->hreq->out_body, ret); st->offset += ret; ret = xcoded - ret; } else { - evbuffer_drain(st->evbuf, xcoded); + evbuffer_drain(st->hreq->out_body, xcoded); st->offset += xcoded; - goto consume; + // Reschedule immediately - consume up to start_offset + event_active(st->ev, 0, 0); + return; } } else ret = xcoded; - httpd_send_reply_chunk(st->hreq, st->evbuf, stream_chunk_resched_cb, st); + httpd_send_reply_chunk(st->hreq, stream_chunk_resched_cb, st); st->offset += ret; stream_end_register(st); - - return; - - consume: /* reschedule immediately - consume up to start_offset */ - evutil_timerclear(&tv); - ret = event_add(st->ev, &tv); - if (ret < 0) - { - DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming (xcode)\n"); - - stream_end(st); - return; - } } static void stream_chunk_raw_cb(int fd, short event, void *arg) { - struct stream_ctx *st; + struct stream_ctx *st = arg; size_t chunk_size; int ret; - st = (struct stream_ctx *)arg; - if (st->end_offset && (st->offset > st->end_offset)) { stream_end(st); @@ -732,7 +815,7 @@ stream_chunk_raw_cb(int fd, short event, void *arg) else chunk_size = STREAM_CHUNK_SIZE; - ret = read(st->fd, st->buf, chunk_size); + ret = evbuffer_read(st->hreq->out_body, st->fd, chunk_size); if (ret <= 0) { if (ret == 0) @@ -746,9 +829,7 @@ stream_chunk_raw_cb(int fd, short event, void *arg) DPRINTF(E_DBG, L_HTTPD, "Read %d bytes; streaming file id %d\n", ret, st->id); - evbuffer_add(st->evbuf, st->buf, ret); - - httpd_send_reply_chunk(st->hreq, st->evbuf, stream_chunk_resched_cb, st); + httpd_send_reply_chunk(st->hreq, stream_chunk_resched_cb, st); st->offset += ret; @@ -756,33 +837,41 @@ stream_chunk_raw_cb(int fd, short event, void *arg) } static void -stream_fail_cb(httpd_connection *conn, void *arg) +stream_fail_cb(struct httpd_request *hreq, void *arg) { - struct stream_ctx *st; + struct stream_ctx *st = arg; - st = (struct stream_ctx *)arg; - - DPRINTF(E_WARN, L_HTTPD, "Connection failed; stopping streaming of file ID %d\n", st->id); - - /* Stop streaming */ - event_del(st->ev); - - stream_end(st); + stream_free(st); } -/* ---------------------------- MAIN HTTPD THREAD --------------------------- */ +/* ---------------------------- REQUEST CALLBACKS --------------------------- */ +// Worker thread, invoked by request_cb() below +static void +request_async_cb(void *arg) +{ + struct httpd_request *hreq = *(struct httpd_request **)arg; + +#ifdef HAVE_SYSCALL + DPRINTF(E_DBG, hreq->module->logdomain, "%s request '%s' in worker thread %ld\n", hreq->module->name, hreq->uri, syscall(SYS_gettid)); +#endif + + // Some handlers require an evbase to schedule events + hreq->evbase = worker_evbase_get(); + hreq->module->request(hreq); +} + +// httpd thread static void request_cb(struct httpd_request *hreq, void *arg) { - // Did we get a CORS preflight request? - if (handle_cors_preflight(hreq, httpd_allow_origin) == 0) + if (is_cors_preflight(hreq, httpd_allow_origin)) { + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); return; } - - if (!hreq->uri || !hreq->uri_parsed) + else if (!hreq->uri || !hreq->uri_parsed) { DPRINTF(E_WARN, L_HTTPD, "Invalid URI in request: '%s'\n", hreq->uri); httpd_redirect_to(hreq, "/"); @@ -796,13 +885,14 @@ request_cb(struct httpd_request *hreq, void *arg) } httpd_request_handler_set(hreq); - if (hreq->module) + if (hreq->module && hreq->is_async) + { + worker_execute(request_async_cb, &hreq, sizeof(struct httpd_request *), 0); + } + else if (hreq->module) { -#ifdef HAVE_SYSCALL - DPRINTF(E_DBG, hreq->module->logdomain, "%s request: '%s' (thread %ld)\n", hreq->module->name, hreq->uri, syscall(SYS_gettid)); -#else DPRINTF(E_DBG, hreq->module->logdomain, "%s request: '%s'\n", hreq->module->name, hreq->uri); -#endif + hreq->evbase = httpd_backend_evbase_get(hreq->backend); hreq->module->request(hreq); } else @@ -811,35 +901,26 @@ request_cb(struct httpd_request *hreq, void *arg) DPRINTF(E_DBG, L_HTTPD, "HTTP request: '%s'\n", hreq->uri); serve_file(hreq); } + + // Don't touch hreq here, if async it has been passed to a worker thread } /* ------------------------------- HTTPD API -------------------------------- */ -/* Thread: httpd */ void httpd_stream_file(struct httpd_request *hreq, int id) { - struct media_quality quality = { HTTPD_STREAM_SAMPLE_RATE, HTTPD_STREAM_BPS, HTTPD_STREAM_CHANNELS, 0 }; - struct media_file_info *mfi; - struct stream_ctx *st; - void (*stream_cb)(int fd, short event, void *arg); - struct stat sb; - struct timeval tv; - struct event_base *evbase; + struct media_file_info *mfi = NULL; + struct stream_ctx *st = NULL; const char *param; const char *param_end; - const char *client_codecs; char buf[64]; - int64_t offset; - int64_t end_offset; - off_t pos; + int64_t offset = 0; + int64_t end_offset = 0; int transcode; int ret; - offset = 0; - end_offset = 0; - param = httpd_header_find(hreq->in_headers, "Range"); if (param) { @@ -880,103 +961,45 @@ httpd_stream_file(struct httpd_request *hreq, int id) DPRINTF(E_LOG, L_HTTPD, "Item %d not found\n", id); httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); - return; + goto error; } if (mfi->data_kind != DATA_KIND_FILE) { DPRINTF(E_LOG, L_HTTPD, "Could not serve '%s' to client, not a file\n", mfi->path); - httpd_send_error(hreq, 500, "Cannot stream non-file content"); - goto out_free_mfi; + httpd_send_error(hreq, HTTP_INTERNAL, "Cannot stream non-file content"); + goto error; } - st = (struct stream_ctx *)malloc(sizeof(struct stream_ctx)); - if (!st) - { - DPRINTF(E_LOG, L_HTTPD, "Out of memory for struct stream_ctx\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_free_mfi; - } - memset(st, 0, sizeof(struct stream_ctx)); - st->fd = -1; - - client_codecs = httpd_header_find(hreq->in_headers, "Accept-Codecs"); - - transcode = transcode_needed(hreq->user_agent, client_codecs, mfi->codectype); + param = httpd_header_find(hreq->in_headers, "Accept-Codecs"); + transcode = transcode_needed(hreq->user_agent, param, mfi->codectype); if (transcode) { DPRINTF(E_INFO, L_HTTPD, "Preparing to transcode %s\n", mfi->path); - stream_cb = stream_chunk_xcode_cb; - - st->xcode = transcode_setup(XCODE_PCM16_HEADER, &quality, mfi->data_kind, mfi->path, mfi->song_length, &st->size); - if (!st->xcode) - { - DPRINTF(E_WARN, L_HTTPD, "Transcoding setup failed, aborting streaming\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_free_st; - } + st = stream_new_transcode(mfi, hreq, offset, end_offset, stream_chunk_xcode_cb); + if (!st) + goto error; if (!httpd_header_find(hreq->out_headers, "Content-Type")) httpd_header_add(hreq->out_headers, "Content-Type", "audio/wav"); } else { - /* Stream the raw file */ DPRINTF(E_INFO, L_HTTPD, "Preparing to stream %s\n", mfi->path); - st->buf = (uint8_t *)malloc(STREAM_CHUNK_SIZE); - if (!st->buf) - { - DPRINTF(E_LOG, L_HTTPD, "Out of memory for raw streaming buffer\n"); + st = stream_new_raw(mfi, hreq, offset, end_offset, stream_chunk_raw_cb); + if (!st) + goto error; - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_free_st; - } - - stream_cb = stream_chunk_raw_cb; - - st->fd = open(mfi->path, O_RDONLY); - if (st->fd < 0) - { - DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno)); - - httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); - goto out_cleanup; - } - - ret = stat(mfi->path, &sb); - if (ret < 0) - { - DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno)); - - httpd_send_error(hreq, HTTP_NOTFOUND, "Not Found"); - goto out_cleanup; - } - st->size = sb.st_size; - - pos = lseek(st->fd, offset, SEEK_SET); - if (pos == (off_t) -1) - { - DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno)); - - httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request"); - goto out_cleanup; - } - st->offset = offset; - st->end_offset = end_offset; - - /* Content-Type for video files is different than for audio files - * and overrides whatever may have been set previously, like - * application/x-dmap-tagged when we're speaking DAAP. - */ + // Content-Type for video files is different than for audio files and + // overrides whatever may have been set previously, like + // application/x-dmap-tagged when we're speaking DAAP. if (mfi->has_video) { - /* Front Row and others expect video/ */ + // Front Row and others expect video/ ret = snprintf(buf, sizeof(buf), "video/%s", mfi->type); if ((ret < 0) || (ret >= sizeof(buf))) DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n"); @@ -986,10 +1009,9 @@ httpd_stream_file(struct httpd_request *hreq, int id) httpd_header_add(hreq->out_headers, "Content-Type", buf); } } - /* If no Content-Type has been set and we're streaming audio, add a proper - * Content-Type for the file we're streaming. Remember DAAP streams audio - * with application/x-dmap-tagged as the Content-Type (ugh!). - */ + // If no Content-Type has been set and we're streaming audio, add a proper + // Content-Type for the file we're streaming. Remember DAAP streams audio + // with application/x-dmap-tagged as the Content-Type (ugh!). else if (!httpd_header_find(hreq->out_headers, "Content-Type") && mfi->type) { ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type); @@ -1000,47 +1022,11 @@ httpd_stream_file(struct httpd_request *hreq, int id) } } - st->evbuf = evbuffer_new(); - if (!st->evbuf) - { - DPRINTF(E_LOG, L_HTTPD, "Could not allocate an evbuffer for streaming\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_cleanup; - } - - ret = evbuffer_expand(st->evbuf, STREAM_CHUNK_SIZE); - if (ret != 0) - { - DPRINTF(E_LOG, L_HTTPD, "Could not expand evbuffer for streaming\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_cleanup; - } - - evbase = httpd_request_evbase_get(hreq); - - st->ev = event_new(evbase, -1, EV_TIMEOUT, stream_cb, st); - evutil_timerclear(&tv); - if (!st->ev || (event_add(st->ev, &tv) < 0)) - { - DPRINTF(E_LOG, L_HTTPD, "Could not add one-shot event for streaming\n"); - - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - goto out_cleanup; - } - - st->id = mfi->id; - st->start_offset = offset; - st->stream_size = st->size; - st->hreq = hreq; - if ((offset == 0) && (end_offset == 0)) { - /* If we are not decoding, send the Content-Length. We don't do - * that if we are decoding because we can only guesstimate the - * size in this case and the error margin is unknown and variable. - */ + // If we are not decoding, send the Content-Length. We don't do that if we + // are decoding because we can only guesstimate the size in this case and + // the error margin is unknown and variable. if (!transcode) { ret = snprintf(buf, sizeof(buf), "%" PRIi64, (int64_t)st->size); @@ -1054,11 +1040,6 @@ httpd_stream_file(struct httpd_request *hreq, int id) } else { - if (offset > 0) - st->stream_size -= offset; - if (end_offset > 0) - st->stream_size -= (st->size - end_offset); - DPRINTF(E_DBG, L_HTTPD, "Stream request with range %" PRIi64 "-%" PRIi64 "\n", offset, end_offset); ret = snprintf(buf, sizeof(buf), "bytes %" PRIi64 "-%" PRIi64 "/%" PRIi64, @@ -1080,7 +1061,7 @@ httpd_stream_file(struct httpd_request *hreq, int id) #ifdef HAVE_POSIX_FADVISE if (!transcode) { - /* Hint the OS */ + // Hint the OS if ( (ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_WILLNEED)) != 0 || (ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_SEQUENTIAL)) != 0 || (ret = posix_fadvise(st->fd, st->start_offset, st->stream_size, POSIX_FADV_NOREUSE)) != 0 ) @@ -1088,26 +1069,15 @@ httpd_stream_file(struct httpd_request *hreq, int id) } #endif - httpd_request_closecb_set(hreq, stream_fail_cb, st); + httpd_request_close_cb_set(hreq, stream_fail_cb, st); DPRINTF(E_INFO, L_HTTPD, "Kicking off streaming for %s\n", mfi->path); free_mfi(mfi, 0); - return; - out_cleanup: - if (st->evbuf) - evbuffer_free(st->evbuf); - if (st->xcode) - transcode_cleanup(&st->xcode); - if (st->buf) - free(st->buf); - if (st->fd > 0) - close(st->fd); - out_free_st: - free(st); - out_free_mfi: + error: + stream_free(st); free_mfi(mfi, 0); } @@ -1178,10 +1148,18 @@ httpd_gzip_deflate(struct evbuffer *in) return NULL; } + +// The httpd_send functions below can be called from a worker thread (with +// hreq->is_async) or directly from the httpd thread. In the former case, they +// will command sending from the httpd thread, since it is not safe to access +// the backend (evhttp) from a worker thread. hreq will be freed (again, +// possibly async) if the type is either _COMPLETE or _END. + void -httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struct evbuffer *evbuf, enum httpd_send_flags flags) +httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, enum httpd_send_flags flags) { struct evbuffer *gzbuf; + struct evbuffer *save; const char *param; int do_gzip; @@ -1189,30 +1167,24 @@ httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struc return; do_gzip = ( (!(flags & HTTPD_SEND_NO_GZIP)) && - evbuf && (evbuffer_get_length(evbuf) > 512) && + (evbuffer_get_length(hreq->out_body) > 512) && (param = httpd_header_find(hreq->in_headers, "Accept-Encoding")) && (strstr(param, "gzip") || strstr(param, "*")) ); cors_headers_add(hreq, httpd_allow_origin); - if (do_gzip && (gzbuf = httpd_gzip_deflate(evbuf))) + if (do_gzip && (gzbuf = httpd_gzip_deflate(hreq->out_body))) { DPRINTF(E_DBG, L_HTTPD, "Gzipping response\n"); httpd_header_add(hreq->out_headers, "Content-Encoding", "gzip"); - httpd_backend_reply_send(hreq->backend, code, reason, gzbuf); - evbuffer_free(gzbuf); - - // Drain original buffer, as would be after evhttp_send_reply() - evbuffer_drain(evbuf, evbuffer_get_length(evbuf)); - } - else - { - httpd_backend_reply_send(hreq->backend, code, reason, evbuf); + save = hreq->out_body; + hreq->out_body = gzbuf; + evbuffer_free(save); } - httpd_request_free(hreq); + httpd_send(hreq, HTTPD_REPLY_COMPLETE, code, reason, NULL, NULL); } void @@ -1220,28 +1192,26 @@ httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason) { cors_headers_add(hreq, httpd_allow_origin); - httpd_backend_reply_start_send(hreq->backend, code, reason); + httpd_send(hreq, HTTPD_REPLY_START, code, reason, NULL, NULL); } void -httpd_send_reply_chunk(struct httpd_request *hreq, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg) +httpd_send_reply_chunk(struct httpd_request *hreq, httpd_connection_chunkcb cb, void *arg) { - httpd_backend_reply_chunk_send(hreq->backend, evbuf, cb, arg); + httpd_send(hreq, HTTPD_REPLY_CHUNK, 0, NULL, cb, arg); } void httpd_send_reply_end(struct httpd_request *hreq) { - httpd_backend_reply_end_send(hreq->backend); - httpd_request_free(hreq); + httpd_send(hreq, HTTPD_REPLY_END, 0, NULL, NULL, NULL); } // This is a modified version of evhttp_send_error (credit libevent) void httpd_send_error(struct httpd_request *hreq, int error, const char *reason) { - struct evbuffer *evbuf; - + evbuffer_drain(hreq->out_body, -1); httpd_headers_clear(hreq->out_headers); cors_headers_add(hreq, httpd_allow_origin); @@ -1249,18 +1219,9 @@ httpd_send_error(struct httpd_request *hreq, int error, const char *reason) httpd_header_add(hreq->out_headers, "Content-Type", "text/html"); httpd_header_add(hreq->out_headers, "Connection", "close"); - evbuf = evbuffer_new(); - if (!evbuf) - DPRINTF(E_LOG, L_HTTPD, "Could not allocate evbuffer for error page\n"); - else - evbuffer_add_printf(evbuf, ERR_PAGE, error, reason, reason); + evbuffer_add_printf(hreq->out_body, ERR_PAGE, error, reason, reason); - httpd_backend_reply_send(hreq->backend, error, reason, evbuf); - - if (evbuf) - evbuffer_free(evbuf); - - httpd_request_free(hreq); + httpd_send(hreq, HTTPD_REPLY_COMPLETE, error, reason, NULL, NULL); } bool @@ -1277,7 +1238,7 @@ httpd_admin_check_auth(struct httpd_request *hreq) { DPRINTF(E_LOG, L_HTTPD, "Web interface request to '%s' denied: No password set in the config\n", hreq->uri); - httpd_send_error(hreq, 403, "Forbidden"); + httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden"); return false; } @@ -1300,7 +1261,6 @@ httpd_admin_check_auth(struct httpd_request *hreq) int httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passwd, const char *realm) { - struct evbuffer *evbuf; char header[256]; const char *auth; char *authuser; @@ -1375,20 +1335,11 @@ httpd_basic_auth(struct httpd_request *hreq, const char *user, const char *passw return -1; } - evbuf = evbuffer_new(); - if (!evbuf) - { - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - return -1; - } - httpd_header_add(hreq->out_headers, "WWW-Authenticate", header); - evbuffer_add_printf(evbuf, ERR_PAGE, 401, "Unauthorized", "Authorization required"); + evbuffer_add_printf(hreq->out_body, ERR_PAGE, HTTP_UNAUTHORIZED, "Unauthorized", "Authorization required"); - httpd_send_reply(hreq, 401, "Unauthorized", evbuf, HTTPD_SEND_NO_GZIP); - - evbuffer_free(evbuf); + httpd_send_reply(hreq, HTTP_UNAUTHORIZED, "Unauthorized", HTTPD_SEND_NO_GZIP); return -1; } diff --git a/src/httpd_artworkapi.c b/src/httpd_artworkapi.c index d1292f87..48ed9161 100644 --- a/src/httpd_artworkapi.c +++ b/src/httpd_artworkapi.c @@ -166,13 +166,13 @@ artworkapi_request(struct httpd_request *hreq) switch (status_code) { case HTTP_OK: /* 200 OK */ - httpd_send_reply(hreq, status_code, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, status_code, "OK", HTTPD_SEND_NO_GZIP); break; case HTTP_NOCONTENT: /* 204 No Content */ - httpd_send_reply(hreq, status_code, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, status_code, "No Content", HTTPD_SEND_NO_GZIP); break; case HTTP_NOTMODIFIED: /* 304 Not Modified */ - httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP); break; case HTTP_BADREQUEST: /* 400 Bad Request */ httpd_send_error(hreq, status_code, "Bad Request"); diff --git a/src/httpd_daap.c b/src/httpd_daap.c index 73a2eca1..800283ae 100644 --- a/src/httpd_daap.c +++ b/src/httpd_daap.c @@ -284,42 +284,28 @@ update_remove(struct daap_update_request *ur) static void update_refresh_cb(int fd, short event, void *arg) { - struct daap_update_request *ur; - struct evbuffer *reply; - - ur = (struct daap_update_request *)arg; - - CHECK_NULL(L_DAAP, reply = evbuffer_new()); - CHECK_ERR(L_DAAP, evbuffer_expand(reply, 32)); + struct daap_update_request *ur = arg; + struct httpd_request *hreq = ur->hreq; current_rev++; /* Send back current revision */ - dmap_add_container(reply, "mupd", 24); - dmap_add_int(reply, "mstt", 200); /* 12 */ - dmap_add_int(reply, "musr", current_rev); /* 12 */ + dmap_add_container(hreq->out_body, "mupd", 24); + dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */ + dmap_add_int(hreq->out_body, "musr", current_rev); /* 12 */ - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - httpd_send_reply(ur->hreq, HTTP_OK, "OK", reply, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); update_remove(ur); } static void -update_fail_cb(httpd_connection *conn, void *arg) +update_fail_cb(struct httpd_request *hreq, void *arg) { - struct daap_update_request *ur; - - ur = (struct daap_update_request *)arg; + struct daap_update_request *ur = arg; DPRINTF(E_DBG, L_DAAP, "Update request: client closed connection\n"); - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - // Peer won't get this, it is just to make sure hreq and evhttp's request get - // freed - httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request"); update_remove(ur); } @@ -671,17 +657,17 @@ daap_reply_send(struct httpd_request *hreq, enum daap_reply_result result) switch (result) { case DAAP_REPLY_LOGOUT: - httpd_send_reply(hreq, HTTP_NOCONTENT, "Logout Successful", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_NOCONTENT, "Logout Successful", 0); break; case DAAP_REPLY_NO_CONTENT: - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); break; case DAAP_REPLY_OK: - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); break; case DAAP_REPLY_OK_NO_GZIP: case DAAP_REPLY_ERROR: - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); break; case DAAP_REPLY_FORBIDDEN: httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden"); @@ -724,7 +710,7 @@ daap_request_authorize(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DAAP, "Unauthorized request from '%s', DAAP session not found: '%s'\n", hreq->peer_address, hreq->uri); - httpd_send_error(hreq, 401, "Unauthorized"); + httpd_send_error(hreq, HTTP_UNAUTHORIZED, "Unauthorized");; return -1; } @@ -760,7 +746,7 @@ daap_request_authorize(struct httpd_request *hreq) /* --------------------------- REPLY HANDLERS ------------------------------- */ /* Note that some handlers can be called without a connection (needed for */ /* cache regeneration), while others cannot. Those that cannot should check */ -/* that httpd_request_connection_get(hreq) is not null. */ +/* that hreq->backend is not null. */ static enum daap_reply_result daap_reply_server_info(struct httpd_request *hreq) @@ -773,7 +759,7 @@ daap_reply_server_info(struct httpd_request *hreq) int mpro; int apro; - if (!httpd_request_connection_get(hreq)) + if (!hreq->backend) { DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_server_info() cannot be called without an actual connection\n"); return DAAP_REPLY_NO_CONNECTION; @@ -986,12 +972,11 @@ static enum daap_reply_result daap_reply_update(struct httpd_request *hreq) { struct daap_update_request *ur; - struct event_base *evbase; const char *param; int reqd_rev; int ret; - if (!httpd_request_connection_get(hreq)) + if (!hreq->backend) { DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_update() cannot be called without an actual connection\n"); return DAAP_REPLY_NO_CONNECTION; @@ -1039,9 +1024,7 @@ daap_reply_update(struct httpd_request *hreq) if (DAAP_UPDATE_REFRESH > 0) { - evbase = httpd_request_evbase_get(hreq); - - ur->timeout = evtimer_new(evbase, update_refresh_cb, ur); + ur->timeout = evtimer_new(hreq->evbase, update_refresh_cb, ur); if (ur->timeout) ret = evtimer_add(ur->timeout, &daap_update_refresh_tv); else @@ -1066,7 +1049,7 @@ daap_reply_update(struct httpd_request *hreq) /* If the connection fails before we have an update to push out * to the client, we need to know. */ - httpd_request_closecb_set(hreq, update_fail_cb, ur); + httpd_request_close_cb_set(hreq, update_fail_cb, ur); return DAAP_REPLY_NONE; } @@ -1948,7 +1931,7 @@ daap_reply_extra_data(struct httpd_request *hreq) int max_h; int ret; - if (!httpd_request_connection_get(hreq)) + if (!hreq->backend) { DPRINTF(E_LOG, L_DAAP, "Bug! daap_reply_extra_data() cannot be called without an actual connection\n"); return DAAP_REPLY_NO_CONNECTION; @@ -2028,7 +2011,7 @@ daap_stream(struct httpd_request *hreq) int id; int ret; - if (!httpd_request_connection_get(hreq)) + if (!hreq->backend) { DPRINTF(E_LOG, L_DAAP, "Bug! daap_stream() cannot be called without an actual connection\n"); return DAAP_REPLY_NO_CONNECTION; @@ -2153,7 +2136,7 @@ static struct httpd_uri_map daap_handlers[] = }, { .regexp = "^/update$", - .handler = daap_reply_update + .handler = daap_reply_update, }, { .regexp = "^/activity$", @@ -2229,8 +2212,7 @@ daap_request(struct httpd_request *hreq) if (!hreq->handler) { DPRINTF(E_LOG, L_DAAP, "Unrecognized path in DAAP request: '%s'\n", hreq->uri); - - httpd_send_error(hreq, HTTP_BADREQUEST, "Bad Request"); + daap_reply_send(hreq, DAAP_REPLY_BAD_REQUEST); return; } @@ -2273,7 +2255,7 @@ daap_request(struct httpd_request *hreq) { // The cache will return the data gzipped, so httpd_send_reply won't need to do it httpd_header_add(hreq->out_headers, "Content-Encoding", "gzip"); - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); // TODO not all want this reply + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); // TODO not all want this reply return; } @@ -2317,7 +2299,7 @@ daap_reply_build(const char *uri, const char *user_agent, int is_remote) DPRINTF(E_DBG, L_DAAP, "Building reply for DAAP request: '%s'\n", uri); - hreq = httpd_request_new(NULL, uri, user_agent); + hreq = httpd_request_new(NULL, NULL, uri, user_agent); if (!hreq) { DPRINTF(E_LOG, L_DAAP, "Error building request: '%s'\n", uri); @@ -2366,7 +2348,6 @@ daap_deinit(void) { struct daap_session *s; struct daap_update_request *ur; - httpd_connection *conn; for (s = daap_sessions; daap_sessions; s = daap_sessions) { @@ -2378,10 +2359,7 @@ daap_deinit(void) { update_requests = ur->next; - httpd_request_closecb_set(ur->hreq, NULL, NULL); - conn = httpd_request_connection_get(ur->hreq); - httpd_connection_free(conn); // TODO necessary? - + daap_reply_send(ur->hreq, DAAP_REPLY_SERVUNAVAIL); update_free(ur); } } diff --git a/src/httpd_dacp.c b/src/httpd_dacp.c index 2eda58a7..6de4f342 100644 --- a/src/httpd_dacp.c +++ b/src/httpd_dacp.c @@ -139,25 +139,12 @@ static struct db_queue_item dummy_queue_item; static void dacp_send_error(struct httpd_request *hreq, const char *container, const char *errmsg) { - struct evbuffer *evbuf; - if (!hreq) return; - evbuf = evbuffer_new(); - if (!evbuf) - { - DPRINTF(E_LOG, L_DACP, "Could not allocate evbuffer for DMAP error\n"); + dmap_error_make(hreq->out_body, container, errmsg); - httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - return; - } - - dmap_error_make(evbuf, container, errmsg); - - httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP); - - evbuffer_free(evbuf); + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); } static void @@ -640,7 +627,7 @@ dacp_request_authorize(struct httpd_request *hreq) invalid: DPRINTF(E_LOG, L_DACP, "Unauthorized request '%s' from '%s' (is peer trusted in your config?)\n", hreq->uri, hreq->peer_address); - httpd_send_error(hreq, 403, "Forbidden"); + httpd_send_error(hreq, HTTP_FORBIDDEN, "Forbidden"); return -1; } @@ -719,13 +706,10 @@ playstatusupdate_cb(int fd, short what, void *arg); static struct dacp_update_request * update_request_new(struct httpd_request *hreq) { - struct event_base *evbase; struct dacp_update_request *ur; - evbase = httpd_request_evbase_get(hreq); - CHECK_NULL(L_DACP, ur = calloc(1, sizeof(struct dacp_update_request))); - CHECK_NULL(L_DACP, ur->updateev = event_new(evbase, -1, 0, playstatusupdate_cb, ur)); + CHECK_NULL(L_DACP, ur->updateev = event_new(hreq->evbase, -1, 0, playstatusupdate_cb, ur)); ur->hreq = hreq; return ur; @@ -771,40 +755,30 @@ static void playstatusupdate_cb(int fd, short what, void *arg) { struct dacp_update_request *ur = arg; - struct evbuffer *update; + struct httpd_request *hreq = ur->hreq; int ret; - CHECK_NULL(L_DACP, update = evbuffer_new()); - - ret = make_playstatusupdate(update, update_current_rev); + ret = make_playstatusupdate(hreq->out_body, update_current_rev); if (ret < 0) goto error; - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - httpd_send_reply(ur->hreq, HTTP_OK, "OK", update, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); pthread_mutex_lock(&update_request_lck); update_request_remove(&update_requests, ur); pthread_mutex_unlock(&update_request_lck); error: - evbuffer_free(update); + return; } static void -update_fail_cb(httpd_connection *conn, void *arg) +update_fail_cb(struct httpd_request *hreq, void *arg) { struct dacp_update_request *ur = arg; DPRINTF(E_DBG, L_DACP, "Update request: client closed connection\n"); - httpd_request_closecb_set(ur->hreq, NULL, NULL); - - // Peer won't get this, it is just to make sure hreq and evhttp's request get - // freed - httpd_send_error(ur->hreq, HTTP_BADREQUEST, "Bad Request"); - pthread_mutex_lock(&update_request_lck); update_request_remove(&update_requests, ur); pthread_mutex_unlock(&update_request_lck); @@ -1028,7 +1002,6 @@ dacp_propset_devicebusy(const char *value, struct httpd_request *hreq) static void dacp_propset_playingtime(const char *value, struct httpd_request *hreq) { - struct event_base *evbase; struct timeval tv; int seek_target; intptr_t seek_target_packed; @@ -1047,8 +1020,7 @@ dacp_propset_playingtime(const char *value, struct httpd_request *hreq) evutil_timerclear(&tv); tv.tv_usec = 200 * 1000; - evbase = httpd_request_evbase_get(hreq); - event_base_once(evbase, -1, EV_TIMEOUT, seek_timer_cb, (void *)seek_target_packed, &tv); + event_base_once(hreq->evbase, -1, EV_TIMEOUT, seek_timer_cb, (void *)seek_target_packed, &tv); } static void @@ -1194,7 +1166,7 @@ dacp_reply_ctrlint(struct httpd_request *hreq) dmap_add_char(hreq->out_body, "cmrl", 1); /* 9, unknown */ dmap_add_long(hreq->out_body, "ceSX", (1 << 1 | 1)); /* 16, unknown dacp - lowest bit announces support for playqueue-contents/-edit */ - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; } @@ -1338,7 +1310,7 @@ dacp_reply_cue_play(struct httpd_request *hreq) dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */ dmap_add_int(hreq->out_body, "miid", status.id);/* 12 */ - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; } @@ -1358,7 +1330,7 @@ dacp_reply_cue_clear(struct httpd_request *hreq) dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */ dmap_add_int(hreq->out_body, "miid", 0); /* 12 */ - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; } @@ -1407,12 +1379,12 @@ dacp_reply_play(struct httpd_request *hreq) ret = player_playback_start(); if (ret < 0) { - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1544,11 +1516,11 @@ dacp_reply_playspec(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; out_fail: - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } @@ -1565,7 +1537,7 @@ dacp_reply_stop(struct httpd_request *hreq) player_playback_stop(); /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1582,7 +1554,7 @@ dacp_reply_pause(struct httpd_request *hreq) player_playback_pause(); /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1609,13 +1581,13 @@ dacp_reply_playpause(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DACP, "Player returned an error for start after pause\n"); - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1634,7 +1606,7 @@ dacp_reply_nextitem(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DACP, "Player returned an error for nextitem\n"); - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } @@ -1643,12 +1615,12 @@ dacp_reply_nextitem(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DACP, "Player returned an error for start after nextitem\n"); - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1667,7 +1639,7 @@ dacp_reply_previtem(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DACP, "Player returned an error for previtem\n"); - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } @@ -1676,12 +1648,12 @@ dacp_reply_previtem(struct httpd_request *hreq) { DPRINTF(E_LOG, L_DACP, "Player returned an error for start after previtem\n"); - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1698,7 +1670,7 @@ dacp_reply_beginff(struct httpd_request *hreq) /* TODO */ /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1715,7 +1687,7 @@ dacp_reply_beginrew(struct httpd_request *hreq) /* TODO */ /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1732,7 +1704,7 @@ dacp_reply_playresume(struct httpd_request *hreq) /* TODO */ /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -1884,7 +1856,7 @@ dacp_reply_playqueuecontents(struct httpd_request *hreq) dmap_add_char(hreq->out_body, "apsm", status.shuffle); /* 9, daap.playlistshufflemode - not part of mlcl container */ dmap_add_char(hreq->out_body, "aprm", status.repeat); /* 9, daap.playlistrepeatmode - not part of mlcl container */ - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; @@ -1922,7 +1894,7 @@ dacp_reply_playqueueedit_clear(struct httpd_request *hreq) dmap_add_int(hreq->out_body, "mstt", 200); /* 12 */ dmap_add_int(hreq->out_body, "miid", 0); /* 12 */ - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; } @@ -2060,7 +2032,7 @@ dacp_reply_playqueueedit_add(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2108,7 +2080,7 @@ dacp_reply_playqueueedit_move(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2144,7 +2116,7 @@ dacp_reply_playqueueedit_remove(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2267,9 +2239,9 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq) { ret = make_playstatusupdate(hreq->out_body, update_current_rev); if (ret < 0) - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); else - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return ret; } @@ -2289,10 +2261,9 @@ dacp_reply_playstatusupdate(struct httpd_request *hreq) update_requests = ur; pthread_mutex_unlock(&update_request_lck); - /* If the connection fails before we have an update to push out - * to the client, we need to know. - */ - httpd_request_closecb_set(hreq, update_fail_cb, ur); + // If the connection fails before we have an update to push out to the client, + // we need to know. + httpd_request_close_cb_set(hreq, update_fail_cb, ur); return 0; } @@ -2370,7 +2341,7 @@ dacp_reply_nowplayingartwork(struct httpd_request *hreq) snprintf(clen, sizeof(clen), "%ld", (long)len); httpd_header_add(hreq->out_headers, "Content-Length", clen); - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); return 0; no_artwork: @@ -2471,7 +2442,7 @@ dacp_reply_getproperty(struct httpd_request *hreq) evbuffer_free(proplist); - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; @@ -2526,7 +2497,7 @@ dacp_reply_setproperty(struct httpd_request *hreq) httpd_query_iterate(hreq->query, setproperty_cb, hreq); /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2554,7 +2525,7 @@ dacp_reply_getspeakers(struct httpd_request *hreq) evbuffer_free(spklist); - httpd_send_reply(hreq, HTTP_OK, "OK", hreq->out_body, 0); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); return 0; } @@ -2638,13 +2609,13 @@ dacp_reply_setspeakers(struct httpd_request *hreq) if (ret == -2) httpd_send_error(hreq, 902, ""); else - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2670,7 +2641,7 @@ dacp_reply_volumeup(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2696,7 +2667,7 @@ dacp_reply_volumedown(struct httpd_request *hreq) } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2718,12 +2689,12 @@ dacp_reply_mutetoggle(struct httpd_request *hreq) ret = speaker_info.selected ? player_speaker_disable(speaker_info.id) : player_speaker_enable(speaker_info.id); if (ret < 0) { - httpd_send_error(hreq, 500, "Internal Server Error"); + httpd_send_error(hreq, HTTP_INTERNAL, "Internal Server Error"); return -1; } /* 204 No Content is the canonical reply */ - httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOCONTENT, "No Content", HTTPD_SEND_NO_GZIP); return 0; } @@ -2784,7 +2755,7 @@ static struct httpd_uri_map dacp_handlers[] = }, { .regexp = "^/ctrl-int/[[:digit:]]+/playstatusupdate$", - .handler = dacp_reply_playstatusupdate + .handler = dacp_reply_playstatusupdate, }, { .regexp = "^/ctrl-int/[[:digit:]]+/playqueue-contents$", @@ -2879,7 +2850,6 @@ static void dacp_deinit(void) { struct dacp_update_request *ur; - httpd_connection *conn; listener_remove(dacp_playstatus_update_handler); @@ -2887,10 +2857,7 @@ dacp_deinit(void) { update_requests = ur->next; - httpd_request_closecb_set(ur->hreq, NULL, NULL); - conn = httpd_request_connection_get(ur->hreq); - httpd_connection_free(conn); // TODO necessary? - + httpd_send_error(ur->hreq, HTTP_SERVUNAVAIL, "Service Unavailable"); update_request_free(ur); } } diff --git a/src/httpd_internal.h b/src/httpd_internal.h index 4d28fcef..9e57ce04 100644 --- a/src/httpd_internal.h +++ b/src/httpd_internal.h @@ -53,11 +53,11 @@ typedef struct evhttp_request httpd_backend; typedef struct evkeyvalq httpd_headers; typedef struct evkeyvalq httpd_query; typedef struct httpd_uri_parsed httpd_uri_parsed; -typedef void httpd_backend_data; // Not used for evhttp +typedef struct httpd_backend_data httpd_backend_data; typedef char *httpd_uri_path_parts[31]; typedef void (*httpd_request_cb)(struct httpd_request *hreq, void *arg); -typedef void (*httpd_connection_closecb)(httpd_connection *conn, void *arg); +typedef void (*httpd_close_cb)(struct httpd_request *hreq, void *arg); typedef void (*httpd_connection_chunkcb)(httpd_connection *conn, void *arg); typedef void (*httpd_query_iteratecb)(const char *key, const char *val, void *arg); @@ -74,6 +74,15 @@ enum httpd_methods HTTPD_METHOD_PATCH = 1 << 8, }; +#define HTTPD_F_REPLY_LAST (1 << 15) +enum httpd_reply_type +{ + HTTPD_REPLY_START = 1, + HTTPD_REPLY_CHUNK = 2, + HTTPD_REPLY_END = HTTPD_F_REPLY_LAST | 1, + HTTPD_REPLY_COMPLETE = HTTPD_F_REPLY_LAST | 2, +}; + enum httpd_send_flags { HTTPD_SEND_NO_GZIP = (1 << 0), @@ -94,6 +103,14 @@ enum httpd_modules MODULE_RSP, }; +enum httpd_handler_flags +{ + // Most requests are pushed to a worker thread, but some handlers deal with + // requests that must be answered quickly. Can only be used for nonblocking + // handlers. + HTTPD_HANDLER_REALTIME = (1 << 0), +}; + struct httpd_module { const char *name; @@ -122,6 +139,7 @@ struct httpd_uri_map char *regexp; int (*handler)(struct httpd_request *hreq); void *preg; + int flags; // See enum httpd_handler_flags }; @@ -174,6 +192,10 @@ struct httpd_request { struct httpd_module *module; // A pointer to the handler that will process the request int (*handler)(struct httpd_request *hreq); + // Is the processing defered to a worker thread + bool is_async; + // Handler thread's evbase in case the handler needs to scehdule an event + struct event_base *evbase; // A pointer to extra data that the module handling the request might need void *extra_data; }; @@ -206,17 +228,16 @@ httpd_response_not_cachable(struct httpd_request *hreq); * @in code HTTP code, e.g. 200 * @in reason A brief explanation of the error - if NULL the standard meaning of the error code will be used - * @in evbuf Data for the response body * @in flags See flags above */ void -httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, struct evbuffer *evbuf, enum httpd_send_flags flags); +httpd_send_reply(struct httpd_request *hreq, int code, const char *reason, enum httpd_send_flags flags); void httpd_send_reply_start(struct httpd_request *hreq, int code, const char *reason); void -httpd_send_reply_chunk(struct httpd_request *hreq, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg); +httpd_send_reply_chunk(struct httpd_request *hreq, httpd_connection_chunkcb cb, void *arg); void httpd_send_reply_end(struct httpd_request *hreq); @@ -270,25 +291,13 @@ void httpd_headers_clear(httpd_headers *headers); void -httpd_connection_free(httpd_connection *conn); - -httpd_connection * -httpd_request_connection_get(struct httpd_request *hreq); - -void -httpd_request_backend_free(struct httpd_request *hreq); - -int -httpd_request_closecb_set(struct httpd_request *hreq, httpd_connection_closecb cb, void *arg); - -struct event_base * -httpd_request_evbase_get(struct httpd_request *hreq); +httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg); void httpd_request_free(struct httpd_request *hreq); struct httpd_request * -httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agent); +httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent); void httpd_server_free(httpd_server *server); @@ -303,28 +312,20 @@ httpd_server_allow_origin_set(httpd_server *server, bool allow); /*----------------- Only called by httpd.c to send raw replies ---------------*/ void -httpd_backend_reply_send(httpd_backend *backend, int code, const char *reason, struct evbuffer *evbuf); - -void -httpd_backend_reply_start_send(httpd_backend *backend, int code, const char *reason); - -void -httpd_backend_reply_chunk_send(httpd_backend *backend, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg); - -void -httpd_backend_reply_end_send(httpd_backend *backend); +httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason, + httpd_connection_chunkcb cb, void *cbarg); /*---------- Only called by httpd.c to populate struct httpd_request ---------*/ httpd_backend_data * -httpd_backend_data_create(httpd_backend *backend); +httpd_backend_data_create(httpd_backend *backend, httpd_server *server); void httpd_backend_data_free(httpd_backend_data *backend_data); -httpd_connection * -httpd_backend_connection_get(httpd_backend *backend); +struct event_base * +httpd_backend_evbase_get(httpd_backend *backend); const char * httpd_backend_uri_get(httpd_backend *backend, httpd_backend_data *backend_data); diff --git a/src/httpd_jsonapi.c b/src/httpd_jsonapi.c index 7fd3afb4..cd7e319b 100644 --- a/src/httpd_jsonapi.c +++ b/src/httpd_jsonapi.c @@ -4727,13 +4727,13 @@ jsonapi_request(struct httpd_request *hreq) { case HTTP_OK: /* 200 OK */ httpd_header_add(hreq->out_headers, "Content-Type", "application/json"); - httpd_send_reply(hreq, status_code, "OK", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, status_code, "OK", HTTPD_SEND_NO_GZIP); break; case HTTP_NOCONTENT: /* 204 No Content */ - httpd_send_reply(hreq, status_code, "No Content", hreq->out_body, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, status_code, "No Content", HTTPD_SEND_NO_GZIP); break; case HTTP_NOTMODIFIED: /* 304 Not Modified */ - httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, NULL, HTTPD_SEND_NO_GZIP); + httpd_send_reply(hreq, HTTP_NOTMODIFIED, NULL, HTTPD_SEND_NO_GZIP); break; case HTTP_BADREQUEST: /* 400 Bad Request */ httpd_send_error(hreq, status_code, "Bad Request"); diff --git a/src/httpd_libevhttp.c b/src/httpd_libevhttp.c index 52b577eb..ac5a3bc4 100644 --- a/src/httpd_libevhttp.c +++ b/src/httpd_libevhttp.c @@ -32,14 +32,16 @@ #include #include +#include + #include "misc.h" #include "logger.h" +#include "commands.h" #include "httpd_internal.h" -#define DEBUG_ALLOC 1 +// #define DEBUG_ALLOC 1 #ifdef DEBUG_ALLOC -#include static pthread_mutex_t debug_alloc_lck = PTHREAD_MUTEX_INITIALIZER; static int debug_alloc_count; #endif @@ -56,10 +58,41 @@ struct httpd_server { int fd; struct evhttp *evhttp; + struct commands_base *cmdbase; httpd_request_cb request_cb; void *request_cb_arg; }; +struct httpd_reply +{ + struct httpd_request *hreq; + enum httpd_reply_type type; + int code; + const char *reason; + httpd_connection_chunkcb chunkcb; + void *cbarg; +}; + +struct httpd_disconnect +{ + pthread_mutex_t lock; + struct event *ev; + httpd_close_cb cb; + void *cbarg; +}; + +struct httpd_backend_data +{ + // Pointer to server instance processing the request + struct httpd_server *server; + // If caller wants a callback on disconnect + struct httpd_disconnect disconnect; +}; + +// Forward +static void +closecb_worker(evutil_socket_t fd, short event, void *arg); + const char * httpd_query_value_find(httpd_query *query, const char *key) @@ -109,45 +142,27 @@ httpd_headers_clear(httpd_headers *headers) } void -httpd_connection_free(httpd_connection *conn) +httpd_request_close_cb_set(struct httpd_request *hreq, httpd_close_cb cb, void *arg) { - if (!conn) - return; + struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect; - evhttp_connection_free(conn); -} + pthread_mutex_lock(&disconnect->lock); -httpd_connection * -httpd_request_connection_get(struct httpd_request *hreq) -{ - return httpd_backend_connection_get(hreq->backend); -} + disconnect->cb = cb; + disconnect->cbarg = arg; -void -httpd_request_backend_free(struct httpd_request *hreq) -{ - evhttp_request_free(hreq->backend); -} + if (hreq->is_async) + { + if (disconnect->ev) + event_free(disconnect->ev); -int -httpd_request_closecb_set(struct httpd_request *hreq, httpd_connection_closecb cb, void *arg) -{ - httpd_connection *conn = httpd_request_connection_get(hreq); - if (!conn) - return -1; + if (disconnect->cb) + disconnect->ev = event_new(hreq->evbase, -1, 0, closecb_worker, hreq); + else + disconnect->ev = NULL; + } - evhttp_connection_set_closecb(conn, cb, arg); - return 0; -} - -struct event_base * -httpd_request_evbase_get(struct httpd_request *hreq) -{ - httpd_connection *conn = httpd_request_connection_get(hreq); - if (!conn) - return NULL; - - return evhttp_connection_get_base(conn); + pthread_mutex_unlock(&disconnect->lock); } void @@ -172,7 +187,7 @@ httpd_request_free(struct httpd_request *hreq) } struct httpd_request * -httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agent) +httpd_request_new(httpd_backend *backend, httpd_server *server, const char *uri, const char *user_agent) { struct httpd_request *hreq; httpd_backend_data *backend_data; @@ -190,7 +205,7 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen hreq->backend = backend; if (backend) { - backend_data = httpd_backend_data_create(backend); + backend_data = httpd_backend_data_create(backend, server); hreq->backend_data = backend_data; hreq->uri = httpd_backend_uri_get(backend, backend_data); @@ -233,6 +248,51 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen return NULL; } +static void +closecb_worker(evutil_socket_t fd, short event, void *arg) +{ + struct httpd_request *hreq = arg; + struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect; + + pthread_mutex_lock(&disconnect->lock); + + if (disconnect->cb) + disconnect->cb(hreq, disconnect->cbarg); + + pthread_mutex_unlock(&disconnect->lock); + + httpd_send_reply_end(hreq); // hreq is now deallocated +} + +static void +closecb_httpd(httpd_connection *conn, void *arg) +{ + struct httpd_request *hreq = arg; + struct httpd_disconnect *disconnect = &hreq->backend_data->disconnect; + + DPRINTF(E_WARN, hreq->module->logdomain, "Connection to '%s' was closed\n", hreq->peer_address); + + // The disconnect event may occur while a worker thread is accessing hreq, or + // has an event scheduled that will do so, so we have to be careful to let it + // finish and cancel events. + pthread_mutex_lock(&disconnect->lock); + if (hreq->is_async) + { + if (disconnect->cb) + event_active(disconnect->ev, 0, 0); + + pthread_mutex_unlock(&disconnect->lock); + return; + } + pthread_mutex_unlock(&disconnect->lock); + + if (!disconnect->cb) + return; + + disconnect->cb(hreq, disconnect->cbarg); + httpd_send_reply_end(hreq); // hreq is now deallocated +} + static void gencb_httpd(httpd_backend *backend, void *arg) { @@ -253,13 +313,17 @@ gencb_httpd(httpd_backend *backend, void *arg) if (bufev) bufferevent_enable(bufev, EV_READ); - hreq = httpd_request_new(backend, NULL, NULL); + hreq = httpd_request_new(backend, server, NULL, NULL); if (!hreq) { evhttp_send_error(backend, HTTP_INTERNAL, "Internal error"); return; } + // We must hook connection close, so we can assure that conn close callbacks + // to handlers running in a worker are made in the same thread. + evhttp_connection_set_closecb(evhttp_request_get_connection(backend), closecb_httpd, hreq); + server->request_cb(hreq, server->request_cb_arg); } @@ -275,6 +339,7 @@ httpd_server_free(httpd_server *server) if (server->evhttp) evhttp_free(server->evhttp); + commands_base_free(server->cmdbase); free(server); } @@ -286,6 +351,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c CHECK_NULL(L_HTTPD, server = calloc(1, sizeof(httpd_server))); CHECK_NULL(L_HTTPD, server->evhttp = evhttp_new(evbase)); + CHECK_NULL(L_HTTPD, server->cmdbase = commands_base_new(evbase, NULL)); server->request_cb = cb; server->request_cb_arg = arg; @@ -294,7 +360,7 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c if (server->fd <= 0) goto error; - // Backlog of 128 is the same libevent uses + // Backlog of 128 is the same that libevent uses ret = listen(server->fd, 128); if (ret < 0) goto error; @@ -318,46 +384,109 @@ httpd_server_allow_origin_set(httpd_server *server, bool allow) evhttp_set_allowed_methods(server->evhttp, EVHTTP_REQ_GET | EVHTTP_REQ_POST | EVHTTP_REQ_PUT | EVHTTP_REQ_DELETE | EVHTTP_REQ_HEAD | EVHTTP_REQ_OPTIONS); } -void -httpd_backend_reply_send(httpd_backend *backend, int code, const char *reason, struct evbuffer *evbuf) +// No locking of hreq required here, we're in the httpd thread, and the worker +// thread is waiting at commands_exec_sync() +static void +send_reply_and_free(struct httpd_reply *reply) { - evhttp_send_reply(backend, code, reason, evbuf); + struct httpd_request *hreq = reply->hreq; + httpd_connection *conn; + +// DPRINTF(E_DBG, L_HTTPD, "Send from httpd thread, type %d, backend %p\n", reply->type, hreq->backend); + + if (reply->type & HTTPD_F_REPLY_LAST) + { + conn = evhttp_request_get_connection(hreq->backend); + if (conn) + evhttp_connection_set_closecb(conn, NULL, NULL); + } + + switch (reply->type) + { + case HTTPD_REPLY_COMPLETE: + evhttp_send_reply(hreq->backend, reply->code, reply->reason, hreq->out_body); + break; + case HTTPD_REPLY_START: + evhttp_send_reply_start(hreq->backend, reply->code, reply->reason); + break; + case HTTPD_REPLY_CHUNK: + evhttp_send_reply_chunk_with_cb(hreq->backend, hreq->out_body, reply->chunkcb, reply->cbarg); + break; + case HTTPD_REPLY_END: + evhttp_send_reply_end(hreq->backend); + break; + } +} + +static enum command_state +send_reply_and_free_cb(void *arg, int *retval) +{ + struct httpd_reply *reply = arg; + + send_reply_and_free(reply); + + return COMMAND_END; } void -httpd_backend_reply_start_send(httpd_backend *backend, int code, const char *reason) +httpd_send(struct httpd_request *hreq, enum httpd_reply_type type, int code, const char *reason, httpd_connection_chunkcb cb, void *cbarg) { - evhttp_send_reply_start(backend, code, reason); -} + struct httpd_server *server = hreq->backend_data->server; + struct httpd_reply reply = { + .hreq = hreq, + .type = type, + .code = code, + .chunkcb = cb, + .cbarg = cbarg, + .reason = reason, + }; -void -httpd_backend_reply_chunk_send(httpd_backend *backend, struct evbuffer *evbuf, httpd_connection_chunkcb cb, void *arg) -{ - evhttp_send_reply_chunk_with_cb(backend, evbuf, cb, arg); -} + if (type & HTTPD_F_REPLY_LAST) + httpd_request_close_cb_set(hreq, NULL, NULL); -void -httpd_backend_reply_end_send(httpd_backend *backend) -{ - evhttp_send_reply_end(backend); + // Sending async is not a option, because then the worker thread might touch + // hreq before we have completed sending the current chunk + if (hreq->is_async) + commands_exec_sync(server->cmdbase, send_reply_and_free_cb, NULL, &reply); + else + send_reply_and_free(&reply); + + if (type & HTTPD_F_REPLY_LAST) + httpd_request_free(hreq); } httpd_backend_data * -httpd_backend_data_create(httpd_backend *backend) +httpd_backend_data_create(httpd_backend *backend, httpd_server *server) { - return "dummy"; + httpd_backend_data *backend_data; + + CHECK_NULL(L_HTTPD, backend_data = calloc(1, sizeof(httpd_backend_data))); + CHECK_ERR(L_HTTPD, mutex_init(&backend_data->disconnect.lock)); + backend_data->server = server; + + return backend_data; } void httpd_backend_data_free(httpd_backend_data *backend_data) { - // Nothing to do + if (!backend_data) + return; + + if (backend_data->disconnect.ev) + event_free(backend_data->disconnect.ev); + + free(backend_data); } -httpd_connection * -httpd_backend_connection_get(httpd_backend *backend) +struct event_base * +httpd_backend_evbase_get(httpd_backend *backend) { - return evhttp_request_get_connection(backend); + httpd_connection *conn = evhttp_request_get_connection(backend); + if (!conn) + return NULL; + + return evhttp_connection_get_base(conn); } const char * @@ -393,7 +522,7 @@ httpd_backend_output_buffer_get(httpd_backend *backend) int httpd_backend_peer_get(const char **addr, uint16_t *port, httpd_backend *backend, httpd_backend_data *backend_data) { - httpd_connection *conn = httpd_backend_connection_get(backend); + httpd_connection *conn = evhttp_request_get_connection(backend); if (!conn) return -1; diff --git a/src/httpd_rsp.c b/src/httpd_rsp.c index 38702626..7bfce8f4 100644 --- a/src/httpd_rsp.c +++ b/src/httpd_rsp.c @@ -119,28 +119,17 @@ static const struct field_map rsp_fields[] = /* -------------------------------- HELPERS --------------------------------- */ -static struct evbuffer * -mxml_to_evbuf(mxml_node_t *tree) +static int +mxml_to_evbuf(struct evbuffer *evbuf, mxml_node_t *tree) { - struct evbuffer *evbuf; char *xml; int ret; - evbuf = evbuffer_new(); - if (!evbuf) - { - DPRINTF(E_LOG, L_RSP, "Could not create evbuffer for RSP reply\n"); - - return NULL; - } - xml = mxmlSaveAllocString(tree, MXML_NO_CALLBACK); if (!xml) { DPRINTF(E_LOG, L_RSP, "Could not finalize RSP reply\n"); - - evbuffer_free(evbuf); - return NULL; + return -1; } ret = evbuffer_add(evbuf, xml, strlen(xml)); @@ -148,21 +137,19 @@ mxml_to_evbuf(mxml_node_t *tree) if (ret < 0) { DPRINTF(E_LOG, L_RSP, "Could not load evbuffer for RSP reply\n"); - - evbuffer_free(evbuf); - return NULL; + return -1; } - return evbuf; + return 0; } static void rsp_send_error(struct httpd_request *hreq, char *errmsg) { - struct evbuffer *evbuf; mxml_node_t *reply; mxml_node_t *status; mxml_node_t *node; + int ret; /* We'd use mxmlNewXML(), but then we can't put any attributes * on the root node and we need some. @@ -185,22 +172,19 @@ rsp_send_error(struct httpd_request *hreq, char *errmsg) node = mxmlNewElement(status, "totalrecords"); mxmlNewText(node, 0, "0"); - evbuf = mxml_to_evbuf(reply); + ret = mxml_to_evbuf(hreq->out_body, reply); mxmlDelete(reply); - if (!evbuf) + if (ret < 0) { httpd_send_error(hreq, HTTP_SERVUNAVAIL, "Internal Server Error"); - return; } httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8"); httpd_header_add(hreq->out_headers, "Connection", "close"); - httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, HTTPD_SEND_NO_GZIP); - - evbuffer_free(evbuf); + httpd_send_reply(hreq, HTTP_OK, "OK", HTTPD_SEND_NO_GZIP); } static int @@ -277,24 +261,21 @@ query_params_set(struct query_params *qp, struct httpd_request *hreq) static void rsp_send_reply(struct httpd_request *hreq, mxml_node_t *reply) { - struct evbuffer *evbuf; + int ret; - evbuf = mxml_to_evbuf(reply); + ret = mxml_to_evbuf(hreq->out_body, reply); mxmlDelete(reply); - if (!evbuf) + if (ret < 0) { rsp_send_error(hreq, "Could not finalize reply"); - return; } httpd_header_add(hreq->out_headers, "Content-Type", "text/xml; charset=utf-8"); httpd_header_add(hreq->out_headers, "Connection", "close"); - httpd_send_reply(hreq, HTTP_OK, "OK", evbuf, 0); - - evbuffer_free(evbuf); + httpd_send_reply(hreq, HTTP_OK, "OK", 0); } static int @@ -844,7 +825,7 @@ static struct httpd_uri_map rsp_handlers[] = }, { .regexp = "^/rsp/stream/[[:digit:]]+$", - .handler = rsp_stream + .handler = rsp_stream, }, { .regexp = NULL, diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 001899d6..16bf329f 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -192,27 +192,15 @@ session_new(struct httpd_request *hreq, bool icy_is_requested) return session; } -static void -session_end(struct streaming_session *session) -{ - DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port); - - // Valgrind says libevent doesn't free the request on disconnect (even though - // it owns it - libevent bug?), so we do it with a reply end. This also makes - // sure the hreq gets freed. - httpd_send_reply_end(session->hreq); - session_free(session); -} - /* ----------------------------- Event callbacks ---------------------------- */ static void -conn_close_cb(httpd_connection *conn, void *arg) +conn_close_cb(struct httpd_request *hreq, void *arg) { struct streaming_session *session = arg; - session_end(session); + session_free(session); } static void @@ -227,8 +215,10 @@ read_cb(evutil_socket_t fd, short event, void *arg) len = evbuffer_read(session->readbuf, fd, -1); if (len < 0 && errno != EAGAIN) { - httpd_request_closecb_set(hreq, NULL, NULL); - session_end(session); + DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port); + + httpd_send_reply_end(session->hreq); + session_free(session); return; } @@ -237,7 +227,7 @@ read_cb(evutil_socket_t fd, short event, void *arg) else evbuffer_add_buffer(hreq->out_body, session->readbuf); - httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL); + httpd_send_reply_chunk(hreq, NULL, NULL); session->bytes_sent += len; } @@ -249,7 +239,6 @@ static int streaming_mp3_handler(struct httpd_request *hreq) { struct streaming_session *session; - struct event_base *evbase; const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *param; bool icy_is_requested; @@ -271,11 +260,10 @@ streaming_mp3_handler(struct httpd_request *hreq) // Ask streaming output module for a fd to read mp3 from session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality); - CHECK_NULL(L_STREAMING, evbase = httpd_request_evbase_get(hreq)); - CHECK_NULL(L_STREAMING, session->readev = event_new(evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); + CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); event_add(session->readev, NULL); - httpd_request_closecb_set(hreq, conn_close_cb, session); + httpd_request_close_cb_set(hreq, conn_close_cb, session); httpd_header_add(hreq->out_headers, "Content-Type", "audio/mpeg"); httpd_header_add(hreq->out_headers, "Server", PACKAGE_NAME "/" VERSION); @@ -292,7 +280,8 @@ static struct httpd_uri_map streaming_handlers[] = { { .regexp = "^/stream.mp3$", - .handler = streaming_mp3_handler + .handler = streaming_mp3_handler, + .flags = HTTPD_HANDLER_REALTIME, }, { .regexp = NULL, diff --git a/src/worker.c b/src/worker.c index 9244a8bf..ecbc9836 100644 --- a/src/worker.c +++ b/src/worker.c @@ -43,10 +43,10 @@ #include "evthr.h" #include "misc.h" -#define THREADPOOL_NTHREADS 2 +#define THREADPOOL_NTHREADS 4 static struct evthr_pool *worker_threadpool; - +static __thread struct evthr *worker_thr; /* ----------------------------- CALLBACK EXECUTION ------------------------- */ @@ -98,12 +98,16 @@ init_cb(struct evthr *thr, void *shared) { CHECK_ERR(L_MAIN, db_perthread_init()); + worker_thr = thr; + thread_setname(pthread_self(), "worker"); } static void exit_cb(struct evthr *thr, void *shared) { + worker_thr = NULL; + db_perthread_deinit(); } @@ -145,6 +149,12 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay) evthr_pool_defer(worker_threadpool, execute, cmdarg); } +struct event_base * +worker_evbase_get(void) +{ + return evthr_get_base(worker_thr); +} + int worker_init(void) { diff --git a/src/worker.h b/src/worker.h index 125e9730..32afea7c 100644 --- a/src/worker.h +++ b/src/worker.h @@ -2,8 +2,10 @@ #ifndef __WORKER_H__ #define __WORKER_H__ +#include + /* The worker thread is made for running asyncronous tasks from a real time - * thread, mainly the player thread. + * thread. * The worker_execute() function will trigger a callback from the worker thread. * Before returning the function will copy the argument given, so the caller @@ -19,6 +21,11 @@ void worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay); +/* Can be called within a callback to get the worker thread's event base + */ +struct event_base * +worker_evbase_get(void); + int worker_init(void);