[player/input] Refactor - WIP

* Open input sources earlier
* Gapless playback
* Remove fixed 44100/16 from player
* Complete restructure player internals
This commit is contained in:
ejurgensen
2019-02-16 19:34:36 +01:00
parent e97ad7d970
commit 87ca6363ae
12 changed files with 1315 additions and 1315 deletions

View File

@@ -22,6 +22,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
@@ -37,6 +38,7 @@
#include "misc.h"
#include "logger.h"
#include "commands.h"
#include "input.h"
// Disallow further writes to the buffer when its size is larger than this threshold
@@ -100,10 +102,26 @@ struct input_buffer
pthread_cond_t cond;
};
struct input_arg
{
uint32_t item_id;
int seek_ms;
struct input_metadata *metadata;
};
/* --- Globals --- */
// Input thread
static pthread_t tid_input;
// Event base, cmdbase and event we use to iterate in the playback loop
static struct event_base *evbase_input;
static struct commands_base *cmdbase;
static struct event *inputev;
static bool input_initialized;
// The source we are reading now
static struct input_source input_now_reading;
// Input buffer
static struct input_buffer input_buffer;
@@ -115,7 +133,7 @@ static size_t debug_elapsed;
#endif
/* ------------------------------ MISC HELPERS ---------------------------- */
/* ------------------------------- MISC HELPERS ----------------------------- */
static int
map_data_kind(int data_kind)
@@ -142,14 +160,14 @@ map_data_kind(int data_kind)
}
static void
marker_add(short flags)
marker_add(size_t pos, short flags)
{
struct marker *head;
struct marker *marker;
CHECK_NULL(L_PLAYER, marker = calloc(1, sizeof(struct marker)));
marker->pos = input_buffer.bytes_written;
marker->pos = pos;
marker->quality = input_buffer.cur_write_quality;
marker->flags = flags;
@@ -162,88 +180,280 @@ marker_add(short flags)
head->prev = marker;
}
static int
source_check_and_map(struct player_source *ps, const char *action, char check_setup)
/* ------------------------- INPUT SOURCE HANDLING -------------------------- */
static void
clear(struct input_source *source)
{
int type;
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Action is %s\n", action);
#endif
if (!ps)
{
DPRINTF(E_LOG, L_PLAYER, "Stream %s called with invalid player source\n", action);
return -1;
}
if (check_setup && !ps->setup_done)
{
DPRINTF(E_LOG, L_PLAYER, "Given player source not setup, %s not possible\n", action);
return -1;
}
type = map_data_kind(ps->data_kind);
if (type < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Unsupported input type, %s not possible\n", action);
return -1;
}
return type;
free(source->path);
memset(source, 0, sizeof(struct input_source));
}
/* ----------------------------- PLAYBACK LOOP ---------------------------- */
/* Thread: input */
// TODO Thread safety of ps?
static void *
playback(void *arg)
static void
flush(short *flags)
{
struct player_source *ps = arg;
int type;
int ret;
type = source_check_and_map(ps, "start", 1);
if ((type < 0) || (inputs[type]->disabled))
goto thread_exit;
// Loops until input_loop_break is set or no more input, e.g. EOF
ret = inputs[type]->start(ps);
if (ret < 0)
input_write(NULL, NULL, INPUT_FLAG_ERROR);
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Playback loop stopped (break is %d, ret %d)\n", input_loop_break, ret);
#endif
thread_exit:
pthread_exit(NULL);
}
void
input_wait(void)
{
struct timespec ts;
struct marker *marker;
size_t len;
pthread_mutex_lock(&input_buffer.mutex);
ts = timespec_reltoabs(input_loop_timeout);
pthread_cond_timedwait(&input_buffer.cond, &input_buffer.mutex, &ts);
// We will return an OR of all the unread marker flags
*flags = 0;
for (marker = input_buffer.marker_tail; marker; marker = input_buffer.marker_tail)
{
*flags |= marker->flags;
input_buffer.marker_tail = marker->prev;
free(marker);
}
len = evbuffer_get_length(input_buffer.evbuf);
evbuffer_drain(input_buffer.evbuf, len);
memset(&input_buffer.cur_read_quality, 0, sizeof(struct media_quality));
memset(&input_buffer.cur_write_quality, 0, sizeof(struct media_quality));
input_buffer.bytes_read = 0;
input_buffer.bytes_written = 0;
input_buffer.full_cb = NULL;
pthread_mutex_unlock(&input_buffer.mutex);
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Flushing %zu bytes with flags %d\n", len, *flags);
#endif
}
static enum command_state
stop(void *arg, int *retval)
{
short flags;
int type;
event_del(inputev);
type = input_now_reading.type;
if (inputs[type]->stop)
inputs[type]->stop(&input_now_reading);
flush(&flags);
clear(&input_now_reading);
*retval = 0;
return COMMAND_END;
}
static int
seek(struct input_source *source, int seek_ms)
{
if (seek_ms > 0 && inputs[source->type]->seek)
return inputs[source->type]->seek(source, seek_ms);
else
return 0;
}
// On error returns -1, on success + seek given + seekable returns the position
// that the seek gave us, otherwise returns 0.
static int
setup(struct input_source *source, struct db_queue_item *queue_item, int seek_ms)
{
int type;
int ret;
type = map_data_kind(queue_item->data_kind);
if ((type < 0) || (inputs[type]->disabled))
goto setup_error;
source->type = type;
source->data_kind = queue_item->data_kind;
source->media_kind = queue_item->media_kind;
source->item_id = queue_item->id;
source->id = queue_item->file_id;
source->len_ms = queue_item->song_length;
source->path = safe_strdup(queue_item->path);
DPRINTF(E_DBG, L_PLAYER, "Setting up input item '%s' (item id %" PRIu32 ")\n", source->path, source->item_id);
if (inputs[type]->setup)
{
ret = inputs[type]->setup(source);
if (ret < 0)
goto setup_error;
}
source->open = true;
ret = seek(source, seek_ms);
if (ret < 0)
goto seek_error;
return ret;
seek_error:
stop(NULL, NULL);
setup_error:
clear(source);
return -1;
}
static enum command_state
start(void *arg, int *retval)
{
struct input_arg *cmdarg = arg;
struct db_queue_item *queue_item;
short flags;
int ret;
// If we are asked to start the item that is currently open we can just seek
if (input_now_reading.open && cmdarg->item_id == input_now_reading.item_id)
{
flush(&flags);
ret = seek(&input_now_reading, cmdarg->seek_ms);
if (ret < 0)
DPRINTF(E_WARN, L_PLAYER, "Ignoring failed seek to %d ms in '%s'\n", cmdarg->seek_ms, input_now_reading.path);
}
else
{
if (input_now_reading.open)
stop(NULL, NULL);
// Get the queue_item from the db
queue_item = db_queue_fetch_byitemid(cmdarg->item_id);
if (!queue_item)
{
DPRINTF(E_LOG, L_PLAYER, "Input start was called with an item id that has disappeared (id=%d)\n", cmdarg->item_id);
goto error;
}
ret = setup(&input_now_reading, queue_item, cmdarg->seek_ms);
free_queue_item(queue_item, 0);
if (ret < 0)
goto error;
}
DPRINTF(E_DBG, L_PLAYER, "Starting input read loop for item '%s' (item id %" PRIu32 "), seek %d\n",
input_now_reading.path, input_now_reading.item_id, cmdarg->seek_ms);
event_active(inputev, 0, 0);
*retval = ret; // Return is the seek result
return COMMAND_END;
error:
input_write(NULL, NULL, INPUT_FLAG_ERROR);
clear(&input_now_reading);
*retval = -1;
return COMMAND_END;
}
/*
static enum command_state
next(void *arg, int *retval)
{
struct player_status status;
struct db_queue_item *queue_item;
uint32_t item_id;
int type;
int ret;
// We may have finished reading source way before end of playback, and we
// don't want to proceed prematurely. So we wait until the input buffer is
// below the write threshold.
ret = input_wait();
if (ret < 0)
{
input_next(); // Async call to ourselves
return;
}
item_id = input_now_reading.item_id;
// Cleans up the source that has ended/failed and clears input_now_reading
stop(NULL, NULL);
player_get_status(&status);
// TODO what about repeat/repeat_all? Maybe move next() to player that can
// just call input_start()
// Get the next queue_item from the db
queue_item = db_queue_fetch_next(item_id, status.shuffle);
if (!queue_item)
{
DPRINTF(E_DBG, L_PLAYER, "Reached end of playback queue\n");
*retval = 0;
return COMMAND_END;
}
ret = setup(&input_now_reading, queue_item, 0);
free_queue_item(queue_item, 0);
if (ret < 0)
goto error;
DPRINTF(E_DBG, L_PLAYER, "Continuing input read loop for item '%s' (item id %" PRIu32 ")\n", input_now_reading.path, input_now_reading.item_id);
event_active(inputev, 0, 0);
*retval = 0;
return COMMAND_END;
error:
input_write(NULL, NULL, INPUT_FLAG_ERROR);
clear(&input_now_reading);
*retval = -1;
return COMMAND_END;
}
*/
static enum command_state
metadata_get(void *arg, int *retval)
{
struct input_arg *cmdarg = arg;
int type;
if (!input_now_reading.open)
{
DPRINTF(E_WARN, L_PLAYER, "Source is no longer available for input_metadata_get()\n");
goto error;
}
type = input_now_reading.type;
if ((type < 0) || (inputs[type]->disabled))
goto error;
if (inputs[type]->metadata_get)
*retval = inputs[type]->metadata_get(cmdarg->metadata, &input_now_reading);
else
*retval = 0;
return COMMAND_END;
error:
*retval = -1;
return COMMAND_END;
}
/* ---------------------- Interface towards input backends ------------------ */
/* Thread: input and spotify */
// Called by input modules from within the playback loop
int
input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
{
struct timespec ts;
bool read_end;
int ret;
pthread_mutex_lock(&input_buffer.mutex);
while ( (!input_loop_break) && (evbuffer_get_length(input_buffer.evbuf) > INPUT_BUFFER_THRESHOLD) && evbuf )
read_end = (flags & (INPUT_FLAG_EOF | INPUT_FLAG_ERROR));
if ((evbuffer_get_length(input_buffer.evbuf) > INPUT_BUFFER_THRESHOLD) && evbuf)
{
if (input_buffer.full_cb)
{
@@ -251,26 +461,19 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
input_buffer.full_cb = NULL;
}
if (flags & INPUT_FLAG_NONBLOCK)
// In case of EOF or error the input is always allowed to write, even if the
// buffer is full. There is no point in holding back the input in that case.
if (!read_end)
{
pthread_mutex_unlock(&input_buffer.mutex);
return EAGAIN;
}
ts = timespec_reltoabs(input_loop_timeout);
pthread_cond_timedwait(&input_buffer.cond, &input_buffer.mutex, &ts);
}
if (input_loop_break)
{
pthread_mutex_unlock(&input_buffer.mutex);
return 0;
}
if (quality && !quality_is_equal(quality, &input_buffer.cur_write_quality))
{
input_buffer.cur_write_quality = *quality;
marker_add(INPUT_FLAG_QUALITY);
marker_add(input_buffer.bytes_written, INPUT_FLAG_QUALITY);
}
ret = 0;
@@ -280,24 +483,109 @@ input_write(struct evbuffer *evbuf, struct media_quality *quality, short flags)
ret = evbuffer_add_buffer(input_buffer.evbuf, evbuf);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Error adding stream data to input buffer\n");
DPRINTF(E_LOG, L_PLAYER, "Error adding stream data to input buffer, stopping\n");
input_stop();
flags |= INPUT_FLAG_ERROR;
}
}
// Note this marker is added at the post-write position, since EOF and ERROR
// belong there. We never want to add a marker for the NONBLOCK flag.
if (flags & ~INPUT_FLAG_NONBLOCK)
marker_add(flags);
if (flags)
{
if (input_buffer.bytes_written > INPUT_BUFFER_THRESHOLD)
marker_add(input_buffer.bytes_written - INPUT_BUFFER_THRESHOLD, INPUT_FLAG_START_NEXT);
else
marker_add(input_buffer.bytes_written, INPUT_FLAG_START_NEXT);
// Note this marker is added at the post-write position, since EOF, error
// and metadata belong there.
marker_add(input_buffer.bytes_written, flags);
}
pthread_mutex_unlock(&input_buffer.mutex);
return ret;
}
int
input_wait(void)
{
struct timespec ts;
/* -------------------- Interface towards player thread ------------------- */
/* Thread: player */
pthread_mutex_lock(&input_buffer.mutex);
ts = timespec_reltoabs(input_loop_timeout);
pthread_cond_timedwait(&input_buffer.cond, &input_buffer.mutex, &ts);
// Is the buffer full?
if (evbuffer_get_length(input_buffer.evbuf) > INPUT_BUFFER_THRESHOLD)
{
pthread_mutex_unlock(&input_buffer.mutex);
return -1;
}
pthread_mutex_unlock(&input_buffer.mutex);
return 0;
}
/*void
input_next(void)
{
commands_exec_async(cmdbase, next, NULL);
}*/
/* ---------------------------------- MAIN ---------------------------------- */
/* Thread: input */
static void *
input(void *arg)
{
int ret;
ret = db_perthread_init();
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Error: DB init failed (input thread)\n");
pthread_exit(NULL);
}
input_initialized = true;
event_base_dispatch(evbase_input);
if (input_initialized)
{
DPRINTF(E_LOG, L_MAIN, "Input event loop terminated ahead of time!\n");
input_initialized = false;
}
db_perthread_deinit();
pthread_exit(NULL);
}
static void
play(evutil_socket_t fd, short flags, void *arg)
{
struct timeval tv = { 0, 0 };
int ret;
// Spotify runs in its own thread, so no reading is done by the input thread,
// thus there is no reason to activate inputev
if (!inputs[input_now_reading.type]->play)
return;
// Return will be negative if there is an error or EOF. Here, we just don't
// loop any more. input_write() will pass the message to the player.
ret = inputs[input_now_reading.type]->play(&input_now_reading);
if (ret < 0)
return; // Error or EOF, so don't come back
event_add(inputev, &tv);
}
/* ---------------------- Interface towards player thread ------------------- */
/* Thread: player */
int
input_read(void *data, size_t size, short *flags)
@@ -307,12 +595,6 @@ input_read(void *data, size_t size, short *flags)
*flags = 0;
if (!tid_input)
{
DPRINTF(E_LOG, L_PLAYER, "Bug! Read called, but playback not running\n");
return -1;
}
pthread_mutex_lock(&input_buffer.mutex);
// First we check if there is a marker in the requested samples. If there is,
@@ -380,157 +662,40 @@ input_buffer_full_cb(input_cb cb)
}
int
input_setup(struct player_source *ps)
input_seek(uint32_t item_id, int seek_ms)
{
int type;
struct input_arg cmdarg;
type = source_check_and_map(ps, "setup", 0);
if ((type < 0) || (inputs[type]->disabled))
return -1;
cmdarg.item_id = item_id;
cmdarg.seek_ms = seek_ms;
if (!inputs[type]->setup)
return 0;
return inputs[type]->setup(ps);
return commands_exec_sync(cmdbase, start, NULL, &cmdarg);
}
int
input_start(struct player_source *ps)
void
input_start(uint32_t item_id)
{
int ret;
struct input_arg *cmdarg;
if (tid_input)
{
DPRINTF(E_WARN, L_PLAYER, "Input start called, but playback already running\n");
return 0;
}
CHECK_NULL(L_PLAYER, cmdarg = malloc(sizeof(struct input_arg)));
input_loop_break = 0;
cmdarg->item_id = item_id;
cmdarg->seek_ms = 0;
ret = pthread_create(&tid_input, NULL, playback, ps);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Could not spawn input thread: %s\n", strerror(errno));
return -1;
}
#if defined(HAVE_PTHREAD_SETNAME_NP)
pthread_setname_np(tid_input, "input");
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
pthread_set_name_np(tid_input, "input");
#endif
return 0;
commands_exec_async(cmdbase, start, cmdarg);
}
int
input_pause(struct player_source *ps)
void
input_stop(void)
{
short flags;
int ret;
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Pause called, stopping playback loop\n");
#endif
if (!tid_input)
return -1;
pthread_mutex_lock(&input_buffer.mutex);
input_loop_break = 1;
pthread_cond_signal(&input_buffer.cond);
pthread_mutex_unlock(&input_buffer.mutex);
// TODO What if input thread is hanging waiting for source? Kill thread?
ret = pthread_join(tid_input, NULL);
if (ret != 0)
{
DPRINTF(E_LOG, L_PLAYER, "Could not join input thread: %s\n", strerror(errno));
return -1;
}
tid_input = 0;
input_flush(&flags);
return 0;
}
int
input_stop(struct player_source *ps)
{
int type;
if (tid_input)
input_pause(ps);
if (!ps)
return 0;
type = source_check_and_map(ps, "stop", 1);
if ((type < 0) || (inputs[type]->disabled))
return -1;
if (!inputs[type]->stop)
return 0;
return inputs[type]->stop(ps);
}
int
input_seek(struct player_source *ps, int seek_ms)
{
int type;
type = source_check_and_map(ps, "seek", 1);
if ((type < 0) || (inputs[type]->disabled))
return -1;
if (!inputs[type]->seek)
return 0;
if (tid_input)
input_pause(ps);
return inputs[type]->seek(ps, seek_ms);
commands_exec_async(cmdbase, stop, NULL);
}
void
input_flush(short *flags)
{
struct marker *marker;
size_t len;
pthread_mutex_lock(&input_buffer.mutex);
// We will return an OR of all the unread marker flags
*flags = 0;
for (marker = input_buffer.marker_tail; marker; marker = input_buffer.marker_tail)
{
*flags |= marker->flags;
input_buffer.marker_tail = marker->prev;
free(marker);
}
len = evbuffer_get_length(input_buffer.evbuf);
evbuffer_drain(input_buffer.evbuf, len);
memset(&input_buffer.cur_read_quality, 0, sizeof(struct media_quality));
memset(&input_buffer.cur_write_quality, 0, sizeof(struct media_quality));
input_buffer.bytes_read = 0;
input_buffer.bytes_written = 0;
input_buffer.full_cb = NULL;
pthread_mutex_unlock(&input_buffer.mutex);
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Flushing %zu bytes with flags %d\n", len, *flags);
#endif
// Flush should be thread safe
flush(flags);
}
int
@@ -542,33 +707,13 @@ input_quality_get(struct media_quality *quality)
}
int
input_metadata_get(struct input_metadata *metadata, struct player_source *ps, int startup, uint64_t rtptime)
input_metadata_get(struct input_metadata *metadata)
{
int type;
struct input_arg cmdarg;
if (!metadata || !ps || !ps->stream_start || !ps->output_start)
{
DPRINTF(E_LOG, L_PLAYER, "Bug! Unhandled case in input_metadata_get()\n");
return -1;
}
cmdarg.metadata = metadata;
memset(metadata, 0, sizeof(struct input_metadata));
metadata->item_id = ps->item_id;
metadata->startup = startup;
metadata->offset = ps->output_start - ps->stream_start;
metadata->rtptime = ps->stream_start;
// Note that the source may overwrite the above progress metadata
type = source_check_and_map(ps, "metadata_get", 1);
if ((type < 0) || (inputs[type]->disabled))
return -1;
if (!inputs[type]->metadata_get)
return 0;
return inputs[type]->metadata_get(metadata, ps, rtptime);
return commands_exec_sync(cmdbase, metadata_get, NULL, &cmdarg);
}
void
@@ -597,12 +742,9 @@ input_init(void)
pthread_mutex_init(&input_buffer.mutex, NULL);
pthread_cond_init(&input_buffer.cond, NULL);
input_buffer.evbuf = evbuffer_new();
if (!input_buffer.evbuf)
{
DPRINTF(E_LOG, L_PLAYER, "Out of memory for input buffer\n");
return -1;
}
CHECK_NULL(L_PLAYER, evbase_input = event_base_new());
CHECK_NULL(L_PLAYER, input_buffer.evbuf = evbuffer_new());
CHECK_NULL(L_PLAYER, inputev = event_new(evbase_input, -1, EV_PERSIST, play, NULL));
no_input = 1;
for (i = 0; inputs[i]; i++)
@@ -610,7 +752,7 @@ input_init(void)
if (inputs[i]->type != i)
{
DPRINTF(E_FATAL, L_PLAYER, "BUG! Input definitions are misaligned with input enum\n");
return -1;
goto input_fail;
}
if (!inputs[i]->init)
@@ -627,17 +769,42 @@ input_init(void)
}
if (no_input)
return -1;
goto input_fail;
cmdbase = commands_base_new(evbase_input, NULL);
ret = pthread_create(&tid_input, NULL, input, NULL);
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not spawn input thread: %s\n", strerror(errno));
goto thread_fail;
}
#if defined(HAVE_PTHREAD_SETNAME_NP)
pthread_setname_np(tid_input, "input");
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
pthread_set_name_np(tid_input, "input");
#endif
return 0;
thread_fail:
commands_base_free(cmdbase);
input_fail:
event_free(inputev);
evbuffer_free(input_buffer.evbuf);
event_base_free(evbase_input);
return -1;
}
void
input_deinit(void)
{
int i;
int ret;
input_stop(NULL);
// TODO ok to do from here?
input_stop();
for (i = 0; inputs[i]; i++)
{
@@ -648,9 +815,21 @@ input_deinit(void)
inputs[i]->deinit();
}
input_initialized = false;
commands_base_destroy(cmdbase);
ret = pthread_join(tid_input, NULL);
if (ret != 0)
{
DPRINTF(E_FATAL, L_MAIN, "Could not join input thread: %s\n", strerror(errno));
return;
}
pthread_cond_destroy(&input_buffer.cond);
pthread_mutex_destroy(&input_buffer.mutex);
event_free(inputev);
evbuffer_free(input_buffer.evbuf);
event_base_free(evbase_input);
}