From f05a55e7fcb80b62286fefa082c39f2fb4e217c9 Mon Sep 17 00:00:00 2001 From: Julien BLACHE Date: Sat, 25 Apr 2009 10:40:47 +0200 Subject: [PATCH] Implement HTTP chunked streaming Transcoding is not supported yet. In the current state of affairs, we will crash if the client closes the connection before the tranfser is done. This is a limitation of evhttp, it is not possible to detect this condition. This will need to be fixed in evhttp. --- src/httpd.c | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/httpd.h | 7 ++ 2 files changed, 285 insertions(+) diff --git a/src/httpd.c b/src/httpd.c index 630ec841..492559e3 100644 --- a/src/httpd.c +++ b/src/httpd.c @@ -28,17 +28,22 @@ #include #include #include +#include +#include #include #include #include "daapd.h" #include "err.h" +#include "ff-dbstruct.h" +#include "db-generic.h" #include "conffile.h" #include "httpd.h" #include "httpd_rsp.h" +#define STREAM_CHUNK_SIZE (512 * 1024) #define WEBFACE_ROOT "/usr/share/mt-daapd/admin-root/" struct content_type_map { @@ -46,6 +51,16 @@ struct content_type_map { char *ctype; }; +struct stream_chunk { + struct evhttp_request *req; + struct evbuffer *evbuf; + int id; + int fd; + off_t size; + off_t offset; + int marked; +}; + static struct content_type_map ext2ctype[] = { @@ -68,6 +83,269 @@ static struct evhttp *evhttpd; static pthread_t tid_httpd; +static int +safe_atol(const char *str, long *val) +{ + char *end; + long intval; + + errno = 0; + intval = strtol(str, &end, 10); + + if (((errno == ERANGE) && ((intval == LONG_MAX) || (intval == LONG_MIN))) + || ((errno != 0) && (intval == 0))) + { + DPRINTF(E_WARN, L_RSP, "Invalid integer in string (%s): %s\n", str, strerror(errno)); + + return -1; + } + + if (end == str) + { + DPRINTF(E_WARN, L_RSP, "No integer found in string (%s)\n", str); + + return -1; + } + + *val = intval; + + return 0; +} + + +static void +stream_chunk_cb(int fd, short event, void *arg) +{ + struct stream_chunk *st; + struct timeval tv; + int ret; + + st = (struct stream_chunk *)arg; + + ret = evbuffer_read(st->evbuf, st->fd, STREAM_CHUNK_SIZE); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Streaming error, file id %d\n", st->id); + + evbuffer_free(st->evbuf); + goto end_stream; + } + + DPRINTF(E_DBG, L_HTTPD, "Read %d bytes; streaming file id %d\n", ret, st->id); + + if (ret > 0) + evhttp_send_reply_chunk(st->req, st->evbuf); + + st->offset += ret; + + if (!st->marked && (st->offset > ((st->size * 80) / 100))) + { + st->marked = 1; + db_playcount_increment(NULL, st->id); + } + + if (ret == 0) + { + DPRINTF(E_LOG, L_HTTPD, "Done streaming file id %d\n", st->id); + + goto end_stream; + } + + evutil_timerclear(&tv); + ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n"); + + goto end_stream; + } + + return; + + end_stream: + evhttp_send_reply_end(st->req); + + close(st->fd); + evbuffer_free(st->evbuf); + free(st); +} + +/* Thread: httpd */ +void +httpd_stream_file(struct evhttp_request *req, int id) +{ + struct media_file_info *mfi; + struct stream_chunk *st; + struct stat sb; + struct timeval tv; + const char *param; + char buf[64]; + long offset; + int transcode; + int ret; + + transcode = 0; /* FIXME: not supported yet */ + + offset = 0; + param = evhttp_find_header(req->input_headers, "Range"); + if (param) + { + DPRINTF(E_DBG, L_HTTPD, "Found Range header: %s\n", param); + + ret = safe_atol(param + strlen("bytes="), &offset); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Invalid offset, starting from 0 (%s)\n", param); + offset = 0; + } + } + + mfi = db_fetch_item(NULL, id); + if (!mfi) + { + DPRINTF(E_LOG, L_HTTPD, "Item %d not found\n", id); + + evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + return; + } + + if (mfi->data_kind != 0) + { + evhttp_send_error(req, 500, "Cannot stream radio station"); + + db_dispose_item(mfi); + return; + } + + if (transcode) + { + /* Not supported yet */ + + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Transcoding not supported"); + + db_dispose_item(mfi); + return; + } + + /* Stream the raw file */ + DPRINTF(E_INF, L_HTTPD, "Preparing to stream %s\n", mfi->path); + + st = (struct stream_chunk *)malloc(sizeof(struct stream_chunk)); + if (!st) + { + DPRINTF(E_LOG, L_HTTPD, "Out of memory for struct stream_chunk\n"); + + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); + + db_dispose_item(mfi); + return; + } + + memset(st, 0, sizeof(struct stream_chunk)); + + st->fd = open(mfi->path, O_RDONLY); + if (st->fd < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno)); + + evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + + free(st); + db_dispose_item(mfi); + return; + } + + ret = stat(mfi->path, &sb); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno)); + + evhttp_send_error(req, HTTP_NOTFOUND, "Not Found"); + + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + + ret = lseek(st->fd, offset, SEEK_SET); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno)); + + evhttp_send_error(req, HTTP_BADREQUEST, "Bad Request"); + + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + + if (!evhttp_find_header(req->output_headers, "Content-Type") && mfi->type) + { + ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type); + if ((ret < 0) || (ret >= sizeof(buf))) + DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n"); + else + evhttp_add_header(req->output_headers, "Content-Type", buf); + } + + st->evbuf = evbuffer_new(); + if (!st->evbuf) + { + DPRINTF(E_LOG, L_HTTPD, "Could not allocate an evbuffer for streaming\n"); + + evhttp_clear_headers(req->output_headers); + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); + + close(st->fd); + free(st); + db_dispose_item(mfi); + return; + } + + evutil_timerclear(&tv); + ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv); + if (ret < 0) + { + DPRINTF(E_LOG, L_HTTPD, "Could not add one-shot event for streaming\n"); + + evhttp_clear_headers(req->output_headers); + evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error"); + + close(st->fd); + evbuffer_free(st->evbuf); + free(st); + db_dispose_item(mfi); + return; + } + + st->id = mfi->id; + st->size = sb.st_size; + st->offset = offset; + st->req = req; + + if (offset == 0) + evhttp_send_reply_start(req, HTTP_OK, "OK"); + else + { + DPRINTF(E_DBG, L_HTTPD, "Stream request with offset %ld\n", offset); + + ret = snprintf(buf, sizeof(buf), "bytes %ld-%ld/%ld", + offset, (long)sb.st_size, (long)sb.st_size + 1); + if ((ret < 0) || (ret >= sizeof(buf))) + DPRINTF(E_LOG, L_HTTPD, "Content-Range too large for buffer, dropping\n"); + else + evhttp_add_header(req->output_headers, "Content-Range", buf); + + evhttp_send_reply_start(req, 206, "Partial Content"); + } + + DPRINTF(E_INF, L_HTTPD, "Kicking off streaming for %s\n", mfi->path); + + db_dispose_item(mfi); +} + /* Thread: httpd */ static int path_is_legal(char *path) diff --git a/src/httpd.h b/src/httpd.h index 03dbe6ad..01afce25 100644 --- a/src/httpd.h +++ b/src/httpd.h @@ -2,6 +2,13 @@ #ifndef __HTTPD_H__ #define __HTTPD_H__ +#include +#include + + +void +httpd_stream_file(struct evhttp_request *req, int id); + int httpd_init(void);