From 938e197fa45cd20d6b889fac8632c44990f2d7f0 Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Sat, 14 Jan 2017 00:43:03 +0100 Subject: [PATCH] [player] Refactor read/write - remove read skip which is obsolete when input has own thread and cannot block - simplify code - fix while loop that could loop infinitely --- src/input.c | 9 +- src/input.h | 14 ++- src/player.c | 322 +++++++++++++++++---------------------------------- 3 files changed, 122 insertions(+), 223 deletions(-) diff --git a/src/input.c b/src/input.c index f037ccce..41a2e5d3 100644 --- a/src/input.c +++ b/src/input.c @@ -250,7 +250,7 @@ input_write(struct evbuffer *evbuf, short flags) /* Thread: player */ int -input_read(struct evbuffer *evbuf, size_t want, short *flags) +input_read(void *data, size_t size, short *flags) { int len; @@ -265,7 +265,7 @@ input_read(struct evbuffer *evbuf, size_t want, short *flags) pthread_mutex_lock(&input_buffer.mutex); #ifdef DEBUG - debug_elapsed += want; + debug_elapsed += size; if (debug_elapsed > STOB(441000)) // 10 sec { DPRINTF(E_DBG, L_PLAYER, "Input buffer has %zu bytes\n", evbuffer_get_length(input_buffer.evbuf)); @@ -273,7 +273,7 @@ input_read(struct evbuffer *evbuf, size_t want, short *flags) } #endif - len = evbuffer_remove_buffer(input_buffer.evbuf, evbuf, want); + len = evbuffer_remove(input_buffer.evbuf, data, size); if (len < 0) { DPRINTF(E_LOG, L_PLAYER, "Error reading stream data from input buffer\n"); @@ -492,6 +492,9 @@ input_deinit(void) inputs[i]->deinit(); } + pthread_cond_destroy(&input_buffer.cond); + pthread_mutex_destroy(&input_buffer.mutex); + evbuffer_free(input_buffer.evbuf); } diff --git a/src/input.h b/src/input.h index 3190d864..b63f7596 100644 --- a/src/input.h +++ b/src/input.h @@ -131,13 +131,13 @@ input_wait(void); * Move a chunk of stream data from the player's input buffer to an output * buffer. Should only be called by the player thread. Will not block. * - * @in evbuf Output buffer - * @in want How much data to move to the output buffer + * @in data Output buffer + * @in size How much data to move to the output buffer * @out flags Flags INPUT_FLAG_EOF or INPUT_FLAG_METADATA - * @return Number of bytes moved + * @return Number of bytes moved, -1 on error */ int -input_read(struct evbuffer *evbuf, size_t want, short *flags); +input_read(void *data, size_t size, short *flags); /* * Initializes the given player source for playback @@ -180,9 +180,15 @@ input_seek(struct player_source *ps, int seek_ms); void input_flush(short *flags); +/* + * Called by player_init (so will run in main thread) + */ int input_init(void); +/* + * Called by player_deinit (so will run in main thread) + */ void input_deinit(void); diff --git a/src/player.c b/src/player.c index c8827fdb..11ad681e 100644 --- a/src/player.c +++ b/src/player.c @@ -107,8 +107,10 @@ // Default volume (must be from 0 - 100) #define PLAYER_DEFAULT_VOLUME 50 -// Used to keep the player from getting ahead of a rate limited source (see below) -#define PLAYER_TICKS_MAX_OVERRUN 2 +// For every tick, we will read a packet from the input buffer and write it to +// the outputs. If the input is empty, we will try to catch up next tick. We +// will continue trying that up to this max, after which we abort playback. +#define PLAYER_WRITES_PENDING_MAX 126 struct volume_param { int volume; @@ -192,8 +194,9 @@ static struct timespec tick_interval; static struct timespec timer_res; // Time between two packets static struct timespec packet_time = { 0, AIRTUNES_V2_STREAM_PERIOD }; -// Will be positive if we need to skip some source reads (see below) -static int ticks_skip; + +// How many writes we owe the output (when the input is underrunning) +static int pb_writes_pending; /* Sync values */ static struct timespec pb_pos_stamp; @@ -217,9 +220,9 @@ static struct player_source *cur_streaming; static uint32_t cur_plid; static uint32_t cur_plversion; -static struct evbuffer *audio_buf; -static uint8_t rawbuf[STOB(AIRTUNES_V2_PACKET_SAMPLES)]; - +/* Player buffer (holds one packet) */ +static uint8_t pb_buffer[STOB(AIRTUNES_V2_PACKET_SAMPLES)]; +static size_t pb_buffer_offset; /* Play history */ static struct player_history *history; @@ -589,59 +592,6 @@ history_add(uint32_t id, uint32_t item_id) /* Audio sources */ -/* - * Read up to "len" data from the given player source and returns - * the actual amount of data read. - */ -/*static int -stream_read(struct player_source *ps, int len) -{ - int icy_timer; - int ret; - - if (!ps) - { - DPRINTF(E_LOG, L_PLAYER, "Stream read called with no active streaming player source\n"); - return -1; - } - - if (!ps->setup_done) - { - DPRINTF(E_LOG, L_PLAYER, "Given player source not setup for reading data\n"); - return -1; - } - - // Read up to len data depending on data kind - switch (ps->data_kind) - { - case DATA_KIND_HTTP: - ret = transcode(audio_buf, len, ps->xcode, &icy_timer); - - if (icy_timer) - metadata_check_icy(); - break; - - case DATA_KIND_FILE: - ret = transcode(audio_buf, len, ps->xcode, &icy_timer); - break; - -#ifdef HAVE_SPOTIFY_H - case DATA_KIND_SPOTIFY: - ret = spotify_audio_get(audio_buf, len); - break; -#endif - - case DATA_KIND_PIPE: - ret = pipe_audio_get(audio_buf, len); - break; - - default: - ret = -1; - } - - return ret; -}*/ - static struct player_source * source_now_playing() { @@ -829,9 +779,6 @@ source_play() ret = input_start(cur_streaming); - ticks_skip = 0; - memset(rawbuf, 0, sizeof(rawbuf)); - return ret; } @@ -1076,91 +1023,80 @@ source_prev() } static int -source_read(uint8_t *buf, int len, uint64_t rtptime) +source_switch(int nbytes) { - int ret; - int nbytes; - short flags; - char *silence_buf; struct player_source *ps; + int ret; + + DPRINTF(E_DBG, L_PLAYER, "Switching track\n"); + + source_close(last_rtptime + AIRTUNES_V2_PACKET_SAMPLES + BTOS(nbytes) - 1); + + ps = source_next(); + if (!ps) + { + cur_streaming = NULL; + return 0; // End of queue + } + + ret = source_open(ps, cur_streaming->end + 1, 0); + if (ret < 0) + return -1; + + ret = source_play(); + if (ret < 0) + return -1; + + metadata_trigger(0); + + return 0; +} + +// Returns -1 on error (caller should abort playback), or bytes read (possibly 0) +static int +source_read(uint8_t *buf, int len) +{ + int nbytes; + int ret; + short flags; if (!cur_streaming) - return 0; + return -1; - nbytes = 0; - while (nbytes < len) + nbytes = input_read(buf, len, &flags); + if (nbytes < 0) { - if (evbuffer_get_length(audio_buf) == 0) - { - flags = 0; - if (cur_streaming) - { - ret = input_read(audio_buf, len - nbytes, &flags); - } - else if (cur_playing) - { - // Reached end of playlist (cur_playing is NULL) send silence and source_check will abort playback if the last item was played - DPRINTF(E_SPAM, L_PLAYER, "End of playlist reached, stream silence until playback of last item ends\n"); - silence_buf = (char *)calloc((len - nbytes), sizeof(char)); - evbuffer_add(audio_buf, silence_buf, (len - nbytes)); - free(silence_buf); - ret = len - nbytes; - } - else - { - // If cur_streaming and cur_playing are NULL, source_read for all queue items failed. Playback will be aborted in the calling function - return -1; - } + DPRINTF(E_LOG, L_PLAYER, "Error reading source %d\n", cur_streaming->id); - if (ret == 0) - sleep(1); // TODO Underrun -> proper pause - else if ((ret < 0) || (flags & INPUT_FLAG_EOF)) - { - source_close(rtptime + BTOS(nbytes) - 1); + nbytes = 0; + ret = source_switch(0); + db_queue_delete_byitemid(cur_streaming->item_id); + if (ret < 0) + return -1; + } + else if (flags & INPUT_FLAG_EOF) + { + ret = source_switch(nbytes); + if (ret < 0) + return -1; + } - DPRINTF(E_DBG, L_PLAYER, "New file\n"); - - ps = source_next(); - - if (ret < 0) - { - DPRINTF(E_LOG, L_PLAYER, "Error reading source %d\n", cur_streaming->id); - db_queue_delete_byitemid(cur_streaming->item_id); - } - - if (ps) - { - ret = source_open(ps, cur_streaming->end + 1, 0); - if (ret < 0) - { - source_free(ps); - return -1; - } - - ret = source_play(); - if (ret < 0) - return -1; - - metadata_trigger(0); - } - else - { - cur_streaming = NULL; - } - continue; - } - } - - nbytes += evbuffer_remove(audio_buf, buf + nbytes, len - nbytes); + // We pad the output buffer with silence if we don't have enough data for a + // full packet and there is no more data coming up (no more tracks in queue) + if ((nbytes < len) && (!cur_streaming)) + { + memset(buf + nbytes, 0, len - nbytes); + nbytes = len; } return nbytes; } static void -playback_write(int read_skip) +playback_write(void) { - int ret; + int want; + int got; source_check(); @@ -1168,23 +1104,31 @@ playback_write(int read_skip) if (player_state == PLAY_STOPPED) return; - last_rtptime += AIRTUNES_V2_PACKET_SAMPLES; - - if (!read_skip) + pb_writes_pending++; + while (pb_writes_pending) { - ret = source_read(rawbuf, sizeof(rawbuf), last_rtptime); - if (ret < 0) + want = sizeof(pb_buffer) - pb_buffer_offset; + got = source_read(pb_buffer + pb_buffer_offset, want); + if (got == want) { - DPRINTF(E_DBG, L_PLAYER, "Error reading from source, aborting playback\n"); - + last_rtptime += AIRTUNES_V2_PACKET_SAMPLES; + outputs_write(pb_buffer, last_rtptime); + pb_buffer_offset = 0; + pb_writes_pending--; + } + else if ((got < 0) || (pb_writes_pending > PLAYER_WRITES_PENDING_MAX)) + { + DPRINTF(E_LOG, L_PLAYER, "Error reading from source, aborting playback (%d)\n", pb_writes_pending); playback_abort(); return; } + else + { + DPRINTF(E_DBG, L_PLAYER, "Partial read (offset=%zu, pending=%d)\n", pb_buffer_offset, pb_writes_pending); + pb_buffer_offset += got; + return; + } } - else - DPRINTF(E_SPAM, L_PLAYER, "Skipping read\n"); - - outputs_write(rawbuf, last_rtptime); } static void @@ -1193,8 +1137,6 @@ player_playback_cb(int fd, short what, void *arg) struct timespec next_tick; uint64_t overrun; int ret; - int skip; - int skip_first; // Check if we missed any timer expirations overrun = 0; @@ -1212,55 +1154,16 @@ player_playback_cb(int fd, short what, void *arg) overrun = ret; #endif /* HAVE_TIMERFD */ - // The reason we get behind the playback timer may be that we are playing a - // network stream OR that the source is slow to open OR some interruption. - // For streams, we might be consuming faster than the stream delivers, so - // when ffmpeg's buffer empties (might take a few hours) our av_read_frame() - // in transcode.c will begin to block, because ffmpeg has to wait for new data - // from the stream server. - // - // Our strategy to catch up with the timer depends on the source: - // - streams: We will skip reading data every second until we have countered - // the overrun by skipping reads for a number of ticks that is - // 3 times the overrun. That should make the source catch up. To - // keep the output happy we resend the previous rawbuf when we - // have skipped a read. - // - files: Just read and write like crazy until we have caught up. - - skip_first = 0; - if (overrun > PLAYER_TICKS_MAX_OVERRUN) - { - DPRINTF(E_WARN, L_PLAYER, "Behind the playback timer with %" PRIu64 " ticks\n", overrun); - - if (cur_streaming && (cur_streaming->data_kind == DATA_KIND_HTTP || cur_streaming->data_kind == DATA_KIND_PIPE)) - { - ticks_skip = 3 * overrun; - - DPRINTF(E_WARN, L_PLAYER, "Will skip reading for a total of %d ticks to catch up\n", ticks_skip); - - // We always skip after a timer overrun, since another read will - // probably just give another time overrun - skip_first = 1; - } - else - ticks_skip = 0; - } - - // Decide how many packets to send + // If there was an overrun, we will try to read/write a corresponding number + // of times so we catch up. The read from the input is non-blocking, so it + // should not bring us further behind, even if there is no data. next_tick = timespec_add(pb_timer_last, tick_interval); for (; overrun > 0; overrun--) next_tick = timespec_add(next_tick, tick_interval); do { - skip = skip_first || ((ticks_skip > 0) && ((last_rtptime / AIRTUNES_V2_PACKET_SAMPLES) % 126 == 0)); - - playback_write(skip); - - skip_first = 0; - if (skip) - ticks_skip--; - + playback_write(); packet_timer_last = timespec_add(packet_timer_last, packet_time); } while ((timespec_cmp(packet_timer_last, next_tick) < 0) && (player_state == PLAY_PLAYING)); @@ -1789,8 +1692,6 @@ playback_abort(void) source_stop(); - evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf)); - if (!clear_queue_on_stop_disabled) db_queue_clear(); @@ -1963,8 +1864,6 @@ playback_stop(void *arg, int *retval) source_stop(); - evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf)); - status_update(PLAY_STOPPED); metadata_purge(); @@ -2009,6 +1908,9 @@ playback_start_bh(void *arg, int *retval) pb_timer_last.tv_sec = pb_pos_stamp.tv_sec; pb_timer_last.tv_nsec = pb_pos_stamp.tv_nsec; + pb_buffer_offset = 0; + pb_writes_pending = 0; + ret = pb_timer_start(); if (ret < 0) goto out_fail; @@ -2406,8 +2308,6 @@ playback_pause(void *arg, int *retval) source_pause(pos); - evbuffer_drain(audio_buf, evbuffer_get_length(audio_buf)); - metadata_purge(); /* We're async if we need to flush devices */ @@ -3244,14 +3144,6 @@ player_init(void) gcry_randomize(&rnd, sizeof(rnd), GCRY_STRONG_RANDOM); last_rtptime = ((uint64_t)1 << 32) | rnd; - audio_buf = evbuffer_new(); - if (!audio_buf) - { - DPRINTF(E_LOG, L_PLAYER, "Could not allocate evbuffer for audio buffer\n"); - - goto audio_fail; - } - evbase_player = event_base_new(); if (!evbase_player) { @@ -3310,8 +3202,6 @@ player_init(void) evnew_fail: event_base_free(evbase_player); evbase_fail: - evbuffer_free(audio_buf); - audio_fail: #ifdef HAVE_TIMERFD close(pb_timer_fd); #else @@ -3327,6 +3217,14 @@ player_deinit(void) { int ret; + player_playback_stop(); + +#ifdef HAVE_TIMERFD + close(pb_timer_fd); +#else + timer_delete(pb_timer); +#endif + player_exit = 1; commands_base_destroy(cmdbase); @@ -3338,19 +3236,11 @@ player_deinit(void) return; } - free(history); - - pb_timer_stop(); -#ifdef HAVE_TIMERFD - close(pb_timer_fd); -#else - timer_delete(pb_timer); -#endif - - evbuffer_free(audio_buf); - input_deinit(); + outputs_deinit(); + free(history); + event_base_free(evbase_player); }