diff --git a/src/httpd_libevhttp.c b/src/httpd_libevhttp.c index c4402b1f..52b577eb 100644 --- a/src/httpd_libevhttp.c +++ b/src/httpd_libevhttp.c @@ -23,12 +23,11 @@ #include #include #include -#include +#include // TAILQ_FOREACH #include // listen() #include -#include -#include +#include // flags in struct evhttp #include #include #include @@ -37,6 +36,13 @@ #include "logger.h" #include "httpd_internal.h" +#define DEBUG_ALLOC 1 + +#ifdef DEBUG_ALLOC +#include +static pthread_mutex_t debug_alloc_lck = PTHREAD_MUTEX_INITIALIZER; +static int debug_alloc_count; +#endif struct httpd_uri_parsed { @@ -144,13 +150,15 @@ httpd_request_evbase_get(struct httpd_request *hreq) return evhttp_connection_get_base(conn); } -int alloc_count; - void httpd_request_free(struct httpd_request *hreq) { - alloc_count--; - DPRINTF(E_LOG, L_HTTPD, "DEALLOC - COUNT %d\n", alloc_count); +#ifdef DEBUG_ALLOC + pthread_mutex_lock(&debug_alloc_lck); + debug_alloc_count--; + pthread_mutex_unlock(&debug_alloc_lck); + DPRINTF(E_DBG, L_HTTPD, "DEALLOC hreq - count is %d\n", debug_alloc_count); +#endif if (!hreq) return; @@ -171,8 +179,12 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen CHECK_NULL(L_HTTPD, hreq = calloc(1, sizeof(struct httpd_request))); - alloc_count++; - DPRINTF(E_LOG, L_HTTPD, "ALLOC - COUNT %d\n", alloc_count); +#ifdef DEBUG_ALLOC + pthread_mutex_lock(&debug_alloc_lck); + debug_alloc_count++; + pthread_mutex_unlock(&debug_alloc_lck); + DPRINTF(E_DBG, L_HTTPD, "ALLOC hreq - count is %d\n", debug_alloc_count); +#endif // Populate hreq by getting values from the backend (or from the caller) hreq->backend = backend; diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 28fccc13..001899d6 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -40,8 +40,11 @@ struct streaming_session { int fd; struct event *readev; - bool require_icy; + struct evbuffer *readbuf; size_t bytes_sent; + + bool icy_is_requested; + size_t icy_remaining; }; static struct media_quality streaming_default_quality = { @@ -51,13 +54,110 @@ static struct media_quality streaming_default_quality = { .bit_rate = 128000, }; -/* As streaming quality goes up, we send more data to the remote client. With a - * smaller ICY_METAINT value we have to splice metadata more frequently - on - * some devices with small input buffers, a higher quality stream and low - * ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge - */ + +/* ------------------------------ ICY metadata -------------------------------*/ + +// To test mp3 and ICY tagm it is good to use: +// mpv --display-tags=* http://localhost:3689/stream.mp3 + +#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes) +#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer) + +// As streaming quality goes up, we send more data to the remote client. With a +// smaller ICY_METAINT value we have to splice metadata more frequently - on +// some devices with small input buffers, a higher quality stream and low +// ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge static unsigned short streaming_icy_metaint = 16384; +static pthread_mutex_t streaming_metadata_lck; +static char streaming_icy_title[STREAMING_ICY_METATITLELEN_MAX]; + + +// We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must +// provide a buf of this size to avoid needless mallocs +// +// The icy meta block is defined by a single byte indicating how many double byte +// words used for the actual meta. Unused bytes are null padded +// +// https://stackoverflow.com/questions/4911062/pulling-track-info-from-an-audio-stream-using-php/4914538#4914538 +// http://www.smackfu.com/stuff/programming/shoutcast.html +static uint8_t * +icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], unsigned *buflen, const char *title) +{ + unsigned titlelen; + unsigned metalen; + uint8_t no16s; + + *buflen = 0; + + if (title == NULL) + { + no16s = 0; + memcpy(buf, &no16s, 1); + + *buflen = 1; + } + else + { + titlelen = strlen(title); + if (titlelen > STREAMING_ICY_METATITLELEN_MAX) + titlelen = STREAMING_ICY_METATITLELEN_MAX; // dont worry about the null byte + + // [0] 1x byte N, indicate the total number of 16 bytes words required + // to represent the meta data + // [1..N] meta data book ended by "StreamTitle='" and "';" + // + // The '15' is strlen of StreamTitle=' + '; + no16s = (15 + titlelen)/16 +1; + metalen = 1 + no16s*16; + memset(buf, 0, metalen); + + memcpy(buf, &no16s, 1); + memcpy(buf+1, (const uint8_t*)"StreamTitle='", 13); + memcpy(buf+14, title, titlelen); + memcpy(buf+14+titlelen, (const uint8_t*)"';", 2); + + *buflen = metalen; + } + + return buf; +} + +static void +icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining) +{ + uint8_t meta[STREAMING_ICY_METALEN_MAX + 1]; + unsigned metalen; + size_t buf_remaining; + size_t consume; + + for (buf_remaining = evbuffer_get_length(in); buf_remaining > 0; buf_remaining -= consume) + { + consume = MIN(*icy_remaining, buf_remaining); + evbuffer_remove_buffer(in, out, consume); + *icy_remaining -= consume; + if (*icy_remaining == 0) + { + pthread_mutex_lock(&streaming_metadata_lck); + icy_meta_create(meta, &metalen, streaming_icy_title); + pthread_mutex_unlock(&streaming_metadata_lck); + + evbuffer_add(out, meta, metalen); + *icy_remaining = streaming_icy_metaint; + } + } +} + +// Thread: player. TODO Would be nice to avoid the lock. Consider moving all the +// ICY tag stuff to streaming.c and make a STREAMING_FORMAT_MP3_ICY? +static void +icy_metadata_cb(char *metadata) +{ + pthread_mutex_lock(&streaming_metadata_lck); + snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s", metadata); + pthread_mutex_unlock(&streaming_metadata_lck); +} + /* ----------------------------- Session helpers ---------------------------- */ @@ -73,18 +173,21 @@ session_free(struct streaming_session *session) event_free(session->readev); } + evbuffer_free(session->readbuf); free(session); } static struct streaming_session * -session_new(struct httpd_request *hreq, bool require_icy) +session_new(struct httpd_request *hreq, bool icy_is_requested) { struct streaming_session *session; CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session))); + CHECK_NULL(L_STREAMING, session->readbuf = evbuffer_new()); session->hreq = hreq; - session->require_icy = require_icy; + session->icy_is_requested = icy_is_requested; + session->icy_remaining = streaming_icy_metaint; return session; } @@ -121,7 +224,7 @@ read_cb(evutil_socket_t fd, short event, void *arg) CHECK_NULL(L_STREAMING, hreq = session->hreq); - len = evbuffer_read(hreq->out_body, fd, -1); + len = evbuffer_read(session->readbuf, fd, -1); if (len < 0 && errno != EAGAIN) { httpd_request_closecb_set(hreq, NULL, NULL); @@ -129,6 +232,11 @@ read_cb(evutil_socket_t fd, short event, void *arg) return; } + if (session->icy_is_requested) + icy_meta_splice(hreq->out_body, session->readbuf, &session->icy_remaining); + else + evbuffer_add_buffer(hreq->out_body, session->readbuf); + httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL); session->bytes_sent += len; @@ -144,19 +252,19 @@ streaming_mp3_handler(struct httpd_request *hreq) struct event_base *evbase; const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *param; - bool require_icy; + bool icy_is_requested; char buf[9]; param = httpd_header_find(hreq->in_headers, "Icy-MetaData"); - require_icy = (param && strcmp(param, "1") == 0); - if (require_icy) + icy_is_requested = (param && strcmp(param, "1") == 0); + if (icy_is_requested) { httpd_header_add(hreq->out_headers, "icy-name", name); - snprintf(buf, sizeof(buf)-1, "%d", streaming_icy_metaint); + snprintf(buf, sizeof(buf), "%d", streaming_icy_metaint); httpd_header_add(hreq->out_headers, "icy-metaint", buf); } - session = session_new(hreq, require_icy); + session = session_new(hreq, icy_is_requested); if (!session) return -1; @@ -248,6 +356,9 @@ streaming_init(void) else DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint); + CHECK_ERR(L_STREAMING, mutex_init(&streaming_metadata_lck)); + streaming_metadatacb_register(icy_metadata_cb); + return 0; } diff --git a/src/outputs.c b/src/outputs.c index b3786e01..c4e5ba4b 100644 --- a/src/outputs.c +++ b/src/outputs.c @@ -467,6 +467,16 @@ metadata_cb_prepare(void *arg) event_active(metadata->ev, 0, 0); } +static void +metadata_free(struct output_metadata *metadata) +{ + if (!metadata) + return; + if (metadata->ev) + event_free(metadata->ev); + free(metadata); +} + static void metadata_send(enum output_types type, uint32_t item_id, bool startup, output_metadata_finalize_cb cb) { @@ -689,6 +699,11 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state state) event_active(outputs_deferredev, 0, 0); } +void +outputs_metadata_free(struct output_metadata *metadata) +{ + metadata_free(metadata); +} /* ---------------------------- Called by player ---------------------------- */ diff --git a/src/outputs.h b/src/outputs.h index 7997a4e7..434d88ad 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -252,7 +252,7 @@ struct output_definition // Called from worker thread for async preparation of metadata (e.g. getting // artwork, which might involce downloading image data). The prepared data is - // saved to metadata->data, which metadata_send() can use. + // saved to metadata->priv, which metadata_send() can use. void *(*metadata_prepare)(struct output_metadata *metadata); // Send metadata to outputs. Ownership of *metadata is transferred. @@ -284,6 +284,9 @@ outputs_quality_unsubscribe(struct media_quality *quality); void outputs_cb(int callback_id, uint64_t device_id, enum output_device_state); +void +outputs_metadata_free(struct output_metadata *metadata); + /* ---------------------------- Called by player ---------------------------- */ // Ownership of *add is transferred, so don't address after calling. Instead you diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index 9929e7ca..0449734f 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -34,6 +34,7 @@ #include "worker.h" #include "transcode.h" #include "logger.h" +#include "db.h" /* About * @@ -43,9 +44,14 @@ * player, but there are clients, it instead writes silence to the fd. */ -// How many times per second we send silence when player is idle (to prevent -// client from hanging up). This value matches the player tick interval. -#define SILENCE_TICKS_PER_SEC 100 +// Seconds between sending silence when player is idle +// (to prevent client from hanging up) +#define STREAMING_SILENCE_INTERVAL 1 + +// How many bytes of silence we encode with the above interval. There is no +// particular reason for using this size, just that it seems to have worked for +// a while. +#define SILENCE_BUF_SIZE STOB(352, 16, 2) // The wanted structure represents a particular format and quality that should // be produced for one or more sessions. A pipe pair is created for each session @@ -79,6 +85,13 @@ struct streaming_ctx struct event *silenceev; struct timeval silencetv; struct media_quality last_quality; + + // seqnum may wrap around so must be unsigned + unsigned int seqnum; + unsigned int seqnum_encode_next; + + // callback with new metadata, e.g. for ICY tags + void (*metadatacb)(char *metadata); }; struct encode_cmdarg @@ -86,13 +99,16 @@ struct encode_cmdarg uint8_t *buf; size_t bufsize; int samples; + unsigned int seqnum; struct media_quality quality; }; static pthread_mutex_t streaming_wanted_lck; +static pthread_cond_t streaming_sequence_cond; + static struct streaming_ctx streaming = { - .silencetv = { 0, (1000000 / SILENCE_TICKS_PER_SEC) }, + .silencetv = { STREAMING_SILENCE_INTERVAL, 0 }, }; extern struct event_base *evbase_player; @@ -394,9 +410,15 @@ encode_data_cb(void *arg) } pthread_mutex_lock(&streaming_wanted_lck); + + // To make sure we process the frames in order + while (ctx->seqnum != streaming.seqnum_encode_next) + pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck); + for (w = streaming.wanted; w; w = next) { next = w->next; + ret = encode_frame(w, ctx->quality, frame); if (ret < 0) wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error @@ -415,6 +437,9 @@ encode_data_cb(void *arg) if (w->refcount == 0) wanted_remove(&streaming.wanted, w); } + + streaming.seqnum_encode_next++; + pthread_cond_broadcast(&streaming_sequence_cond); pthread_mutex_unlock(&streaming_wanted_lck); out: @@ -422,67 +447,25 @@ encode_data_cb(void *arg) free(ctx->buf); } - -/* ----------------------------- Thread: Player ----------------------------- */ - -static void -encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) +static void * +streaming_metadata_prepare(struct output_metadata *metadata) { - struct encode_cmdarg ctx; + struct db_queue_item *queue_item; + char *title; - if (quality.channels == 0) + queue_item = db_queue_fetch_byitemid(metadata->item_id); + if (!queue_item) { - DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", - quality.sample_rate, quality.bits_per_sample, quality.channels); - return; + DPRINTF(E_LOG, L_STREAMING, "Could not fetch queue item id %d for new metadata\n", metadata->item_id); + return NULL; } - ctx.buf = buf; - ctx.bufsize = bufsize; - ctx.samples = samples; - ctx.quality = quality; + title = safe_asprintf("%s - %s", queue_item->title, queue_item->artist); + free_queue_item(queue_item, 0); - worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); + return title; } -static void -streaming_write(struct output_buffer *obuf) -{ - uint8_t *rawbuf; - - if (!streaming.wanted) - return; - - // Need to make a copy since it will be passed of to the async worker - CHECK_NULL(L_STREAMING, rawbuf = malloc(obuf->data[0].bufsize)); - memcpy(rawbuf, obuf->data[0].buffer, obuf->data[0].bufsize); - - encode_worker_invoke(rawbuf, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality); - - streaming.last_quality = obuf->data[0].quality; - - // In case this is the last player write() we want to start streaming silence - evtimer_add(streaming.silenceev, &streaming.silencetv); -} - -static void -silenceev_cb(evutil_socket_t fd, short event, void *arg) -{ - uint8_t *rawbuf; - size_t bufsize; - int samples; - - // TODO what if everyone has disconnected? Check for streaming.wanted? - - samples = streaming.last_quality.sample_rate / SILENCE_TICKS_PER_SEC; - bufsize = STOB(samples, streaming.last_quality.bits_per_sample, streaming.last_quality.channels); - - CHECK_NULL(L_STREAMING, rawbuf = calloc(1, bufsize)); - - encode_worker_invoke(rawbuf, bufsize, samples, streaming.last_quality); - - evtimer_add(streaming.silenceev, &streaming.silencetv); -} /* ----------------------------- Thread: httpd ------------------------------ */ @@ -526,11 +509,90 @@ streaming_session_deregister(int readfd) pthread_mutex_unlock(&streaming_wanted_lck); } +// Not thread safe, but only called once during httpd init +void +streaming_metadatacb_register(streaming_metadatacb cb) +{ + streaming.metadatacb = cb; +} + +/* ----------------------------- Thread: Player ----------------------------- */ + +static void +encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) +{ + struct encode_cmdarg ctx; + + if (quality.channels == 0) + { + DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", + quality.sample_rate, quality.bits_per_sample, quality.channels); + return; + } + + CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize)); + memcpy(ctx.buf, buf, bufsize); + ctx.bufsize = bufsize; + ctx.samples = samples; + ctx.quality = quality; + ctx.seqnum = streaming.seqnum; + + streaming.seqnum++; + + worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); +} + +static void +silenceev_cb(evutil_socket_t fd, short event, void *arg) +{ + uint8_t silence[SILENCE_BUF_SIZE] = { 0 }; + int samples; + + // No lock since this is just an early exit, it doesn't need to be accurate + if (!streaming.wanted) + return; + + samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels); + + encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality); + + evtimer_add(streaming.silenceev, &streaming.silencetv); +} + +static void +streaming_write(struct output_buffer *obuf) +{ + // No lock since this is just an early exit, it doesn't need to be accurate + if (!streaming.wanted) + return; + + encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality); + + streaming.last_quality = obuf->data[0].quality; + + // In case this is the last player write() we want to start streaming silence + evtimer_add(streaming.silenceev, &streaming.silencetv); +} + +static void +streaming_metadata_send(struct output_metadata *metadata) +{ + char *title = metadata->priv; + + // Calls back to httpd_streaming to update the title + if (streaming.metadatacb) + streaming.metadatacb(title); + + free(title); + outputs_metadata_free(metadata); +} + static int streaming_init(void) { CHECK_NULL(L_STREAMING, streaming.silenceev = event_new(evbase_player, -1, 0, silenceev_cb, NULL)); - CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck)); + CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck)); + CHECK_ERR(L_STREAMING, pthread_cond_init(&streaming_sequence_cond, NULL)); return 0; } @@ -541,6 +603,7 @@ streaming_deinit(void) event_free(streaming.silenceev); } + struct output_definition output_streaming = { .name = "mp3 streaming", @@ -550,4 +613,6 @@ struct output_definition output_streaming = .init = streaming_init, .deinit = streaming_deinit, .write = streaming_write, + .metadata_prepare = streaming_metadata_prepare, + .metadata_send = streaming_metadata_send, }; diff --git a/src/outputs/streaming.h b/src/outputs/streaming.h index 901112ae..34673345 100644 --- a/src/outputs/streaming.h +++ b/src/outputs/streaming.h @@ -4,6 +4,8 @@ #include "misc.h" // struct media_quality +typedef void (*streaming_metadatacb)(char *metadata); + enum streaming_format { STREAMING_FORMAT_MP3, @@ -15,4 +17,7 @@ streaming_session_register(enum streaming_format format, struct media_quality qu void streaming_session_deregister(int readfd); +void +streaming_metadatacb_register(streaming_metadatacb cb); + #endif /* !__STREAMING_H__ */