From 7a601d7daaff28e6b2ccd09b5b8cfaf99cfc4b1d Mon Sep 17 00:00:00 2001 From: Julien BLACHE Date: Fri, 1 May 2009 15:33:48 +0200 Subject: [PATCH] Enable streaming of transcoded content --- src/httpd.c | 255 ++++++++++++++++++++++++++++++++++------------- src/httpd_daap.c | 3 +- src/httpd_rsp.c | 3 +- 3 files changed, 190 insertions(+), 71 deletions(-) diff --git a/src/httpd.c b/src/httpd.c index 823b81e0..fde4fae9 100644 --- a/src/httpd.c +++ b/src/httpd.c @@ -43,6 +43,7 @@ #include "httpd.h" #include "httpd_rsp.h" #include "httpd_daap.h" +#include "transcode.h" /* @@ -73,9 +74,11 @@ struct stream_chunk { struct evbuffer *evbuf; int id; int fd; - off_t size; - off_t offset; + size_t size; + size_t offset; + size_t start_offset; int marked; + struct transcode_ctx *xcode; }; @@ -101,7 +104,84 @@ static pthread_t tid_httpd; static void -stream_chunk_cb(int fd, short event, void *arg) +stream_chunk_xcode_cb(int fd, short event, void *arg) +{ + struct stream_chunk *st; + struct timeval tv; + int xcoded; + int ret; + + st = (struct stream_chunk *)arg; + + xcoded = transcode(st->xcode, st->evbuf, STREAM_CHUNK_SIZE); + if (xcoded <= 0) + { + if (xcoded == 0) + DPRINTF(E_LOG, L_HTTPD, "Done streaming transcoded file id %d\n", st->id); + else + DPRINTF(E_LOG, L_HTTPD, "Transcoding error, file id %d\n", st->id); + + goto end_stream; + } + + DPRINTF(E_DBG, L_HTTPD, "Got %d bytes from transcode; streaming file id %d\n", xcoded, st->id); + + /* Consume transcoded data until we meet start_offset */ + if (st->start_offset > st->offset) + { + ret = st->start_offset - st->offset; + + if (ret < xcoded) + { + evbuffer_drain(st->evbuf, ret); + st->offset += ret; + + ret = xcoded - ret; + } + else + { + evbuffer_drain(st->evbuf, xcoded); + st->offset += xcoded; + + goto continue_stream; + } + } + else + ret = xcoded; + + if (ret > 0) + evhttp_send_reply_chunk(st->req, st->evbuf); + + st->offset += ret; + + if (!st->marked && (st->offset > ((st->size * 80) / 100))) + { + st->marked = 1; + db_playcount_increment(NULL, st->id); + } + + continue_stream: + evutil_timerclear(&tv); + ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_xcode_cb, st, &tv); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n"); + + goto end_stream; + } + + return; + + end_stream: + evhttp_send_reply_end(st->req); + + evbuffer_free(st->evbuf); + transcode_cleanup(st->xcode); + free(st); +} + +static void +stream_chunk_raw_cb(int fd, short event, void *arg) { struct stream_chunk *st; struct timeval tv; @@ -110,11 +190,13 @@ stream_chunk_cb(int fd, short event, void *arg) st = (struct stream_chunk *)arg; ret = evbuffer_read(st->evbuf, st->fd, STREAM_CHUNK_SIZE); - if (ret < 0) + if (ret <= 0) { - DPRINTF(E_LOG, L_HTTPD, "Streaming error, file id %d\n", st->id); + if (ret == 0) + DPRINTF(E_LOG, L_HTTPD, "Done streaming file id %d\n", st->id); + else + DPRINTF(E_LOG, L_HTTPD, "Streaming error, file id %d\n", st->id); - evbuffer_free(st->evbuf); goto end_stream; } @@ -131,15 +213,8 @@ stream_chunk_cb(int fd, short event, void *arg) db_playcount_increment(NULL, st->id); } - if (ret == 0) - { - DPRINTF(E_LOG, L_HTTPD, "Done streaming file id %d\n", st->id); - - goto end_stream; - } - evutil_timerclear(&tv); - ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv); + ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_raw_cb, st, &tv); if (ret < 0) { DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n"); @@ -163,6 +238,7 @@ httpd_stream_file(struct evhttp_request *req, int id) { struct media_file_info *mfi; struct stream_chunk *st; + void (*stream_cb)(int fd, short event, void *arg); struct stat sb; struct timeval tv; const char *param; @@ -171,8 +247,6 @@ httpd_stream_file(struct evhttp_request *req, int id) int transcode; int ret; - transcode = 0; /* FIXME: not supported yet */ - offset = 0; param = evhttp_find_header(req->input_headers, "Range"); if (param) @@ -204,19 +278,6 @@ httpd_stream_file(struct evhttp_request *req, int id) return; } - if (transcode) - { - /* Not supported yet */ - - evhttp_send_error(req, HTTP_SERVUNAVAIL, "Transcoding not supported"); - - db_dispose_item(mfi); - return; - } - - /* Stream the raw file */ - DPRINTF(E_INF, L_HTTPD, "Preparing to stream %s\n", mfi->path); - st = (struct stream_chunk *)malloc(sizeof(struct stream_chunk)); if (!st) { @@ -227,54 +288,87 @@ httpd_stream_file(struct evhttp_request *req, int id) db_dispose_item(mfi); return; } - memset(st, 0, sizeof(struct stream_chunk)); - st->fd = open(mfi->path, O_RDONLY); - if (st->fd < 0) + transcode = transcode_needed(req->input_headers, mfi->codectype); + + if (transcode) { - DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno)); + DPRINTF(E_INF, L_HTTPD, "Preparing to transcode %s\n", mfi->path); - evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + stream_cb = stream_chunk_xcode_cb; + st->fd = -1; - free(st); - db_dispose_item(mfi); - return; + st->xcode = transcode_setup(mfi, &st->size); + if (!st->xcode) + { + DPRINTF(E_WARN, L_HTTPD, "Transcoding setup failed, aborting streaming\n"); + + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); + + free(st); + db_dispose_item(mfi); + return; + } + + if (!evhttp_find_header(req->output_headers, "Content-Type")) + evhttp_add_header(req->output_headers, "Content-Type", "audio/wav"); } - - ret = stat(mfi->path, &sb); - if (ret < 0) + else { - DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno)); + /* Stream the raw file */ + DPRINTF(E_INF, L_HTTPD, "Preparing to stream %s\n", mfi->path); - evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + stream_cb = stream_chunk_raw_cb; - close(st->fd); - free(st); - db_dispose_item(mfi); - return; - } + st->fd = open(mfi->path, O_RDONLY); + if (st->fd < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno)); - ret = lseek(st->fd, offset, SEEK_SET); - if (ret < 0) - { - DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno)); + evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); - evhttp_send_error(req, HTTP_BADREQUEST, "Bad Request"); + free(st); + db_dispose_item(mfi); + return; + } - close(st->fd); - free(st); - db_dispose_item(mfi); - return; - } + ret = stat(mfi->path, &sb); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno)); - if (!evhttp_find_header(req->output_headers, "Content-Type") && mfi->type) - { - ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type); - if ((ret < 0) || (ret >= sizeof(buf))) - DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n"); - else - evhttp_add_header(req->output_headers, "Content-Type", buf); + evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + st->size = sb.st_size; + + ret = lseek(st->fd, offset, SEEK_SET); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno)); + + evhttp_send_error(req, HTTP_BADREQUEST, "Bad Request"); + + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + st->offset = offset; + + if (!evhttp_find_header(req->output_headers, "Content-Type") && mfi->type) + { + ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type); + if ((ret < 0) || (ret >= sizeof(buf))) + DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n"); + else + evhttp_add_header(req->output_headers, "Content-Type", buf); + } } st->evbuf = evbuffer_new(); @@ -285,14 +379,35 @@ httpd_stream_file(struct evhttp_request *req, int id) evhttp_clear_headers(req->output_headers); evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); - close(st->fd); + if (transcode) + transcode_cleanup(st->xcode); + else + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + + ret = evbuffer_expand(st->evbuf, STREAM_CHUNK_SIZE); + if (ret != 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not expand evbuffer for streaming\n"); + + evhttp_clear_headers(req->output_headers); + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); + + if (transcode) + transcode_cleanup(st->xcode); + else + close(st->fd); + evbuffer_free(st->evbuf); free(st); db_dispose_item(mfi); return; } evutil_timerclear(&tv); - ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv); + ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_cb, st, &tv); if (ret < 0) { DPRINTF(E_LOG, L_HTTPD, "Could not add one-shot event for streaming\n"); @@ -300,7 +415,10 @@ httpd_stream_file(struct evhttp_request *req, int id) evhttp_clear_headers(req->output_headers); evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); - close(st->fd); + if (transcode) + transcode_cleanup(st->xcode); + else + close(st->fd); evbuffer_free(st->evbuf); free(st); db_dispose_item(mfi); @@ -308,8 +426,7 @@ httpd_stream_file(struct evhttp_request *req, int id) } st->id = mfi->id; - st->size = sb.st_size; - st->offset = offset; + st->start_offset = offset; st->req = req; if (offset == 0) diff --git a/src/httpd_daap.c b/src/httpd_daap.c index 5ccf4fa3..a4d844d0 100644 --- a/src/httpd_daap.c +++ b/src/httpd_daap.c @@ -45,6 +45,7 @@ #include "conffile.h" #include "misc.h" #include "httpd.h" +#include "transcode.h" #include "httpd_daap.h" @@ -1111,7 +1112,7 @@ daap_reply_songlist_generic(struct evhttp_request *req, struct evbuffer *evbuf, { nsongs++; - transcode = 0; /* FIXME: No transcode support here yet */ + transcode = transcode_needed(req->input_headers, dbmfi->codectype); i = -1; while (1) diff --git a/src/httpd_rsp.c b/src/httpd_rsp.c index bde7cddd..35552942 100644 --- a/src/httpd_rsp.c +++ b/src/httpd_rsp.c @@ -44,6 +44,7 @@ #include "conffile.h" #include "misc.h" #include "httpd.h" +#include "transcode.h" #include "httpd_rsp.h" @@ -563,7 +564,7 @@ rsp_reply_playlist(struct evhttp_request *req, char **uri, struct evkeyvalq *que /* Items block (all items) */ while (((ret = db_enum_fetch_row(&db_errmsg, &dbmfi, &qi)) == DB_E_SUCCESS) && (dbmfi)) { - transcode = 0; /* FIXME: No transcode support here yet */ + transcode = transcode_needed(req->input_headers, dbmfi->codectype); /* Item block (one item) */ item = mxmlNewElement(items, "item");