[outputs] Make outputs_device_flush (per device flushing)

Works better with the callback mechanism
This commit is contained in:
ejurgensen 2019-02-17 00:19:13 +01:00
parent 87ca6363ae
commit d008e241cf
7 changed files with 171 additions and 212 deletions

View File

@ -124,22 +124,6 @@ callback_remove(struct output_device *device)
}
}
static void
callback_remove_all(enum output_types type)
{
struct output_device *device;
for (device = output_device_list; device; device = device->next)
{
if (type != device->type)
continue;
outputs_device_cb_set(device, NULL);
callback_remove(device);
}
}
static int
callback_add(struct output_device *device, output_status_cb cb)
{
@ -168,7 +152,7 @@ callback_add(struct output_device *device, output_status_cb cb)
outputs_cb_queue[callback_id].cb = cb;
outputs_cb_queue[callback_id].device = device; // Don't dereference this later, it might become invalid!
DPRINTF(E_DBG, L_PLAYER, "Registered callback to %s with id %d\n", player_pmap(cb), callback_id);
DPRINTF(E_DBG, L_PLAYER, "Registered callback to %s with id %d (device %p, %s)\n", player_pmap(cb), callback_id, device, device->name);
int active = 0;
for (int i = 0; i < ARRAY_SIZE(outputs_cb_queue); i++)
@ -694,6 +678,18 @@ outputs_device_stop(struct output_device *device, output_status_cb cb)
return outputs[device->type]->device_stop(device, callback_add(device, cb));
}
int
outputs_device_flush(struct output_device *device, output_status_cb cb)
{
if (outputs[device->type]->disabled || !outputs[device->type]->device_flush)
return -1;
if (!device->session)
return -1;
return outputs[device->type]->device_flush(device, callback_add(device, cb));
}
int
outputs_device_probe(struct output_device *device, output_status_cb cb)
{
@ -786,6 +782,25 @@ outputs_playback_stop(void)
}
}
int
outputs_flush(output_status_cb cb)
{
struct output_device *device;
int count = 0;
int ret;
for (device = output_device_list; device; device = device->next)
{
ret = outputs_device_flush(device, cb);
if (ret < 0)
continue;
count++;
}
return count;
}
void
outputs_write(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts)
{
@ -805,27 +820,6 @@ outputs_write(void *buf, size_t bufsize, struct media_quality *quality, int nsam
buffer_drain(&output_buffer);
}
int
outputs_flush(output_status_cb cb)
{
int ret;
int i;
ret = 0;
for (i = 0; outputs[i]; i++)
{
if (outputs[i]->disabled || !outputs[i]->flush)
continue;
// Clear callback register for all devices belonging to outputs[i]
callback_remove_all(outputs[i]->type);
ret += outputs[i]->flush(callback_add(NULL, cb));
}
return ret;
}
struct output_metadata *
outputs_metadata_prepare(int id)
{

View File

@ -199,6 +199,9 @@ struct output_definition
// Close a session prepared by device_start and call back
int (*device_stop)(struct output_device *device, int callback_id);
// Flush device session and call back
int (*device_flush)(struct output_device *device, int callback_id);
// Test the connection to a device and call back
int (*device_probe)(struct output_device *device, int callback_id);
@ -223,9 +226,6 @@ struct output_definition
// Write stream data to the output devices
void (*write)(struct output_buffer *buffer);
// Flush all sessions, the return must be number of sessions pending the flush
int (*flush)(int callback_id);
// Authorize an output with a pin-code (probably coming from the filescanner)
void (*authorize)(const char *pin);
@ -278,6 +278,9 @@ outputs_device_start(struct output_device *device, output_status_cb cb);
int
outputs_device_stop(struct output_device *device, output_status_cb cb);
int
outputs_device_flush(struct output_device *device, output_status_cb cb);
int
outputs_device_probe(struct output_device *device, output_status_cb cb);
@ -299,12 +302,12 @@ outputs_device_free(struct output_device *device);
void
outputs_playback_stop(void);
void
outputs_write(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts);
int
outputs_flush(output_status_cb cb);
void
outputs_write(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts);
struct output_metadata *
outputs_metadata_prepare(int id);

View File

@ -829,6 +829,24 @@ alsa_device_stop(struct output_session *session)
alsa_status(as);
}
static int
alsa_device_flush(struct output_device *device, int callback_id)
{
struct alsa_session *as = device->session;
int i;
// TODO close device?
snd_pcm_drop(hdl);
prebuf_free(as);
as->callback_id = callback_id;
as->state = ALSA_STATE_STARTED;
alsa_status(as);
return 0;
}
static int
alsa_device_probe(struct output_device *device, output_status_cb cb)
{
@ -951,28 +969,6 @@ alsa_write(uint8_t *buf, uint64_t rtptime)
}
}
static int
alsa_flush(output_status_cb cb, uint64_t rtptime)
{
struct alsa_session *as;
int i;
i = 0;
for (as = sessions; as; as = as->next)
{
i++;
snd_pcm_drop(hdl);
prebuf_free(as);
as->status_cb = cb;
as->state = ALSA_STATE_STARTED;
alsa_status(as);
}
return i;
}
static void
alsa_set_status_cb(struct output_session *session, output_status_cb cb)
{
@ -1059,11 +1055,11 @@ struct output_definition output_alsa =
.deinit = alsa_deinit,
.device_start = alsa_device_start,
.device_stop = alsa_device_stop,
.device_flush = alsa_device_flush,
.device_probe = alsa_device_probe,
.device_volume_set = alsa_device_volume_set,
.playback_start = alsa_playback_start,
.playback_stop = alsa_playback_stop,
.write = alsa_write,
.flush = alsa_flush,
.status_cb = alsa_set_status_cb,
};

View File

@ -131,6 +131,19 @@ dummy_device_stop(struct output_device *device, int callback_id)
return 0;
}
static int
dummy_device_flush(struct output_device *device, int callback_id)
{
struct dummy_session *ds = device->session;
ds->callback_id = callback_id;
ds->state = OUTPUT_STATE_STOPPED;
dummy_status(ds);
return 0;
}
static int
dummy_device_probe(struct output_device *device, int callback_id)
{
@ -228,6 +241,7 @@ struct output_definition output_dummy =
.deinit = dummy_deinit,
.device_start = dummy_device_start,
.device_stop = dummy_device_stop,
.device_flush = dummy_device_flush,
.device_probe = dummy_device_probe,
.device_volume_set = dummy_device_volume_set,
.device_cb_set = dummy_device_cb_set,

View File

@ -323,6 +323,21 @@ fifo_device_stop(struct output_device *device, int callback_id)
return 0;
}
static int
fifo_device_flush(struct output_device *device, int callback_id)
{
struct fifo_session *fifo_session = device->session;
fifo_empty(fifo_session);
free_buffer();
fifo_session->callback_id = callback_id;
fifo_session->state = OUTPUT_STATE_CONNECTED;
fifo_status(fifo_session);
return 0;
}
static int
fifo_device_probe(struct output_device *device, int callback_id)
{
@ -386,23 +401,6 @@ fifo_playback_stop(void)
fifo_status(fifo_session);
}
static int
fifo_flush(int callback_id)
{
struct fifo_session *fifo_session = sessions;
if (!fifo_session)
return 0;
fifo_empty(fifo_session);
free_buffer();
fifo_session->callback_id = callback_id;
fifo_session->state = OUTPUT_STATE_CONNECTED;
fifo_status(fifo_session);
return 1;
}
static void
fifo_write(struct output_buffer *obuf)
{
@ -530,10 +528,10 @@ struct output_definition output_fifo =
.deinit = fifo_deinit,
.device_start = fifo_device_start,
.device_stop = fifo_device_stop,
.device_flush = fifo_device_flush,
.device_probe = fifo_device_probe,
.device_volume_set = fifo_device_volume_set,
.device_cb_set = fifo_device_cb_set,
.playback_stop = fifo_playback_stop,
.write = fifo_write,
.flush = fifo_flush,
};

View File

@ -669,6 +669,40 @@ pulse_device_stop(struct output_session *session)
stream_close(ps, close_cb);
}
static int
pulse_device_flush(struct output_device *device, int callback_id)
{
struct pulse_session *ps = device->session;
pa_operation* o;
DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio flush\n");
pa_threaded_mainloop_lock(pulse.mainloop);
ps->callback_id = callback_id;
o = pa_stream_cork(ps->stream, 1, NULL, NULL);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not pause '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
return -1;
}
pa_operation_unref(o);
o = pa_stream_flush(ps->stream, flush_cb, ps);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not flush '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
return -1;
}
pa_operation_unref(o);
pa_threaded_mainloop_unlock(pulse.mainloop);
return 0;
}
static int
pulse_device_probe(struct output_device *device, output_status_cb cb)
{
@ -824,46 +858,6 @@ pulse_playback_stop(void)
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static int
pulse_flush(output_status_cb cb, uint64_t rtptime)
{
struct pulse_session *ps;
pa_operation* o;
int i;
DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio flush\n");
pa_threaded_mainloop_lock(pulse.mainloop);
i = 0;
for (ps = sessions; ps; ps = ps->next)
{
i++;
ps->status_cb = cb;
o = pa_stream_cork(ps->stream, 1, NULL, NULL);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not pause '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
continue;
}
pa_operation_unref(o);
o = pa_stream_flush(ps->stream, flush_cb, ps);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not flush '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
continue;
}
pa_operation_unref(o);
}
pa_threaded_mainloop_unlock(pulse.mainloop);
return i;
}
static void
pulse_set_status_cb(struct output_session *session, output_status_cb cb)
{
@ -962,13 +956,13 @@ struct output_definition output_pulse =
.deinit = pulse_deinit,
.device_start = pulse_device_start,
.device_stop = pulse_device_stop,
.device_flush = pulse_device_flush,
.device_probe = pulse_device_probe,
.device_free_extra = pulse_device_free_extra,
.device_volume_set = pulse_device_volume_set,
.playback_start = pulse_playback_start,
.playback_stop = pulse_playback_stop,
.write = pulse_write,
.flush = pulse_flush,
.status_cb = pulse_set_status_cb,
};

View File

@ -342,9 +342,6 @@ static struct raop_service control_6svc;
static struct raop_metadata *metadata_head;
static struct raop_metadata *metadata_tail;
/* FLUSH timer */
static struct event *flush_timer;
/* Keep-alive timer - hack for ATV's with tvOS 10 */
static struct event *keep_alive_timer;
static struct timeval keep_alive_tv = { 30, 0 };
@ -1911,16 +1908,6 @@ session_failure(struct raop_session *rs)
session_cleanup(rs);
}
static void
deferredev_cb(int fd, short what, void *arg)
{
struct raop_session *rs = arg;
DPRINTF(E_DBG, L_RAOP, "Cleaning up failed session (deferred) on device '%s'\n", rs->devname);
session_failure(rs);
}
static void
deferred_session_failure(struct raop_session *rs)
{
@ -1974,6 +1961,23 @@ session_teardown(struct raop_session *rs, const char *log_caller)
return ret;
}
static void
deferredev_cb(int fd, short what, void *arg)
{
struct raop_session *rs = arg;
if (rs->state == RAOP_STATE_FAILED)
{
DPRINTF(E_DBG, L_RAOP, "Cleaning up failed session (deferred) on device '%s'\n", rs->devname);
session_failure(rs);
}
else
{
DPRINTF(E_DBG, L_RAOP, "Flush timer expired; tearing down RAOP session on '%s'\n", rs->devname);
session_teardown(rs, "deferredev_cb");
}
}
static struct raop_session *
session_make(struct output_device *rd, int family, int callback_id, bool only_probe)
{
@ -2010,6 +2014,7 @@ session_make(struct output_device *rd, int family, int callback_id, bool only_pr
}
CHECK_NULL(L_PLAYER, rs = calloc(1, sizeof(struct raop_session)));
CHECK_NULL(L_RAOP, rs->deferredev = evtimer_new(evbase_player, deferredev_cb, rs));
rs->state = RAOP_STATE_STOPPED;
rs->only_probe = only_probe;
@ -2059,14 +2064,6 @@ session_make(struct output_device *rd, int family, int callback_id, bool only_pr
break;
}
rs->deferredev = evtimer_new(evbase_player, deferredev_cb, rs);
if (!rs->deferredev)
{
DPRINTF(E_LOG, L_RAOP, "Out of memory for deferred error handling!\n");
goto out_free_rs;
}
rs->ctrl = evrtsp_connection_new(address, port);
if (!rs->ctrl)
{
@ -2153,7 +2150,6 @@ session_make(struct output_device *rd, int family, int callback_id, bool only_pr
evrtsp_connection_free(rs->ctrl);
out_free_event:
event_free(rs->deferredev);
out_free_rs:
free(rs);
return NULL;
@ -2812,17 +2808,6 @@ raop_cb_keep_alive(struct evrtsp_request *req, void *arg)
return;
}
static void
raop_flush_timer_cb(int fd, short what, void *arg)
{
struct raop_session *rs;
DPRINTF(E_DBG, L_RAOP, "Flush timer expired; tearing down RAOP sessions\n");
for (rs = raop_sessions; rs; rs = rs->next)
session_teardown(rs, "raop_flush_timer_cb");
}
static void
raop_keep_alive_timer_cb(int fd, short what, void *arg)
{
@ -4824,6 +4809,29 @@ raop_device_stop(struct output_device *device, int callback_id)
return session_teardown(rs, "device_stop");
}
static int
raop_device_flush(struct output_device *device, int callback_id)
{
struct raop_session *rs = device->session;
struct timeval tv;
int ret;
if (rs->state != RAOP_STATE_STREAMING)
return -1;
ret = raop_send_req_flush(rs, raop_cb_flush, "flush");
if (ret < 0)
return -1;
rs->callback_id = callback_id;
evutil_timerclear(&tv);
tv.tv_sec = 10;
evtimer_add(rs->deferredev, &tv);
return 0;
}
static void
raop_device_cb_set(struct output_device *device, int callback_id)
{
@ -4889,51 +4897,13 @@ raop_write(struct output_buffer *obuf)
if (rs->state != RAOP_STATE_CONNECTED)
continue;
event_del(flush_timer); // In case playback was stopped but then restarted again
event_del(rs->deferredev); // Kills flush timer in case playback was stopped but then restarted again
rs->state = RAOP_STATE_STREAMING;
// Make a cb?
}
}
static int
raop_flush(int callback_id)
{
struct timeval tv;
struct raop_session *rs;
struct raop_session *next;
int pending;
int ret;
pending = 0;
for (rs = raop_sessions; rs; rs = next)
{
next = rs->next;
if (rs->state != RAOP_STATE_STREAMING)
continue;
ret = raop_send_req_flush(rs, raop_cb_flush, "flush");
if (ret < 0)
{
session_failure(rs);
continue;
}
rs->callback_id = callback_id;
pending++;
}
if (pending > 0)
{
evutil_timerclear(&tv);
tv.tv_sec = 10;
evtimer_add(flush_timer, &tv);
}
return pending;
}
static int
raop_init(void)
{
@ -5006,14 +4976,7 @@ raop_init(void)
if (ptr)
*ptr = '\0';
flush_timer = evtimer_new(evbase_player, raop_flush_timer_cb, NULL);
keep_alive_timer = evtimer_new(evbase_player, raop_keep_alive_timer_cb, NULL);
if (!flush_timer || !keep_alive_timer)
{
DPRINTF(E_LOG, L_RAOP, "Out of memory for flush timer or keep alive timer\n");
goto out_free_b64_iv;
}
CHECK_NULL(L_RAOP, keep_alive_timer = evtimer_new(evbase_player, raop_keep_alive_timer_cb, NULL));
v6enabled = cfg_getbool(cfg_getsec(cfg, "general"), "ipv6");
@ -5022,7 +4985,7 @@ raop_init(void)
{
DPRINTF(E_LOG, L_RAOP, "AirPlay time synchronization failed to start\n");
goto out_free_timers;
goto out_free_timer;
}
ret = raop_v2_control_start(v6enabled);
@ -5055,10 +5018,8 @@ raop_init(void)
raop_v2_control_stop();
out_stop_timing:
raop_v2_timing_stop();
out_free_timers:
event_free(flush_timer);
out_free_timer:
event_free(keep_alive_timer);
out_free_b64_iv:
free(raop_aes_iv_b64);
out_free_b64_key:
free(raop_aes_key_b64);
@ -5083,7 +5044,6 @@ raop_deinit(void)
raop_v2_control_stop();
raop_v2_timing_stop();
event_free(flush_timer);
event_free(keep_alive_timer);
gcry_cipher_close(raop_aes_ctx);
@ -5102,6 +5062,7 @@ struct output_definition output_raop =
.deinit = raop_deinit,
.device_start = raop_device_start,
.device_stop = raop_device_stop,
.device_flush = raop_device_flush,
.device_probe = raop_device_probe,
.device_cb_set = raop_device_cb_set,
.device_free_extra = raop_device_free_extra,
@ -5109,7 +5070,6 @@ struct output_definition output_raop =
.device_volume_to_pct = raop_volume_to_pct,
.playback_stop = raop_playback_stop,
.write = raop_write,
.flush = raop_flush,
.metadata_prepare = raop_metadata_prepare,
.metadata_send = raop_metadata_send,
.metadata_purge = raop_metadata_purge,