[player] Refactor read/write

- remove read skip which is obsolete when input has own thread and cannot block
- simplify code
- fix while loop that could loop infinitely
This commit is contained in:
ejurgensen 2017-01-14 00:43:03 +01:00
parent 1258481202
commit 938e197fa4
3 changed files with 122 additions and 223 deletions

View File

@ -250,7 +250,7 @@ input_write(struct evbuffer *evbuf, short flags)
/* Thread: player */ /* Thread: player */
int int
input_read(struct evbuffer *evbuf, size_t want, short *flags) input_read(void *data, size_t size, short *flags)
{ {
int len; int len;
@ -265,7 +265,7 @@ input_read(struct evbuffer *evbuf, size_t want, short *flags)
pthread_mutex_lock(&input_buffer.mutex); pthread_mutex_lock(&input_buffer.mutex);
#ifdef DEBUG #ifdef DEBUG
debug_elapsed += want; debug_elapsed += size;
if (debug_elapsed > STOB(441000)) // 10 sec if (debug_elapsed > STOB(441000)) // 10 sec
{ {
DPRINTF(E_DBG, L_PLAYER, "Input buffer has %zu bytes\n", evbuffer_get_length(input_buffer.evbuf)); DPRINTF(E_DBG, L_PLAYER, "Input buffer has %zu bytes\n", evbuffer_get_length(input_buffer.evbuf));
@ -273,7 +273,7 @@ input_read(struct evbuffer *evbuf, size_t want, short *flags)
} }
#endif #endif
len = evbuffer_remove_buffer(input_buffer.evbuf, evbuf, want); len = evbuffer_remove(input_buffer.evbuf, data, size);
if (len < 0) if (len < 0)
{ {
DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n"); DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n");
@ -492,6 +492,9 @@ input_deinit(void)
inputs[i]->deinit(); inputs[i]->deinit();
} }
pthread_cond_destroy(&input_buffer.cond);
pthread_mutex_destroy(&input_buffer.mutex);
evbuffer_free(input_buffer.evbuf); evbuffer_free(input_buffer.evbuf);
} }

View File

@ -131,13 +131,13 @@ input_wait(void);
* Move a chunk of stream data from the player's input buffer to an output * Move a chunk of stream data from the player's input buffer to an output
* buffer. Should only be called by the player thread. Will not block. * buffer. Should only be called by the player thread. Will not block.
* *
* @in evbuf Output buffer * @in data Output buffer
* @in want How much data to move to the output buffer * @in size How much data to move to the output buffer
* @out flags Flags INPUT_FLAG_EOF or INPUT_FLAG_METADATA * @out flags Flags INPUT_FLAG_EOF or INPUT_FLAG_METADATA
* @return Number of bytes moved * @return Number of bytes moved, -1 on error
*/ */
int int
input_read(struct evbuffer *evbuf, size_t want, short *flags); input_read(void *data, size_t size, short *flags);
/* /*
* Initializes the given player source for playback * Initializes the given player source for playback
@ -180,9 +180,15 @@ input_seek(struct player_source *ps, int seek_ms);
void void
input_flush(short *flags); input_flush(short *flags);
/*
* Called by player_init (so will run in main thread)
*/
int int
input_init(void); input_init(void);
/*
* Called by player_deinit (so will run in main thread)
*/
void void
input_deinit(void); input_deinit(void);

View File

@ -107,8 +107,10 @@
// Default volume (must be from 0 - 100) // Default volume (must be from 0 - 100)
#define PLAYER_DEFAULT_VOLUME 50 #define PLAYER_DEFAULT_VOLUME 50
// Used to keep the player from getting ahead of a rate limited source (see below) // For every tick, we will read a packet from the input buffer and write it to
#define PLAYER_TICKS_MAX_OVERRUN 2 // the outputs. If the input is empty, we will try to catch up next tick. We
// will continue trying that up to this max, after which we abort playback.
#define PLAYER_WRITES_PENDING_MAX 126
struct volume_param { struct volume_param {
int volume; int volume;
@ -192,8 +194,9 @@ static struct timespec tick_interval;
static struct timespec timer_res; static struct timespec timer_res;
// Time between two packets // Time between two packets
static struct timespec packet_time = { 0, AIRTUNES_V2_STREAM_PERIOD }; static struct timespec packet_time = { 0, AIRTUNES_V2_STREAM_PERIOD };
// Will be positive if we need to skip some source reads (see below)
static int ticks_skip; // How many writes we owe the output (when the input is underrunning)
static int pb_writes_pending;
/* Sync values */ /* Sync values */
static struct timespec pb_pos_stamp; static struct timespec pb_pos_stamp;
@ -217,9 +220,9 @@ static struct player_source *cur_streaming;
static uint32_t cur_plid; static uint32_t cur_plid;
static uint32_t cur_plversion; static uint32_t cur_plversion;
static struct evbuffer *audio_buf; /* Player buffer (holds one packet) */
static uint8_t rawbuf[STOB(AIRTUNES_V2_PACKET_SAMPLES)]; static uint8_t pb_buffer[STOB(AIRTUNES_V2_PACKET_SAMPLES)];
static size_t pb_buffer_offset;
/* Play history */ /* Play history */
static struct player_history *history; static struct player_history *history;
@ -589,59 +592,6 @@ history_add(uint32_t id, uint32_t item_id)
/* Audio sources */ /* Audio sources */
/*
* Read up to "len" data from the given player source and returns
* the actual amount of data read.
*/
/*static int
stream_read(struct player_source *ps, int len)
{
int icy_timer;
int ret;
if (!ps)
{
DPRINTF(E_LOG, L_PLAYER, "Stream read called with no active streaming player source\n");
return -1;
}
if (!ps->setup_done)
{
DPRINTF(E_LOG, L_PLAYER, "Given player source not setup for reading data\n");
return -1;
}
// Read up to len data depending on data kind
switch (ps->data_kind)
{
case DATA_KIND_HTTP:
ret = transcode(audio_buf, len, ps->xcode, &icy_timer);
if (icy_timer)
metadata_check_icy();
break;
case DATA_KIND_FILE:
ret = transcode(audio_buf, len, ps->xcode, &icy_timer);
break;
#ifdef HAVE_SPOTIFY_H
case DATA_KIND_SPOTIFY:
ret = spotify_audio_get(audio_buf, len);
break;
#endif
case DATA_KIND_PIPE:
ret = pipe_audio_get(audio_buf, len);
break;
default:
ret = -1;
}
return ret;
}*/
static struct player_source * static struct player_source *
source_now_playing() source_now_playing()
{ {
@ -829,9 +779,6 @@ source_play()
ret = input_start(cur_streaming); ret = input_start(cur_streaming);
ticks_skip = 0;
memset(rawbuf, 0, sizeof(rawbuf));
return ret; return ret;
} }
@ -1076,91 +1023,80 @@ source_prev()
} }
static int static int
source_read(uint8_t *buf, int len, uint64_t rtptime) source_switch(int nbytes)
{ {
int ret;
int nbytes;
short flags;
char *silence_buf;
struct player_source *ps; struct player_source *ps;
int ret;
if (!cur_streaming) DPRINTF(E_DBG, L_PLAYER, "Switching track\n");
return 0;
nbytes = 0; source_close(last_rtptime + AIRTUNES_V2_PACKET_SAMPLES + BTOS(nbytes) - 1);
while (nbytes < len)
{
if (evbuffer_get_length(audio_buf) == 0)
{
flags = 0;
if (cur_streaming)
{
ret = input_read(audio_buf, len - nbytes, &flags);
}
else if (cur_playing)
{
// Reached end of playlist (cur_playing is NULL) send silence and source_check will abort playback if the last item was played
DPRINTF(E_SPAM, L_PLAYER, "End of playlist reached, stream silence until playback of last item ends\n");
silence_buf = (char *)calloc((len - nbytes), sizeof(char));
evbuffer_add(audio_buf, silence_buf, (len - nbytes));
free(silence_buf);
ret = len - nbytes;
}
else
{
// If cur_streaming and cur_playing are NULL, source_read for all queue items failed. Playback will be aborted in the calling function
return -1;
}
if (ret == 0)
sleep(1); // TODO Underrun -> proper pause
else if ((ret < 0) || (flags & INPUT_FLAG_EOF))
{
source_close(rtptime + BTOS(nbytes) - 1);
DPRINTF(E_DBG, L_PLAYER, "New file\n");
ps = source_next(); ps = source_next();
if (!ps)
if (ret < 0)
{ {
DPRINTF(E_LOG, L_PLAYER, "Error reading source %d\n", cur_streaming->id); cur_streaming = NULL;
db_queue_delete_byitemid(cur_streaming->item_id); return 0; // End of queue
} }
if (ps)
{
ret = source_open(ps, cur_streaming->end + 1, 0); ret = source_open(ps, cur_streaming->end + 1, 0);
if (ret < 0) if (ret < 0)
{
source_free(ps);
return -1; return -1;
}
ret = source_play(); ret = source_play();
if (ret < 0) if (ret < 0)
return -1; return -1;
metadata_trigger(0); metadata_trigger(0);
}
else return 0;
{
cur_streaming = NULL;
}
continue;
}
} }
nbytes += evbuffer_remove(audio_buf, buf + nbytes, len - nbytes); // Returns -1 on error (caller should abort playback), or bytes read (possibly 0)
static int
source_read(uint8_t *buf, int len)
{
int nbytes;
int ret;
short flags;
if (!cur_streaming)
return -1;
nbytes = input_read(buf, len, &flags);
if (nbytes < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Error reading source %d\n", cur_streaming->id);
nbytes = 0;
ret = source_switch(0);
db_queue_delete_byitemid(cur_streaming->item_id);
if (ret < 0)
return -1;
}
else if (flags & INPUT_FLAG_EOF)
{
ret = source_switch(nbytes);
if (ret < 0)
return -1;
}
// We pad the output buffer with silence if we don't have enough data for a
// full packet and there is no more data coming up (no more tracks in queue)
if ((nbytes < len) && (!cur_streaming))
{
memset(buf + nbytes, 0, len - nbytes);
nbytes = len;
} }
return nbytes; return nbytes;
} }
static void static void
playback_write(int read_skip) playback_write(void)
{ {
int ret; int want;
int got;
source_check(); source_check();
@ -1168,23 +1104,31 @@ playback_write(int read_skip)
if (player_state == PLAY_STOPPED) if (player_state == PLAY_STOPPED)
return; return;
pb_writes_pending++;
while (pb_writes_pending)
{
want = sizeof(pb_buffer) - pb_buffer_offset;
got = source_read(pb_buffer + pb_buffer_offset, want);
if (got == want)
{
last_rtptime += AIRTUNES_V2_PACKET_SAMPLES; last_rtptime += AIRTUNES_V2_PACKET_SAMPLES;
outputs_write(pb_buffer, last_rtptime);
if (!read_skip) pb_buffer_offset = 0;
pb_writes_pending--;
}
else if ((got < 0) || (pb_writes_pending > PLAYER_WRITES_PENDING_MAX))
{ {
ret = source_read(rawbuf, sizeof(rawbuf), last_rtptime); DPRINTF(E_LOG, L_PLAYER, "Error reading from source, aborting playback (%d)\n", pb_writes_pending);
if (ret < 0)
{
DPRINTF(E_DBG, L_PLAYER, "Error reading from source, aborting playback\n");
playback_abort(); playback_abort();
return; return;
} }
}
else else
DPRINTF(E_SPAM, L_PLAYER, "Skipping read\n"); {
DPRINTF(E_DBG, L_PLAYER, "Partial read (offset=%zu, pending=%d)\n", pb_buffer_offset, pb_writes_pending);
outputs_write(rawbuf, last_rtptime); pb_buffer_offset += got;
return;
}
}
} }
static void static void
@ -1193,8 +1137,6 @@ player_playback_cb(int fd, short what, void *arg)
struct timespec next_tick; struct timespec next_tick;
uint64_t overrun; uint64_t overrun;
int ret; int ret;
int skip;
int skip_first;
// Check if we missed any timer expirations // Check if we missed any timer expirations
overrun = 0; overrun = 0;
@ -1212,55 +1154,16 @@ player_playback_cb(int fd, short what, void *arg)
overrun = ret; overrun = ret;
#endif /* HAVE_TIMERFD */ #endif /* HAVE_TIMERFD */
// The reason we get behind the playback timer may be that we are playing a // If there was an overrun, we will try to read/write a corresponding number
// network stream OR that the source is slow to open OR some interruption. // of times so we catch up. The read from the input is non-blocking, so it
// For streams, we might be consuming faster than the stream delivers, so // should not bring us further behind, even if there is no data.
// when ffmpeg's buffer empties (might take a few hours) our av_read_frame()
// in transcode.c will begin to block, because ffmpeg has to wait for new data
// from the stream server.
//
// Our strategy to catch up with the timer depends on the source:
// - streams: We will skip reading data every second until we have countered
// the overrun by skipping reads for a number of ticks that is
// 3 times the overrun. That should make the source catch up. To
// keep the output happy we resend the previous rawbuf when we
// have skipped a read.
// - files: Just read and write like crazy until we have caught up.
skip_first = 0;
if (overrun > PLAYER_TICKS_MAX_OVERRUN)
{
DPRINTF(E_WARN, L_PLAYER, "Behind the playback timer with %" PRIu64 " ticks\n", overrun);
if (cur_streaming && (cur_streaming->data_kind == DATA_KIND_HTTP || cur_streaming->data_kind == DATA_KIND_PIPE))
{
ticks_skip = 3 * overrun;
DPRINTF(E_WARN, L_PLAYER, "Will skip reading for a total of %d ticks to catch up\n", ticks_skip);
// We always skip after a timer overrun, since another read will
// probably just give another time overrun
skip_first = 1;
}
else
ticks_skip = 0;
}
// Decide how many packets to send
next_tick = timespec_add(pb_timer_last, tick_interval); next_tick = timespec_add(pb_timer_last, tick_interval);
for (; overrun > 0; overrun--) for (; overrun > 0; overrun--)
next_tick = timespec_add(next_tick, tick_interval); next_tick = timespec_add(next_tick, tick_interval);
do do
{ {
skip = skip_first || ((ticks_skip > 0) && ((last_rtptime / AIRTUNES_V2_PACKET_SAMPLES) % 126 == 0)); playback_write();
playback_write(skip);
skip_first = 0;
if (skip)
ticks_skip--;
packet_timer_last = timespec_add(packet_timer_last, packet_time); packet_timer_last = timespec_add(packet_timer_last, packet_time);
} }
while ((timespec_cmp(packet_timer_last, next_tick) < 0) && (player_state == PLAY_PLAYING)); while ((timespec_cmp(packet_timer_last, next_tick) < 0) && (player_state == PLAY_PLAYING));
@ -1789,8 +1692,6 @@ playback_abort(void)
source_stop(); source_stop();
evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf));
if (!clear_queue_on_stop_disabled) if (!clear_queue_on_stop_disabled)
db_queue_clear(); db_queue_clear();
@ -1963,8 +1864,6 @@ playback_stop(void *arg, int *retval)
source_stop(); source_stop();
evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf));
status_update(PLAY_STOPPED); status_update(PLAY_STOPPED);
metadata_purge(); metadata_purge();
@ -2009,6 +1908,9 @@ playback_start_bh(void *arg, int *retval)
pb_timer_last.tv_sec = pb_pos_stamp.tv_sec; pb_timer_last.tv_sec = pb_pos_stamp.tv_sec;
pb_timer_last.tv_nsec = pb_pos_stamp.tv_nsec; pb_timer_last.tv_nsec = pb_pos_stamp.tv_nsec;
pb_buffer_offset = 0;
pb_writes_pending = 0;
ret = pb_timer_start(); ret = pb_timer_start();
if (ret < 0) if (ret < 0)
goto out_fail; goto out_fail;
@ -2406,8 +2308,6 @@ playback_pause(void *arg, int *retval)
source_pause(pos); source_pause(pos);
evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf));
metadata_purge(); metadata_purge();
/* We're async if we need to flush devices */ /* We're async if we need to flush devices */
@ -3244,14 +3144,6 @@ player_init(void)
gcry_randomize(&rnd, sizeof(rnd), GCRY_STRONG_RANDOM); gcry_randomize(&rnd, sizeof(rnd), GCRY_STRONG_RANDOM);
last_rtptime = ((uint64_t)1 << 32) | rnd; last_rtptime = ((uint64_t)1 << 32) | rnd;
audio_buf = evbuffer_new();
if (!audio_buf)
{
DPRINTF(E_LOG, L_PLAYER, "Could not allocate evbuffer for audio buffer\n");
goto audio_fail;
}
evbase_player = event_base_new(); evbase_player = event_base_new();
if (!evbase_player) if (!evbase_player)
{ {
@ -3310,8 +3202,6 @@ player_init(void)
evnew_fail: evnew_fail:
event_base_free(evbase_player); event_base_free(evbase_player);
evbase_fail: evbase_fail:
evbuffer_free(audio_buf);
audio_fail:
#ifdef HAVE_TIMERFD #ifdef HAVE_TIMERFD
close(pb_timer_fd); close(pb_timer_fd);
#else #else
@ -3327,6 +3217,14 @@ player_deinit(void)
{ {
int ret; int ret;
player_playback_stop();
#ifdef HAVE_TIMERFD
close(pb_timer_fd);
#else
timer_delete(pb_timer);
#endif
player_exit = 1; player_exit = 1;
commands_base_destroy(cmdbase); commands_base_destroy(cmdbase);
@ -3338,19 +3236,11 @@ player_deinit(void)
return; return;
} }
free(history);
pb_timer_stop();
#ifdef HAVE_TIMERFD
close(pb_timer_fd);
#else
timer_delete(pb_timer);
#endif
evbuffer_free(audio_buf);
input_deinit(); input_deinit();
outputs_deinit(); outputs_deinit();
free(history);
event_base_free(evbase_player); event_base_free(evbase_player);
} }