[streaming] Fix mp3 streaming not working with ffmpeg 6

ffmpeg 6 requires fixed size frames for the encoder, so this commit refactors
the mp3 streaming to do support that. Also uses outputs_quality_subscribe which
potentially has better performance (e.g. a 48000 sample rate source will only
be resampled once if both airplay and mp3 streaming want 44100), and makes it
easier to create fixed size frames.

Resolves 
This commit is contained in:
ejurgensen 2023-05-01 23:35:34 +02:00
parent 701bad466e
commit 537012440b
3 changed files with 191 additions and 160 deletions

@ -598,18 +598,6 @@ vol_adjust(void)
/* ----------------------------------- API ---------------------------------- */ /* ----------------------------------- API ---------------------------------- */
struct output_buffer *
outputs_buffer_copy(struct output_buffer *buffer)
{
return buffer_copy(buffer);
}
void
outputs_buffer_free(struct output_buffer *buffer)
{
buffer_free(buffer);
}
struct output_device * struct output_device *
outputs_device_get(uint64_t device_id) outputs_device_get(uint64_t device_id)
{ {
@ -754,6 +742,18 @@ outputs_metadata_free(struct output_metadata *metadata)
metadata_free(metadata); metadata_free(metadata);
} }
struct output_buffer *
outputs_buffer_copy(struct output_buffer *buffer)
{
return buffer_copy(buffer);
}
void
outputs_buffer_free(struct output_buffer *buffer)
{
buffer_free(buffer);
}
/* ---------------------------- Called by player ---------------------------- */ /* ---------------------------- Called by player ---------------------------- */
struct output_device * struct output_device *

@ -264,12 +264,6 @@ struct output_definition
/* ------------------------------- General use ------------------------------ */ /* ------------------------------- General use ------------------------------ */
struct output_buffer *
outputs_buffer_copy(struct output_buffer *buffer);
void
outputs_buffer_free(struct output_buffer *buffer);
struct output_device * struct output_device *
outputs_device_get(uint64_t device_id); outputs_device_get(uint64_t device_id);
@ -293,6 +287,12 @@ outputs_cb(int callback_id, uint64_t device_id, enum output_device_state);
void void
outputs_metadata_free(struct output_metadata *metadata); outputs_metadata_free(struct output_metadata *metadata);
struct output_buffer *
outputs_buffer_copy(struct output_buffer *buffer);
void
outputs_buffer_free(struct output_buffer *buffer);
/* ---------------------------- Called by player ---------------------------- */ /* ---------------------------- Called by player ---------------------------- */
// Ownership of *add is transferred, so don't address after calling. Instead you // Ownership of *add is transferred, so don't address after calling. Instead you

@ -44,15 +44,10 @@
* player, but there are clients, it instead writes silence to the fd. * player, but there are clients, it instead writes silence to the fd.
*/ */
// Seconds between sending silence when player is idle // Seconds between sending a frame of silence when player is idle
// (to prevent client from hanging up) // (to prevent client from hanging up)
#define STREAMING_SILENCE_INTERVAL 1 #define STREAMING_SILENCE_INTERVAL 1
// How many bytes of silence we encode with the above interval. There is no
// particular reason for using this size, just that it seems to have worked for
// a while.
#define SILENCE_BUF_SIZE STOB(352, 16, 2)
// The wanted structure represents a particular format and quality that should // The wanted structure represents a particular format and quality that should
// be produced for one or more sessions. A pipe pair is created for each session // be produced for one or more sessions. A pipe pair is created for each session
// for the i/o. // for the i/o.
@ -66,15 +61,19 @@ struct pipepair
struct streaming_wanted struct streaming_wanted
{ {
int refcount; int num_sessions; // for refcounting
struct pipepair pipes[WANTED_PIPES_MAX]; struct pipepair pipes[WANTED_PIPES_MAX];
enum streaming_format format; enum streaming_format format;
struct media_quality quality_in; struct media_quality quality;
struct media_quality quality_out;
struct evbuffer *audio_in;
struct evbuffer *audio_out;
struct encode_ctx *xcode_ctx; struct encode_ctx *xcode_ctx;
struct evbuffer *encoded_data;
int nb_samples;
uint8_t *frame_data;
size_t frame_size;
struct streaming_wanted *next; struct streaming_wanted *next;
}; };
@ -96,11 +95,8 @@ struct streaming_ctx
struct encode_cmdarg struct encode_cmdarg
{ {
uint8_t *buf; struct output_buffer *obuf;
size_t bufsize;
int samples;
unsigned int seqnum; unsigned int seqnum;
struct media_quality quality;
}; };
static pthread_mutex_t streaming_wanted_lck; static pthread_mutex_t streaming_wanted_lck;
@ -116,6 +112,41 @@ extern struct event_base *evbase_player;
/* ------------------------------- Helpers ---------------------------------- */ /* ------------------------------- Helpers ---------------------------------- */
static struct encode_ctx *
encoder_setup(enum streaming_format format, struct media_quality *quality)
{
struct decode_ctx *decode_ctx = NULL;
struct encode_ctx *encode_ctx = NULL;
if (quality->bits_per_sample == 16)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, quality);
else if (quality->bits_per_sample == 24)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, quality);
else if (quality->bits_per_sample == 32)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, quality);
if (!decode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for quality sr %d, bps %d, ch %d, cannot encode\n",
quality->sample_rate, quality->bits_per_sample, quality->channels);
goto out;
}
if (format == STREAMING_FORMAT_MP3)
encode_ctx = transcode_encode_setup(XCODE_MP3, quality, decode_ctx, NULL, 0, 0);
if (!encode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for quality sr %d, bps %d, ch %d, cannot encode\n",
quality->sample_rate, quality->bits_per_sample, quality->channels);
goto out;
}
out:
transcode_decode_cleanup(&decode_ctx);
return encode_ctx;
}
static int static int
pipe_open(struct pipepair *p) pipe_open(struct pipepair *p)
{ {
@ -164,8 +195,12 @@ wanted_free(struct streaming_wanted *w)
for (int i = 0; i < WANTED_PIPES_MAX; i++) for (int i = 0; i < WANTED_PIPES_MAX; i++)
pipe_close(&w->pipes[i]); pipe_close(&w->pipes[i]);
outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe
transcode_encode_cleanup(&w->xcode_ctx); transcode_encode_cleanup(&w->xcode_ctx);
evbuffer_free(w->encoded_data); evbuffer_free(w->audio_in);
evbuffer_free(w->audio_out);
free(w->frame_data);
free(w); free(w);
} }
@ -175,10 +210,21 @@ wanted_new(enum streaming_format format, struct media_quality quality)
struct streaming_wanted *w; struct streaming_wanted *w;
CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted))); CHECK_NULL(L_STREAMING, w = calloc(1, sizeof(struct streaming_wanted)));
CHECK_NULL(L_STREAMING, w->encoded_data = evbuffer_new()); CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new());
CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new());
outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only
w->xcode_ctx = encoder_setup(format, &quality);
if (!w->xcode_ctx)
goto error;
w->quality_out = quality;
w->format = format; w->format = format;
w->quality = quality;
w->nb_samples = transcode_encode_query(w->xcode_ctx, "samples_per_frame"); // 1152 for mp3
w->frame_size = STOB(w->nb_samples, quality.bits_per_sample, quality.channels);
CHECK_NULL(L_STREAMING, w->frame_data = malloc(w->frame_size));
for (int i = 0; i < WANTED_PIPES_MAX; i++) for (int i = 0; i < WANTED_PIPES_MAX; i++)
{ {
@ -187,6 +233,10 @@ wanted_new(enum streaming_format format, struct media_quality quality)
} }
return w; return w;
error:
wanted_free(w);
return NULL;
} }
static void static void
@ -233,7 +283,7 @@ wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format form
for (w = wanted; w; w = w->next) for (w = wanted; w; w = w->next)
{ {
if (w->format == format && quality_is_equal(&w->quality_out, &quality)) if (w->format == format && quality_is_equal(&w->quality, &quality))
return w; return w;
} }
@ -281,8 +331,8 @@ wanted_session_add(struct pipepair *p, struct streaming_wanted *w)
return -1; return -1;
} }
w->refcount++; w->num_sessions++;
DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->refcount=%d\n", p->readfd, w->refcount); DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->num_sessions=%d\n", p->readfd, w->num_sessions);
return 0; return 0;
} }
@ -307,73 +357,13 @@ wanted_session_remove(struct streaming_wanted *w, int readfd)
return; return;
} }
w->refcount--; w->num_sessions--;
DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->refcount=%d\n", readfd, w->refcount); DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->num_sessions=%d\n", readfd, w->num_sessions);
} }
/* ----------------------------- Thread: Worker ----------------------------- */ /* ----------------------------- Thread: Worker ----------------------------- */
static int
encode_reset(struct streaming_wanted *w, struct media_quality quality_in)
{
struct media_quality quality_out = w->quality_out;
struct decode_ctx *decode_ctx = NULL;
transcode_encode_cleanup(&w->xcode_ctx);
if (quality_in.bits_per_sample == 16)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM16, &quality_in);
else if (quality_in.bits_per_sample == 24)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM24, &quality_in);
else if (quality_in.bits_per_sample == 32)
decode_ctx = transcode_decode_setup_raw(XCODE_PCM32, &quality_in);
if (!decode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up decoder for input quality sr %d, bps %d, ch %d, cannot MP3 encode\n",
quality_in.sample_rate, quality_in.bits_per_sample, quality_in.channels);
goto error;
}
w->quality_in = quality_in;
w->xcode_ctx = transcode_encode_setup(XCODE_MP3, &quality_out, decode_ctx, NULL, 0, 0);
if (!w->xcode_ctx)
{
DPRINTF(E_LOG, L_STREAMING, "Error setting up encoder for output quality sr %d, bps %d, ch %d, cannot MP3 encode\n",
quality_out.sample_rate, quality_out.bits_per_sample, quality_out.channels);
goto error;
}
transcode_decode_cleanup(&decode_ctx);
return 0;
error:
transcode_decode_cleanup(&decode_ctx);
return -1;
}
static int
encode_frame(struct streaming_wanted *w, struct media_quality quality_in, transcode_frame *frame)
{
int ret;
if (!w->xcode_ctx || !quality_is_equal(&quality_in, &w->quality_in))
{
DPRINTF(E_DBG, L_STREAMING, "Resetting transcode context\n");
if (encode_reset(w, quality_in) < 0)
return -1;
}
ret = transcode_encode(w->encoded_data, w->xcode_ctx, frame, 0);
if (ret < 0)
{
return -1;
}
return 0;
}
static void static void
encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p) encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p)
{ {
@ -390,25 +380,76 @@ encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pip
} }
} }
static int
encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize)
{
ssize_t remaining_bytes;
transcode_frame *frame = NULL;
int ret;
if (buf)
{
evbuffer_add(w->audio_in, buf, bufsize);
}
else
{
// buf being null is either a silence timeout or that we could't find the
// subscripted quality. In both cases we encode silence.
memset(w->frame_data, 0, w->frame_size);
evbuffer_add(w->audio_in, w->frame_data, w->frame_size);
}
remaining_bytes = evbuffer_get_length(w->audio_in);
// Read and encode from 'audio_in' in chunks of 'frame_size' bytes
while (remaining_bytes > w->frame_size)
{
ret = evbuffer_remove(w->audio_in, w->frame_data, w->frame_size);
if (ret != w->frame_size)
{
DPRINTF(E_LOG, L_STREAMING, "Bug! Couldn't read a frame of %zu bytes (format %d)\n", w->frame_size, w->format);
goto error;
}
remaining_bytes -= w->frame_size;
frame = transcode_frame_new(w->frame_data, w->frame_size, w->nb_samples, &w->quality);
if (!frame)
{
DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame (format %d)\n", w->format);
goto error;
}
ret = transcode_encode(w->audio_out, w->xcode_ctx, frame, 0);
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Encoding error (format %d)\n", w->format);
goto error;
}
transcode_frame_free(frame);
}
return 0;
error:
transcode_frame_free(frame);
return -1;
}
static void static void
encode_data_cb(void *arg) encode_data_cb(void *arg)
{ {
struct encode_cmdarg *ctx = arg; struct encode_cmdarg *ctx = arg;
transcode_frame *frame; struct output_buffer *obuf = ctx->obuf;
struct streaming_wanted *w; struct streaming_wanted *w;
struct streaming_wanted *next; struct streaming_wanted *next;
uint8_t *buf; uint8_t *buf;
size_t bufsize;
size_t len; size_t len;
int ret; int ret;
int i; int i;
frame = transcode_frame_new(ctx->buf, ctx->bufsize, ctx->samples, &ctx->quality);
if (!frame)
{
DPRINTF(E_LOG, L_STREAMING, "Could not convert raw PCM to frame\n");
goto out;
}
pthread_mutex_lock(&streaming_wanted_lck); pthread_mutex_lock(&streaming_wanted_lck);
// To make sure we process the frames in order // To make sure we process the frames in order
@ -419,32 +460,48 @@ encode_data_cb(void *arg)
{ {
next = w->next; next = w->next;
ret = encode_frame(w, ctx->quality, frame); for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++)
if (ret < 0) {
wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error if (!quality_is_equal(&obuf->data[i].quality, &w->quality))
len = evbuffer_get_length(w->encoded_data);
if (len == 0)
continue; continue;
buf = evbuffer_pullup(w->encoded_data, -1); buf = obuf->data[i].buffer;
bufsize = obuf->data[i].bufsize;
}
ret = encode_buffer(w, buf, bufsize);
if (ret < 0)
{
wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error
continue;
}
len = evbuffer_get_length(w->audio_out);
if (len == 0)
{
continue;
}
buf = evbuffer_pullup(w->audio_out, -1);
for (i = 0; i < WANTED_PIPES_MAX; i++) for (i = 0; i < WANTED_PIPES_MAX; i++)
{
encode_write(buf, len, w, &w->pipes[i]); encode_write(buf, len, w, &w->pipes[i]);
}
evbuffer_drain(w->encoded_data, -1); evbuffer_drain(w->audio_out, -1);
if (w->refcount == 0) if (w->num_sessions == 0)
{
wanted_remove(&streaming.wanted, w); wanted_remove(&streaming.wanted, w);
} }
}
streaming.seqnum_encode_next++; streaming.seqnum_encode_next++;
pthread_cond_broadcast(&streaming_sequence_cond); pthread_cond_broadcast(&streaming_sequence_cond);
pthread_mutex_unlock(&streaming_wanted_lck); pthread_mutex_unlock(&streaming_wanted_lck);
out: outputs_buffer_free(ctx->obuf);
transcode_frame_free(frame);
free(ctx->buf);
} }
static void * static void *
@ -502,7 +559,7 @@ streaming_session_deregister(int readfd)
wanted_session_remove(w, readfd); wanted_session_remove(w, readfd);
if (w->refcount == 0) if (w->num_sessions == 0)
wanted_remove(&streaming.wanted, w); wanted_remove(&streaming.wanted, w);
out: out:
@ -519,59 +576,33 @@ streaming_metadatacb_register(streaming_metadatacb cb)
/* ----------------------------- Thread: Player ----------------------------- */ /* ----------------------------- Thread: Player ----------------------------- */
static void static void
encode_worker_invoke(uint8_t *buf, size_t bufsize, int samples, struct media_quality quality) streaming_write(struct output_buffer *obuf)
{ {
struct encode_cmdarg ctx; struct encode_cmdarg ctx;
if (quality.channels == 0) // No lock since this is just an early exit, it doesn't need to be accurate
{ if (!streaming.wanted)
DPRINTF(E_LOG, L_STREAMING, "Streaming quality is zero (%d/%d/%d)\n",
quality.sample_rate, quality.bits_per_sample, quality.channels);
return; return;
}
CHECK_NULL(L_STREAMING, ctx.buf = malloc(bufsize)); // We don't want to block the player, so we can't lock to access
memcpy(ctx.buf, buf, bufsize); // streaming.wanted and find which qualities we need. So we just copy it all
ctx.bufsize = bufsize; // and pass it to a worker thread that can lock and check what is wanted, and
ctx.samples = samples; // also can encode without holding the player.
ctx.quality = quality; ctx.obuf = outputs_buffer_copy(obuf);
ctx.seqnum = streaming.seqnum; ctx.seqnum = streaming.seqnum;
streaming.seqnum++; streaming.seqnum++;
worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0); worker_execute(encode_data_cb, &ctx, sizeof(struct encode_cmdarg), 0);
// In case this is the last player write() we want to start streaming silence
evtimer_add(streaming.silenceev, &streaming.silencetv);
} }
static void static void
silenceev_cb(evutil_socket_t fd, short event, void *arg) silenceev_cb(evutil_socket_t fd, short event, void *arg)
{ {
uint8_t silence[SILENCE_BUF_SIZE] = { 0 }; streaming_write(NULL);
int samples;
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
samples = BTOS(SILENCE_BUF_SIZE, streaming.last_quality.bits_per_sample, streaming.last_quality.channels);
encode_worker_invoke(silence, SILENCE_BUF_SIZE, samples, streaming.last_quality);
evtimer_add(streaming.silenceev, &streaming.silencetv);
}
static void
streaming_write(struct output_buffer *obuf)
{
// No lock since this is just an early exit, it doesn't need to be accurate
if (!streaming.wanted)
return;
encode_worker_invoke(obuf->data[0].buffer, obuf->data[0].bufsize, obuf->data[0].samples, obuf->data[0].quality);
streaming.last_quality = obuf->data[0].quality;
// In case this is the last player write() we want to start streaming silence
evtimer_add(streaming.silenceev, &streaming.silencetv);
} }
static void static void