[streaming] ICY handling using output metadata events/callbacks

This commit is contained in:
ejurgensen 2023-01-22 17:23:32 +01:00
parent 4d0c297901
commit e77cb3f94e
6 changed files with 293 additions and 82 deletions

View File

@ -23,12 +23,11 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <sys/queue.h> #include <sys/queue.h> // TAILQ_FOREACH
#include <sys/socket.h> // listen() #include <sys/socket.h> // listen()
#include <event2/http.h> #include <event2/http.h>
#include <event2/http_struct.h> #include <event2/http_struct.h> // flags in struct evhttp
#include <event2/http_compat.h>
#include <event2/keyvalq_struct.h> #include <event2/keyvalq_struct.h>
#include <event2/buffer.h> #include <event2/buffer.h>
#include <event2/bufferevent.h> #include <event2/bufferevent.h>
@ -37,6 +36,13 @@
#include "logger.h" #include "logger.h"
#include "httpd_internal.h" #include "httpd_internal.h"
#define DEBUG_ALLOC 1
#ifdef DEBUG_ALLOC
#include <pthread.h>
static pthread_mutex_t debug_alloc_lck = PTHREAD_MUTEX_INITIALIZER;
static int debug_alloc_count;
#endif
struct httpd_uri_parsed struct httpd_uri_parsed
{ {
@ -144,13 +150,15 @@ httpd_request_evbase_get(struct httpd_request *hreq)
return evhttp_connection_get_base(conn); return evhttp_connection_get_base(conn);
} }
int alloc_count;
void void
httpd_request_free(struct httpd_request *hreq) httpd_request_free(struct httpd_request *hreq)
{ {
alloc_count--; #ifdef DEBUG_ALLOC
DPRINTF(E_LOG, L_HTTPD, "DEALLOC - COUNT %d\n", alloc_count); pthread_mutex_lock(&debug_alloc_lck);
debug_alloc_count--;
pthread_mutex_unlock(&debug_alloc_lck);
DPRINTF(E_DBG, L_HTTPD, "DEALLOC hreq - count is %d\n", debug_alloc_count);
#endif
if (!hreq) if (!hreq)
return; return;
@ -171,8 +179,12 @@ httpd_request_new(httpd_backend *backend, const char *uri, const char *user_agen
CHECK_NULL(L_HTTPD, hreq = calloc(1, sizeof(struct httpd_request))); CHECK_NULL(L_HTTPD, hreq = calloc(1, sizeof(struct httpd_request)));
alloc_count++; #ifdef DEBUG_ALLOC
DPRINTF(E_LOG, L_HTTPD, "ALLOC - COUNT %d\n", alloc_count); pthread_mutex_lock(&debug_alloc_lck);
debug_alloc_count++;
pthread_mutex_unlock(&debug_alloc_lck);
DPRINTF(E_DBG, L_HTTPD, "ALLOC hreq - count is %d\n", debug_alloc_count);
#endif
// Populate hreq by getting values from the backend (or from the caller) // Populate hreq by getting values from the backend (or from the caller)
hreq->backend = backend; hreq->backend = backend;

View File

@ -40,8 +40,11 @@ struct streaming_session {
int fd; int fd;
struct event *readev; struct event *readev;
bool require_icy; struct evbuffer *readbuf;
size_t bytes_sent; size_t bytes_sent;
bool icy_is_requested;
size_t icy_remaining;
}; };
static struct media_quality streaming_default_quality = { static struct media_quality streaming_default_quality = {
@ -51,13 +54,110 @@ static struct media_quality streaming_default_quality = {
.bit_rate = 128000, .bit_rate = 128000,
}; };
/* As streaming quality goes up, we send more data to the remote client. With a
* smaller ICY_METAINT value we have to splice metadata more frequently - on /* ------------------------------ ICY metadata -------------------------------*/
* some devices with small input buffers, a higher quality stream and low
* ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge // To test mp3 and ICY tagm it is good to use:
*/ // mpv --display-tags=* http://localhost:3689/stream.mp3
#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes)
#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer)
// As streaming quality goes up, we send more data to the remote client. With a
// smaller ICY_METAINT value we have to splice metadata more frequently - on
// some devices with small input buffers, a higher quality stream and low
// ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge
static unsigned short streaming_icy_metaint = 16384; static unsigned short streaming_icy_metaint = 16384;
static pthread_mutex_t streaming_metadata_lck;
static char streaming_icy_title[STREAMING_ICY_METATITLELEN_MAX];
// We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must
// provide a buf of this size to avoid needless mallocs
//
// The icy meta block is defined by a single byte indicating how many double byte
// words used for the actual meta. Unused bytes are null padded
//
// https://stackoverflow.com/questions/4911062/pulling-track-info-from-an-audio-stream-using-php/4914538#4914538
// http://www.smackfu.com/stuff/programming/shoutcast.html
static uint8_t *
icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], unsigned *buflen, const char *title)
{
unsigned titlelen;
unsigned metalen;
uint8_t no16s;
*buflen = 0;
if (title == NULL)
{
no16s = 0;
memcpy(buf, &no16s, 1);
*buflen = 1;
}
else
{
titlelen = strlen(title);
if (titlelen > STREAMING_ICY_METATITLELEN_MAX)
titlelen = STREAMING_ICY_METATITLELEN_MAX; // dont worry about the null byte
// [0] 1x byte N, indicate the total number of 16 bytes words required
// to represent the meta data
// [1..N] meta data book ended by "StreamTitle='" and "';"
//
// The '15' is strlen of StreamTitle=' + ';
no16s = (15 + titlelen)/16 +1;
metalen = 1 + no16s*16;
memset(buf, 0, metalen);
memcpy(buf, &no16s, 1);
memcpy(buf+1, (const uint8_t*)"StreamTitle='", 13);
memcpy(buf+14, title, titlelen);
memcpy(buf+14+titlelen, (const uint8_t*)"';", 2);
*buflen = metalen;
}
return buf;
}
static void
icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining)
{
uint8_t meta[STREAMING_ICY_METALEN_MAX + 1];
unsigned metalen;
size_t buf_remaining;
size_t consume;
for (buf_remaining = evbuffer_get_length(in); buf_remaining > 0; buf_remaining -= consume)
{
consume = MIN(*icy_remaining, buf_remaining);
evbuffer_remove_buffer(in, out, consume);
*icy_remaining -= consume;
if (*icy_remaining == 0)
{
pthread_mutex_lock(&streaming_metadata_lck);
icy_meta_create(meta, &metalen, streaming_icy_title);
pthread_mutex_unlock(&streaming_metadata_lck);
evbuffer_add(out, meta, metalen);
*icy_remaining = streaming_icy_metaint;
}
}
}
// Thread: player. TODO Would be nice to avoid the lock. Consider moving all the
// ICY tag stuff to streaming.c and make a STREAMING_FORMAT_MP3_ICY?
static void
icy_metadata_cb(char *metadata)
{
pthread_mutex_lock(&streaming_metadata_lck);
snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s", metadata);
pthread_mutex_unlock(&streaming_metadata_lck);
}
/* ----------------------------- Session helpers ---------------------------- */ /* ----------------------------- Session helpers ---------------------------- */
@ -73,18 +173,21 @@ session_free(struct streaming_session *session)
event_free(session->readev); event_free(session->readev);
} }
evbuffer_free(session->readbuf);
free(session); free(session);
} }
static struct streaming_session * static struct streaming_session *
session_new(struct httpd_request *hreq, bool require_icy) session_new(struct httpd_request *hreq, bool icy_is_requested)
{ {
struct streaming_session *session; struct streaming_session *session;
CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session))); CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session)));
CHECK_NULL(L_STREAMING, session->readbuf = evbuffer_new());
session->hreq = hreq; session->hreq = hreq;
session->require_icy = require_icy; session->icy_is_requested = icy_is_requested;
session->icy_remaining = streaming_icy_metaint;
return session; return session;
} }
@ -121,7 +224,7 @@ read_cb(evutil_socket_t fd, short event, void *arg)
CHECK_NULL(L_STREAMING, hreq = session->hreq); CHECK_NULL(L_STREAMING, hreq = session->hreq);
len = evbuffer_read(hreq->out_body, fd, -1); len = evbuffer_read(session->readbuf, fd, -1);
if (len < 0 && errno != EAGAIN) if (len < 0 && errno != EAGAIN)
{ {
httpd_request_closecb_set(hreq, NULL, NULL); httpd_request_closecb_set(hreq, NULL, NULL);
@ -129,6 +232,11 @@ read_cb(evutil_socket_t fd, short event, void *arg)
return; return;
} }
if (session->icy_is_requested)
icy_meta_splice(hreq->out_body, session->readbuf, &session->icy_remaining);
else
evbuffer_add_buffer(hreq->out_body, session->readbuf);
httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL); httpd_send_reply_chunk(hreq, hreq->out_body, NULL, NULL);
session->bytes_sent += len; session->bytes_sent += len;
@ -144,19 +252,19 @@ streaming_mp3_handler(struct httpd_request *hreq)
struct event_base *evbase; struct event_base *evbase;
const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name");
const char *param; const char *param;
bool require_icy; bool icy_is_requested;
char buf[9]; char buf[9];
param = httpd_header_find(hreq->in_headers, "Icy-MetaData"); param = httpd_header_find(hreq->in_headers, "Icy-MetaData");
require_icy = (param && strcmp(param, "1") == 0); icy_is_requested = (param && strcmp(param, "1") == 0);
if (require_icy) if (icy_is_requested)
{ {
httpd_header_add(hreq->out_headers, "icy-name", name); httpd_header_add(hreq->out_headers, "icy-name", name);
snprintf(buf, sizeof(buf)-1, "%d", streaming_icy_metaint); snprintf(buf, sizeof(buf), "%d", streaming_icy_metaint);
httpd_header_add(hreq->out_headers, "icy-metaint", buf); httpd_header_add(hreq->out_headers, "icy-metaint", buf);
} }
session = session_new(hreq, require_icy); session = session_new(hreq, icy_is_requested);
if (!session) if (!session)
return -1; return -1;
@ -248,6 +356,9 @@ streaming_init(void)
else else
DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint); DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint);
CHECK_ERR(L_STREAMING, mutex_init(&streaming_metadata_lck));
streaming_metadatacb_register(icy_metadata_cb);
return 0; return 0;
} }

View File

@ -467,6 +467,16 @@ metadata_cb_prepare(void *arg)
event_active(metadata->ev, 0, 0); event_active(metadata->ev, 0, 0);
} }
static void
metadata_free(struct output_metadata *metadata)
{
if (!metadata)
return;
if (metadata->ev)
event_free(metadata->ev);
free(metadata);
}
static void static void
metadata_send(enum output_types type, uint32_t item_id, bool startup, output_metadata_finalize_cb cb) metadata_send(enum output_types type, uint32_t item_id, bool startup, output_metadata_finalize_cb cb)
{ {
@ -689,6 +699,11 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state state)
event_active(outputs_deferredev, 0, 0); event_active(outputs_deferredev, 0, 0);
} }
void
outputs_metadata_free(struct output_metadata *metadata)
{
metadata_free(metadata);
}
/* ---------------------------- Called by player ---------------------------- */ /* ---------------------------- Called by player ---------------------------- */

View File

@ -252,7 +252,7 @@ struct output_definition
// Called from worker thread for async preparation of metadata (e.g. getting // Called from worker thread for async preparation of metadata (e.g. getting
// artwork, which might involce downloading image data). The prepared data is // artwork, which might involce downloading image data). The prepared data is
// saved to metadata->data, which metadata_send() can use. // saved to metadata->priv, which metadata_send() can use.
void *(*metadata_prepare)(struct output_metadata *metadata); void *(*metadata_prepare)(struct output_metadata *metadata);
// Send metadata to outputs. Ownership of *metadata is transferred. // Send metadata to outputs. Ownership of *metadata is transferred.
@ -284,6 +284,9 @@ outputs_quality_unsubscribe(struct media_quality *quality);
void void
outputs_cb(int callback_id, uint64_t device_id, enum output_device_state); outputs_cb(int callback_id, uint64_t device_id, enum output_device_state);
void
outputs_metadata_free(struct output_metadata *metadata);
/* ---------------------------- Called by player ---------------------------- */ /* ---------------------------- Called by player ---------------------------- */
// Ownership of *add is transferred, so don't address after calling. Instead you // Ownership of *add is transferred, so don't address after calling. Instead you

View File

@ -34,6 +34,7 @@
#include "worker.h" #include "worker.h"
#include "transcode.h" #include "transcode.h"
#include "logger.h" #include "logger.h"
#include "db.h"
/* About /* About
* *
@ -43,9 +44,14 @@
* player, but there are clients, it instead writes silence to the fd. * player, but there are clients, it instead writes silence to the fd.
*/ */
// How many times per second we send silence when player is idle (to prevent // Seconds between sending silence when player is idle
// client from hanging up). This value matches the player tick interval. // (to prevent client from hanging up)
#define SILENCE_TICKS_PER_SEC 100 #define STREAMING_SILENCE_INTERVAL 1
// How many bytes of silence we encode with the above interval. There is no
// particular reason for using this size, just that it seems to have worked for
// a while.
#define SILENCE_BUF_SIZE STOB(352, 16, 2)
// The wanted structure represents a particular format and quality that should // The wanted structure represents a particular format and quality that should
// be produced for one or more sessions. A pipe pair is created for each session // be produced for one or more sessions. A pipe pair is created for each session
@ -79,6 +85,13 @@ struct streaming_ctx
struct event *silenceev; struct event *silenceev;
struct timeval silencetv; struct timeval silencetv;
struct media_quality last_quality; struct media_quality last_quality;
// seqnum may wrap around so must be unsigned
unsigned int seqnum;
unsigned int seqnum_encode_next;
// callback with new metadata, e.g. for ICY tags
void (*metadatacb)(char *metadata);
}; };
struct encode_cmdarg struct encode_cmdarg
@ -86,13 +99,16 @@ struct encode_cmdarg
uint8_t *buf; uint8_t *buf;
size_t bufsize; size_t bufsize;
int samples; int samples;
unsigned int seqnum;
struct media_quality quality; struct media_quality quality;
}; };
static pthread_mutex_t streaming_wanted_lck; static pthread_mutex_t streaming_wanted_lck;
static pthread_cond_t streaming_sequence_cond;
static struct streaming_ctx streaming = static struct streaming_ctx streaming =
{ {
.silencetv = { 0, (1000000 / SILENCE_TICKS_PER_SEC) }, .silencetv = { STREAMING_SILENCE_INTERVAL, 0 },
}; };
extern struct event_base *evbase_player; extern struct event_base *evbase_player;
@ -394,9 +410,15 @@ encode_data_cb(void *arg)
} }
pthread_mutex_lock(&streaming_wanted_lck); pthread_mutex_lock(&streaming_wanted_lck);
// To make sure we process the frames in order
while (ctx->seqnum != streaming.seqnum_encode_next)
pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck);
for (w = streaming.wanted; w; w = next) for (w = streaming.wanted; w; w = next)
{ {
next = w->next; next = w->next;
ret = encode_frame(w, ctx->quality, frame); ret = encode_frame(w, ctx->quality, frame);
if (ret < 0) if (ret < 0)
wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error
@ -415,6 +437,9 @@ encode_data_cb(void *arg)
if (w->refcount == 0) if (w->refcount == 0)
wanted_remove(&streaming.wanted, w); wanted_remove(&streaming.wanted, w);
} }
streaming.seqnum_encode_next++;
pthread_cond_broadcast(&streaming_sequence_cond);
pthread_mutex_unlock(&streaming_wanted_lck); pthread_mutex_unlock(&streaming_wanted_lck);
out: out:
@ -422,67 +447,25 @@ encode_data_cb(void *arg)
free(ctx->buf); free(ctx->buf);
} }
static void *
/* ----------------------------- Thread: Player ----------------------------- */ streaming_metadata_prepare(struct output_metadata *metadata)
static void
encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality)
{ {
struct encode_cmdarg ctx; struct db_queue_item *queue_item;
char *title;
if (quality.channels == 0) queue_item = db_queue_fetch_byitemid(metadata->item_id);
if (!queue_item)
{ {
DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", DPRINTF(E_LOG, L_STREAMING, "Could not fetch queue item id %d for new metadata\n", metadata->item_id);
quality.sample_rate, quality.bits_per_sample, quality.channels); return NULL;
return;
} }
ctx.buf = buf; title = safe_asprintf("%s - %s", queue_item->title, queue_item->artist);
ctx.bufsize = bufsize; free_queue_item(queue_item, 0);
ctx.samples = samples;
ctx.quality = quality;
worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); return title;
} }
static void
streaming_write(struct output_buffer *obuf)
{
uint8_t *rawbuf;
if (!streaming.wanted)
return;
// Need to make a copy since it will be passed of to the async worker
CHECK_NULL(L_STREAMING, rawbuf = malloc(obuf->data[0].bufsize));
memcpy(rawbuf, obuf->data[0].buffer, obuf->data[0].bufsize);
encode_worker_invoke(rawbuf, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality);
streaming.last_quality = obuf->data[0].quality;
// In case this is the last player write() we want to start streaming silence
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
silenceev_cb(evutil_socket_t fd, short event, void *arg)
{
uint8_t *rawbuf;
size_t bufsize;
int samples;
// TODO what if everyone has disconnected? Check for streaming.wanted?
samples = streaming.last_quality.sample_rate / SILENCE_TICKS_PER_SEC;
bufsize = STOB(samples, streaming.last_quality.bits_per_sample, streaming.last_quality.channels);
CHECK_NULL(L_STREAMING, rawbuf = calloc(1, bufsize));
encode_worker_invoke(rawbuf, bufsize, samples, streaming.last_quality);
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
/* ----------------------------- Thread: httpd ------------------------------ */ /* ----------------------------- Thread: httpd ------------------------------ */
@ -526,11 +509,90 @@ streaming_session_deregister(int readfd)
pthread_mutex_unlock(&streaming_wanted_lck); pthread_mutex_unlock(&streaming_wanted_lck);
} }
// Not thread safe, but only called once during httpd init
void
streaming_metadatacb_register(streaming_metadatacb cb)
{
streaming.metadatacb = cb;
}
/* ----------------------------- Thread: Player ----------------------------- */
static void
encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality)
{
struct encode_cmdarg ctx;
if (quality.channels == 0)
{
DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n",
quality.sample_rate, quality.bits_per_sample, quality.channels);
return;
}
CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize));
memcpy(ctx.buf, buf, bufsize);
ctx.bufsize = bufsize;
ctx.samples = samples;
ctx.quality = quality;
ctx.seqnum = streaming.seqnum;
streaming.seqnum++;
worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0);
}
static void
silenceev_cb(evutil_socket_t fd, short event, void *arg)
{
uint8_t silence[SILENCE_BUF_SIZE] = { 0 };
int samples;
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels);
encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality);
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
streaming_write(struct output_buffer *obuf)
{
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality);
streaming.last_quality = obuf->data[0].quality;
// In case this is the last player write() we want to start streaming silence
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
streaming_metadata_send(struct output_metadata *metadata)
{
char *title = metadata->priv;
// Calls back to httpd_streaming to update the title
if (streaming.metadatacb)
streaming.metadatacb(title);
free(title);
outputs_metadata_free(metadata);
}
static int static int
streaming_init(void) streaming_init(void)
{ {
CHECK_NULL(L_STREAMING, streaming.silenceev = event_new(evbase_player, -1, 0, silenceev_cb, NULL)); CHECK_NULL(L_STREAMING, streaming.silenceev = event_new(evbase_player, -1, 0, silenceev_cb, NULL));
CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck)); CHECK_ERR(L_STREAMING, mutex_init(&streaming_wanted_lck));
CHECK_ERR(L_STREAMING, pthread_cond_init(&streaming_sequence_cond, NULL));
return 0; return 0;
} }
@ -541,6 +603,7 @@ streaming_deinit(void)
event_free(streaming.silenceev); event_free(streaming.silenceev);
} }
struct output_definition output_streaming = struct output_definition output_streaming =
{ {
.name = "mp3 streaming", .name = "mp3 streaming",
@ -550,4 +613,6 @@ struct output_definition output_streaming =
.init = streaming_init, .init = streaming_init,
.deinit = streaming_deinit, .deinit = streaming_deinit,
.write = streaming_write, .write = streaming_write,
.metadata_prepare = streaming_metadata_prepare,
.metadata_send = streaming_metadata_send,
}; };

View File

@ -4,6 +4,8 @@
#include "misc.h" // struct media_quality #include "misc.h" // struct media_quality
typedef void (*streaming_metadatacb)(char *metadata);
enum streaming_format enum streaming_format
{ {
STREAMING_FORMAT_MP3, STREAMING_FORMAT_MP3,
@ -15,4 +17,7 @@ streaming_session_register(enum streaming_format format, struct media_quality qu
void void
streaming_session_deregister(int readfd); streaming_session_deregister(int readfd);
void
streaming_metadatacb_register(streaming_metadatacb cb);
#endif /* !__STREAMING_H__ */ #endif /* !__STREAMING_H__ */