From ab0a6055b93c4611d74e1cf04f82f28139341f48 Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Mon, 18 Mar 2019 22:57:18 +0100 Subject: [PATCH] [input] Refactor metadata handling + add playback wait timeout Previously input_metadata_get() would retrieve artwork from the source being read currently, which might not be the one that triggered the FLAG_METADATA event. So to fix this the metadata is now read by the input module itself when the METADATA event happens, and the result is stored with the marker. The commit also includes a timer so that the input thread does loop forever if the player never starts reading. Also some refactoring of metadata + abolish input_metadata_get and input_quality_get. The latter in an attempt to treat the two in the same way. --- src/input.c | 272 +++++++++++++++++++++++++++++----------------- src/input.h | 34 +++--- src/inputs/pipe.c | 29 +---- 3 files changed, 191 insertions(+), 144 deletions(-) diff --git a/src/input.c b/src/input.c index b418d049..0b98e336 100644 --- a/src/input.c +++ b/src/input.c @@ -44,8 +44,10 @@ // Disallow further writes to the buffer when its size exceeds this threshold. // The below gives us room to buffer 2 seconds of 48000/16/2 audio. #define INPUT_BUFFER_THRESHOLD STOB(96000, 16, 2) -// How long (in sec) to wait for player read before looping in playback thread +// How long (in sec) to wait for player read before looping #define INPUT_LOOP_TIMEOUT 1 +// How long (in sec) to keep an input open without the player reading from it +#define INPUT_OPEN_TIMEOUT 600 #define DEBUG 1 //TODO disable @@ -69,9 +71,14 @@ static struct input_definition *inputs[] = { struct marker { - uint64_t pos; // Position of marker measured in bytes - struct media_quality quality; - enum input_flags flags; + // Position of marker measured in bytes + uint64_t pos; + + // Type of marker + enum input_flags flag; + + // Data associated with the marker, e.g. quality or metadata struct + void *data; // Reverse linked list, yay! struct marker *prev; @@ -116,7 +123,7 @@ static pthread_t tid_input; // Event base, cmdbase and event we use to iterate in the playback loop static struct event_base *evbase_input; static struct commands_base *cmdbase; -static struct event *inputev; +static struct event *input_ev; static bool input_initialized; // The source we are reading now @@ -128,6 +135,10 @@ static struct input_buffer input_buffer; // Timeout waiting in playback loop static struct timespec input_loop_timeout = { INPUT_LOOP_TIMEOUT, 0 }; +// Timeout waiting for player read +static struct timeval input_open_timeout = { INPUT_OPEN_TIMEOUT, 0 }; +static struct event *input_open_timeout_ev; + #ifdef DEBUG static size_t debug_elapsed; #endif @@ -160,7 +171,92 @@ map_data_kind(int data_kind) } static void -marker_add(size_t pos, short flags) +metadata_free(struct input_metadata *metadata, int content_only) +{ + free(metadata->artist); + free(metadata->title); + free(metadata->album); + free(metadata->genre); + free(metadata->artwork_url); + + if (!content_only) + free(metadata); + else + memset(metadata, 0, sizeof(struct input_metadata)); +} + +static struct input_metadata * +metadata_get(struct input_source *source) +{ + struct input_metadata *metadata; + struct db_queue_item *queue_item; + int ret; + + if (!inputs[source->type]->metadata_get) + return NULL; + + metadata = calloc(1, sizeof(struct input_metadata)); + + ret = inputs[source->type]->metadata_get(metadata, source); + if (ret < 0) + goto out_free_metadata; + + queue_item = db_queue_fetch_byitemid(source->item_id); + if (!queue_item) + { + DPRINTF(E_LOG, L_PLAYER, "Bug! Input source item_id does not match anything in queue\n"); + goto out_free_metadata; + } + + // Update queue item if metadata changed + if (metadata->artist || metadata->title || metadata->album || metadata->genre || metadata->artwork_url || metadata->len_ms) + { + // Since we won't be using the metadata struct values for anything else + // than this we just swap pointers + if (metadata->artist) + swap_pointers(&queue_item->artist, &metadata->artist); + if (metadata->title) + swap_pointers(&queue_item->title, &metadata->title); + if (metadata->album) + swap_pointers(&queue_item->album, &metadata->album); + if (metadata->genre) + swap_pointers(&queue_item->genre, &metadata->genre); + if (metadata->artwork_url) + swap_pointers(&queue_item->artwork_url, &metadata->artwork_url); + if (metadata->len_ms) + queue_item->song_length = metadata->len_ms; + + ret = db_queue_update_item(queue_item); + if (ret < 0) + DPRINTF(E_LOG, L_PLAYER, "Database error while updating queue with new metadata\n"); + } + + free_queue_item(queue_item, 0); + + return metadata; + + out_free_metadata: + metadata_free(metadata, 0); + return NULL; +} + +static void +marker_free(struct marker *marker) +{ + if (!marker) + return; + + if (marker->flag == INPUT_FLAG_METADATA && marker->data) + metadata_free(marker->data, 0); + + if (marker->flag == INPUT_FLAG_QUALITY && marker->data) + free(marker->data); + + free(marker); +} + +static void +marker_add(size_t pos, short flag, void *flagdata) { struct marker *head; struct marker *marker; @@ -168,8 +264,8 @@ marker_add(size_t pos, short flags) CHECK_NULL(L_PLAYER, marker = calloc(1, sizeof(struct marker))); marker->pos = pos; - marker->quality = input_buffer.cur_write_quality; - marker->flags = flags; + marker->flag = flag; + marker->data = flagdata; for (head = input_buffer.marker_tail; head && head->prev; head = head->prev) ; // Fast forward to the head @@ -180,6 +276,41 @@ marker_add(size_t pos, short flags) head->prev = marker; } +static void +markers_set(short flags, size_t write_size) +{ + struct media_quality *quality; + struct input_metadata *metadata; + + if (flags & INPUT_FLAG_QUALITY) + { + quality = malloc(sizeof(struct media_quality)); + *quality = input_buffer.cur_write_quality; + marker_add(input_buffer.bytes_written - write_size, INPUT_FLAG_QUALITY, quality); + } + + if (flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR)) + { + // This controls when the player will open the next track in the queue + if (input_buffer.bytes_read + INPUT_BUFFER_THRESHOLD < input_buffer.bytes_written) + // The player's read is behind, tell it to open when it reaches where + // we are minus the buffer size + marker_add(input_buffer.bytes_written - INPUT_BUFFER_THRESHOLD, INPUT_FLAG_START_NEXT, NULL); + else + // The player's read is close to our write, so open right away + marker_add(input_buffer.bytes_read, INPUT_FLAG_START_NEXT, NULL); + + marker_add(input_buffer.bytes_written, flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR), NULL); + } + + if (flags & INPUT_FLAG_METADATA) + { + metadata = metadata_get(&input_now_reading); + if (metadata) + marker_add(input_buffer.bytes_written, INPUT_FLAG_METADATA, metadata); + } +} + /* ------------------------- INPUT SOURCE HANDLING -------------------------- */ @@ -202,9 +333,9 @@ flush(short *flags) *flags = 0; for (marker = input_buffer.marker_tail; marker; marker = input_buffer.marker_tail) { - *flags |= marker->flags; + *flags |= marker->flag; input_buffer.marker_tail = marker->prev; - free(marker); + marker_free(marker); } len = evbuffer_get_length(input_buffer.evbuf); @@ -232,7 +363,8 @@ stop(void) short flags; int type; - event_del(inputev); + event_del(input_open_timeout_ev); + event_del(input_ev); type = input_now_reading.type; @@ -339,7 +471,8 @@ start(void *arg, int *retval) DPRINTF(E_DBG, L_PLAYER, "Starting input read loop for item '%s' (item id %" PRIu32 "), seek %d\n", input_now_reading.path, input_now_reading.item_id, cmdarg->seek_ms); - event_active(inputev, 0, 0); + event_add(input_open_timeout_ev, &input_open_timeout); + event_active(input_ev, 0, 0); *retval = ret; // Return is the seek result return COMMAND_END; @@ -360,35 +493,12 @@ stop_cmd(void *arg, int *retval) return COMMAND_END; } -static enum command_state -metadata_get(void *arg, int *retval) +static void +timeout_cb(int fd, short what, void *arg) { - struct input_arg *cmdarg = arg; - int type; - - if (!input_now_reading.open) - { - DPRINTF(E_WARN, L_PLAYER, "Source is no longer available for input_metadata_get()\n"); - goto error; - } - - type = input_now_reading.type; - if ((type < 0) || (inputs[type]->disabled)) - goto error; - - if (inputs[type]->metadata_get) - *retval = inputs[type]->metadata_get(cmdarg->metadata, &input_now_reading); - else - *retval = 0; - - return COMMAND_END; - - error: - *retval = -1; - return COMMAND_END; + stop(); } - /* ---------------------- Interface towards input backends ------------------ */ /* Thread: input and spotify */ @@ -397,11 +507,14 @@ int input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags) { bool read_end; + size_t len; int ret; pthread_mutex_lock(&input_buffer.mutex); read_end = (flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR)); + if (read_end) + input_now_reading.open = false; if ((evbuffer_get_length(input_buffer.evbuf) > INPUT_BUFFER_THRESHOLD) && evbuf) { @@ -423,13 +536,15 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags) if (quality && !quality_is_equal(quality, &input_buffer.cur_write_quality)) { input_buffer.cur_write_quality = *quality; - marker_add(input_buffer.bytes_written, INPUT_FLAG_QUALITY); + flags |= INPUT_FLAG_QUALITY; } ret = 0; + len = 0; if (evbuf) { - input_buffer.bytes_written += evbuffer_get_length(evbuf); + len = evbuffer_get_length(evbuf); + input_buffer.bytes_written += len; ret = evbuffer_add_buffer(input_buffer.evbuf, evbuf); if (ret < 0) { @@ -440,24 +555,7 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags) } if (flags) - { - if (read_end) - { - input_now_reading.open = false; - // This controls when the player will open the next track in the queue - if (input_buffer.bytes_read + INPUT_BUFFER_THRESHOLD < input_buffer.bytes_written) - // The player's read is behind, tell it to open when it reaches where - // we are minus the buffer size - marker_add(input_buffer.bytes_written - INPUT_BUFFER_THRESHOLD, INPUT_FLAG_START_NEXT); - else - // The player's read is close to our write, so open right away - marker_add(input_buffer.bytes_read, INPUT_FLAG_START_NEXT); - } - - // Note this marker is added at the post-write position, since EOF, error - // and metadata belong there. - marker_add(input_buffer.bytes_written, flags); - } + markers_set(flags, len); pthread_mutex_unlock(&input_buffer.mutex); @@ -537,7 +635,7 @@ play(evutil_socket_t fd, short flags, void *arg) int ret; // Spotify runs in its own thread, so no reading is done by the input thread, - // thus there is no reason to activate inputev + // thus there is no reason to activate input_ev if (!inputs[input_now_reading.type]->play) return; @@ -550,7 +648,7 @@ play(evutil_socket_t fd, short flags, void *arg) return; // Error or EOF, so don't come back } - event_add(inputev, &tv); + event_add(input_ev, &tv); } @@ -558,12 +656,12 @@ play(evutil_socket_t fd, short flags, void *arg) /* Thread: player */ int -input_read(void *data, size_t size, short *flags) +input_read(void *data, size_t size, short *flag, void **flagdata) { struct marker *marker; int len; - *flags = 0; + *flag = 0; pthread_mutex_lock(&input_buffer.mutex); @@ -574,9 +672,8 @@ input_read(void *data, size_t size, short *flags) marker = input_buffer.marker_tail; if (marker && marker->pos <= input_buffer.bytes_read + size) { - *flags = marker->flags; - if (*flags & INPUT_FLAG_QUALITY) - input_buffer.cur_read_quality = marker->quality; + *flag = marker->flag; + *flagdata = marker->data; size = marker->pos - input_buffer.bytes_read; input_buffer.marker_tail = marker->prev; @@ -587,7 +684,7 @@ input_read(void *data, size_t size, short *flags) if (len < 0) { DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n"); - *flags |= INPUT_FLAG_ERROR; + *flag = INPUT_FLAG_ERROR; goto out_unlock; } @@ -595,9 +692,13 @@ input_read(void *data, size_t size, short *flags) #ifdef DEBUG // Logs if flags present or each 10 seconds + + if (*flag & INPUT_FLAG_QUALITY) + input_buffer.cur_read_quality = *((struct media_quality *)marker->data); + size_t one_sec_size = STOB(input_buffer.cur_read_quality.sample_rate, input_buffer.cur_read_quality.bits_per_sample, input_buffer.cur_read_quality.channels); debug_elapsed += len; - if (*flags || (debug_elapsed > 10 * one_sec_size)) + if (*flag || (debug_elapsed > 10 * one_sec_size)) { debug_elapsed = 0; DPRINTF(E_DBG, L_PLAYER, "READ %zu bytes (%d/%d/%d), WROTE %zu bytes (%d/%d/%d), SIZE %zu (=%zu), FLAGS %04x\n", @@ -611,7 +712,7 @@ input_read(void *data, size_t size, short *flags) input_buffer.cur_write_quality.channels, evbuffer_get_length(input_buffer.evbuf), input_buffer.bytes_written - input_buffer.bytes_read, - *flags); + *flag); } #endif @@ -668,37 +769,11 @@ input_flush(short *flags) flush(flags); } -int -input_quality_get(struct media_quality *quality) -{ - // No mutex, other threads should not be able to affect cur_read_quality - *quality = input_buffer.cur_read_quality; - return 0; -} - -int -input_metadata_get(struct input_metadata *metadata) -{ - struct input_arg cmdarg; - - cmdarg.metadata = metadata; - - return commands_exec_sync(cmdbase, metadata_get, NULL, &cmdarg); -} - +// Not currently used, perhaps remove? void input_metadata_free(struct input_metadata *metadata, int content_only) { - free(metadata->artist); - free(metadata->title); - free(metadata->album); - free(metadata->genre); - free(metadata->artwork_url); - - if (!content_only) - free(metadata); - else - memset(metadata, 0, sizeof(struct input_metadata)); + metadata_free(metadata, content_only); } int @@ -714,7 +789,8 @@ input_init(void) CHECK_NULL(L_PLAYER, evbase_input = event_base_new()); CHECK_NULL(L_PLAYER, input_buffer.evbuf = evbuffer_new()); - CHECK_NULL(L_PLAYER, inputev = event_new(evbase_input, -1, EV_PERSIST, play, NULL)); + CHECK_NULL(L_PLAYER, input_ev = event_new(evbase_input, -1, EV_PERSIST, play, NULL)); + CHECK_NULL(L_PLAYER, input_open_timeout_ev = evtimer_new(evbase_input, timeout_cb, NULL)); no_input = 1; for (i = 0; inputs[i]; i++) @@ -761,7 +837,6 @@ input_init(void) thread_fail: commands_base_free(cmdbase); input_fail: - event_free(inputev); evbuffer_free(input_buffer.evbuf); event_base_free(evbase_input); return -1; @@ -798,7 +873,6 @@ input_deinit(void) pthread_cond_destroy(&input_buffer.cond); pthread_mutex_destroy(&input_buffer.mutex); - event_free(inputev); evbuffer_free(input_buffer.evbuf); event_base_free(evbase_input); } diff --git a/src/input.h b/src/input.h index e3797366..18407f6d 100644 --- a/src/input.h +++ b/src/input.h @@ -71,22 +71,25 @@ typedef int (*input_cb)(void); struct input_metadata { + // queue_item id uint32_t item_id; - int startup; + // Input can override the default player progress by setting this + // FIXME only implemented for Airplay speakers currently + uint32_t pos_ms; - uint64_t start; - uint64_t rtptime; - uint64_t offset; - - // The player will update queue_item with the below - uint32_t song_length; + // Sets new song length (input will also update queue_item) + uint32_t len_ms; + // Input can update queue_item with the below char *artist; char *title; char *album; char *genre; char *artwork_url; + + // Indicates whether we are starting playback. Just passed on to output. + int startup; }; struct input_definition @@ -164,11 +167,12 @@ input_wait(void); * * @in data Output buffer * @in size How much data to move to the output buffer - * @out flags Flags INPUT_FLAG_* + * @out flag Flag INPUT_FLAG_* + * @out flagdata Data associated with the flag, e.g. quality or metadata struct * @return Number of bytes moved, -1 on error */ int -input_read(void *data, size_t size, short *flags); +input_read(void *data, size_t size, short *flag, void **flagdata); /* * Player can set this to get a callback from the input when the input buffer @@ -212,18 +216,6 @@ input_stop(void); void input_flush(short *flags); -/* - * Returns the current quality of data returned by intput_read(). - */ -int -input_quality_get(struct media_quality *quality); - -/* - * Gets metadata from the input, returns 0 if metadata is set, otherwise -1 - */ -int -input_metadata_get(struct input_metadata *metadata); - /* * Free the entire struct */ diff --git a/src/inputs/pipe.c b/src/inputs/pipe.c index edd14357..85caaf55 100644 --- a/src/inputs/pipe.c +++ b/src/inputs/pipe.c @@ -308,9 +308,10 @@ parse_progress(struct input_metadata *m, char *progress) if (!start || !pos || !end) return; - m->rtptime = start; // Not actually used - we have our own rtptime - m->offset = (pos > start) ? (pos - start) : 0; - m->song_length = (end - start) * 1000 / pipe_sample_rate; + if (pos > start) + m->pos_ms = (pos - start) * 1000 / pipe_sample_rate; + if (end > start) + m->len_ms = (end - start) * 1000 / pipe_sample_rate; } static void @@ -907,27 +908,7 @@ metadata_get(struct input_metadata *metadata, struct input_source *source) { pthread_mutex_lock(&pipe_metadata_lock); - if (pipe_metadata_parsed.artist) - swap_pointers(&metadata->artist, &pipe_metadata_parsed.artist); - if (pipe_metadata_parsed.title) - swap_pointers(&metadata->title, &pipe_metadata_parsed.title); - if (pipe_metadata_parsed.album) - swap_pointers(&metadata->album, &pipe_metadata_parsed.album); - if (pipe_metadata_parsed.genre) - swap_pointers(&metadata->genre, &pipe_metadata_parsed.genre); - if (pipe_metadata_parsed.artwork_url) - swap_pointers(&metadata->artwork_url, &pipe_metadata_parsed.artwork_url); - - if (pipe_metadata_parsed.song_length) - { -// TODO this is probably broken - if (metadata->rtptime > metadata->start) - metadata->rtptime -= pipe_metadata_parsed.offset; - metadata->offset = pipe_metadata_parsed.offset; - metadata->song_length = pipe_metadata_parsed.song_length; - } - - input_metadata_free(&pipe_metadata_parsed, 1); + *metadata = pipe_metadata_parsed; pthread_mutex_unlock(&pipe_metadata_lock);