[input] Adapt input_buffer so it can handle dynamic quality (sample rates etc)

Still WIP, player and outputs cannot handle this yet
This commit is contained in:
ejurgensen
2019-01-14 00:17:02 +01:00
parent 9182597605
commit 1696fc3384
5 changed files with 161 additions and 86 deletions

View File

@@ -40,6 +40,7 @@
#include "input.h"
// Disallow further writes to the buffer when its size is larger than this threshold
// TODO untie from 44100
#define INPUT_BUFFER_THRESHOLD STOB(88200)
// How long (in sec) to wait for player read before looping in playback thread
#define INPUT_LOOP_TIMEOUT 1
@@ -64,21 +65,36 @@ static struct input_definition *inputs[] = {
NULL
};
struct marker
{
uint64_t pos; // Position of marker measured in bytes
struct input_quality quality;
enum input_flags flags;
// Reverse linked list, yay!
struct marker *prev;
};
struct input_buffer
{
// Raw pcm stream data
struct evbuffer *evbuf;
// If non-zero, remaining length of buffer until EOF
size_t eof;
// If non-zero, remaining length of buffer until read error occurred
size_t error;
// If non-zero, remaining length of buffer until (possible) new metadata
size_t metadata;
// If an input makes a write with a flag or a changed sample rate etc, we add
// a marker to head, and when we read we check from the tail to see if there
// are updates to the player.
struct marker *marker_tail;
// Optional callback to player if buffer is full
input_cb full_cb;
// Quality of write/read data
struct input_quality cur_write_quality;
struct input_quality cur_read_quality;
size_t bytes_written;
size_t bytes_read;
// Locks for sharing the buffer between input and player thread
pthread_mutex_t mutex;
pthread_cond_t cond;
@@ -101,47 +117,6 @@ static size_t debug_elapsed;
/* ------------------------------ MISC HELPERS ---------------------------- */
static short
flags_set(size_t len)
{
short flags = 0;
if (input_buffer.error)
{
if (len >= input_buffer.error)
{
flags |= INPUT_FLAG_ERROR;
input_buffer.error = 0;
}
else
input_buffer.error -= len;
}
if (input_buffer.eof)
{
if (len >= input_buffer.eof)
{
flags |= INPUT_FLAG_EOF;
input_buffer.eof = 0;
}
else
input_buffer.eof -= len;
}
if (input_buffer.metadata)
{
if (len >= input_buffer.metadata)
{
flags |= INPUT_FLAG_METADATA;
input_buffer.metadata = 0;
}
else
input_buffer.metadata -= len;
}
return flags;
}
static int
map_data_kind(int data_kind)
{
@@ -166,6 +141,27 @@ map_data_kind(int data_kind)
}
}
static void
marker_add(short flags)
{
struct marker *head;
struct marker *marker;
CHECK_NULL(L_PLAYER, marker = calloc(1, sizeof(struct marker)));
marker->pos = input_buffer.bytes_written;
marker->quality = input_buffer.cur_write_quality;
marker->flags = flags;
for (head = input_buffer.marker_tail; head && head->prev; head = head->prev)
; // Fast forward to the head
if (!head)
input_buffer.marker_tail = marker;
else
head->prev = marker;
}
static int
source_check_and_map(struct player_source *ps, const char *action, char check_setup)
{
@@ -215,7 +211,7 @@ playback(void *arg)
// Loops until input_loop_break is set or no more input, e.g. EOF
ret = inputs[type]->start(ps);
if (ret < 0)
input_write(NULL, 0, 0, INPUT_FLAG_ERROR);
input_write(NULL, NULL, INPUT_FLAG_ERROR);
#ifdef DEBUG
DPRINTF(E_DBG, L_PLAYER, "Playback loop stopped (break is %d, ret %d)\n", input_loop_break, ret);
@@ -240,7 +236,7 @@ input_wait(void)
// Called by input modules from within the playback loop
int
input_write(struct evbuffer *evbuf, int sample_rate, int bits_per_sample, short flags)
input_write(struct evbuffer *evbuf, struct input_quality *quality, short flags)
{
struct timespec ts;
int ret;
@@ -271,20 +267,31 @@ input_write(struct evbuffer *evbuf, int sample_rate, int bits_per_sample, short
return 0;
}
// Change of quality. Note, the marker is placed at the last position of the
// last byte we wrote, even though that of course doesn't have the new quality
// yet. Not intuitive, but input_read() will understand.
if (quality && memcmp(quality, &input_buffer.cur_write_quality, sizeof(struct input_quality)) != 0)
{
input_buffer.cur_write_quality = *quality;
marker_add(INPUT_FLAG_QUALITY);
}
ret = 0;
if (evbuf)
ret = evbuffer_add_buffer(input_buffer.evbuf, evbuf);
else
ret = 0;
{
input_buffer.bytes_written += evbuffer_get_length(evbuf);
ret = evbuffer_add_buffer(input_buffer.evbuf, evbuf);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Error adding stream data to input buffer\n");
flags |= INPUT_FLAG_ERROR;
}
}
if (ret < 0)
DPRINTF(E_LOG, L_PLAYER, "Error adding stream data to input buffer\n");
if (!input_buffer.error && (flags & INPUT_FLAG_ERROR))
input_buffer.error = evbuffer_get_length(input_buffer.evbuf);
if (!input_buffer.eof && (flags & INPUT_FLAG_EOF))
input_buffer.eof = evbuffer_get_length(input_buffer.evbuf);
if (!input_buffer.metadata && (flags & INPUT_FLAG_METADATA))
input_buffer.metadata = evbuffer_get_length(input_buffer.evbuf);
// Note this marker is added at the post-write position, since EOF and ERROR
// belong there. We never want to add a marker for the NONBLOCK flag.
if (flags & ~INPUT_FLAG_NONBLOCK)
marker_add(flags);
pthread_mutex_unlock(&input_buffer.mutex);
@@ -298,6 +305,7 @@ input_write(struct evbuffer *evbuf, int sample_rate, int bits_per_sample, short
int
input_read(void *data, size_t size, short *flags)
{
struct marker *marker;
int len;
*flags = 0;
@@ -310,23 +318,50 @@ input_read(void *data, size_t size, short *flags)
pthread_mutex_lock(&input_buffer.mutex);
#ifdef DEBUG
debug_elapsed += size;
if (debug_elapsed > STOB(441000)) // 10 sec
// First we check if there is a marker in the requested samples. If there is,
// we only return data up until that marker. That way we don't have to deal
// with multiple markers, and we don't return data that contains mixed sample
// rates, bits per sample or an EOF in the middle.
marker = input_buffer.marker_tail;
if (marker && marker->pos < input_buffer.bytes_read + size)
{
DPRINTF(E_DBG, L_PLAYER, "Input buffer has %zu bytes\n", evbuffer_get_length(input_buffer.evbuf));
debug_elapsed = 0;
*flags = marker->flags;
if (*flags & INPUT_FLAG_QUALITY)
input_buffer.cur_read_quality = marker->quality;
size = marker->pos - input_buffer.bytes_read;
input_buffer.marker_tail = marker->prev;
free(marker);
}
#endif
len = evbuffer_remove(input_buffer.evbuf, data, size);
if (len < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n");
*flags |= INPUT_FLAG_ERROR;
goto out_unlock;
}
*flags = flags_set(len);
input_buffer.bytes_read += len;
#ifdef DEBUG
// Logs if flags present or each 10 seconds
debug_elapsed += len;
if (*flags || (debug_elapsed / STOB(input_buffer.cur_read_quality.sample_rate) > 10))
{
debug_elapsed = 0;
DPRINTF(E_SPAM, L_PLAYER, "READ %zu bytes (%d/%d), WROTE %zu bytes (%d/%d), SIZE %zu (=%zu), FLAGS %04x\n",
input_buffer.bytes_read,
input_buffer.cur_read_quality.sample_rate,
input_buffer.cur_read_quality.bits_per_sample,
input_buffer.bytes_written,
input_buffer.cur_write_quality.sample_rate,
input_buffer.cur_write_quality.bits_per_sample,
evbuffer_get_length(input_buffer.evbuf),
input_buffer.bytes_written - input_buffer.bytes_read,
*flags);
}
#endif
out_unlock:
pthread_cond_signal(&input_buffer.cond);
@@ -465,19 +500,30 @@ input_seek(struct player_source *ps, int seek_ms)
void
input_flush(short *flags)
{
struct marker *marker;
size_t len;
pthread_mutex_lock(&input_buffer.mutex);
// We will return an OR of all the unread marker flags
*flags = 0;
for (marker = input_buffer.marker_tail; marker; marker = input_buffer.marker_tail)
{
*flags |= marker->flags;
input_buffer.marker_tail = marker->prev;
free(marker);
}
len = evbuffer_get_length(input_buffer.evbuf);
evbuffer_drain(input_buffer.evbuf, len);
*flags = flags_set(len);
memset(&input_buffer.cur_read_quality, 0, sizeof(struct input_quality));
memset(&input_buffer.cur_write_quality, 0, sizeof(struct input_quality));
input_buffer.bytes_read = 0;
input_buffer.bytes_written = 0;
input_buffer.error = 0;
input_buffer.eof = 0;
input_buffer.metadata = 0;
input_buffer.full_cb = NULL;
pthread_mutex_unlock(&input_buffer.mutex);
@@ -487,6 +533,14 @@ input_flush(short *flags)
#endif
}
int
input_quality_get(struct input_quality *quality)
{
// No mutex, other threads should not be able to affect cur_read_quality
*quality = input_buffer.cur_read_quality;
return 0;
}
int
input_metadata_get(struct input_metadata *metadata, struct player_source *ps, int startup, uint64_t rtptime)
{