From 537012440b341c4cc62dedece7c7ed4f4fb0831b Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 1 May 2023 23:35:34 +0200 Subject: [PATCH] [streaming] Fix mp3 streaming not working with ffmpeg 6 ffmpeg 6 requires fixed size frames for the encoder, so this commit refactors the mp3 streaming to do support that. Also uses outputs_quality_subscribe which potentially has better performance (e.g. a 48000 sample rate source will only be resampled once if both airplay and mp3 streaming want 44100), and makes it easier to create fixed size frames. Resolves #1601 --- src/outputs.c | 24 +-- src/outputs.h | 12 +- src/outputs/streaming.c | 315 ++++++++++++++++++++++------------------ 3 files changed, 191 insertions(+), 160 deletions(-) diff --git a/src/outputs.c b/src/outputs.c index 67541512..b5d568f5 100644 --- a/src/outputs.c +++ b/src/outputs.c @@ -598,18 +598,6 @@ vol_adjust(void) /* ----------------------------------- API ---------------------------------- */ -struct output_buffer * -outputs_buffer_copy(struct output_buffer *buffer) -{ - return buffer_copy(buffer); -} - -void -outputs_buffer_free(struct output_buffer *buffer) -{ - buffer_free(buffer); -} - struct output_device * outputs_device_get(uint64_t device_id) { @@ -754,6 +742,18 @@ outputs_metadata_free(struct output_metadata *metadata) metadata_free(metadata); } +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer) +{ + return buffer_copy(buffer); +} + +void +outputs_buffer_free(struct output_buffer *buffer) +{ + buffer_free(buffer); +} + /* ---------------------------- Called by player ---------------------------- */ struct output_device * diff --git a/src/outputs.h b/src/outputs.h index 50647dbd..eeda8357 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -264,12 +264,6 @@ struct output_definition /* ------------------------------- General use ------------------------------ */ -struct output_buffer * -outputs_buffer_copy(struct output_buffer *buffer); - -void -outputs_buffer_free(struct output_buffer *buffer); - struct output_device * outputs_device_get(uint64_t device_id); @@ -293,6 +287,12 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state); void outputs_metadata_free(struct output_metadata *metadata); +struct output_buffer * +outputs_buffer_copy(struct output_buffer *buffer); + +void +outputs_buffer_free(struct output_buffer *buffer); + /* ---------------------------- Called by player ---------------------------- */ // Ownership of *add is transferred, so don't address after calling. Instead you diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index d1b34d00..f0ab5b95 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -44,15 +44,10 @@ * player, but there are clients, it instead writes silence to the fd. */ -// Seconds between sending silence when player is idle +// Seconds between sending a frame of silence when player is idle // (to prevent client from hanging up) #define STREAMING_SILENCE_INTERVAL 1 -// How many bytes of silence we encode with the above interval. There is no -// particular reason for using this size, just that it seems to have worked for -// a while. -#define SILENCE_BUF_SIZE STOB(352, 16, 2) - // The wanted structure represents a particular format and quality that should // be produced for one or more sessions. A pipe pair is created for each session // for the i/o. @@ -66,15 +61,19 @@ struct pipepair struct streaming_wanted { - int refcount; + int num_sessions; // for refcounting struct pipepair pipes[WANTED_PIPES_MAX]; enum streaming_format format; - struct media_quality quality_in; - struct media_quality quality_out; + struct media_quality quality; + struct evbuffer *audio_in; + struct evbuffer *audio_out; struct encode_ctx *xcode_ctx; - struct evbuffer *encoded_data; + + int nb_samples; + uint8_t *frame_data; + size_t frame_size; struct streaming_wanted *next; }; @@ -96,11 +95,8 @@ struct streaming_ctx struct encode_cmdarg { - uint8_t *buf; - size_t bufsize; - int samples; + struct output_buffer *obuf; unsigned int seqnum; - struct media_quality quality; }; static pthread_mutex_t streaming_wanted_lck; @@ -116,6 +112,41 @@ extern struct event_base *evbase_player; /* ------------------------------- Helpers ---------------------------------- */ +static struct encode_ctx * +encoder_setup(enum streaming_format format, struct media_quality *quality) +{ + struct decode_ctx *decode_ctx = NULL; + struct encode_ctx *encode_ctx = NULL; + + if (quality->bits_per_sample == 16) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, quality); + else if (quality->bits_per_sample == 24) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, quality); + else if (quality->bits_per_sample == 32) + decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, quality); + + if (!decode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for quality sr %d, bps %d, ch %d, cannot encode\n", + quality->sample_rate, quality->bits_per_sample, quality->channels); + goto out; + } + + if (format == STREAMING_FORMAT_MP3) + encode_ctx = transcode_encode_setup(XCODE_MP3, quality, decode_ctx, NULL, 0, 0); + + if (!encode_ctx) + { + DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for quality sr %d, bps %d, ch %d, cannot encode\n", + quality->sample_rate, quality->bits_per_sample, quality->channels); + goto out; + } + + out: + transcode_decode_cleanup(&decode_ctx); + return encode_ctx; +} + static int pipe_open(struct pipepair *p) { @@ -164,8 +195,12 @@ wanted_free(struct streaming_wanted *w) for (int i = 0; i < WANTED_PIPES_MAX; i++) pipe_close(&w->pipes[i]); + outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe + transcode_encode_cleanup(&w->xcode_ctx); - evbuffer_free(w->encoded_data); + evbuffer_free(w->audio_in); + evbuffer_free(w->audio_out); + free(w->frame_data); free(w); } @@ -175,10 +210,21 @@ wanted_new(enum streaming_format format, struct media_quality quality) struct streaming_wanted *w; CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted))); - CHECK_NULL(L_STREAMING, w->encoded_data = evbuffer_new()); + CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new()); + CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new()); + + outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only + + w->xcode_ctx = encoder_setup(format, &quality); + if (!w->xcode_ctx) + goto error; - w->quality_out = quality; w->format = format; + w->quality = quality; + w->nb_samples = transcode_encode_query(w->xcode_ctx, "samples_per_frame"); // 1152 for mp3 + w->frame_size = STOB(w->nb_samples, quality.bits_per_sample, quality.channels); + + CHECK_NULL(L_STREAMING, w->frame_data = malloc(w->frame_size)); for (int i = 0; i < WANTED_PIPES_MAX; i++) { @@ -187,6 +233,10 @@ wanted_new(enum streaming_format format, struct media_quality quality) } return w; + + error: + wanted_free(w); + return NULL; } static void @@ -233,7 +283,7 @@ wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format form for (w = wanted; w; w = w->next) { - if (w->format == format && quality_is_equal(&w->quality_out, &quality)) + if (w->format == format && quality_is_equal(&w->quality, &quality)) return w; } @@ -281,8 +331,8 @@ wanted_session_add(struct pipepair *p, struct streaming_wanted *w) return -1; } - w->refcount++; - DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->refcount=%d\n", p->readfd, w->refcount); + w->num_sessions++; + DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->num_sessions=%d\n", p->readfd, w->num_sessions); return 0; } @@ -307,73 +357,13 @@ wanted_session_remove(struct streaming_wanted *w, int readfd) return; } - w->refcount--; - DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->refcount=%d\n", readfd, w->refcount); + w->num_sessions--; + DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->num_sessions=%d\n", readfd, w->num_sessions); } /* ----------------------------- Thread: Worker ----------------------------- */ -static int -encode_reset(struct streaming_wanted *w, struct media_quality quality_in) -{ - struct media_quality quality_out = w->quality_out; - struct decode_ctx *decode_ctx = NULL; - - transcode_encode_cleanup(&w->xcode_ctx); - - if (quality_in.bits_per_sample == 16) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality_in); - else if (quality_in.bits_per_sample == 24) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality_in); - else if (quality_in.bits_per_sample == 32) - decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality_in); - - if (!decode_ctx) - { - DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for input quality sr %d, bps %d, ch %d, cannot MP3 encode\n", - quality_in.sample_rate, quality_in.bits_per_sample, quality_in.channels); - goto error; - } - - w->quality_in = quality_in; - w->xcode_ctx = transcode_encode_setup(XCODE_MP3, &quality_out, decode_ctx, NULL, 0, 0); - if (!w->xcode_ctx) - { - DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for output quality sr %d, bps %d, ch %d, cannot MP3 encode\n", - quality_out.sample_rate, quality_out.bits_per_sample, quality_out.channels); - goto error; - } - - transcode_decode_cleanup(&decode_ctx); - return 0; - - error: - transcode_decode_cleanup(&decode_ctx); - return -1; -} - -static int -encode_frame(struct streaming_wanted *w, struct media_quality quality_in, transcode_frame *frame) -{ - int ret; - - if (!w->xcode_ctx || !quality_is_equal(&quality_in, &w->quality_in)) - { - DPRINTF(E_DBG, L_STREAMING, "Resetting transcode context\n"); - if (encode_reset(w, quality_in) < 0) - return -1; - } - - ret = transcode_encode(w->encoded_data, w->xcode_ctx, frame, 0); - if (ret < 0) - { - return -1; - } - - return 0; -} - static void encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p) { @@ -390,25 +380,76 @@ encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pip } } +static int +encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) +{ + ssize_t remaining_bytes; + transcode_frame *frame = NULL; + int ret; + + if (buf) + { + evbuffer_add(w->audio_in, buf, bufsize); + } + else + { + // buf being null is either a silence timeout or that we could't find the + // subscripted quality. In both cases we encode silence. + memset(w->frame_data, 0, w->frame_size); + evbuffer_add(w->audio_in, w->frame_data, w->frame_size); + } + + remaining_bytes = evbuffer_get_length(w->audio_in); + + // Read and encode from 'audio_in' in chunks of 'frame_size' bytes + while (remaining_bytes > w->frame_size) + { + ret = evbuffer_remove(w->audio_in, w->frame_data, w->frame_size); + if (ret != w->frame_size) + { + DPRINTF(E_LOG, L_STREAMING, "Bug! Couldn't read a frame of %zu bytes (format %d)\n", w->frame_size, w->format); + goto error; + } + + remaining_bytes -= w->frame_size; + + frame = transcode_frame_new(w->frame_data, w->frame_size, w->nb_samples, &w->quality); + if (!frame) + { + DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame (format %d)\n", w->format); + goto error; + } + + ret = transcode_encode(w->audio_out, w->xcode_ctx, frame, 0); + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Encoding error (format %d)\n", w->format); + goto error; + } + + transcode_frame_free(frame); + } + + return 0; + + error: + transcode_frame_free(frame); + return -1; +} + static void encode_data_cb(void *arg) { struct encode_cmdarg *ctx = arg; - transcode_frame *frame; + struct output_buffer *obuf = ctx->obuf; struct streaming_wanted *w; struct streaming_wanted *next; uint8_t *buf; + size_t bufsize; size_t len; int ret; int i; - frame = transcode_frame_new(ctx->buf, ctx->bufsize, ctx->samples, &ctx->quality); - if (!frame) - { - DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n"); - goto out; - } - pthread_mutex_lock(&streaming_wanted_lck); // To make sure we process the frames in order @@ -419,32 +460,48 @@ encode_data_cb(void *arg) { next = w->next; - ret = encode_frame(w, ctx->quality, frame); + for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) + { + if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) + continue; + + buf = obuf->data[i].buffer; + bufsize = obuf->data[i].bufsize; + } + + ret = encode_buffer(w, buf, bufsize); if (ret < 0) - wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error + { + wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error + continue; + } - len = evbuffer_get_length(w->encoded_data); + len = evbuffer_get_length(w->audio_out); if (len == 0) - continue; + { + continue; + } - buf = evbuffer_pullup(w->encoded_data, -1); + buf = evbuffer_pullup(w->audio_out, -1); for (i = 0; i < WANTED_PIPES_MAX; i++) - encode_write(buf, len, w, &w->pipes[i]); + { + encode_write(buf, len, w, &w->pipes[i]); + } - evbuffer_drain(w->encoded_data, -1); + evbuffer_drain(w->audio_out, -1); - if (w->refcount == 0) - wanted_remove(&streaming.wanted, w); + if (w->num_sessions == 0) + { + wanted_remove(&streaming.wanted, w); + } } streaming.seqnum_encode_next++; pthread_cond_broadcast(&streaming_sequence_cond); pthread_mutex_unlock(&streaming_wanted_lck); - out: - transcode_frame_free(frame); - free(ctx->buf); + outputs_buffer_free(ctx->obuf); } static void * @@ -502,7 +559,7 @@ streaming_session_deregister(int readfd) wanted_session_remove(w, readfd); - if (w->refcount == 0) + if (w->num_sessions == 0) wanted_remove(&streaming.wanted, w); out: @@ -519,59 +576,33 @@ streaming_metadatacb_register(streaming_metadatacb cb) /* ----------------------------- Thread: Player ----------------------------- */ static void -encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) +streaming_write(struct output_buffer *obuf) { struct encode_cmdarg ctx; - if (quality.channels == 0) - { - DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n", - quality.sample_rate, quality.bits_per_sample, quality.channels); - return; - } + // No lock since this is just an early exit, it doesn't need to be accurate + if (!streaming.wanted) + return; - CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize)); - memcpy(ctx.buf, buf, bufsize); - ctx.bufsize = bufsize; - ctx.samples = samples; - ctx.quality = quality; + // We don't want to block the player, so we can't lock to access + // streaming.wanted and find which qualities we need. So we just copy it all + // and pass it to a worker thread that can lock and check what is wanted, and + // also can encode without holding the player. + ctx.obuf = outputs_buffer_copy(obuf); ctx.seqnum = streaming.seqnum; streaming.seqnum++; worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); + + // In case this is the last player write() we want to start streaming silence + evtimer_add(streaming.silenceev, &streaming.silencetv); } static void silenceev_cb(evutil_socket_t fd, short event, void *arg) { - uint8_t silence[SILENCE_BUF_SIZE] = { 0 }; - int samples; - - // No lock since this is just an early exit, it doesn't need to be accurate - if (!streaming.wanted) - return; - - samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels); - - encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality); - - evtimer_add(streaming.silenceev, &streaming.silencetv); -} - -static void -streaming_write(struct output_buffer *obuf) -{ - // No lock since this is just an early exit, it doesn't need to be accurate - if (!streaming.wanted) - return; - - encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality); - - streaming.last_quality = obuf->data[0].quality; - - // In case this is the last player write() we want to start streaming silence - evtimer_add(streaming.silenceev, &streaming.silencetv); + streaming_write(NULL); } static void