[streaming] Change how metadata is delivered to http streaming

This gets rid of player locks + the special header file outputs/streaming.h
This commit is contained in:
ejurgensen
2023-05-08 20:46:16 +02:00
parent 6364515fb7
commit f998b1f3dd
7 changed files with 217 additions and 177 deletions

View File

@@ -28,7 +28,6 @@
#include <uninorm.h>
#include <fcntl.h>
#include "streaming.h"
#include "outputs.h"
#include "misc.h"
#include "worker.h"
@@ -63,9 +62,10 @@ struct pipepair
struct streaming_wanted
{
int num_sessions; // for refcounting
struct pipepair pipes[WANTED_PIPES_MAX];
struct pipepair audio[WANTED_PIPES_MAX];
struct pipepair metadata[WANTED_PIPES_MAX];
enum streaming_format format;
enum player_format format;
struct media_quality quality;
struct evbuffer *audio_in;
@@ -86,12 +86,11 @@ struct streaming_ctx
struct timeval silencetv;
struct media_quality last_quality;
char title[4064]; // See STREAMING_ICY_METALEN_MAX in http_streaming.c
// seqnum may wrap around so must be unsigned
unsigned int seqnum;
unsigned int seqnum_encode_next;
// callback with new metadata, e.g. for ICY tags
void (*metadatacb)(char *metadata);
};
struct encode_cmdarg
@@ -114,7 +113,7 @@ extern struct event_base *evbase_player;
/* ------------------------------- Helpers ---------------------------------- */
static struct encode_ctx *
encoder_setup(enum streaming_format format, struct media_quality *quality)
encoder_setup(enum player_format format, struct media_quality *quality)
{
struct decode_ctx *decode_ctx = NULL;
struct encode_ctx *encode_ctx = NULL;
@@ -133,7 +132,7 @@ encoder_setup(enum streaming_format format, struct media_quality *quality)
goto out;
}
if (format == STREAMING_FORMAT_MP3)
if (format == PLAYER_FORMAT_MP3)
encode_ctx = transcode_encode_setup(XCODE_MP3, quality, decode_ctx, NULL, 0, 0);
if (!encode_ctx)
@@ -194,7 +193,9 @@ wanted_free(struct streaming_wanted *w)
return;
for (int i = 0; i < WANTED_PIPES_MAX; i++)
pipe_close(&w->pipes[i]);
pipe_close(&w->audio[i]);
for (int i = 0; i < WANTED_PIPES_MAX; i++)
pipe_close(&w->metadata[i]);
transcode_encode_cleanup(&w->xcode_ctx);
evbuffer_free(w->audio_in);
@@ -203,8 +204,20 @@ wanted_free(struct streaming_wanted *w)
free(w);
}
static int
pipe_index_find_byreadfd(struct pipepair *p, int readfd)
{
for (int i = 0; i < WANTED_PIPES_MAX; i++, p++)
{
if (p->readfd == readfd)
return i;
}
return -1;
}
static struct streaming_wanted *
wanted_new(enum streaming_format format, struct media_quality quality)
wanted_new(enum player_format format, struct media_quality quality)
{
struct streaming_wanted *w;
@@ -225,8 +238,10 @@ wanted_new(enum streaming_format format, struct media_quality quality)
for (int i = 0; i < WANTED_PIPES_MAX; i++)
{
w->pipes[i].writefd = -1;
w->pipes[i].readfd = -1;
w->audio[i].writefd = -1;
w->audio[i].readfd = -1;
w->metadata[i].writefd = -1;
w->metadata[i].readfd = -1;
}
return w;
@@ -262,7 +277,7 @@ wanted_remove(struct streaming_wanted **wanted, struct streaming_wanted *remove)
}
static struct streaming_wanted *
wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struct media_quality quality)
wanted_add(struct streaming_wanted **wanted, enum player_format format, struct media_quality quality)
{
struct streaming_wanted *w;
@@ -274,7 +289,7 @@ wanted_add(struct streaming_wanted **wanted, enum streaming_format format, struc
}
static struct streaming_wanted *
wanted_find_byformat(struct streaming_wanted *wanted, enum streaming_format format, struct media_quality quality)
wanted_find_byformat(struct streaming_wanted *wanted, enum player_format format, struct media_quality quality)
{
struct streaming_wanted *w;
@@ -294,31 +309,36 @@ wanted_find_byreadfd(struct streaming_wanted *wanted, int readfd)
int i;
for (w = wanted; w; w = w->next)
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].readfd == readfd)
return w;
}
{
i = pipe_index_find_byreadfd(w->audio, readfd);
if (i != -1)
return w;
}
return NULL;
}
static int
wanted_session_add(struct pipepair *p, struct streaming_wanted *w)
wanted_session_add(int *audiofd, int *metadatafd, struct streaming_wanted *w)
{
int ret;
int i;
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].writefd != -1) // In use
if (w->audio[i].writefd != -1) // In use
continue;
ret = pipe_open(&w->pipes[i]);
ret = pipe_open(&w->audio[i]);
if (ret < 0)
return -1;
memcpy(p, &w->pipes[i], sizeof(struct pipepair));
ret = pipe_open(&w->metadata[i]);
if (ret < 0)
return -1;
*audiofd = w->audio[i].readfd;
*metadatafd = w->metadata[i].readfd;
break;
}
@@ -329,31 +349,25 @@ wanted_session_add(struct pipepair *p, struct streaming_wanted *w)
}
w->num_sessions++;
DPRINTF(E_DBG, L_STREAMING, "Session register readfd %d, wanted->num_sessions=%d\n", p->readfd, w->num_sessions);
DPRINTF(E_DBG, L_STREAMING, "Session register audiofd %d, metadatafd %d, wanted->num_sessions=%d\n", *audiofd, *metadatafd, w->num_sessions);
return 0;
}
static void
wanted_session_remove(struct streaming_wanted *w, int readfd)
{
int i;
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->pipes[i].readfd != readfd)
continue;
pipe_close(&w->pipes[i]);
break;
}
if (i == WANTED_PIPES_MAX)
i = pipe_index_find_byreadfd(w->audio, readfd);
if (i < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Cannot remove streaming session, readfd %d not found\n", readfd);
return;
}
pipe_close(&w->audio[i]);
pipe_close(&w->metadata[i]);
w->num_sessions--;
DPRINTF(E_DBG, L_STREAMING, "Session deregister readfd %d, wanted->num_sessions=%d\n", readfd, w->num_sessions);
}
@@ -421,7 +435,6 @@ encode_buffer(struct streaming_wanted *w, uint8_t *buf, size_t bufsize)
static void
encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct output_buffer *obuf)
{
struct pipepair *p;
uint8_t *buf;
size_t bufsize;
size_t len;
@@ -444,10 +457,8 @@ encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct out
{
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
p = &w->pipes[i];
if (p->writefd < 0)
continue;
*failed_pipe_readfd = p->readfd;
if (w->audio[i].writefd != -1)
*failed_pipe_readfd = w->audio[i].readfd;
}
return;
@@ -462,15 +473,14 @@ encode_and_write(int *failed_pipe_readfd, struct streaming_wanted *w, struct out
buf = evbuffer_pullup(w->audio_out, -1);
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
p = &w->pipes[i];
if (p->writefd < 0)
if (w->audio[i].writefd == -1)
continue;
ret = write(p->writefd, buf, len);
ret = write(w->audio[i].writefd, buf, len);
if (ret < 0)
{
DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", p->writefd, w->format, strerror(errno));
*failed_pipe_readfd = p->readfd;
DPRINTF(E_LOG, L_STREAMING, "Error writing to stream pipe %d (format %d): %s\n", w->audio[i].writefd, w->format, strerror(errno));
*failed_pipe_readfd = w->audio[i].readfd;
}
}
@@ -506,11 +516,44 @@ encode_data_cb(void *arg)
player_streaming_deregister(failed_pipe_readfd);
}
static void
metadata_write(struct streaming_wanted *w, int readfd, const char *metadata)
{
size_t metadata_size;
int i;
int ret;
for (i = 0; i < WANTED_PIPES_MAX; i++)
{
if (w->metadata[i].writefd == -1)
continue;
if (readfd >= 0 && w->metadata[i].readfd != readfd)
continue;
metadata_size = strlen(metadata) + 1;
ret = write(w->metadata[i].writefd, metadata, metadata_size);
if (ret < 0)
DPRINTF(E_WARN, L_STREAMING, "Error writing metadata '%s' to fd %d\n", metadata, w->metadata[i].writefd);
}
}
static void
metadata_startup_cb(void *arg)
{
int *metadata_fd = arg;
struct streaming_wanted *w;
pthread_mutex_lock(&streaming_wanted_lck);
for (w = streaming.wanted; w; w = w->next)
metadata_write(w, *metadata_fd, streaming.title);
pthread_mutex_unlock(&streaming_wanted_lck);
}
static void *
streaming_metadata_prepare(struct output_metadata *metadata)
{
struct db_queue_item *queue_item;
char *title;
struct streaming_wanted *w;
queue_item = db_queue_fetch_byitemid(metadata->item_id);
if (!queue_item)
@@ -519,22 +562,19 @@ streaming_metadata_prepare(struct output_metadata *metadata)
return NULL;
}
title = safe_asprintf("%s - %s", queue_item->title, queue_item->artist);
pthread_mutex_lock(&streaming_wanted_lck);
// Save it here, we might need it later if a new session starts up
snprintf(streaming.title, sizeof(streaming.title), "%s - %s", queue_item->title, queue_item->artist);
for (w = streaming.wanted; w; w = w->next)
metadata_write(w, -1, streaming.title);
pthread_mutex_unlock(&streaming_wanted_lck);
free_queue_item(queue_item, 0);
return title;
return NULL;
}
/* ----------------------------- Thread: httpd ------------------------------ */
// Not thread safe, but only called once during httpd init
void
streaming_metadatacb_register(streaming_metadatacb cb)
{
streaming.metadatacb = cb;
}
/* ----------------------------- Thread: Player ----------------------------- */
static void
@@ -570,14 +610,7 @@ silenceev_cb(evutil_socket_t fd, short event, void *arg)
static void
streaming_metadata_send(struct output_metadata *metadata)
{
char *title = metadata->priv;
// Calls back to httpd_streaming to update the title
if (streaming.metadatacb)
streaming.metadatacb(title);
free(title);
outputs_metadata_free(metadata);
// Nothing to do, metadata_prepare() did all we needed in a worker thread
}
// Since this is streaming and there is no actual device, we will be called with
@@ -587,20 +620,22 @@ static int
streaming_start(struct output_device *device, int callback_id)
{
struct streaming_wanted *w;
struct pipepair pipe;
int ret;
pthread_mutex_lock(&streaming_wanted_lck);
w = wanted_find_byformat(streaming.wanted, device->format, device->quality);
if (!w)
w = wanted_add(&streaming.wanted, device->format, device->quality);
ret = wanted_session_add(&pipe, w);
ret = wanted_session_add(&device->audio_fd, &device->metadata_fd, w);
if (ret < 0)
goto error;
pthread_mutex_unlock(&streaming_wanted_lck);
worker_execute(metadata_startup_cb, &(device->metadata_fd), sizeof(device->metadata_fd), 0);
outputs_quality_subscribe(&device->quality);
device->id = pipe.readfd; // Not super clean
device->id = device->audio_fd;
return 0;
error:

View File

@@ -1,15 +0,0 @@
#ifndef __STREAMING_H__
#define __STREAMING_H__
typedef void (*streaming_metadatacb)(char *metadata);
enum streaming_format
{
STREAMING_FORMAT_MP3,
};
void
streaming_metadatacb_register(streaming_metadatacb cb);
#endif /* !__STREAMING_H__ */