[streaming] Use player_streaming_register() for thread safety

Also means we might be able to drop the special header file for
outputs/streaming.c making it a bit more like a regular output module.
This commit is contained in:
ejurgensen 2023-05-06 01:02:48 +02:00
parent dedd4a95c2
commit 6364515fb7
3 changed files with 143 additions and 118 deletions

View File

@ -31,6 +31,7 @@
#include <event2/buffer.h> #include <event2/buffer.h>
#include "httpd_internal.h" #include "httpd_internal.h"
#include "player.h"
#include "outputs/streaming.h" #include "outputs/streaming.h"
#include "logger.h" #include "logger.h"
#include "conffile.h" #include "conffile.h"
@ -169,7 +170,7 @@ session_free(struct streaming_session *session)
if (session->readev) if (session->readev)
{ {
streaming_session_deregister(session->fd); player_streaming_deregister(session->fd);
event_free(session->readev); event_free(session->readev);
} }
@ -238,7 +239,7 @@ read_cb(evutil_socket_t fd, short event, void *arg)
static int static int
streaming_mp3_handler(struct httpd_request *hreq) streaming_mp3_handler(struct httpd_request *hreq)
{ {
struct streaming_session *session; struct streaming_session *session = NULL;
const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name");
const char *param; const char *param;
bool icy_is_requested; bool icy_is_requested;
@ -255,10 +256,16 @@ streaming_mp3_handler(struct httpd_request *hreq)
session = session_new(hreq, icy_is_requested); session = session_new(hreq, icy_is_requested);
if (!session) if (!session)
return -1; {
goto error;
}
// Ask streaming output module for a fd to read mp3 from // Ask streaming output module for a fd to read mp3 from
session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality); session->fd = player_streaming_register(STREAMING_FORMAT_MP3, streaming_default_quality);
if (session->fd < 0)
{
goto error;
}
CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session));
event_add(session->readev, NULL); event_add(session->readev, NULL);
@ -274,6 +281,11 @@ streaming_mp3_handler(struct httpd_request *hreq)
httpd_send_reply_start(hreq, HTTP_OK, "OK"); httpd_send_reply_start(hreq, HTTP_OK, "OK");
return 0; return 0;
error:
session_free(session);
// Error message is sent by streaming_request()
return -1;
} }
static struct httpd_uri_map streaming_handlers[] = static struct httpd_uri_map streaming_handlers[] =

View File

@ -32,6 +32,7 @@
#include "outputs.h" #include "outputs.h"
#include "misc.h" #include "misc.h"
#include "worker.h" #include "worker.h"
#include "player.h"
#include "transcode.h" #include "transcode.h"
#include "logger.h" #include "logger.h"
#include "db.h" #include "db.h"
@ -195,8 +196,6 @@ wanted_free(struct streaming_wanted *w)
for (int i = 0; i < WANTED_PIPES_MAX; i++) for (int i = 0; i < WANTED_PIPES_MAX; i++)
pipe_close(&w->pipes[i]); pipe_close(&w->pipes[i]);
outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe
transcode_encode_cleanup(&w->xcode_ctx); transcode_encode_cleanup(&w->xcode_ctx);
evbuffer_free(w->audio_in); evbuffer_free(w->audio_in);
evbuffer_free(w->audio_out); evbuffer_free(w->audio_out);
@ -213,8 +212,6 @@ wanted_new(enum streaming_format format, struct media_quality quality)
CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new()); CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new());
CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new()); CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new());
outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only
w->xcode_ctx = encoder_setup(format, &quality); w->xcode_ctx = encoder_setup(format, &quality);
if (!w->xcode_ctx) if (!w->xcode_ctx)
goto error; goto error;
@ -364,22 +361,6 @@ wanted_session_remove(struct streaming_wanted *w, int readfd)
/* ----------------------------- Thread: Worker ----------------------------- */ /* ----------------------------- Thread: Worker ----------------------------- */
static void
encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p)
{
int ret;
if (p->writefd < 0)
return;
ret = write(p->writefd, buf, buflen);
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno));
wanted_session_remove(w, p->readfd);
}
}
static int static int
encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize)
{ {
@ -438,70 +419,91 @@ encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize)
} }
static void static void
encode_data_cb(void *arg) encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct output_buffer *obuf)
{ {
struct encode_cmdarg *ctx = arg; struct pipepair *p;
struct output_buffer *obuf = ctx->obuf;
struct streaming_wanted *w;
struct streaming_wanted *next;
uint8_t *buf; uint8_t *buf;
size_t bufsize; size_t bufsize;
size_t len; size_t len;
int ret; int ret;
int i; int i;
for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++)
{
if (!quality_is_equal(&obuf->data[i].quality, &w->quality))
continue;
buf = obuf->data[i].buffer;
bufsize = obuf->data[i].bufsize;
}
// If encoding fails we should kill the sessions, which for thread safety
// and to avoid deadlocks has to be done later with player_streaming_deregister()
ret = encode_buffer(w, buf, bufsize);
if (ret < 0)
{
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
p = &w->pipes[i];
if (p->writefd < 0)
continue;
*failed_pipe_readfd = p->readfd;
}
return;
}
len = evbuffer_get_length(w->audio_out);
if (len == 0)
{
return;
}
buf = evbuffer_pullup(w->audio_out, -1);
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
p = &w->pipes[i];
if (p->writefd < 0)
continue;
ret = write(p->writefd, buf, len);
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno));
*failed_pipe_readfd = p->readfd;
}
}
evbuffer_drain(w->audio_out, -1);
}
static void
encode_data_cb(void *arg)
{
struct encode_cmdarg *ctx = arg;
struct output_buffer *obuf = ctx->obuf;
struct streaming_wanted *w;
int failed_pipe_readfd = -1;
pthread_mutex_lock(&streaming_wanted_lck); pthread_mutex_lock(&streaming_wanted_lck);
// To make sure we process the frames in order // To make sure we process the frames in order
while (ctx->seqnum != streaming.seqnum_encode_next) while (ctx->seqnum != streaming.seqnum_encode_next)
pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck); pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck);
for (w = streaming.wanted; w; w = next) for (w = streaming.wanted; w; w = w->next)
{ encode_and_write(&failed_pipe_readfd, w, obuf);
next = w->next;
for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++)
{
if (!quality_is_equal(&obuf->data[i].quality, &w->quality))
continue;
buf = obuf->data[i].buffer;
bufsize = obuf->data[i].bufsize;
}
ret = encode_buffer(w, buf, bufsize);
if (ret < 0)
{
wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error
continue;
}
len = evbuffer_get_length(w->audio_out);
if (len == 0)
{
continue;
}
buf = evbuffer_pullup(w->audio_out, -1);
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
encode_write(buf, len, w, &w->pipes[i]);
}
evbuffer_drain(w->audio_out, -1);
if (w->num_sessions == 0)
{
wanted_remove(&streaming.wanted, w);
}
}
streaming.seqnum_encode_next++; streaming.seqnum_encode_next++;
pthread_cond_broadcast(&streaming_sequence_cond); pthread_cond_broadcast(&streaming_sequence_cond);
pthread_mutex_unlock(&streaming_wanted_lck); pthread_mutex_unlock(&streaming_wanted_lck);
outputs_buffer_free(ctx->obuf); outputs_buffer_free(ctx->obuf);
// We have to do this after letting go of the lock or we will deadlock. This
// unfortunate method means we can only fail one session (pipe) each pass.
if (failed_pipe_readfd >= 0)
player_streaming_deregister(failed_pipe_readfd);
} }
static void * static void *
@ -526,46 +528,6 @@ streaming_metadata_prepare(struct output_metadata *metadata)
/* ----------------------------- Thread: httpd ------------------------------ */ /* ----------------------------- Thread: httpd ------------------------------ */
int
streaming_session_register(enum streaming_format format, struct media_quality quality)
{
struct streaming_wanted *w;
struct pipepair pipe;
int ret;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byformat(streaming.wanted, format, quality);
if (!w)
w = wanted_add(&streaming.wanted, format, quality);
ret = wanted_session_add(&pipe, w);
if (ret < 0)
pipe.readfd = -1;
pthread_mutex_unlock(&streaming_wanted_lck);
return pipe.readfd;
}
void
streaming_session_deregister(int readfd)
{
struct streaming_wanted *w;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byreadfd(streaming.wanted, readfd);
if (!w)
goto out;
wanted_session_remove(w, readfd);
if (w->num_sessions == 0)
wanted_remove(&streaming.wanted, w);
out:
pthread_mutex_unlock(&streaming_wanted_lck);
}
// Not thread safe, but only called once during httpd init // Not thread safe, but only called once during httpd init
void void
streaming_metadatacb_register(streaming_metadatacb cb) streaming_metadatacb_register(streaming_metadatacb cb)
@ -618,6 +580,62 @@ streaming_metadata_send(struct output_metadata *metadata)
outputs_metadata_free(metadata); outputs_metadata_free(metadata);
} }
// Since this is streaming and there is no actual device, we will be called with
// a dummy/ad hoc device that's not part in the speaker list. We don't need to
// make any callback so can ignore callback_id.
static int
streaming_start(struct output_device *device, int callback_id)
{
struct streaming_wanted *w;
struct pipepair pipe;
int ret;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byformat(streaming.wanted, device->format, device->quality);
if (!w)
w = wanted_add(&streaming.wanted, device->format, device->quality);
ret = wanted_session_add(&pipe, w);
if (ret < 0)
goto error;
pthread_mutex_unlock(&streaming_wanted_lck);
outputs_quality_subscribe(&device->quality);
device->id = pipe.readfd; // Not super clean
return 0;
error:
if (w->num_sessions == 0)
wanted_remove(&streaming.wanted, w);
pthread_mutex_unlock(&streaming_wanted_lck);
return -1;
}
// Since this is streaming and there is no actual device, we will be called with
// a dummy/ad hoc device that's not part in the speaker list. We don't need to
// make any callback so can ignore callback_id.
static int
streaming_stop(struct output_device *device, int callback_id)
{
struct streaming_wanted *w;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byreadfd(streaming.wanted, device->id);
if (!w)
goto error;
device->quality = w->quality;
wanted_session_remove(w, device->id);
if (w->num_sessions == 0)
wanted_remove(&streaming.wanted, w);
pthread_mutex_unlock(&streaming_wanted_lck);
outputs_quality_unsubscribe(&device->quality);
return 0;
error:
pthread_mutex_unlock(&streaming_wanted_lck);
return -1;
}
static int static int
streaming_init(void) streaming_init(void)
{ {
@ -637,13 +655,16 @@ streaming_deinit(void)
struct output_definition output_streaming = struct output_definition output_streaming =
{ {
.name = "mp3 streaming", .name = "streaming",
.type = OUTPUT_TYPE_STREAMING, .type = OUTPUT_TYPE_STREAMING,
.priority = 0, .priority = 0,
.disabled = 0, .disabled = 0,
.init = streaming_init, .init = streaming_init,
.deinit = streaming_deinit, .deinit = streaming_deinit,
.write = streaming_write, .write = streaming_write,
.device_start = streaming_start,
.device_probe = streaming_start,
.device_stop = streaming_stop,
.metadata_prepare = streaming_metadata_prepare, .metadata_prepare = streaming_metadata_prepare,
.metadata_send = streaming_metadata_send, .metadata_send = streaming_metadata_send,
}; };

View File

@ -2,8 +2,6 @@
#ifndef __STREAMING_H__ #ifndef __STREAMING_H__
#define __STREAMING_H__ #define __STREAMING_H__
#include "misc.h" // struct media_quality
typedef void (*streaming_metadatacb)(char *metadata); typedef void (*streaming_metadatacb)(char *metadata);
enum streaming_format enum streaming_format
@ -11,12 +9,6 @@ enum streaming_format
STREAMING_FORMAT_MP3, STREAMING_FORMAT_MP3,
}; };
int
streaming_session_register(enum streaming_format format, struct media_quality quality);
void
streaming_session_deregister(int readfd);
void void
streaming_metadatacb_register(streaming_metadatacb cb); streaming_metadatacb_register(streaming_metadatacb cb);