From cd62070fdbf174150dcb6673d51e9239dce817eb Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 1 May 2023 23:32:06 +0200 Subject: [PATCH 1/6] [xcode] Add "samples_per_frame" query parameter --- src/transcode.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/transcode.c b/src/transcode.c index 660a7125..ff9cd76b 100644 --- a/src/transcode.c +++ b/src/transcode.c @@ -2202,6 +2202,11 @@ transcode_encode_query(struct encode_ctx *ctx, const char *query) return ctx->audio_stream.stream->codecpar->channels; #endif } + else if (strcmp(query, "samples_per_frame") == 0) + { + if (ctx->audio_stream.stream) + return ctx->audio_stream.stream->codecpar->frame_size; + } return -1; } From 701bad466eb36cec158c31cad863070b62ae9b9a Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 1 May 2023 23:32:57 +0200 Subject: [PATCH 2/6] [outputs] Add helpers for output buffer copy and free --- src/outputs.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ src/outputs.h | 6 ++++++ 2 files changed, 55 insertions(+) diff --git a/src/outputs.c b/src/outputs.c index c4e5ba4b..67541512 100644 --- a/src/outputs.c +++ b/src/outputs.c @@ -389,6 +389,43 @@ buffer_drain(struct output_buffer *obuf) } } +static struct output_buffer * +buffer_copy(struct output_buffer *obuf) +{ + struct output_buffer *copy; + int i; + + if (!obuf) + return NULL; + + CHECK_NULL(L_PLAYER, copy = malloc(sizeof(struct output_buffer))); + + memcpy(copy, obuf, sizeof(struct output_buffer)); + + for (i = 0; obuf->data[i].buffer; i++) + { + CHECK_NULL(L_PLAYER, copy->data[i].evbuf = evbuffer_new()); + evbuffer_add(copy->data[i].evbuf, obuf->data[i].buffer, obuf->data[i].bufsize); + copy->data[i].buffer = evbuffer_pullup(copy->data[i].evbuf, -1); + } + + return copy; +} + +static void +buffer_free(struct output_buffer *obuf) +{ + int i; + + if (!obuf) + return; + + for (i = 0; obuf->data[i].buffer; i++) + evbuffer_free(obuf->data[i].evbuf); + + free(obuf); +} + static void device_list_sort(void) { @@ -561,6 +598,18 @@ vol_adjust(void) /* ----------------------------------- API ---------------------------------- */ +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer) +{ + return buffer_copy(buffer); +} + +void +outputs_buffer_free(struct output_buffer *buffer) +{ + buffer_free(buffer); +} + struct output_device * outputs_device_get(uint64_t device_id) { diff --git a/src/outputs.h b/src/outputs.h index 434d88ad..50647dbd 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -264,6 +264,12 @@ struct output_definition /* ------------------------------- General use ------------------------------ */ +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer); + +void +outputs_buffer_free(struct output_buffer *buffer); + struct output_device * outputs_device_get(uint64_t device_id); From 537012440b341c4cc62dedece7c7ed4f4fb0831b Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 1 May 2023 23:35:34 +0200 Subject: [PATCH 3/6] [streaming] Fix mp3 streaming not working with ffmpeg 6 ffmpeg 6 requires fixed size frames for the encoder, so this commit refactors the mp3 streaming to do support that. Also uses outputs_quality_subscribe which potentially has better performance (e.g. a 48000 sample rate source will only be resampled once if both airplay and mp3 streaming want 44100), and makes it easier to create fixed size frames. Resolves #1601 --- src/outputs.c | 24 +-- src/outputs.h | 12 +- src/outputs/streaming.c | 315 ++++++++++++++++++++++------------------ 3 files changed, 191 insertions(+), 160 deletions(-) diff --git a/src/outputs.c b/src/outputs.c index 67541512..b5d568f5 100644 --- a/src/outputs.c +++ b/src/outputs.c @@ -598,18 +598,6 @@ vol_adjust(void) /* ----------------------------------- API ---------------------------------- */ -struct output_buffer * -outputs_buffer_copy(struct output_buffer *buffer) -{ - return buffer_copy(buffer); -} - -void -outputs_buffer_free(struct output_buffer *buffer) -{ - buffer_free(buffer); -} - struct output_device * outputs_device_get(uint64_t device_id) { @@ -754,6 +742,18 @@ outputs_metadata_free(struct output_metadata *metadata) metadata_free(metadata); } +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer) +{ + return buffer_copy(buffer); +} + +void +outputs_buffer_free(struct output_buffer *buffer) +{ + buffer_free(buffer); +} + /* ---------------------------- Called by player ---------------------------- */ struct output_device * diff --git a/src/outputs.h b/src/outputs.h index 50647dbd..eeda8357 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -264,12 +264,6 @@ struct output_definition /* ------------------------------- General use ------------------------------ */ -struct output_buffer * -outputs_buffer_copy(struct output_buffer *buffer); - -void -outputs_buffer_free(struct output_buffer *buffer); - struct output_device * outputs_device_get(uint64_t device_id); @@ -293,6 +287,12 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state); void outputs_metadata_free(struct output_metadata *metadata); +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer); + +void +outputs_buffer_free(struct output_buffer *buffer); + /* ---------------------------- Called by player ---------------------------- */ // Ownership of *add is transferred, so don't address after calling. Instead you diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index d1b34d00..f0ab5b95 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -44,15 +44,10 @@ * player, but there are clients, it instead writes silence to the fd. */ -// Seconds between sending silence when player is idle +// Seconds between sending a frame of silence when player is idle // (to prevent client from hanging up) #define STREAMING_SILENCE_INTERVAL 1 -// How many bytes of silence we encode with the above interval. There is no -// particular reason for using this size, just that it seems to have worked for -// a while. -#define SILENCE_BUF_SIZE STOB(352, 16, 2) - // The wanted structure represents a particular format and quality that should // be produced for one or more sessions. A pipe pair is created for each session // for the i/o. @@ -66,15 +61,19 @@ struct pipepair struct streaming_wanted { - int refcount; + int num_sessions; // for refcounting struct pipepair pipes[WANTED_PIPES_MAX]; enum streaming_format format; - struct media_quality quality_in; - struct media_quality quality_out; + struct media_quality quality; + struct evbuffer *audio_in; + struct evbuffer *audio_out; struct encode_ctx *xcode_ctx; - struct evbuffer *encoded_data; + + int nb_samples; + uint8_t *frame_data; + size_t frame_size; struct streaming_wanted *next; }; @@ -96,11 +95,8 @@ struct streaming_ctx struct encode_cmdarg { - uint8_t *buf; - size_t bufsize; - int samples; + struct output_buffer *obuf; unsigned int seqnum; - struct media_quality quality; }; static pthread_mutex_t streaming_wanted_lck; @@ -116,6 +112,41 @@ extern struct event_base *evbase_player; /* ------------------------------- Helpers ---------------------------------- */ +static struct encode_ctx * +encoder_setup(enum streaming_format format, struct media_quality *quality) +{ + struct decode_ctx *decode_ctx = NULL; + struct encode_ctx *encode_ctx = NULL; + + if (quality->bits_per_sample == 16) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, quality); + else if (quality->bits_per_sample == 24) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, quality); + else if (quality->bits_per_sample == 32) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, quality); + + if (!decode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for quality sr %d, bps %d, ch %d, cannot encode\n", + quality->sample_rate, quality->bits_per_sample, quality->channels); + goto out; + } + + if (format == STREAMING_FORMAT_MP3) + encode_ctx = transcode_encode_setup(XCODE_MP3, quality, decode_ctx, NULL, 0, 0); + + if (!encode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for quality sr %d, bps %d, ch %d, cannot encode\n", + quality->sample_rate, quality->bits_per_sample, quality->channels); + goto out; + } + + out: + transcode_decode_cleanup(&decode_ctx); + return encode_ctx; +} + static int pipe_open(struct pipepair *p) { @@ -164,8 +195,12 @@ wanted_free(struct streaming_wanted *w) for (int i = 0; i < WANTED_PIPES_MAX; i++) pipe_close(&w->pipes[i]); + outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe + transcode_encode_cleanup(&w->xcode_ctx); - evbuffer_free(w->encoded_data); + evbuffer_free(w->audio_in); + evbuffer_free(w->audio_out); + free(w->frame_data); free(w); } @@ -175,10 +210,21 @@ wanted_new(enum streaming_format format, struct media_quality quality) struct streaming_wanted *w; CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted))); - CHECK_NULL(L_STREAMING, w->encoded_data = evbuffer_new()); + CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new()); + CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new()); + + outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only + + w->xcode_ctx = encoder_setup(format, &quality); + if (!w->xcode_ctx) + goto error; - w->quality_out = quality; w->format = format; + w->quality = quality; + w->nb_samples = transcode_encode_query(w->xcode_ctx, "samples_per_frame"); // 1152 for mp3 + w->frame_size = STOB(w->nb_samples, quality.bits_per_sample, quality.channels); + + CHECK_NULL(L_STREAMING, w->frame_data = malloc(w->frame_size)); for (int i = 0; i < WANTED_PIPES_MAX; i++) { @@ -187,6 +233,10 @@ wanted_new(enum streaming_format format, struct media_quality quality) } return w; + + error: + wanted_free(w); + return NULL; } static void @@ -233,7 +283,7 @@ wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format form for (w = wanted; w; w = w->next) { - if (w->format == format && quality_is_equal(&w->quality_out, &quality)) + if (w->format == format && quality_is_equal(&w->quality, &quality)) return w; } @@ -281,8 +331,8 @@ wanted_session_add(struct pipepair *p, struct streaming_wanted *w) return -1; } - w->refcount++; - DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->refcount=%d\n", p->readfd, w->refcount); + w->num_sessions++; + DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->num_sessions=%d\n", p->readfd, w->num_sessions); return 0; } @@ -307,73 +357,13 @@ wanted_session_remove(struct streaming_wanted *w, int readfd) return; } - w->refcount--; - DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->refcount=%d\n", readfd, w->refcount); + w->num_sessions--; + DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->num_sessions=%d\n", readfd, w->num_sessions); } /* ----------------------------- Thread: Worker ----------------------------- */ -static int -encode_reset(struct streaming_wanted *w, struct media_quality quality_in) -{ - struct media_quality quality_out = w->quality_out; - struct decode_ctx *decode_ctx = NULL; - - transcode_encode_cleanup(&w->xcode_ctx); - - if (quality_in.bits_per_sample == 16) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality_in); - else if (quality_in.bits_per_sample == 24) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality_in); - else if (quality_in.bits_per_sample == 32) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality_in); - - if (!decode_ctx) - { - DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for input quality sr %d, bps %d, ch %d, cannot MP3 encode\n", - quality_in.sample_rate, quality_in.bits_per_sample, quality_in.channels); - goto error; - } - - w->quality_in = quality_in; - w->xcode_ctx = transcode_encode_setup(XCODE_MP3, &quality_out, decode_ctx, NULL, 0, 0); - if (!w->xcode_ctx) - { - DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for output quality sr %d, bps %d, ch %d, cannot MP3 encode\n", - quality_out.sample_rate, quality_out.bits_per_sample, quality_out.channels); - goto error; - } - - transcode_decode_cleanup(&decode_ctx); - return 0; - - error: - transcode_decode_cleanup(&decode_ctx); - return -1; -} - -static int -encode_frame(struct streaming_wanted *w, struct media_quality quality_in, transcode_frame *frame) -{ - int ret; - - if (!w->xcode_ctx || !quality_is_equal(&quality_in, &w->quality_in)) - { - DPRINTF(E_DBG, L_STREAMING, "Resetting transcode context\n"); - if (encode_reset(w, quality_in) < 0) - return -1; - } - - ret = transcode_encode(w->encoded_data, w->xcode_ctx, frame, 0); - if (ret < 0) - { - return -1; - } - - return 0; -} - static void encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p) { @@ -390,25 +380,76 @@ encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pip } } +static int +encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) +{ + ssize_t remaining_bytes; + transcode_frame *frame = NULL; + int ret; + + if (buf) + { + evbuffer_add(w->audio_in, buf, bufsize); + } + else + { + // buf being null is either a silence timeout or that we could't find the + // subscripted quality. In both cases we encode silence. + memset(w->frame_data, 0, w->frame_size); + evbuffer_add(w->audio_in, w->frame_data, w->frame_size); + } + + remaining_bytes = evbuffer_get_length(w->audio_in); + + // Read and encode from 'audio_in' in chunks of 'frame_size' bytes + while (remaining_bytes > w->frame_size) + { + ret = evbuffer_remove(w->audio_in, w->frame_data, w->frame_size); + if (ret != w->frame_size) + { + DPRINTF(E_LOG, L_STREAMING, "Bug! Couldn't read a frame of %zu bytes (format %d)\n", w->frame_size, w->format); + goto error; + } + + remaining_bytes -= w->frame_size; + + frame = transcode_frame_new(w->frame_data, w->frame_size, w->nb_samples, &w->quality); + if (!frame) + { + DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame (format %d)\n", w->format); + goto error; + } + + ret = transcode_encode(w->audio_out, w->xcode_ctx, frame, 0); + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Encoding error (format %d)\n", w->format); + goto error; + } + + transcode_frame_free(frame); + } + + return 0; + + error: + transcode_frame_free(frame); + return -1; +} + static void encode_data_cb(void *arg) { struct encode_cmdarg *ctx = arg; - transcode_frame *frame; + struct output_buffer *obuf = ctx->obuf; struct streaming_wanted *w; struct streaming_wanted *next; uint8_t *buf; + size_t bufsize; size_t len; int ret; int i; - frame = transcode_frame_new(ctx->buf, ctx->bufsize, ctx->samples, &ctx->quality); - if (!frame) - { - DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n"); - goto out; - } - pthread_mutex_lock(&streaming_wanted_lck); // To make sure we process the frames in order @@ -419,32 +460,48 @@ encode_data_cb(void *arg) { next = w->next; - ret = encode_frame(w, ctx->quality, frame); + for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) + { + if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) + continue; + + buf = obuf->data[i].buffer; + bufsize = obuf->data[i].bufsize; + } + + ret = encode_buffer(w, buf, bufsize); if (ret < 0) - wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error + { + wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error + continue; + } - len = evbuffer_get_length(w->encoded_data); + len = evbuffer_get_length(w->audio_out); if (len == 0) - continue; + { + continue; + } - buf = evbuffer_pullup(w->encoded_data, -1); + buf = evbuffer_pullup(w->audio_out, -1); for (i = 0; i < WANTED_PIPES_MAX; i++) - encode_write(buf, len, w, &w->pipes[i]); + { + encode_write(buf, len, w, &w->pipes[i]); + } - evbuffer_drain(w->encoded_data, -1); + evbuffer_drain(w->audio_out, -1); - if (w->refcount == 0) - wanted_remove(&streaming.wanted, w); + if (w->num_sessions == 0) + { + wanted_remove(&streaming.wanted, w); + } } streaming.seqnum_encode_next++; pthread_cond_broadcast(&streaming_sequence_cond); pthread_mutex_unlock(&streaming_wanted_lck); - out: - transcode_frame_free(frame); - free(ctx->buf); + outputs_buffer_free(ctx->obuf); } static void * @@ -502,7 +559,7 @@ streaming_session_deregister(int readfd) wanted_session_remove(w, readfd); - if (w->refcount == 0) + if (w->num_sessions == 0) wanted_remove(&streaming.wanted, w); out: @@ -519,59 +576,33 @@ streaming_metadatacb_register(streaming_metadatacb cb) /* ----------------------------- Thread: Player ----------------------------- */ static void -encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) +streaming_write(struct output_buffer *obuf) { struct encode_cmdarg ctx; - if (quality.channels == 0) - { - DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", - quality.sample_rate, quality.bits_per_sample, quality.channels); - return; - } + // No lock since this is just an early exit, it doesn't need to be accurate + if (!streaming.wanted) + return; - CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize)); - memcpy(ctx.buf, buf, bufsize); - ctx.bufsize = bufsize; - ctx.samples = samples; - ctx.quality = quality; + // We don't want to block the player, so we can't lock to access + // streaming.wanted and find which qualities we need. So we just copy it all + // and pass it to a worker thread that can lock and check what is wanted, and + // also can encode without holding the player. + ctx.obuf = outputs_buffer_copy(obuf); ctx.seqnum = streaming.seqnum; streaming.seqnum++; worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); + + // In case this is the last player write() we want to start streaming silence + evtimer_add(streaming.silenceev, &streaming.silencetv); } static void silenceev_cb(evutil_socket_t fd, short event, void *arg) { - uint8_t silence[SILENCE_BUF_SIZE] = { 0 }; - int samples; - - // No lock since this is just an early exit, it doesn't need to be accurate - if (!streaming.wanted) - return; - - samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels); - - encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality); - - evtimer_add(streaming.silenceev, &streaming.silencetv); -} - -static void -streaming_write(struct output_buffer *obuf) -{ - // No lock since this is just an early exit, it doesn't need to be accurate - if (!streaming.wanted) - return; - - encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality); - - streaming.last_quality = obuf->data[0].quality; - - // In case this is the last player write() we want to start streaming silence - evtimer_add(streaming.silenceev, &streaming.silencetv); + streaming_write(NULL); } static void From dedd4a95c2a5088d2209563272c655d1219041ee Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Sat, 6 May 2023 01:00:22 +0200 Subject: [PATCH 4/6] [player] Add player_streaming_register() and _deregister() --- src/outputs.h | 1 + src/player.c | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/player.h | 7 +++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/src/outputs.h b/src/outputs.h index eeda8357..578bad60 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -134,6 +134,7 @@ struct output_device // Quality of audio output struct media_quality quality; + int format; // Address char *v4_address; diff --git a/src/player.c b/src/player.c index f485b8c3..0a08d48f 100644 --- a/src/player.c +++ b/src/player.c @@ -151,6 +151,9 @@ struct speaker_attr_param bool prevent_playback; bool busy; + struct media_quality quality; + int format; + const char *pin; }; @@ -2883,6 +2886,48 @@ speaker_start_all(void *arg, int *retval) return COMMAND_END; } +// This is borderline misuse of the outputs_device interface, but the purpose is +// to register streaming session info with outputs/streaming.c via the player +// thread. It must be the player thread because session setup requires that +// outputs_quality_subscribe() is called, and by design it isn't thread safe. +static enum command_state +streaming_register(void *arg, int *retval) +{ + struct speaker_attr_param *param = arg; + struct output_device device = + { + .type = OUTPUT_TYPE_STREAMING, + .type_name = "streaming", + .name = "streaming", + .quality = param->quality, + .format = param->format, + }; + + *retval = outputs_device_start(&device, NULL, false); + if (*retval < 0) + return COMMAND_END; + + *retval = device.id; // Actually the fd that the called needs + return COMMAND_END; +} + +static enum command_state +streaming_deregister(void *arg, int *retval) +{ + struct speaker_attr_param *param = arg; + struct output_device device = + { + .type = OUTPUT_TYPE_STREAMING, + .type_name = "streaming", + .name = "streaming", + .id = param->spk_id, + .session = "dummy", // to pass check in outputs_device_stop() + }; + + *retval = outputs_device_stop(&device, NULL); + return COMMAND_END; +} + static enum command_state volume_set(void *arg, int *retval) { @@ -3138,7 +3183,7 @@ player_get_status(struct player_status *status) } -/* --------------------------- Thread: httpd (DACP) ------------------------- */ +/* ------------------------------ Thread: httpd ----------------------------- */ /* * Stores the now playing media item dbmfi-id in the given id pointer. @@ -3424,6 +3469,33 @@ player_speaker_authorize(uint64_t id, const char *pin) return ret; } +int +player_streaming_register(int format, struct media_quality quality) +{ + struct speaker_attr_param param; + int ret; + + param.format = format; + param.quality = quality; + + ret = commands_exec_sync(cmdbase, streaming_register, NULL, ¶m); + + return ret; +} + +int +player_streaming_deregister(int id) +{ + struct speaker_attr_param param; + int ret; + + param.spk_id = id; + + ret = commands_exec_sync(cmdbase, streaming_deregister, NULL, ¶m); + + return ret; +} + int player_volume_set(int vol) { diff --git a/src/player.h b/src/player.h index 974794f4..3246023c 100644 --- a/src/player.h +++ b/src/player.h @@ -6,6 +6,7 @@ #include #include "db.h" +#include "misc.h" // for struct media_quality // Maximum number of previously played songs that are remembered #define MAX_HISTORY_COUNT 20 @@ -117,6 +118,12 @@ player_speaker_resurrect(void *arg); int player_speaker_authorize(uint64_t id, const char *pin); +int +player_streaming_register(int format, struct media_quality quality); + +int +player_streaming_deregister(int id); + int player_playback_start(void); From 6364515fb720829f3381a1084f700afc2d7068f9 Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Sat, 6 May 2023 01:02:48 +0200 Subject: [PATCH 5/6] [streaming] Use player_streaming_register() for thread safety Also means we might be able to drop the special header file for outputs/streaming.c making it a bit more like a regular output module. --- src/httpd_streaming.c | 20 +++- src/outputs/streaming.c | 233 ++++++++++++++++++++++------------------ src/outputs/streaming.h | 8 -- 3 files changed, 143 insertions(+), 118 deletions(-) diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 1306e390..6517b986 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -31,6 +31,7 @@ #include #include "httpd_internal.h" +#include "player.h" #include "outputs/streaming.h" #include "logger.h" #include "conffile.h" @@ -169,7 +170,7 @@ session_free(struct streaming_session *session) if (session->readev) { - streaming_session_deregister(session->fd); + player_streaming_deregister(session->fd); event_free(session->readev); } @@ -238,7 +239,7 @@ read_cb(evutil_socket_t fd, short event, void *arg) static int streaming_mp3_handler(struct httpd_request *hreq) { - struct streaming_session *session; + struct streaming_session *session = NULL; const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *param; bool icy_is_requested; @@ -255,10 +256,16 @@ streaming_mp3_handler(struct httpd_request *hreq) session = session_new(hreq, icy_is_requested); if (!session) - return -1; + { + goto error; + } // Ask streaming output module for a fd to read mp3 from - session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality); + session->fd = player_streaming_register(STREAMING_FORMAT_MP3, streaming_default_quality); + if (session->fd < 0) + { + goto error; + } CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); event_add(session->readev, NULL); @@ -274,6 +281,11 @@ streaming_mp3_handler(struct httpd_request *hreq) httpd_send_reply_start(hreq, HTTP_OK, "OK"); return 0; + + error: + session_free(session); + // Error message is sent by streaming_request() + return -1; } static struct httpd_uri_map streaming_handlers[] = diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index f0ab5b95..91a8c574 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -32,6 +32,7 @@ #include "outputs.h" #include "misc.h" #include "worker.h" +#include "player.h" #include "transcode.h" #include "logger.h" #include "db.h" @@ -195,8 +196,6 @@ wanted_free(struct streaming_wanted *w) for (int i = 0; i < WANTED_PIPES_MAX; i++) pipe_close(&w->pipes[i]); - outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe - transcode_encode_cleanup(&w->xcode_ctx); evbuffer_free(w->audio_in); evbuffer_free(w->audio_out); @@ -213,8 +212,6 @@ wanted_new(enum streaming_format format, struct media_quality quality) CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new()); CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new()); - outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only - w->xcode_ctx = encoder_setup(format, &quality); if (!w->xcode_ctx) goto error; @@ -364,22 +361,6 @@ wanted_session_remove(struct streaming_wanted *w, int readfd) /* ----------------------------- Thread: Worker ----------------------------- */ -static void -encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p) -{ - int ret; - - if (p->writefd < 0) - return; - - ret = write(p->writefd, buf, buflen); - if (ret < 0) - { - DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno)); - wanted_session_remove(w, p->readfd); - } -} - static int encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) { @@ -438,70 +419,91 @@ encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) } static void -encode_data_cb(void *arg) +encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct output_buffer *obuf) { - struct encode_cmdarg *ctx = arg; - struct output_buffer *obuf = ctx->obuf; - struct streaming_wanted *w; - struct streaming_wanted *next; + struct pipepair *p; uint8_t *buf; size_t bufsize; size_t len; int ret; int i; + for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) + { + if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) + continue; + + buf = obuf->data[i].buffer; + bufsize = obuf->data[i].bufsize; + } + + // If encoding fails we should kill the sessions, which for thread safety + // and to avoid deadlocks has to be done later with player_streaming_deregister() + ret = encode_buffer(w, buf, bufsize); + if (ret < 0) + { + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + p = &w->pipes[i]; + if (p->writefd < 0) + continue; + *failed_pipe_readfd = p->readfd; + } + + return; + } + + len = evbuffer_get_length(w->audio_out); + if (len == 0) + { + return; + } + + buf = evbuffer_pullup(w->audio_out, -1); + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + p = &w->pipes[i]; + if (p->writefd < 0) + continue; + + ret = write(p->writefd, buf, len); + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno)); + *failed_pipe_readfd = p->readfd; + } + } + + evbuffer_drain(w->audio_out, -1); +} + +static void +encode_data_cb(void *arg) +{ + struct encode_cmdarg *ctx = arg; + struct output_buffer *obuf = ctx->obuf; + struct streaming_wanted *w; + int failed_pipe_readfd = -1; + pthread_mutex_lock(&streaming_wanted_lck); // To make sure we process the frames in order while (ctx->seqnum != streaming.seqnum_encode_next) pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck); - for (w = streaming.wanted; w; w = next) - { - next = w->next; - - for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) - { - if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) - continue; - - buf = obuf->data[i].buffer; - bufsize = obuf->data[i].bufsize; - } - - ret = encode_buffer(w, buf, bufsize); - if (ret < 0) - { - wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error - continue; - } - - len = evbuffer_get_length(w->audio_out); - if (len == 0) - { - continue; - } - - buf = evbuffer_pullup(w->audio_out, -1); - - for (i = 0; i < WANTED_PIPES_MAX; i++) - { - encode_write(buf, len, w, &w->pipes[i]); - } - - evbuffer_drain(w->audio_out, -1); - - if (w->num_sessions == 0) - { - wanted_remove(&streaming.wanted, w); - } - } + for (w = streaming.wanted; w; w = w->next) + encode_and_write(&failed_pipe_readfd, w, obuf); streaming.seqnum_encode_next++; pthread_cond_broadcast(&streaming_sequence_cond); pthread_mutex_unlock(&streaming_wanted_lck); outputs_buffer_free(ctx->obuf); + + // We have to do this after letting go of the lock or we will deadlock. This + // unfortunate method means we can only fail one session (pipe) each pass. + if (failed_pipe_readfd >= 0) + player_streaming_deregister(failed_pipe_readfd); } static void * @@ -526,46 +528,6 @@ streaming_metadata_prepare(struct output_metadata *metadata) /* ----------------------------- Thread: httpd ------------------------------ */ -int -streaming_session_register(enum streaming_format format, struct media_quality quality) -{ - struct streaming_wanted *w; - struct pipepair pipe; - int ret; - - pthread_mutex_lock(&streaming_wanted_lck); - w = wanted_find_byformat(streaming.wanted, format, quality); - if (!w) - w = wanted_add(&streaming.wanted, format, quality); - - ret = wanted_session_add(&pipe, w); - if (ret < 0) - pipe.readfd = -1; - - pthread_mutex_unlock(&streaming_wanted_lck); - - return pipe.readfd; -} - -void -streaming_session_deregister(int readfd) -{ - struct streaming_wanted *w; - - pthread_mutex_lock(&streaming_wanted_lck); - w = wanted_find_byreadfd(streaming.wanted, readfd); - if (!w) - goto out; - - wanted_session_remove(w, readfd); - - if (w->num_sessions == 0) - wanted_remove(&streaming.wanted, w); - - out: - pthread_mutex_unlock(&streaming_wanted_lck); -} - // Not thread safe, but only called once during httpd init void streaming_metadatacb_register(streaming_metadatacb cb) @@ -618,6 +580,62 @@ streaming_metadata_send(struct output_metadata *metadata) outputs_metadata_free(metadata); } +// Since this is streaming and there is no actual device, we will be called with +// a dummy/ad hoc device that's not part in the speaker list. We don't need to +// make any callback so can ignore callback_id. +static int +streaming_start(struct output_device *device, int callback_id) +{ + struct streaming_wanted *w; + struct pipepair pipe; + int ret; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byformat(streaming.wanted, device->format, device->quality); + if (!w) + w = wanted_add(&streaming.wanted, device->format, device->quality); + ret = wanted_session_add(&pipe, w); + if (ret < 0) + goto error; + pthread_mutex_unlock(&streaming_wanted_lck); + + outputs_quality_subscribe(&device->quality); + device->id = pipe.readfd; // Not super clean + return 0; + + error: + if (w->num_sessions == 0) + wanted_remove(&streaming.wanted, w); + pthread_mutex_unlock(&streaming_wanted_lck); + return -1; +} + +// Since this is streaming and there is no actual device, we will be called with +// a dummy/ad hoc device that's not part in the speaker list. We don't need to +// make any callback so can ignore callback_id. +static int +streaming_stop(struct output_device *device, int callback_id) +{ + struct streaming_wanted *w; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byreadfd(streaming.wanted, device->id); + if (!w) + goto error; + device->quality = w->quality; + wanted_session_remove(w, device->id); + if (w->num_sessions == 0) + wanted_remove(&streaming.wanted, w); + pthread_mutex_unlock(&streaming_wanted_lck); + + outputs_quality_unsubscribe(&device->quality); + return 0; + + error: + pthread_mutex_unlock(&streaming_wanted_lck); + return -1; +} + static int streaming_init(void) { @@ -637,13 +655,16 @@ streaming_deinit(void) struct output_definition output_streaming = { - .name = "mp3 streaming", + .name = "streaming", .type = OUTPUT_TYPE_STREAMING, .priority = 0, .disabled = 0, .init = streaming_init, .deinit = streaming_deinit, .write = streaming_write, + .device_start = streaming_start, + .device_probe = streaming_start, + .device_stop = streaming_stop, .metadata_prepare = streaming_metadata_prepare, .metadata_send = streaming_metadata_send, }; diff --git a/src/outputs/streaming.h b/src/outputs/streaming.h index 34673345..49ba71d2 100644 --- a/src/outputs/streaming.h +++ b/src/outputs/streaming.h @@ -2,8 +2,6 @@ #ifndef __STREAMING_H__ #define __STREAMING_H__ -#include "misc.h" // struct media_quality - typedef void (*streaming_metadatacb)(char *metadata); enum streaming_format @@ -11,12 +9,6 @@ enum streaming_format STREAMING_FORMAT_MP3, }; -int -streaming_session_register(enum streaming_format format, struct media_quality quality); - -void -streaming_session_deregister(int readfd); - void streaming_metadatacb_register(streaming_metadatacb cb); From f998b1f3dd491dfaddf85241f47a0f4ac5a9ea1b Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 8 May 2023 20:46:16 +0200 Subject: [PATCH 6/6] [streaming] Change how metadata is delivered to http streaming This gets rid of player locks + the special header file outputs/streaming.h --- src/Makefile.am | 2 +- src/httpd_streaming.c | 175 +++++++++++++++++++++------------------- src/outputs.h | 4 + src/outputs/streaming.c | 173 +++++++++++++++++++++++---------------- src/outputs/streaming.h | 15 ---- src/player.c | 19 +++-- src/player.h | 6 +- 7 files changed, 217 insertions(+), 177 deletions(-) delete mode 100644 src/outputs/streaming.h diff --git a/src/Makefile.am b/src/Makefile.am index a64b5299..085d6968 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -118,7 +118,7 @@ owntone_SOURCES = main.c \ outputs/rtp_common.h outputs/rtp_common.c \ outputs/raop.c outputs/airplay.c $(PAIR_AP_SRC) \ outputs/airplay_events.c outputs/airplay_events.h \ - outputs/streaming.c outputs/streaming.h \ + outputs/streaming.c \ outputs/dummy.c outputs/fifo.c outputs/rcp.c \ $(ALSA_SRC) $(PULSEAUDIO_SRC) $(CHROMECAST_SRC) \ evrtsp/rtsp.c evrtsp/evrtsp.h evrtsp/rtsp-internal.h evrtsp/log.h \ diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 6517b986..7590ca25 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -32,20 +32,24 @@ #include "httpd_internal.h" #include "player.h" -#include "outputs/streaming.h" #include "logger.h" #include "conffile.h" +#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes) +#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer) + struct streaming_session { struct httpd_request *hreq; - int fd; - struct event *readev; - struct evbuffer *readbuf; + int id; + struct event *audioev; + struct event *metadataev; + struct evbuffer *audiobuf; size_t bytes_sent; bool icy_is_requested; size_t icy_remaining; + char icy_title[STREAMING_ICY_METATITLELEN_MAX]; }; static struct media_quality streaming_default_quality = { @@ -55,25 +59,20 @@ static struct media_quality streaming_default_quality = { .bit_rate = 128000, }; +static void +session_free(struct streaming_session *session); /* ------------------------------ ICY metadata -------------------------------*/ // To test mp3 and ICY tagm it is good to use: // mpv --display-tags=* http://localhost:3689/stream.mp3 -#define STREAMING_ICY_METALEN_MAX 4080 // 255*16 incl header/footer (16bytes) -#define STREAMING_ICY_METATITLELEN_MAX 4064 // STREAMING_ICY_METALEN_MAX -16 (not incl header/footer) - // As streaming quality goes up, we send more data to the remote client. With a // smaller ICY_METAINT value we have to splice metadata more frequently - on // some devices with small input buffers, a higher quality stream and low // ICY_METAINT can lead to stuttering as observed on a Roku Soundbridge static unsigned short streaming_icy_metaint = 16384; -static pthread_mutex_t streaming_metadata_lck; -static char streaming_icy_title[STREAMING_ICY_METATITLELEN_MAX]; - - // We know that the icymeta is limited to 1+255*16 (ie 4081) bytes so caller must // provide a buf of this size to avoid needless mallocs // @@ -125,7 +124,7 @@ icy_meta_create(uint8_t buf[STREAMING_ICY_METALEN_MAX+1], unsigned *buflen, cons } static void -icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining) +icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining, char *title) { uint8_t meta[STREAMING_ICY_METALEN_MAX + 1]; unsigned metalen; @@ -139,9 +138,7 @@ icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining *icy_remaining -= consume; if (*icy_remaining == 0) { - pthread_mutex_lock(&streaming_metadata_lck); - icy_meta_create(meta, &metalen, streaming_icy_title); - pthread_mutex_unlock(&streaming_metadata_lck); + icy_meta_create(meta, &metalen, title); evbuffer_add(out, meta, metalen); *icy_remaining = streaming_icy_metaint; @@ -149,50 +146,6 @@ icy_meta_splice(struct evbuffer *out, struct evbuffer *in, size_t *icy_remaining } } -// Thread: player. TODO Would be nice to avoid the lock. Consider moving all the -// ICY tag stuff to streaming.c and make a STREAMING_FORMAT_MP3_ICY? -static void -icy_metadata_cb(char *metadata) -{ - pthread_mutex_lock(&streaming_metadata_lck); - snprintf(streaming_icy_title, sizeof(streaming_icy_title), "%s", metadata); - pthread_mutex_unlock(&streaming_metadata_lck); -} - - -/* ----------------------------- Session helpers ---------------------------- */ - -static void -session_free(struct streaming_session *session) -{ - if (!session) - return; - - if (session->readev) - { - player_streaming_deregister(session->fd); - event_free(session->readev); - } - - evbuffer_free(session->readbuf); - free(session); -} - -static struct streaming_session * -session_new(struct httpd_request *hreq, bool icy_is_requested) -{ - struct streaming_session *session; - - CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session))); - CHECK_NULL(L_STREAMING, session->readbuf = evbuffer_new()); - - session->hreq = hreq; - session->icy_is_requested = icy_is_requested; - session->icy_remaining = streaming_icy_metaint; - - return session; -} - /* ----------------------------- Event callbacks ---------------------------- */ @@ -205,7 +158,7 @@ conn_close_cb(void *arg) } static void -read_cb(evutil_socket_t fd, short event, void *arg) +audio_cb(evutil_socket_t fd, short event, void *arg) { struct streaming_session *session = arg; struct httpd_request *hreq; @@ -213,7 +166,7 @@ read_cb(evutil_socket_t fd, short event, void *arg) CHECK_NULL(L_STREAMING, hreq = session->hreq); - len = evbuffer_read(session->readbuf, fd, -1); + len = evbuffer_read(session->audiobuf, fd, -1); if (len < 0 && errno != EAGAIN) { DPRINTF(E_INFO, L_STREAMING, "Stopping mp3 streaming to %s:%d\n", session->hreq->peer_address, (int)session->hreq->peer_port); @@ -224,15 +177,87 @@ read_cb(evutil_socket_t fd, short event, void *arg) } if (session->icy_is_requested) - icy_meta_splice(hreq->out_body, session->readbuf, &session->icy_remaining); + icy_meta_splice(hreq->out_body, session->audiobuf, &session->icy_remaining, session->icy_title); else - evbuffer_add_buffer(hreq->out_body, session->readbuf); + evbuffer_add_buffer(hreq->out_body, session->audiobuf); httpd_send_reply_chunk(hreq, NULL, NULL); session->bytes_sent += len; } +static void +metadata_cb(evutil_socket_t fd, short event, void *arg) +{ + struct streaming_session *session = arg; + struct evbuffer *evbuf; + int len; + + CHECK_NULL(L_STREAMING, evbuf = evbuffer_new()); + + len = evbuffer_read(evbuf, fd, -1); + if (len < 0) + goto out; + + len = sizeof(session->icy_title); + evbuffer_remove(evbuf, session->icy_title, len); + session->icy_title[len - 1] = '\0'; + + out: + evbuffer_free(evbuf); +} + + +/* ----------------------------- Session helpers ---------------------------- */ + +static void +session_free(struct streaming_session *session) +{ + if (!session) + return; + + player_streaming_deregister(session->id); + + if (session->audioev) + event_free(session->audioev); + if (session->metadataev) + event_free(session->metadataev); + + evbuffer_free(session->audiobuf); + free(session); +} + +static struct streaming_session * +session_new(struct httpd_request *hreq, bool icy_is_requested, enum player_format format, struct media_quality quality) +{ + struct streaming_session *session; + int audio_fd; + int metadata_fd; + + CHECK_NULL(L_STREAMING, session = calloc(1, sizeof(struct streaming_session))); + CHECK_NULL(L_STREAMING, session->audiobuf = evbuffer_new()); + + session->hreq = hreq; + session->icy_is_requested = icy_is_requested; + session->icy_remaining = streaming_icy_metaint; + + // Ask streaming output module for a fd to read mp3 from + session->id = player_streaming_register(&audio_fd, &metadata_fd, format, quality); + if (session->id < 0) + goto error; + + CHECK_NULL(L_STREAMING, session->audioev = event_new(hreq->evbase, audio_fd, EV_READ | EV_PERSIST, audio_cb, session)); + event_add(session->audioev, NULL); + CHECK_NULL(L_STREAMING, session->metadataev = event_new(hreq->evbase, metadata_fd, EV_READ | EV_PERSIST, metadata_cb, session)); + event_add(session->metadataev, NULL); + + return session; + + error: + session_free(session); + return NULL; +} + /* -------------------------- Module implementation ------------------------- */ @@ -254,21 +279,9 @@ streaming_mp3_handler(struct httpd_request *hreq) httpd_header_add(hreq->out_headers, "icy-metaint", buf); } - session = session_new(hreq, icy_is_requested); + session = session_new(hreq, icy_is_requested, PLAYER_FORMAT_MP3, streaming_default_quality); if (!session) - { - goto error; - } - - // Ask streaming output module for a fd to read mp3 from - session->fd = player_streaming_register(STREAMING_FORMAT_MP3, streaming_default_quality); - if (session->fd < 0) - { - goto error; - } - - CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); - event_add(session->readev, NULL); + return -1; // Error sent by caller httpd_request_close_cb_set(hreq, conn_close_cb, session); @@ -281,11 +294,6 @@ streaming_mp3_handler(struct httpd_request *hreq) httpd_send_reply_start(hreq, HTTP_OK, "OK"); return 0; - - error: - session_free(session); - // Error message is sent by streaming_request() - return -1; } static struct httpd_uri_map streaming_handlers[] = @@ -357,9 +365,6 @@ streaming_init(void) else DPRINTF(E_INFO, L_STREAMING, "Unsupported icy_metaint=%d, supported range: 4096..131072, defaulting to %d\n", val, streaming_icy_metaint); - CHECK_ERR(L_STREAMING, mutex_init(&streaming_metadata_lck)); - streaming_metadatacb_register(icy_metadata_cb); - return 0; } diff --git a/src/outputs.h b/src/outputs.h index 578bad60..61d380c1 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -142,6 +142,10 @@ struct output_device short v4_port; short v6_port; + // Only used for streaming + int audio_fd; + int metadata_fd; + struct event *stop_timer; // Opaque pointers to device and session data diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index 91a8c574..e89dcc3d 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -28,7 +28,6 @@ #include #include -#include "streaming.h" #include "outputs.h" #include "misc.h" #include "worker.h" @@ -63,9 +62,10 @@ struct pipepair struct streaming_wanted { int num_sessions; // for refcounting - struct pipepair pipes[WANTED_PIPES_MAX]; + struct pipepair audio[WANTED_PIPES_MAX]; + struct pipepair metadata[WANTED_PIPES_MAX]; - enum streaming_format format; + enum player_format format; struct media_quality quality; struct evbuffer *audio_in; @@ -86,12 +86,11 @@ struct streaming_ctx struct timeval silencetv; struct media_quality last_quality; + char title[4064]; // See STREAMING_ICY_METALEN_MAX in http_streaming.c + // seqnum may wrap around so must be unsigned unsigned int seqnum; unsigned int seqnum_encode_next; - - // callback with new metadata, e.g. for ICY tags - void (*metadatacb)(char *metadata); }; struct encode_cmdarg @@ -114,7 +113,7 @@ extern struct event_base *evbase_player; /* ------------------------------- Helpers ---------------------------------- */ static struct encode_ctx * -encoder_setup(enum streaming_format format, struct media_quality *quality) +encoder_setup(enum player_format format, struct media_quality *quality) { struct decode_ctx *decode_ctx = NULL; struct encode_ctx *encode_ctx = NULL; @@ -133,7 +132,7 @@ encoder_setup(enum streaming_format format, struct media_quality *quality) goto out; } - if (format == STREAMING_FORMAT_MP3) + if (format == PLAYER_FORMAT_MP3) encode_ctx = transcode_encode_setup(XCODE_MP3, quality, decode_ctx, NULL, 0, 0); if (!encode_ctx) @@ -194,7 +193,9 @@ wanted_free(struct streaming_wanted *w) return; for (int i = 0; i < WANTED_PIPES_MAX; i++) - pipe_close(&w->pipes[i]); + pipe_close(&w->audio[i]); + for (int i = 0; i < WANTED_PIPES_MAX; i++) + pipe_close(&w->metadata[i]); transcode_encode_cleanup(&w->xcode_ctx); evbuffer_free(w->audio_in); @@ -203,8 +204,20 @@ wanted_free(struct streaming_wanted *w) free(w); } +static int +pipe_index_find_byreadfd(struct pipepair *p, int readfd) +{ + for (int i = 0; i < WANTED_PIPES_MAX; i++, p++) + { + if (p->readfd == readfd) + return i; + } + + return -1; +} + static struct streaming_wanted * -wanted_new(enum streaming_format format, struct media_quality quality) +wanted_new(enum player_format format, struct media_quality quality) { struct streaming_wanted *w; @@ -225,8 +238,10 @@ wanted_new(enum streaming_format format, struct media_quality quality) for (int i = 0; i < WANTED_PIPES_MAX; i++) { - w->pipes[i].writefd = -1; - w->pipes[i].readfd = -1; + w->audio[i].writefd = -1; + w->audio[i].readfd = -1; + w->metadata[i].writefd = -1; + w->metadata[i].readfd = -1; } return w; @@ -262,7 +277,7 @@ wanted_remove(struct streaming_wanted **wanted, struct streaming_wanted *remove) } static struct streaming_wanted * -wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struct media_quality quality) +wanted_add(struct streaming_wanted **wanted, enum player_format format, struct media_quality quality) { struct streaming_wanted *w; @@ -274,7 +289,7 @@ wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struc } static struct streaming_wanted * -wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format format, struct media_quality quality) +wanted_find_byformat(struct streaming_wanted *wanted, enum player_format format, struct media_quality quality) { struct streaming_wanted *w; @@ -294,31 +309,36 @@ wanted_find_byreadfd(struct streaming_wanted *wanted, int readfd) int i; for (w = wanted; w; w = w->next) - for (i = 0; i < WANTED_PIPES_MAX; i++) - { - if (w->pipes[i].readfd == readfd) - return w; - } + { + i = pipe_index_find_byreadfd(w->audio, readfd); + if (i != -1) + return w; + } return NULL; } static int -wanted_session_add(struct pipepair *p, struct streaming_wanted *w) +wanted_session_add(int *audiofd, int *metadatafd, struct streaming_wanted *w) { int ret; int i; for (i = 0; i < WANTED_PIPES_MAX; i++) { - if (w->pipes[i].writefd != -1) // In use + if (w->audio[i].writefd != -1) // In use continue; - ret = pipe_open(&w->pipes[i]); + ret = pipe_open(&w->audio[i]); if (ret < 0) return -1; - memcpy(p, &w->pipes[i], sizeof(struct pipepair)); + ret = pipe_open(&w->metadata[i]); + if (ret < 0) + return -1; + + *audiofd = w->audio[i].readfd; + *metadatafd = w->metadata[i].readfd; break; } @@ -329,31 +349,25 @@ wanted_session_add(struct pipepair *p, struct streaming_wanted *w) } w->num_sessions++; - DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->num_sessions=%d\n", p->readfd, w->num_sessions); + DPRINTF(E_DBG, L_STREAMING, "Session register audiofd %d, metadatafd %d, wanted->num_sessions=%d\n", *audiofd, *metadatafd, w->num_sessions); return 0; } - static void wanted_session_remove(struct streaming_wanted *w, int readfd) { int i; - for (i = 0; i < WANTED_PIPES_MAX; i++) - { - if (w->pipes[i].readfd != readfd) - continue; - - pipe_close(&w->pipes[i]); - break; - } - - if (i == WANTED_PIPES_MAX) + i = pipe_index_find_byreadfd(w->audio, readfd); + if (i < 0) { DPRINTF(E_LOG, L_STREAMING, "Cannot remove streaming session, readfd %d not found\n", readfd); return; } + pipe_close(&w->audio[i]); + pipe_close(&w->metadata[i]); + w->num_sessions--; DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->num_sessions=%d\n", readfd, w->num_sessions); } @@ -421,7 +435,6 @@ encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) static void encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct output_buffer *obuf) { - struct pipepair *p; uint8_t *buf; size_t bufsize; size_t len; @@ -444,10 +457,8 @@ encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct out { for (i = 0; i < WANTED_PIPES_MAX; i++) { - p = &w->pipes[i]; - if (p->writefd < 0) - continue; - *failed_pipe_readfd = p->readfd; + if (w->audio[i].writefd != -1) + *failed_pipe_readfd = w->audio[i].readfd; } return; @@ -462,15 +473,14 @@ encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct out buf = evbuffer_pullup(w->audio_out, -1); for (i = 0; i < WANTED_PIPES_MAX; i++) { - p = &w->pipes[i]; - if (p->writefd < 0) + if (w->audio[i].writefd == -1) continue; - ret = write(p->writefd, buf, len); + ret = write(w->audio[i].writefd, buf, len); if (ret < 0) { - DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno)); - *failed_pipe_readfd = p->readfd; + DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", w->audio[i].writefd, w->format, strerror(errno)); + *failed_pipe_readfd = w->audio[i].readfd; } } @@ -506,11 +516,44 @@ encode_data_cb(void *arg) player_streaming_deregister(failed_pipe_readfd); } +static void +metadata_write(struct streaming_wanted *w, int readfd, const char *metadata) +{ + size_t metadata_size; + int i; + int ret; + + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + if (w->metadata[i].writefd == -1) + continue; + if (readfd >= 0 && w->metadata[i].readfd != readfd) + continue; + + metadata_size = strlen(metadata) + 1; + ret = write(w->metadata[i].writefd, metadata, metadata_size); + if (ret < 0) + DPRINTF(E_WARN, L_STREAMING, "Error writing metadata '%s' to fd %d\n", metadata, w->metadata[i].writefd); + } +} + +static void +metadata_startup_cb(void *arg) +{ + int *metadata_fd = arg; + struct streaming_wanted *w; + + pthread_mutex_lock(&streaming_wanted_lck); + for (w = streaming.wanted; w; w = w->next) + metadata_write(w, *metadata_fd, streaming.title); + pthread_mutex_unlock(&streaming_wanted_lck); +} + static void * streaming_metadata_prepare(struct output_metadata *metadata) { struct db_queue_item *queue_item; - char *title; + struct streaming_wanted *w; queue_item = db_queue_fetch_byitemid(metadata->item_id); if (!queue_item) @@ -519,22 +562,19 @@ streaming_metadata_prepare(struct output_metadata *metadata) return NULL; } - title = safe_asprintf("%s - %s", queue_item->title, queue_item->artist); + pthread_mutex_lock(&streaming_wanted_lck); + // Save it here, we might need it later if a new session starts up + snprintf(streaming.title, sizeof(streaming.title), "%s - %s", queue_item->title, queue_item->artist); + + for (w = streaming.wanted; w; w = w->next) + metadata_write(w, -1, streaming.title); + pthread_mutex_unlock(&streaming_wanted_lck); + free_queue_item(queue_item, 0); - - return title; + return NULL; } -/* ----------------------------- Thread: httpd ------------------------------ */ - -// Not thread safe, but only called once during httpd init -void -streaming_metadatacb_register(streaming_metadatacb cb) -{ - streaming.metadatacb = cb; -} - /* ----------------------------- Thread: Player ----------------------------- */ static void @@ -570,14 +610,7 @@ silenceev_cb(evutil_socket_t fd, short event, void *arg) static void streaming_metadata_send(struct output_metadata *metadata) { - char *title = metadata->priv; - - // Calls back to httpd_streaming to update the title - if (streaming.metadatacb) - streaming.metadatacb(title); - - free(title); - outputs_metadata_free(metadata); + // Nothing to do, metadata_prepare() did all we needed in a worker thread } // Since this is streaming and there is no actual device, we will be called with @@ -587,20 +620,22 @@ static int streaming_start(struct output_device *device, int callback_id) { struct streaming_wanted *w; - struct pipepair pipe; int ret; pthread_mutex_lock(&streaming_wanted_lck); w = wanted_find_byformat(streaming.wanted, device->format, device->quality); if (!w) w = wanted_add(&streaming.wanted, device->format, device->quality); - ret = wanted_session_add(&pipe, w); + ret = wanted_session_add(&device->audio_fd, &device->metadata_fd, w); if (ret < 0) goto error; pthread_mutex_unlock(&streaming_wanted_lck); + worker_execute(metadata_startup_cb, &(device->metadata_fd), sizeof(device->metadata_fd), 0); + outputs_quality_subscribe(&device->quality); - device->id = pipe.readfd; // Not super clean + + device->id = device->audio_fd; return 0; error: diff --git a/src/outputs/streaming.h b/src/outputs/streaming.h deleted file mode 100644 index 49ba71d2..00000000 --- a/src/outputs/streaming.h +++ /dev/null @@ -1,15 +0,0 @@ - -#ifndef __STREAMING_H__ -#define __STREAMING_H__ - -typedef void (*streaming_metadatacb)(char *metadata); - -enum streaming_format -{ - STREAMING_FORMAT_MP3, -}; - -void -streaming_metadatacb_register(streaming_metadatacb cb); - -#endif /* !__STREAMING_H__ */ diff --git a/src/player.c b/src/player.c index 0a08d48f..5b7e3c6c 100644 --- a/src/player.c +++ b/src/player.c @@ -152,7 +152,10 @@ struct speaker_attr_param bool busy; struct media_quality quality; - int format; + enum player_format format; + + int audio_fd; + int metadata_fd; const char *pin; }; @@ -2904,10 +2907,10 @@ streaming_register(void *arg, int *retval) }; *retval = outputs_device_start(&device, NULL, false); - if (*retval < 0) - return COMMAND_END; - *retval = device.id; // Actually the fd that the called needs + param->spk_id = device.id; + param->audio_fd = device.audio_fd; + param->metadata_fd = device.metadata_fd; return COMMAND_END; } @@ -3470,7 +3473,7 @@ player_speaker_authorize(uint64_t id, const char *pin) } int -player_streaming_register(int format, struct media_quality quality) +player_streaming_register(int *audio_fd, int *metadata_fd, enum player_format format, struct media_quality quality) { struct speaker_attr_param param; int ret; @@ -3479,8 +3482,12 @@ player_streaming_register(int format, struct media_quality quality) param.quality = quality; ret = commands_exec_sync(cmdbase, streaming_register, NULL, ¶m); + if (ret < 0) + return ret; - return ret; + *audio_fd = param.audio_fd; + *metadata_fd = param.metadata_fd; + return param.spk_id; } int diff --git a/src/player.h b/src/player.h index 3246023c..602a13e0 100644 --- a/src/player.h +++ b/src/player.h @@ -28,6 +28,10 @@ enum player_seek_mode { PLAYER_SEEK_RELATIVE = 2, }; +enum player_format { + PLAYER_FORMAT_MP3, +}; + struct player_speaker_info { uint64_t id; uint32_t active_remote; @@ -119,7 +123,7 @@ int player_speaker_authorize(uint64_t id, const char *pin); int -player_streaming_register(int format, struct media_quality quality); +player_streaming_register(int *audio_fd, int *metadata_fd, enum player_format format, struct media_quality quality); int player_streaming_deregister(int id);