Implement HTTP chunked streaming

Transcoding is not supported yet.

In the current state of affairs, we will crash if the client closes the
connection before the tranfser is done. This is a limitation of evhttp,
it is not possible to detect this condition. This will need to be fixed
in evhttp.
This commit is contained in:
Julien BLACHE 2009-04-25 10:40:47 +02:00
parent 5e8275a998
commit f05a55e7fc
2 changed files with 285 additions and 0 deletions

View File

@ -28,17 +28,22 @@
#include <limits.h> #include <limits.h>
#include <errno.h> #include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <time.h>
#include <sys/queue.h>
#include <event.h> #include <event.h>
#include <evhttp.h> #include <evhttp.h>
#include "daapd.h" #include "daapd.h"
#include "err.h" #include "err.h"
#include "ff-dbstruct.h"
#include "db-generic.h"
#include "conffile.h" #include "conffile.h"
#include "httpd.h" #include "httpd.h"
#include "httpd_rsp.h" #include "httpd_rsp.h"
#define STREAM_CHUNK_SIZE (512 * 1024)
#define WEBFACE_ROOT "/usr/share/mt-daapd/admin-root/" #define WEBFACE_ROOT "/usr/share/mt-daapd/admin-root/"
struct content_type_map { struct content_type_map {
@ -46,6 +51,16 @@ struct content_type_map {
char *ctype; char *ctype;
}; };
struct stream_chunk {
struct evhttp_request *req;
struct evbuffer *evbuf;
int id;
int fd;
off_t size;
off_t offset;
int marked;
};
static struct content_type_map ext2ctype[] = static struct content_type_map ext2ctype[] =
{ {
@ -68,6 +83,269 @@ static struct evhttp *evhttpd;
static pthread_t tid_httpd; static pthread_t tid_httpd;
static int
safe_atol(const char *str, long *val)
{
char *end;
long intval;
errno = 0;
intval = strtol(str, &end, 10);
if (((errno == ERANGE) && ((intval == LONG_MAX) || (intval == LONG_MIN)))
|| ((errno != 0) && (intval == 0)))
{
DPRINTF(E_WARN, L_RSP, "Invalid integer in string (%s): %s\n", str, strerror(errno));
return -1;
}
if (end == str)
{
DPRINTF(E_WARN, L_RSP, "No integer found in string (%s)\n", str);
return -1;
}
*val = intval;
return 0;
}
static void
stream_chunk_cb(int fd, short event, void *arg)
{
struct stream_chunk *st;
struct timeval tv;
int ret;
st = (struct stream_chunk *)arg;
ret = evbuffer_read(st->evbuf, st->fd, STREAM_CHUNK_SIZE);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Streaming error, file id %d\n", st->id);
evbuffer_free(st->evbuf);
goto end_stream;
}
DPRINTF(E_DBG, L_HTTPD, "Read %d bytes; streaming file id %d\n", ret, st->id);
if (ret > 0)
evhttp_send_reply_chunk(st->req, st->evbuf);
st->offset += ret;
if (!st->marked && (st->offset > ((st->size * 80) / 100)))
{
st->marked = 1;
db_playcount_increment(NULL, st->id);
}
if (ret == 0)
{
DPRINTF(E_LOG, L_HTTPD, "Done streaming file id %d\n", st->id);
goto end_stream;
}
evutil_timerclear(&tv);
ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not re-add one-shot event for streaming\n");
goto end_stream;
}
return;
end_stream:
evhttp_send_reply_end(st->req);
close(st->fd);
evbuffer_free(st->evbuf);
free(st);
}
/* Thread: httpd */
void
httpd_stream_file(struct evhttp_request *req, int id)
{
struct media_file_info *mfi;
struct stream_chunk *st;
struct stat sb;
struct timeval tv;
const char *param;
char buf[64];
long offset;
int transcode;
int ret;
transcode = 0; /* FIXME: not supported yet */
offset = 0;
param = evhttp_find_header(req->input_headers, "Range");
if (param)
{
DPRINTF(E_DBG, L_HTTPD, "Found Range header: %s\n", param);
ret = safe_atol(param + strlen("bytes="), &offset);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Invalid offset, starting from 0 (%s)\n", param);
offset = 0;
}
}
mfi = db_fetch_item(NULL, id);
if (!mfi)
{
DPRINTF(E_LOG, L_HTTPD, "Item %d not found\n", id);
evhttp_send_error(req, HTTP_NOTFOUND, "Not Found");
return;
}
if (mfi->data_kind != 0)
{
evhttp_send_error(req, 500, "Cannot stream radio station");
db_dispose_item(mfi);
return;
}
if (transcode)
{
/* Not supported yet */
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Transcoding not supported");
db_dispose_item(mfi);
return;
}
/* Stream the raw file */
DPRINTF(E_INF, L_HTTPD, "Preparing to stream %s\n", mfi->path);
st = (struct stream_chunk *)malloc(sizeof(struct stream_chunk));
if (!st)
{
DPRINTF(E_LOG, L_HTTPD, "Out of memory for struct stream_chunk\n");
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
db_dispose_item(mfi);
return;
}
memset(st, 0, sizeof(struct stream_chunk));
st->fd = open(mfi->path, O_RDONLY);
if (st->fd < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not open %s: %s\n", mfi->path, strerror(errno));
evhttp_send_error(req, HTTP_NOTFOUND, "Not Found");
free(st);
db_dispose_item(mfi);
return;
}
ret = stat(mfi->path, &sb);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not stat() %s: %s\n", mfi->path, strerror(errno));
evhttp_send_error(req, HTTP_NOTFOUND, "Not Found");
close(st->fd);
free(st);
db_dispose_item(mfi);
return;
}
ret = lseek(st->fd, offset, SEEK_SET);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not seek into %s: %s\n", mfi->path, strerror(errno));
evhttp_send_error(req, HTTP_BADREQUEST, "Bad Request");
close(st->fd);
free(st);
db_dispose_item(mfi);
return;
}
if (!evhttp_find_header(req->output_headers, "Content-Type") && mfi->type)
{
ret = snprintf(buf, sizeof(buf), "audio/%s", mfi->type);
if ((ret < 0) || (ret >= sizeof(buf)))
DPRINTF(E_LOG, L_HTTPD, "Content-Type too large for buffer, dropping\n");
else
evhttp_add_header(req->output_headers, "Content-Type", buf);
}
st->evbuf = evbuffer_new();
if (!st->evbuf)
{
DPRINTF(E_LOG, L_HTTPD, "Could not allocate an evbuffer for streaming\n");
evhttp_clear_headers(req->output_headers);
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
close(st->fd);
free(st);
db_dispose_item(mfi);
return;
}
evutil_timerclear(&tv);
ret = event_base_once(evbase_httpd, -1, EV_TIMEOUT, stream_chunk_cb, st, &tv);
if (ret < 0)
{
DPRINTF(E_LOG, L_HTTPD, "Could not add one-shot event for streaming\n");
evhttp_clear_headers(req->output_headers);
evhttp_send_error(req, HTTP_SERVUNAVAIL, "Internal Server Error");
close(st->fd);
evbuffer_free(st->evbuf);
free(st);
db_dispose_item(mfi);
return;
}
st->id = mfi->id;
st->size = sb.st_size;
st->offset = offset;
st->req = req;
if (offset == 0)
evhttp_send_reply_start(req, HTTP_OK, "OK");
else
{
DPRINTF(E_DBG, L_HTTPD, "Stream request with offset %ld\n", offset);
ret = snprintf(buf, sizeof(buf), "bytes %ld-%ld/%ld",
offset, (long)sb.st_size, (long)sb.st_size + 1);
if ((ret < 0) || (ret >= sizeof(buf)))
DPRINTF(E_LOG, L_HTTPD, "Content-Range too large for buffer, dropping\n");
else
evhttp_add_header(req->output_headers, "Content-Range", buf);
evhttp_send_reply_start(req, 206, "Partial Content");
}
DPRINTF(E_INF, L_HTTPD, "Kicking off streaming for %s\n", mfi->path);
db_dispose_item(mfi);
}
/* Thread: httpd */ /* Thread: httpd */
static int static int
path_is_legal(char *path) path_is_legal(char *path)

View File

@ -2,6 +2,13 @@
#ifndef __HTTPD_H__ #ifndef __HTTPD_H__
#define __HTTPD_H__ #define __HTTPD_H__
#include <event.h>
#include <evhttp.h>
void
httpd_stream_file(struct evhttp_request *req, int id);
int int
httpd_init(void); httpd_init(void);