diff --git a/src/outputs/pulse.c b/src/outputs/pulse.c index a51ab8ba..28bc0bfd 100644 --- a/src/outputs/pulse.c +++ b/src/outputs/pulse.c @@ -1,8 +1,6 @@ /* * Copyright (C) 2016 Espen Jürgensen * - * Adapted from pulseaudio's simple.c - * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or @@ -45,6 +43,7 @@ /* TODO for Pulseaudio - Get volume from Pulseaudio on startup and on callbacks - Add sync with AirPlay with pa_buffer_attr + - Underrun, suspend and overrun monitoring */ struct pulse @@ -55,7 +54,7 @@ struct pulse struct commands_base *cmdbase; int operation_success; -}; +} pulse; struct pulse_session { @@ -65,9 +64,6 @@ struct pulse_session char *devname; int volume; - struct event *deferredev; - output_status_cb defer_cb; - /* Do not dereference - only passed to the status cb */ struct output_device *device; struct output_session *output_session; @@ -80,27 +76,25 @@ struct pulse_session extern struct event_base *evbase_player; // Globals -static struct pulse pulse; static struct pulse_session *sessions; // Internal list with indeces of the Pulseaudio devices (sinks) we have registered static uint32_t pulse_known_devices[PULSE_MAX_DEVICES]; -/* Forwards */ -static void -defer_cb(int fd, short what, void *arg); - /* ---------------------------- SESSION HANDLING ---------------------------- */ static void pulse_session_free(struct pulse_session *ps) { - event_free(ps->deferredev); - if (ps->stream) { + pa_threaded_mainloop_lock(pulse.mainloop); + + pa_stream_set_state_callback(ps->stream, NULL, NULL); pa_stream_disconnect(ps->stream); pa_stream_unref(ps->stream); + + pa_threaded_mainloop_unlock(pulse.mainloop); } if (ps->devname) @@ -146,15 +140,6 @@ pulse_session_make(struct output_device *device, output_status_cb cb) return NULL; } - ps->deferredev = evtimer_new(evbase_player, defer_cb, ps); - if (!ps->deferredev) - { - DPRINTF(E_LOG, L_LAUDIO, "Out of memory for Pulseaudio deferred event\n"); - free(os); - free(ps); - return NULL; - } - os->session = ps; os->type = device->type; @@ -171,15 +156,15 @@ pulse_session_make(struct output_device *device, output_status_cb cb) return ps; } -/* ---------------------------- STATUS HANDLERS ----------------------------- */ +/* ---------------------------- COMMAND HANDLERS ---------------------------- */ // Maps our internal state to the generic output state and then makes a callback -// to the player to tell that state. Note: Will free the session if the state is -// stopped or failed. -static void -defer_cb(int fd, short what, void *arg) +// to the player to tell that state. Should always be called deferred. +static enum command_state +send_status(void *arg, int *ptr) { struct pulse_session *ps = arg; + output_status_cb status_cb; enum output_device_state state; switch (ps->state) @@ -198,77 +183,138 @@ defer_cb(int fd, short what, void *arg) state = OUTPUT_STATE_STARTUP; break; default: - DPRINTF(E_LOG, L_LAUDIO, "Bug! Unhandled state in pulse_status()\n"); + DPRINTF(E_LOG, L_LAUDIO, "Bug! Unhandled state in send_status()\n"); state = OUTPUT_STATE_FAILED; } - if (ps->defer_cb) - ps->defer_cb(ps->device, ps->output_session, state); + status_cb = ps->status_cb; + ps->status_cb = NULL; + if (status_cb) + status_cb(ps->device, ps->output_session, state); - if (!(state > OUTPUT_STATE_STOPPED)) - pulse_session_cleanup(ps); + return COMMAND_PENDING; // Don't want the command module to clean up ps } +static enum command_state +session_shutdown(void *arg, int *ptr) +{ + struct pulse_session *ps = arg; + + send_status(ps, ptr); + pulse_session_cleanup(ps); + + return COMMAND_PENDING; // Don't want the command module to clean up ps +} + +/* ---------------------- EXECUTED IN PULSEAUDIO THREAD --------------------- */ + static void pulse_status(struct pulse_session *ps) { - ps->defer_cb = ps->status_cb; - event_active(ps->deferredev, 0, 0); - ps->status_cb = NULL; + // async to avoid risk of deadlock if the player should make calls back to Pulseaudio + commands_exec_async(pulse.cmdbase, send_status, ps); } +static void +pulse_session_shutdown(struct pulse_session *ps) +{ + // async to avoid risk of deadlock if the player should make calls back to Pulseaudio + commands_exec_async(pulse.cmdbase, session_shutdown, ps); +} /* --------------------- CALLBACKS FROM PULSEAUDIO THREAD ------------------- */ + +// This will be called if something happens to the stream after it was opened static void -stream_state_cb(pa_stream *s, void * userdata) +stream_state_cb(pa_stream *s, void *userdata) { - struct pulse *p = &pulse; struct pulse_session *ps = userdata; - DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio stream state CB\n"); + DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio stream to '%s' changed state (%d)\n", ps->devname, ps->state); ps->state = pa_stream_get_state(s); - - switch (ps->state) + if (!PA_STREAM_IS_GOOD(ps->state)) { - case PA_STREAM_READY: - case PA_STREAM_FAILED: - case PA_STREAM_TERMINATED: - pa_threaded_mainloop_signal(p->mainloop, 0); - break; + if (ps->state == PA_STREAM_FAILED) + { + errno = pa_context_errno(pulse.context); + DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio stream to '%s' failed with error: %s\n", ps->devname, pa_strerror(errno)); + } + else + DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio stream to '%s' aborted (%d)\n", ps->devname, ps->state); - case PA_STREAM_UNCONNECTED: - case PA_STREAM_CREATING: - break; + pulse_session_shutdown(ps); + return; } } +// This will be called our request to open the stream has completed static void -stream_request_cb(pa_stream *s, size_t length, void *userdata) +start_cb(pa_stream *s, void *userdata) { - struct pulse *p = &pulse; + struct pulse_session *ps = userdata; - pa_threaded_mainloop_signal(p->mainloop, 0); + ps->state = pa_stream_get_state(s); + if (ps->state == PA_STREAM_CREATING) + return; + + if (ps->state != PA_STREAM_READY) + { + DPRINTF(E_LOG, L_LAUDIO, "Error starting Pulseaudio stream to '%s' (%d)\n", ps->devname, ps->state); + pulse_session_shutdown(ps); + return; + } + + pa_stream_set_state_callback(ps->stream, stream_state_cb, ps); + + pulse_status(ps); } static void -stream_latency_update_cb(pa_stream *s, void *userdata) +close_cb(pa_stream *s, void *userdata) { - struct pulse *p = &pulse; + struct pulse_session *ps = userdata; - pa_threaded_mainloop_signal(p->mainloop, 0); + pulse_session_shutdown(ps); } -/*static void -success_cb(pa_stream *s, int success, void *userdata) +// This will be called our request to probe the stream has completed +static void +probe_cb(pa_stream *s, void *userdata) { - struct pulse *p = userdata; + struct pulse_session *ps = userdata; - p->operation_success = success; - pa_threaded_mainloop_signal(p->mainloop, 0); + ps->state = pa_stream_get_state(s); + if (ps->state == PA_STREAM_CREATING) + return; + + if (ps->state != PA_STREAM_READY) + { + DPRINTF(E_LOG, L_LAUDIO, "Error probing Pulseaudio stream to '%s' (%d)\n", ps->devname, ps->state); + pulse_session_shutdown(ps); + return; + } + + // This will callback to the player with succes and then remove the session + pulse_session_shutdown(ps); +} + +static void +flush_cb(pa_stream *s, int success, void *userdata) +{ + struct pulse_session *ps = userdata; + + pulse_status(ps); +} + +static void +volume_cb(pa_context *c, int success, void *userdata) +{ + struct pulse_session *ps = userdata; + + pulse_status(ps); } -*/ static void sinklist_cb(pa_context *ctx, const pa_sink_info *info, int eol, void *userdata) @@ -381,12 +427,14 @@ subscribe_cb(pa_context *c, pa_subscription_event_type_t t, uint32_t index, void static void context_state_cb(pa_context *c, void *userdata) { - struct pulse *p = userdata; + pa_context_state_t state; pa_operation *o; - DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio context state CB\n"); + state = pa_context_get_state(c); - switch (pa_context_get_state(c)) + DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio context state changed to %d (ready is %d)\n", state, PA_CONTEXT_READY); + + switch (state) { case PA_CONTEXT_READY: o = pa_context_get_sink_info_list(c, sinklist_cb, NULL); @@ -406,12 +454,12 @@ context_state_cb(pa_context *c, void *userdata) } pa_operation_unref(o); - pa_threaded_mainloop_signal(p->mainloop, 0); + pa_threaded_mainloop_signal(pulse.mainloop, 0); break; case PA_CONTEXT_TERMINATED: case PA_CONTEXT_FAILED: - pa_threaded_mainloop_signal(p->mainloop, 0); + pa_threaded_mainloop_signal(pulse.mainloop, 0); break; case PA_CONTEXT_UNCONNECTED: @@ -429,20 +477,20 @@ context_state_cb(pa_context *c, void *userdata) static void pulse_free(struct pulse *p) { - if (p->mainloop) - pa_threaded_mainloop_stop(p->mainloop); + if (pulse.mainloop) + pa_threaded_mainloop_stop(pulse.mainloop); - if (p->context) + if (pulse.context) { - pa_context_disconnect(p->context); - pa_context_unref(p->context); + pa_context_disconnect(pulse.context); + pa_context_unref(pulse.context); } if (p->cmdbase) commands_base_free(p->cmdbase); - if (p->mainloop) - pa_threaded_mainloop_free(p->mainloop); + if (pulse.mainloop) + pa_threaded_mainloop_free(pulse.mainloop); } static int @@ -469,26 +517,24 @@ context_check(pa_context *context) } static int -stream_open(struct pulse *p, struct pulse_session *ps) +stream_open(struct pulse_session *ps, pa_stream_notify_cb_t cb) { pa_stream_flags_t flags; pa_sample_spec ss; int ret; - DPRINTF(E_DBG, L_LAUDIO, "Opening Pulseaudio stream\n"); + DPRINTF(E_DBG, L_LAUDIO, "Opening Pulseaudio stream to '%s'\n", ps->devname); ss.format = PA_SAMPLE_S16LE; ss.channels = 2; ss.rate = 44100; - pa_threaded_mainloop_lock(p->mainloop); + pa_threaded_mainloop_lock(pulse.mainloop); - if (!(ps->stream = pa_stream_new(p->context, "forked-daapd audio", &ss, NULL))) + if (!(ps->stream = pa_stream_new(pulse.context, "forked-daapd audio", &ss, NULL))) goto unlock_and_fail; - pa_stream_set_state_callback(ps->stream, stream_state_cb, ps); - pa_stream_set_write_callback(ps->stream, stream_request_cb, ps); - pa_stream_set_latency_update_callback(ps->stream, stream_latency_update_cb, ps); + pa_stream_set_state_callback(ps->stream, cb, ps); // TODO should we use PA_STREAM_ADJUST_LATENCY? flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_ADJUST_LATENCY | PA_STREAM_AUTO_TIMING_UPDATE; @@ -497,76 +543,37 @@ stream_open(struct pulse *p, struct pulse_session *ps) if (ret < 0) goto unlock_and_fail; - for (;;) - { - ps->state = pa_stream_get_state(ps->stream); + ps->state = pa_stream_get_state(ps->stream); + if (!PA_STREAM_IS_GOOD(ps->state)) + goto unlock_and_fail; - if (ps->state == PA_STREAM_READY) - break; - - if (!PA_STREAM_IS_GOOD(ps->state)) - goto unlock_and_fail; - - /* Wait until the stream is ready */ - pa_threaded_mainloop_wait(p->mainloop); - } - - pa_threaded_mainloop_unlock(p->mainloop); + pa_threaded_mainloop_unlock(pulse.mainloop); return 0; unlock_and_fail: - ret = pa_context_errno(p->context); + ret = pa_context_errno(pulse.context); DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not start '%s': %s\n", ps->devname, pa_strerror(ret)); - pa_threaded_mainloop_unlock(p->mainloop); + pa_threaded_mainloop_unlock(pulse.mainloop); return -1; } static void -stream_close(struct pulse *p, struct pulse_session *ps) +stream_close(struct pulse_session *ps, pa_stream_notify_cb_t cb) { - pa_threaded_mainloop_lock(p->mainloop); + pa_threaded_mainloop_lock(pulse.mainloop); + pa_stream_set_state_callback(ps->stream, cb, ps); pa_stream_disconnect(ps->stream); + pa_stream_unref(ps->stream); - for (;;) - { - ps->state = pa_stream_get_state(ps->stream); + ps->state = PA_STREAM_TERMINATED; + ps->stream = NULL; - if (ps->state != PA_STREAM_READY) - break; - - /* Wait until the stream is closed */ - pa_threaded_mainloop_wait(p->mainloop); - } - - pa_threaded_mainloop_unlock(p->mainloop); -} - -static int -stream_check(struct pulse *p, struct pulse_session *ps) -{ - pa_stream_state_t state; - int errno; - - state = pa_stream_get_state(ps->stream); - if (!PA_STREAM_IS_GOOD(state)) - { - if (state == PA_STREAM_FAILED) - { - errno = pa_context_errno(p->context); - DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio stream failed with error: %s\n", pa_strerror(errno)); - } - else - DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio stream invalid state\n"); - - return -1; - } - - return 0; + pa_threaded_mainloop_unlock(pulse.mainloop); } @@ -578,17 +585,18 @@ pulse_device_start(struct output_device *device, output_status_cb cb, uint64_t r struct pulse_session *ps; int ret; - DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio start\n"); + DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio starting '%s'\n", device->name); ps = pulse_session_make(device, cb); if (!ps) return -1; - ret = stream_open(&pulse, ps); + ret = stream_open(ps, start_cb); if (ret < 0) - return -1; - - pulse_status(ps); + { + pulse_session_cleanup(ps); + return -1; + } return 0; } @@ -598,11 +606,9 @@ pulse_device_stop(struct output_session *session) { struct pulse_session *ps = session->session; - DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio stop\n"); + DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio stopping '%s'\n", ps->devname); - stream_close(&pulse, ps); - - pulse_status(ps); + stream_close(ps, close_cb); } static int @@ -611,23 +617,19 @@ pulse_device_probe(struct output_device *device, output_status_cb cb) struct pulse_session *ps; int ret; - DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio probe\n"); + DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio probing '%s'\n", device->name); ps = pulse_session_make(device, cb); if (!ps) return -1; - ret = stream_open(&pulse, ps); + ret = stream_open(ps, probe_cb); if (ret < 0) { pulse_session_cleanup(ps); return -1; } - stream_close(&pulse, ps); - - pulse_status(ps); - return 0; } @@ -640,7 +642,6 @@ pulse_device_free_extra(struct output_device *device) static int pulse_device_volume_set(struct output_device *device, output_status_cb cb) { - struct pulse *p = &pulse; struct pulse_session *ps; uint32_t idx; pa_operation* o; @@ -651,32 +652,27 @@ pulse_device_volume_set(struct output_device *device, output_status_cb cb) return 0; ps = device->session->session; - - if ((context_check(p->context) < 0) || (stream_check(p, ps) < 0)) - return 0; + idx = pa_stream_get_index(ps->stream); vol = PA_VOLUME_MUTED + (device->volume * (PA_VOLUME_NORM - PA_VOLUME_MUTED)) / 100; pa_cvolume_set(&cvol, 2, vol); - idx = pa_stream_get_index(ps->stream); - DPRINTF(E_DBG, L_LAUDIO, "Setting Pulseaudio volume for stream %" PRIu32 " to %d (%d)\n", idx, (int)vol, device->volume); - pa_threaded_mainloop_lock(p->mainloop); + pa_threaded_mainloop_lock(pulse.mainloop); - o = pa_context_set_sink_input_volume(p->context, idx, &cvol, NULL, NULL); + ps->status_cb = cb; + + o = pa_context_set_sink_input_volume(pulse.context, idx, &cvol, volume_cb, ps); if (!o) { - DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not set volume: %s\n", pa_strerror(pa_context_errno(p->context))); - pa_threaded_mainloop_unlock(p->mainloop); + DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not set volume: %s\n", pa_strerror(pa_context_errno(pulse.context))); + pa_threaded_mainloop_unlock(pulse.mainloop); return 0; } pa_operation_unref(o); - pa_threaded_mainloop_unlock(p->mainloop); - - ps->status_cb = cb; - pulse_status(ps); + pa_threaded_mainloop_unlock(pulse.mainloop); return 1; } @@ -684,11 +680,9 @@ pulse_device_volume_set(struct output_device *device, output_status_cb cb) static void pulse_write(uint8_t *buf, uint64_t rtptime) { - struct pulse *p = &pulse; struct pulse_session *ps; struct pulse_session *next; size_t length; - int invalid_context; int ret; if (!sessions) @@ -696,72 +690,55 @@ pulse_write(uint8_t *buf, uint64_t rtptime) length = STOB(AIRTUNES_V2_PACKET_SAMPLES); - pa_threaded_mainloop_lock(p->mainloop); - - invalid_context = (context_check(p->context) < 0); + pa_threaded_mainloop_lock(pulse.mainloop); for (ps = sessions; ps; ps = next) { next = ps->next; - if (invalid_context || (stream_check(p, ps) < 0)) - { - pulse_status(ps); // Note: This will nuke the session (deferred) - continue; - } - - ret = pa_stream_writable_size(ps->stream); - if (ret < 0) - { - ret = pa_context_errno(p->context); - DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio device '%s', returned error instead of writable size: %s\n", ps->devname, pa_strerror(ret)); - continue; - } - else if (ret < length) - { - DPRINTF(E_WARN, L_LAUDIO, "Pulseaudio device '%s' not writable or overrun (%d/%zu), skipping packet\n", ps->devname, ret, length); - continue; - } + if (ps->state != PA_STREAM_READY) + continue; ret = pa_stream_write(ps->stream, buf, length, NULL, 0LL, PA_SEEK_RELATIVE); if (ret < 0) { - ret = pa_context_errno(p->context); + ret = pa_context_errno(pulse.context); DPRINTF(E_LOG, L_LAUDIO, "Error writing Pulseaudio stream data to '%s': %s\n", ps->devname, pa_strerror(ret)); continue; } } - pa_threaded_mainloop_unlock(p->mainloop); - return; + pa_threaded_mainloop_unlock(pulse.mainloop); } static int pulse_flush(output_status_cb cb, uint64_t rtptime) { - struct pulse *p = &pulse; struct pulse_session *ps; pa_operation* o; int i; DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio flush\n"); - pa_threaded_mainloop_lock(p->mainloop); + pa_threaded_mainloop_lock(pulse.mainloop); i = 0; for (ps = sessions; ps; ps = ps->next) { i++; - o = pa_stream_flush(ps->stream, NULL, NULL); - if (o) + ps->status_cb = cb; + o = pa_stream_flush(ps->stream, flush_cb, ps); + if (!o) { - ps->status_cb = cb; - pulse_status(ps); + DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not flush '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context))); + continue; } + + pa_operation_unref(o); } - pa_threaded_mainloop_unlock(p->mainloop); + pa_threaded_mainloop_unlock(pulse.mainloop); return i; } @@ -788,55 +765,55 @@ pulse_init(void) ret = 0; - if (!(p->mainloop = pa_threaded_mainloop_new())) + if (!(pulse.mainloop = pa_threaded_mainloop_new())) goto fail; if (!(p->cmdbase = commands_base_new(evbase_player, NULL))) goto fail; #ifdef HAVE_PULSE_MAINLOOP_SET_NAME - pa_threaded_mainloop_set_name(p->mainloop, "pulseaudio"); + pa_threaded_mainloop_set_name(pulse.mainloop, "pulseaudio"); #endif - if (!(p->context = pa_context_new(pa_threaded_mainloop_get_api(p->mainloop), "forked-daapd"))) + if (!(pulse.context = pa_context_new(pa_threaded_mainloop_get_api(pulse.mainloop), "forked-daapd"))) goto fail; - pa_context_set_state_callback(p->context, context_state_cb, p); + pa_context_set_state_callback(pulse.context, context_state_cb, p); - if (pa_context_connect(p->context, NULL, 0, NULL) < 0) + if (pa_context_connect(pulse.context, NULL, 0, NULL) < 0) { - ret = pa_context_errno(p->context); + ret = pa_context_errno(pulse.context); goto fail; } - pa_threaded_mainloop_lock(p->mainloop); + pa_threaded_mainloop_lock(pulse.mainloop); - if (pa_threaded_mainloop_start(p->mainloop) < 0) + if (pa_threaded_mainloop_start(pulse.mainloop) < 0) goto unlock_and_fail; for (;;) { - state = pa_context_get_state(p->context); + state = pa_context_get_state(pulse.context); if (state == PA_CONTEXT_READY) break; if (!PA_CONTEXT_IS_GOOD(state)) { - ret = pa_context_errno(p->context); + ret = pa_context_errno(pulse.context); goto unlock_and_fail; } /* Wait until the context is ready */ - pa_threaded_mainloop_wait(p->mainloop); + pa_threaded_mainloop_wait(pulse.mainloop); } - pa_threaded_mainloop_unlock(p->mainloop); + pa_threaded_mainloop_unlock(pulse.mainloop); return 0; unlock_and_fail: - pa_threaded_mainloop_unlock(p->mainloop); + pa_threaded_mainloop_unlock(pulse.mainloop); fail: if (ret)