diff --git a/src/httpd_streaming.c b/src/httpd_streaming.c index 1306e390..6517b986 100644 --- a/src/httpd_streaming.c +++ b/src/httpd_streaming.c @@ -31,6 +31,7 @@ #include #include "httpd_internal.h" +#include "player.h" #include "outputs/streaming.h" #include "logger.h" #include "conffile.h" @@ -169,7 +170,7 @@ session_free(struct streaming_session *session) if (session->readev) { - streaming_session_deregister(session->fd); + player_streaming_deregister(session->fd); event_free(session->readev); } @@ -238,7 +239,7 @@ read_cb(evutil_socket_t fd, short event, void *arg) static int streaming_mp3_handler(struct httpd_request *hreq) { - struct streaming_session *session; + struct streaming_session *session = NULL; const char *name = cfg_getstr(cfg_getsec(cfg, "library"), "name"); const char *param; bool icy_is_requested; @@ -255,10 +256,16 @@ streaming_mp3_handler(struct httpd_request *hreq) session = session_new(hreq, icy_is_requested); if (!session) - return -1; + { + goto error; + } // Ask streaming output module for a fd to read mp3 from - session->fd = streaming_session_register(STREAMING_FORMAT_MP3, streaming_default_quality); + session->fd = player_streaming_register(STREAMING_FORMAT_MP3, streaming_default_quality); + if (session->fd < 0) + { + goto error; + } CHECK_NULL(L_STREAMING, session->readev = event_new(hreq->evbase, session->fd, EV_READ | EV_PERSIST, read_cb, session)); event_add(session->readev, NULL); @@ -274,6 +281,11 @@ streaming_mp3_handler(struct httpd_request *hreq) httpd_send_reply_start(hreq, HTTP_OK, "OK"); return 0; + + error: + session_free(session); + // Error message is sent by streaming_request() + return -1; } static struct httpd_uri_map streaming_handlers[] = diff --git a/src/outputs/streaming.c b/src/outputs/streaming.c index f0ab5b95..91a8c574 100644 --- a/src/outputs/streaming.c +++ b/src/outputs/streaming.c @@ -32,6 +32,7 @@ #include "outputs.h" #include "misc.h" #include "worker.h" +#include "player.h" #include "transcode.h" #include "logger.h" #include "db.h" @@ -195,8 +196,6 @@ wanted_free(struct streaming_wanted *w) for (int i = 0; i < WANTED_PIPES_MAX; i++) pipe_close(&w->pipes[i]); - outputs_quality_unsubscribe(&w->quality); // FIXME not thread safe - transcode_encode_cleanup(&w->xcode_ctx); evbuffer_free(w->audio_in); evbuffer_free(w->audio_out); @@ -213,8 +212,6 @@ wanted_new(enum streaming_format format, struct media_quality quality) CHECK_NULL(L_STREAMING, w->audio_in = evbuffer_new()); CHECK_NULL(L_STREAMING, w->audio_out = evbuffer_new()); - outputs_quality_subscribe(&quality); // FIXME not thread safe, should be called from player thread only - w->xcode_ctx = encoder_setup(format, &quality); if (!w->xcode_ctx) goto error; @@ -364,22 +361,6 @@ wanted_session_remove(struct streaming_wanted *w, int readfd) /* ----------------------------- Thread: Worker ----------------------------- */ -static void -encode_write(uint8_t *buf, size_t buflen, struct streaming_wanted *w, struct pipepair *p) -{ - int ret; - - if (p->writefd < 0) - return; - - ret = write(p->writefd, buf, buflen); - if (ret < 0) - { - DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno)); - wanted_session_remove(w, p->readfd); - } -} - static int encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) { @@ -438,70 +419,91 @@ encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize) } static void -encode_data_cb(void *arg) +encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct output_buffer *obuf) { - struct encode_cmdarg *ctx = arg; - struct output_buffer *obuf = ctx->obuf; - struct streaming_wanted *w; - struct streaming_wanted *next; + struct pipepair *p; uint8_t *buf; size_t bufsize; size_t len; int ret; int i; + for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) + { + if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) + continue; + + buf = obuf->data[i].buffer; + bufsize = obuf->data[i].bufsize; + } + + // If encoding fails we should kill the sessions, which for thread safety + // and to avoid deadlocks has to be done later with player_streaming_deregister() + ret = encode_buffer(w, buf, bufsize); + if (ret < 0) + { + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + p = &w->pipes[i]; + if (p->writefd < 0) + continue; + *failed_pipe_readfd = p->readfd; + } + + return; + } + + len = evbuffer_get_length(w->audio_out); + if (len == 0) + { + return; + } + + buf = evbuffer_pullup(w->audio_out, -1); + for (i = 0; i < WANTED_PIPES_MAX; i++) + { + p = &w->pipes[i]; + if (p->writefd < 0) + continue; + + ret = write(p->writefd, buf, len); + if (ret < 0) + { + DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno)); + *failed_pipe_readfd = p->readfd; + } + } + + evbuffer_drain(w->audio_out, -1); +} + +static void +encode_data_cb(void *arg) +{ + struct encode_cmdarg *ctx = arg; + struct output_buffer *obuf = ctx->obuf; + struct streaming_wanted *w; + int failed_pipe_readfd = -1; + pthread_mutex_lock(&streaming_wanted_lck); // To make sure we process the frames in order while (ctx->seqnum != streaming.seqnum_encode_next) pthread_cond_wait(&streaming_sequence_cond, &streaming_wanted_lck); - for (w = streaming.wanted; w; w = next) - { - next = w->next; - - for (i = 0, buf = NULL, bufsize = 0; obuf && obuf->data[i].buffer; i++) - { - if (!quality_is_equal(&obuf->data[i].quality, &w->quality)) - continue; - - buf = obuf->data[i].buffer; - bufsize = obuf->data[i].bufsize; - } - - ret = encode_buffer(w, buf, bufsize); - if (ret < 0) - { - wanted_remove(&streaming.wanted, w); // This will close all the fds, so readers get an error - continue; - } - - len = evbuffer_get_length(w->audio_out); - if (len == 0) - { - continue; - } - - buf = evbuffer_pullup(w->audio_out, -1); - - for (i = 0; i < WANTED_PIPES_MAX; i++) - { - encode_write(buf, len, w, &w->pipes[i]); - } - - evbuffer_drain(w->audio_out, -1); - - if (w->num_sessions == 0) - { - wanted_remove(&streaming.wanted, w); - } - } + for (w = streaming.wanted; w; w = w->next) + encode_and_write(&failed_pipe_readfd, w, obuf); streaming.seqnum_encode_next++; pthread_cond_broadcast(&streaming_sequence_cond); pthread_mutex_unlock(&streaming_wanted_lck); outputs_buffer_free(ctx->obuf); + + // We have to do this after letting go of the lock or we will deadlock. This + // unfortunate method means we can only fail one session (pipe) each pass. + if (failed_pipe_readfd >= 0) + player_streaming_deregister(failed_pipe_readfd); } static void * @@ -526,46 +528,6 @@ streaming_metadata_prepare(struct output_metadata *metadata) /* ----------------------------- Thread: httpd ------------------------------ */ -int -streaming_session_register(enum streaming_format format, struct media_quality quality) -{ - struct streaming_wanted *w; - struct pipepair pipe; - int ret; - - pthread_mutex_lock(&streaming_wanted_lck); - w = wanted_find_byformat(streaming.wanted, format, quality); - if (!w) - w = wanted_add(&streaming.wanted, format, quality); - - ret = wanted_session_add(&pipe, w); - if (ret < 0) - pipe.readfd = -1; - - pthread_mutex_unlock(&streaming_wanted_lck); - - return pipe.readfd; -} - -void -streaming_session_deregister(int readfd) -{ - struct streaming_wanted *w; - - pthread_mutex_lock(&streaming_wanted_lck); - w = wanted_find_byreadfd(streaming.wanted, readfd); - if (!w) - goto out; - - wanted_session_remove(w, readfd); - - if (w->num_sessions == 0) - wanted_remove(&streaming.wanted, w); - - out: - pthread_mutex_unlock(&streaming_wanted_lck); -} - // Not thread safe, but only called once during httpd init void streaming_metadatacb_register(streaming_metadatacb cb) @@ -618,6 +580,62 @@ streaming_metadata_send(struct output_metadata *metadata) outputs_metadata_free(metadata); } +// Since this is streaming and there is no actual device, we will be called with +// a dummy/ad hoc device that's not part in the speaker list. We don't need to +// make any callback so can ignore callback_id. +static int +streaming_start(struct output_device *device, int callback_id) +{ + struct streaming_wanted *w; + struct pipepair pipe; + int ret; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byformat(streaming.wanted, device->format, device->quality); + if (!w) + w = wanted_add(&streaming.wanted, device->format, device->quality); + ret = wanted_session_add(&pipe, w); + if (ret < 0) + goto error; + pthread_mutex_unlock(&streaming_wanted_lck); + + outputs_quality_subscribe(&device->quality); + device->id = pipe.readfd; // Not super clean + return 0; + + error: + if (w->num_sessions == 0) + wanted_remove(&streaming.wanted, w); + pthread_mutex_unlock(&streaming_wanted_lck); + return -1; +} + +// Since this is streaming and there is no actual device, we will be called with +// a dummy/ad hoc device that's not part in the speaker list. We don't need to +// make any callback so can ignore callback_id. +static int +streaming_stop(struct output_device *device, int callback_id) +{ + struct streaming_wanted *w; + + pthread_mutex_lock(&streaming_wanted_lck); + w = wanted_find_byreadfd(streaming.wanted, device->id); + if (!w) + goto error; + device->quality = w->quality; + wanted_session_remove(w, device->id); + if (w->num_sessions == 0) + wanted_remove(&streaming.wanted, w); + pthread_mutex_unlock(&streaming_wanted_lck); + + outputs_quality_unsubscribe(&device->quality); + return 0; + + error: + pthread_mutex_unlock(&streaming_wanted_lck); + return -1; +} + static int streaming_init(void) { @@ -637,13 +655,16 @@ streaming_deinit(void) struct output_definition output_streaming = { - .name = "mp3 streaming", + .name = "streaming", .type = OUTPUT_TYPE_STREAMING, .priority = 0, .disabled = 0, .init = streaming_init, .deinit = streaming_deinit, .write = streaming_write, + .device_start = streaming_start, + .device_probe = streaming_start, + .device_stop = streaming_stop, .metadata_prepare = streaming_metadata_prepare, .metadata_send = streaming_metadata_send, }; diff --git a/src/outputs/streaming.h b/src/outputs/streaming.h index 34673345..49ba71d2 100644 --- a/src/outputs/streaming.h +++ b/src/outputs/streaming.h @@ -2,8 +2,6 @@ #ifndef __STREAMING_H__ #define __STREAMING_H__ -#include "misc.h" // struct media_quality - typedef void (*streaming_metadatacb)(char *metadata); enum streaming_format @@ -11,12 +9,6 @@ enum streaming_format STREAMING_FORMAT_MP3, }; -int -streaming_session_register(enum streaming_format format, struct media_quality quality); - -void -streaming_session_deregister(int readfd); - void streaming_metadatacb_register(streaming_metadatacb cb);