[input] Refactor metadata handling + add playback wait timeout

Previously input_metadata_get() would retrieve artwork from the source being
read currently, which might not be the one that triggered the FLAG_METADATA
event. So to fix this the metadata is now read by the input module itself when
the METADATA event happens, and the result is stored with the marker.

The commit also includes a timer so that the input thread does loop forever
if the player never starts reading.

Also some refactoring of metadata + abolish input_metadata_get and
input_quality_get. The latter in an attempt to treat the two in the same way.
This commit is contained in:
ejurgensen
2019-03-18 22:57:18 +01:00
parent 992ab90876
commit ab0a6055b9
3 changed files with 191 additions and 144 deletions

View File

@@ -44,8 +44,10 @@
// Disallow further writes to the buffer when its size exceeds this threshold.
// The below gives us room to buffer 2 seconds of 48000/16/2 audio.
#define INPUT_BUFFER_THRESHOLD STOB(96000, 16, 2)
// How long (in sec) to wait for player read before looping in playback thread
// How long (in sec) to wait for player read before looping
#define INPUT_LOOP_TIMEOUT 1
// How long (in sec) to keep an input open without the player reading from it
#define INPUT_OPEN_TIMEOUT 600
#define DEBUG 1 //TODO disable
@@ -69,9 +71,14 @@ static struct input_definition *inputs[] = {
struct marker
{
uint64_t pos; // Position of marker measured in bytes
struct media_quality quality;
enum input_flags flags;
// Position of marker measured in bytes
uint64_t pos;
// Type of marker
enum input_flags flag;
// Data associated with the marker, e.g. quality or metadata struct
void *data;
// Reverse linked list, yay!
struct marker *prev;
@@ -116,7 +123,7 @@ static pthread_t tid_input;
// Event base, cmdbase and event we use to iterate in the playback loop
static struct event_base *evbase_input;
static struct commands_base *cmdbase;
static struct event *inputev;
static struct event *input_ev;
static bool input_initialized;
// The source we are reading now
@@ -128,6 +135,10 @@ static struct input_buffer input_buffer;
// Timeout waiting in playback loop
static struct timespec input_loop_timeout = { INPUT_LOOP_TIMEOUT, 0 };
// Timeout waiting for player read
static struct timeval input_open_timeout = { INPUT_OPEN_TIMEOUT, 0 };
static struct event *input_open_timeout_ev;
#ifdef DEBUG
static size_t debug_elapsed;
#endif
@@ -160,7 +171,92 @@ map_data_kind(int data_kind)
}
static void
marker_add(size_t pos, short flags)
metadata_free(struct input_metadata *metadata, int content_only)
{
free(metadata->artist);
free(metadata->title);
free(metadata->album);
free(metadata->genre);
free(metadata->artwork_url);
if (!content_only)
free(metadata);
else
memset(metadata, 0, sizeof(struct input_metadata));
}
static struct input_metadata *
metadata_get(struct input_source *source)
{
struct input_metadata *metadata;
struct db_queue_item *queue_item;
int ret;
if (!inputs[source->type]->metadata_get)
return NULL;
metadata = calloc(1, sizeof(struct input_metadata));
ret = inputs[source->type]->metadata_get(metadata, source);
if (ret < 0)
goto out_free_metadata;
queue_item = db_queue_fetch_byitemid(source->item_id);
if (!queue_item)
{
DPRINTF(E_LOG, L_PLAYER, "Bug! Input source item_id does not match anything in queue\n");
goto out_free_metadata;
}
// Update queue item if metadata changed
if (metadata->artist || metadata->title || metadata->album || metadata->genre || metadata->artwork_url || metadata->len_ms)
{
// Since we won't be using the metadata struct values for anything else
// than this we just swap pointers
if (metadata->artist)
swap_pointers(&queue_item->artist, &metadata->artist);
if (metadata->title)
swap_pointers(&queue_item->title, &metadata->title);
if (metadata->album)
swap_pointers(&queue_item->album, &metadata->album);
if (metadata->genre)
swap_pointers(&queue_item->genre, &metadata->genre);
if (metadata->artwork_url)
swap_pointers(&queue_item->artwork_url, &metadata->artwork_url);
if (metadata->len_ms)
queue_item->song_length = metadata->len_ms;
ret = db_queue_update_item(queue_item);
if (ret < 0)
DPRINTF(E_LOG, L_PLAYER, "Database error while updating queue with new metadata\n");
}
free_queue_item(queue_item, 0);
return metadata;
out_free_metadata:
metadata_free(metadata, 0);
return NULL;
}
static void
marker_free(struct marker *marker)
{
if (!marker)
return;
if (marker->flag == INPUT_FLAG_METADATA && marker->data)
metadata_free(marker->data, 0);
if (marker->flag == INPUT_FLAG_QUALITY && marker->data)
free(marker->data);
free(marker);
}
static void
marker_add(size_t pos, short flag, void *flagdata)
{
struct marker *head;
struct marker *marker;
@@ -168,8 +264,8 @@ marker_add(size_t pos, short flags)
CHECK_NULL(L_PLAYER, marker = calloc(1, sizeof(struct marker)));
marker->pos = pos;
marker->quality = input_buffer.cur_write_quality;
marker->flags = flags;
marker->flag = flag;
marker->data = flagdata;
for (head = input_buffer.marker_tail; head && head->prev; head = head->prev)
; // Fast forward to the head
@@ -180,6 +276,41 @@ marker_add(size_t pos, short flags)
head->prev = marker;
}
static void
markers_set(short flags, size_t write_size)
{
struct media_quality *quality;
struct input_metadata *metadata;
if (flags & INPUT_FLAG_QUALITY)
{
quality = malloc(sizeof(struct media_quality));
*quality = input_buffer.cur_write_quality;
marker_add(input_buffer.bytes_written - write_size, INPUT_FLAG_QUALITY, quality);
}
if (flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR))
{
// This controls when the player will open the next track in the queue
if (input_buffer.bytes_read + INPUT_BUFFER_THRESHOLD < input_buffer.bytes_written)
// The player's read is behind, tell it to open when it reaches where
// we are minus the buffer size
marker_add(input_buffer.bytes_written - INPUT_BUFFER_THRESHOLD, INPUT_FLAG_START_NEXT, NULL);
else
// The player's read is close to our write, so open right away
marker_add(input_buffer.bytes_read, INPUT_FLAG_START_NEXT, NULL);
marker_add(input_buffer.bytes_written, flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR), NULL);
}
if (flags & INPUT_FLAG_METADATA)
{
metadata = metadata_get(&input_now_reading);
if (metadata)
marker_add(input_buffer.bytes_written, INPUT_FLAG_METADATA, metadata);
}
}
/* ------------------------- INPUT SOURCE HANDLING -------------------------- */
@@ -202,9 +333,9 @@ flush(short *flags)
*flags = 0;
for (marker = input_buffer.marker_tail; marker; marker = input_buffer.marker_tail)
{
*flags |= marker->flags;
*flags |= marker->flag;
input_buffer.marker_tail = marker->prev;
free(marker);
marker_free(marker);
}
len = evbuffer_get_length(input_buffer.evbuf);
@@ -232,7 +363,8 @@ stop(void)
short flags;
int type;
event_del(inputev);
event_del(input_open_timeout_ev);
event_del(input_ev);
type = input_now_reading.type;
@@ -339,7 +471,8 @@ start(void *arg, int *retval)
DPRINTF(E_DBG, L_PLAYER, "Starting input read loop for item '%s' (item id %" PRIu32 "), seek %d\n",
input_now_reading.path, input_now_reading.item_id, cmdarg->seek_ms);
event_active(inputev, 0, 0);
event_add(input_open_timeout_ev, &input_open_timeout);
event_active(input_ev, 0, 0);
*retval = ret; // Return is the seek result
return COMMAND_END;
@@ -360,35 +493,12 @@ stop_cmd(void *arg, int *retval)
return COMMAND_END;
}
static enum command_state
metadata_get(void *arg, int *retval)
static void
timeout_cb(int fd, short what, void *arg)
{
struct input_arg *cmdarg = arg;
int type;
if (!input_now_reading.open)
{
DPRINTF(E_WARN, L_PLAYER, "Source is no longer available for input_metadata_get()\n");
goto error;
}
type = input_now_reading.type;
if ((type < 0) || (inputs[type]->disabled))
goto error;
if (inputs[type]->metadata_get)
*retval = inputs[type]->metadata_get(cmdarg->metadata, &input_now_reading);
else
*retval = 0;
return COMMAND_END;
error:
*retval = -1;
return COMMAND_END;
stop();
}
/* ---------------------- Interface towards input backends ------------------ */
/* Thread: input and spotify */
@@ -397,11 +507,14 @@ int
input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
{
bool read_end;
size_t len;
int ret;
pthread_mutex_lock(&input_buffer.mutex);
read_end = (flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR));
if (read_end)
input_now_reading.open = false;
if ((evbuffer_get_length(input_buffer.evbuf) > INPUT_BUFFER_THRESHOLD) && evbuf)
{
@@ -423,13 +536,15 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
if (quality && !quality_is_equal(quality, &input_buffer.cur_write_quality))
{
input_buffer.cur_write_quality = *quality;
marker_add(input_buffer.bytes_written, INPUT_FLAG_QUALITY);
flags |= INPUT_FLAG_QUALITY;
}
ret = 0;
len = 0;
if (evbuf)
{
input_buffer.bytes_written += evbuffer_get_length(evbuf);
len = evbuffer_get_length(evbuf);
input_buffer.bytes_written += len;
ret = evbuffer_add_buffer(input_buffer.evbuf, evbuf);
if (ret < 0)
{
@@ -440,24 +555,7 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
}
if (flags)
{
if (read_end)
{
input_now_reading.open = false;
// This controls when the player will open the next track in the queue
if (input_buffer.bytes_read + INPUT_BUFFER_THRESHOLD < input_buffer.bytes_written)
// The player's read is behind, tell it to open when it reaches where
// we are minus the buffer size
marker_add(input_buffer.bytes_written - INPUT_BUFFER_THRESHOLD, INPUT_FLAG_START_NEXT);
else
// The player's read is close to our write, so open right away
marker_add(input_buffer.bytes_read, INPUT_FLAG_START_NEXT);
}
// Note this marker is added at the post-write position, since EOF, error
// and metadata belong there.
marker_add(input_buffer.bytes_written, flags);
}
markers_set(flags, len);
pthread_mutex_unlock(&input_buffer.mutex);
@@ -537,7 +635,7 @@ play(evutil_socket_t fd, short flags, void *arg)
int ret;
// Spotify runs in its own thread, so no reading is done by the input thread,
// thus there is no reason to activate inputev
// thus there is no reason to activate input_ev
if (!inputs[input_now_reading.type]->play)
return;
@@ -550,7 +648,7 @@ play(evutil_socket_t fd, short flags, void *arg)
return; // Error or EOF, so don't come back
}
event_add(inputev, &tv);
event_add(input_ev, &tv);
}
@@ -558,12 +656,12 @@ play(evutil_socket_t fd, short flags, void *arg)
/* Thread: player */
int
input_read(void *data, size_t size, short *flags)
input_read(void *data, size_t size, short *flag, void **flagdata)
{
struct marker *marker;
int len;
*flags = 0;
*flag = 0;
pthread_mutex_lock(&input_buffer.mutex);
@@ -574,9 +672,8 @@ input_read(void *data, size_t size, short *flags)
marker = input_buffer.marker_tail;
if (marker && marker->pos <= input_buffer.bytes_read + size)
{
*flags = marker->flags;
if (*flags & INPUT_FLAG_QUALITY)
input_buffer.cur_read_quality = marker->quality;
*flag = marker->flag;
*flagdata = marker->data;
size = marker->pos - input_buffer.bytes_read;
input_buffer.marker_tail = marker->prev;
@@ -587,7 +684,7 @@ input_read(void *data, size_t size, short *flags)
if (len < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n");
*flags |= INPUT_FLAG_ERROR;
*flag = INPUT_FLAG_ERROR;
goto out_unlock;
}
@@ -595,9 +692,13 @@ input_read(void *data, size_t size, short *flags)
#ifdef DEBUG
// Logs if flags present or each 10 seconds
if (*flag & INPUT_FLAG_QUALITY)
input_buffer.cur_read_quality = *((struct media_quality *)marker->data);
size_t one_sec_size = STOB(input_buffer.cur_read_quality.sample_rate, input_buffer.cur_read_quality.bits_per_sample, input_buffer.cur_read_quality.channels);
debug_elapsed += len;
if (*flags || (debug_elapsed > 10 * one_sec_size))
if (*flag || (debug_elapsed > 10 * one_sec_size))
{
debug_elapsed = 0;
DPRINTF(E_DBG, L_PLAYER, "READ %zu bytes (%d/%d/%d), WROTE %zu bytes (%d/%d/%d), SIZE %zu (=%zu), FLAGS %04x\n",
@@ -611,7 +712,7 @@ input_read(void *data, size_t size, short *flags)
input_buffer.cur_write_quality.channels,
evbuffer_get_length(input_buffer.evbuf),
input_buffer.bytes_written - input_buffer.bytes_read,
*flags);
*flag);
}
#endif
@@ -668,37 +769,11 @@ input_flush(short *flags)
flush(flags);
}
int
input_quality_get(struct media_quality *quality)
{
// No mutex, other threads should not be able to affect cur_read_quality
*quality = input_buffer.cur_read_quality;
return 0;
}
int
input_metadata_get(struct input_metadata *metadata)
{
struct input_arg cmdarg;
cmdarg.metadata = metadata;
return commands_exec_sync(cmdbase, metadata_get, NULL, &cmdarg);
}
// Not currently used, perhaps remove?
void
input_metadata_free(struct input_metadata *metadata, int content_only)
{
free(metadata->artist);
free(metadata->title);
free(metadata->album);
free(metadata->genre);
free(metadata->artwork_url);
if (!content_only)
free(metadata);
else
memset(metadata, 0, sizeof(struct input_metadata));
metadata_free(metadata, content_only);
}
int
@@ -714,7 +789,8 @@ input_init(void)
CHECK_NULL(L_PLAYER, evbase_input = event_base_new());
CHECK_NULL(L_PLAYER, input_buffer.evbuf = evbuffer_new());
CHECK_NULL(L_PLAYER, inputev = event_new(evbase_input, -1, EV_PERSIST, play, NULL));
CHECK_NULL(L_PLAYER, input_ev = event_new(evbase_input, -1, EV_PERSIST, play, NULL));
CHECK_NULL(L_PLAYER, input_open_timeout_ev = evtimer_new(evbase_input, timeout_cb, NULL));
no_input = 1;
for (i = 0; inputs[i]; i++)
@@ -761,7 +837,6 @@ input_init(void)
thread_fail:
commands_base_free(cmdbase);
input_fail:
event_free(inputev);
evbuffer_free(input_buffer.evbuf);
event_base_free(evbase_input);
return -1;
@@ -798,7 +873,6 @@ input_deinit(void)
pthread_cond_destroy(&input_buffer.cond);
pthread_mutex_destroy(&input_buffer.mutex);
event_free(inputev);
evbuffer_free(input_buffer.evbuf);
event_base_free(evbase_input);
}