[pulse] Convert Pulseaudio to new interface, incl support for native quality

First draft, probably has a few bugs
This commit is contained in:
ejurgensen 2019-02-26 23:04:16 +01:00
parent 3c2ff294a1
commit 63a2e750c7
1 changed files with 174 additions and 104 deletions

View File

@ -38,8 +38,6 @@
#include "outputs.h"
#include "commands.h"
// From Airplay
#define PULSE_SAMPLES_PER_PACKET 352
#define PULSE_MAX_DEVICES 64
#define PULSE_LOG_MAX 10
@ -60,19 +58,21 @@ struct pulse
struct pulse_session
{
uint64_t device_id;
int callback_id;
char *devname;
pa_stream_state_t state;
pa_stream *stream;
pa_buffer_attr attr;
pa_volume_t volume;
struct media_quality quality;
int logcount;
char *devname;
struct output_device *device;
output_status_cb status_cb;
struct pulse_session *next;
};
@ -85,6 +85,9 @@ static struct pulse_session *sessions;
// Internal list with indeces of the Pulseaudio devices (sinks) we have registered
static uint32_t pulse_known_devices[PULSE_MAX_DEVICES];
static struct media_quality pulse_last_quality;
static struct media_quality pulse_fallback_quality = { 44100, 16, 2 };
// Converts from 0 - 100 to Pulseaudio's scale
static inline pa_volume_t
pulse_from_device_volume(int device_volume)
@ -113,10 +116,9 @@ pulse_session_free(struct pulse_session *ps)
pa_threaded_mainloop_unlock(pulse.mainloop);
}
if (ps->devname)
free(ps->devname);
outputs_quality_unsubscribe(&pulse_fallback_quality);
free(ps->output_session);
free(ps->devname);
free(ps);
}
@ -139,28 +141,36 @@ pulse_session_cleanup(struct pulse_session *ps)
p->next = ps->next;
}
ps->device->session = NULL;
outputs_device_session_remove(ps->device_id);
pulse_session_free(ps);
}
static struct pulse_session *
pulse_session_make(struct output_device *device, output_status_cb cb)
pulse_session_make(struct output_device *device, int callback_id)
{
struct pulse_session *ps;
int ret;
ret = outputs_quality_subscribe(&pulse_fallback_quality);
if (ret < 0)
{
DPRINTF(E_LOG, L_LAUDIO, "Could not subscribe to fallback audio quality\n");
return NULL;
}
CHECK_NULL(L_LAUDIO, ps = calloc(1, sizeof(struct pulse_session)));
ps->state = PA_STREAM_UNCONNECTED;
ps->device = device;
ps->status_cb = cb;
ps->device_id = device->id;
ps->callback_id = callback_id;
ps->volume = pulse_from_device_volume(device->volume);
ps->devname = strdup(device->extra_device_info);
ps->next = sessions;
sessions = ps;
outputs_device_session_add(device, ps);
outputs_device_session_add(device->id, ps);
return ps;
}
@ -173,7 +183,6 @@ static enum command_state
send_status(void *arg, int *ptr)
{
struct pulse_session *ps = arg;
output_status_cb status_cb;
enum output_device_state state;
switch (ps->state)
@ -196,10 +205,8 @@ send_status(void *arg, int *ptr)
state = OUTPUT_STATE_FAILED;
}
status_cb = ps->status_cb;
ps->status_cb = NULL;
if (status_cb)
status_cb(ps->device, ps->output_session, state);
outputs_cb(ps->callback_id, ps->device_id, state);
ps->callback_id = -1;
return COMMAND_PENDING; // Don't want the command module to clean up ps
}
@ -557,25 +564,33 @@ pulse_free(void)
}
static int
stream_open(struct pulse_session *ps, pa_stream_notify_cb_t cb)
stream_open(struct pulse_session *ps, struct media_quality *quality, pa_stream_notify_cb_t cb)
{
pa_stream_flags_t flags;
pa_sample_spec ss;
pa_cvolume cvol;
int offset;
int offset_ms;
int ret;
DPRINTF(E_DBG, L_LAUDIO, "Opening Pulseaudio stream to '%s'\n", ps->devname);
if (quality->bits_per_sample == 16)
ss.format = PA_SAMPLE_S16LE;
ss.channels = 2;
ss.rate = 44100;
else if (quality->bits_per_sample == 24)
ss.format = PA_SAMPLE_S24LE;
else if (quality->bits_per_sample == 32)
ss.format = PA_SAMPLE_S32LE;
else
ss.format = 0;
offset = cfg_getint(cfg_getsec(cfg, "audio"), "offset");
if (abs(offset) > 44100)
ss.channels = quality->channels;
ss.rate = quality->sample_rate;
offset_ms = cfg_getint(cfg_getsec(cfg, "audio"), "offset_ms");
if (abs(offset_ms) > 1000)
{
DPRINTF(E_LOG, L_LAUDIO, "The audio offset (%d) set in the configuration is out of bounds\n", offset);
offset = 44100 * (offset/abs(offset));
DPRINTF(E_LOG, L_LAUDIO, "The audio offset (%d) set in the configuration is out of bounds\n", offset_ms);
offset_ms = 1000 * (offset_ms/abs(offset_ms));
}
pa_threaded_mainloop_lock(pulse.mainloop);
@ -587,7 +602,7 @@ stream_open(struct pulse_session *ps, pa_stream_notify_cb_t cb)
flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE;
ps->attr.tlength = STOB(2 * ss.rate + PULSE_SAMPLES_PER_PACKET - offset, 16, 2); // 2 second latency
ps->attr.tlength = STOB((OUTPUTS_BUFFER_DURATION * 1000 + offset_ms) * ss.rate / 1000, quality->bits_per_sample, quality->channels);
ps->attr.maxlength = 2 * ps->attr.tlength;
ps->attr.prebuf = (uint32_t)-1;
ps->attr.minreq = (uint32_t)-1;
@ -610,7 +625,8 @@ stream_open(struct pulse_session *ps, pa_stream_notify_cb_t cb)
unlock_and_fail:
ret = pa_context_errno(pulse.context);
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not start '%s': %s\n", ps->devname, pa_strerror(ret));
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not start '%s' using quality %d/%d/%d: %s\n",
ps->devname, quality->sample_rate, quality->bits_per_sample, quality->channels, pa_strerror(ret));
pa_threaded_mainloop_unlock(pulse.mainloop);
@ -620,11 +636,15 @@ stream_open(struct pulse_session *ps, pa_stream_notify_cb_t cb)
static void
stream_close(struct pulse_session *ps, pa_stream_notify_cb_t cb)
{
if (!ps->stream)
return;
pa_threaded_mainloop_lock(pulse.mainloop);
pa_stream_set_underflow_callback(ps->stream, NULL, NULL);
pa_stream_set_overflow_callback(ps->stream, NULL, NULL);
pa_stream_set_state_callback(ps->stream, cb, ps);
pa_stream_disconnect(ps->stream);
pa_stream_unref(ps->stream);
@ -634,39 +654,121 @@ stream_close(struct pulse_session *ps, pa_stream_notify_cb_t cb)
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static void
playback_restart(struct pulse_session *ps, struct output_buffer *obuf)
{
int ret;
stream_close(ps, NULL);
// Negotiate quality (sample rate) with device - first we try to use the source quality
ps->quality = obuf->data[0].quality;
ret = stream_open(ps, &ps->quality, start_cb);
if (ret < 0)
{
DPRINTF(E_INFO, L_LAUDIO, "Input quality (%d/%d/%d) not supported, falling back to default\n",
ps->quality.sample_rate, ps->quality.bits_per_sample, ps->quality.channels);
ps->quality = pulse_fallback_quality;
ret = stream_open(ps, &ps->quality, start_cb);
if (ret < 0)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio device failed setting fallback quality\n");
ps->state = PA_STREAM_FAILED;
pulse_session_shutdown(ps);
return;
}
}
}
static void
playback_write(struct pulse_session *ps, struct output_buffer *obuf)
{
int i;
int ret;
// Find the quality we want
for (i = 0; obuf->data[i].buffer; i++)
{
if (quality_is_equal(&ps->quality, &obuf->data[i].quality))
break;
}
if (!obuf->data[i].buffer)
{
DPRINTF(E_LOG, L_LAUDIO, "Output not delivering required data quality, aborting\n");
ps->state = PA_STREAM_FAILED;
pulse_session_shutdown(ps);
return;
}
pa_threaded_mainloop_lock(pulse.mainloop);
ret = pa_stream_write(ps->stream, obuf->data[i].buffer, obuf->data[i].bufsize, NULL, 0LL, PA_SEEK_RELATIVE);
if (ret < 0)
{
ret = pa_context_errno(pulse.context);
DPRINTF(E_LOG, L_LAUDIO, "Error writing Pulseaudio stream data to '%s': %s\n", ps->devname, pa_strerror(ret));
ps->state = PA_STREAM_FAILED;
pulse_session_shutdown(ps);
goto unlock;
}
unlock:
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static void
playback_resume(struct pulse_session *ps)
{
pa_operation* o;
pa_threaded_mainloop_lock(pulse.mainloop);
o = pa_stream_cork(ps->stream, 0, NULL, NULL);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not resume '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
goto unlock;
}
pa_operation_unref(o);
unlock:
pa_threaded_mainloop_unlock(pulse.mainloop);
}
/* ------------------ INTERFACE FUNCTIONS CALLED BY OUTPUTS.C --------------- */
static int
pulse_device_start(struct output_device *device, output_status_cb cb, uint64_t rtptime)
pulse_device_start(struct output_device *device, int callback_id)
{
struct pulse_session *ps;
int ret;
DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio starting '%s'\n", device->name);
ps = pulse_session_make(device, cb);
ps = pulse_session_make(device, callback_id);
if (!ps)
return -1;
ret = stream_open(ps, start_cb);
if (ret < 0)
{
pulse_session_cleanup(ps);
return -1;
}
pulse_status(ps);
return 0;
}
static void
pulse_device_stop(struct output_session *session)
static int
pulse_device_stop(struct output_device *device, int callback_id)
{
struct pulse_session *ps = session->session;
struct pulse_session *ps = device->session;
DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio stopping '%s'\n", ps->devname);
ps->callback_id = callback_id;
stream_close(ps, close_cb);
return 0;
}
@ -704,18 +806,18 @@ pulse_device_flush(struct output_device *device, int callback_id)
}
static int
pulse_device_probe(struct output_device *device, output_status_cb cb)
pulse_device_probe(struct output_device *device, int callback_id)
{
struct pulse_session *ps;
int ret;
DPRINTF(E_DBG, L_LAUDIO, "Pulseaudio probing '%s'\n", device->name);
ps = pulse_session_make(device, cb);
ps = pulse_session_make(device, callback_id);
if (!ps)
return -1;
ret = stream_open(ps, probe_cb);
ret = stream_open(ps, &pulse_fallback_quality, probe_cb);
if (ret < 0)
{
pulse_session_cleanup(ps);
@ -731,18 +833,25 @@ pulse_device_free_extra(struct output_device *device)
free(device->extra_device_info);
}
static int
pulse_device_volume_set(struct output_device *device, output_status_cb cb)
static void
pulse_device_cb_set(struct output_device *device, int callback_id)
{
struct pulse_session *ps;
struct pulse_session *ps = device->session;
ps->callback_id = callback_id;
}
static int
pulse_device_volume_set(struct output_device *device, int callback_id)
{
struct pulse_session *ps = device->session;
uint32_t idx;
pa_operation* o;
pa_cvolume cvol;
if (!sessions || !device->session || !device->session->session)
if (!ps)
return 0;
ps = device->session->session;
idx = pa_stream_get_index(ps->stream);
ps->volume = pulse_from_device_volume(device->volume);
@ -752,7 +861,7 @@ pulse_device_volume_set(struct output_device *device, output_status_cb cb)
pa_threaded_mainloop_lock(pulse.mainloop);
ps->status_cb = cb;
ps->callback_id = callback_id;
o = pa_context_set_sink_input_volume(pulse.context, idx, &cvol, volume_cb, ps);
if (!o)
@ -769,63 +878,33 @@ pulse_device_volume_set(struct output_device *device, output_status_cb cb)
}
static void
pulse_write(uint8_t *buf, uint64_t rtptime)
pulse_write(struct output_buffer *obuf)
{
struct pulse_session *ps;
struct pulse_session *next;
size_t length;
int ret;
if (!sessions)
return;
length = STOB(PULSE_SAMPLES_PER_PACKET, 16, 2);
pa_threaded_mainloop_lock(pulse.mainloop);
for (ps = sessions; ps; ps = next)
{
next = ps->next;
if (ps->state != PA_STREAM_READY)
// We have not set up a stream OR the quality changed, so we need to set it up again
if (ps->state == PA_STREAM_UNCONNECTED || !quality_is_equal(&obuf->data[0].quality, &pulse_last_quality))
{
playback_restart(ps, obuf);
pulse_last_quality = obuf->data[0].quality;
continue; // Async, so the device won't be ready for writing just now
}
else if (ps->state != PA_STREAM_READY)
continue;
ret = pa_stream_write(ps->stream, buf, length, NULL, 0LL, PA_SEEK_RELATIVE);
if (ret < 0)
{
ret = pa_context_errno(pulse.context);
DPRINTF(E_LOG, L_LAUDIO, "Error writing Pulseaudio stream data to '%s': %s\n", ps->devname, pa_strerror(ret));
if (ps->stream && pa_stream_is_corked(ps->stream))
playback_resume(ps);
ps->state = PA_STREAM_FAILED;
pulse_session_shutdown(ps);
continue;
playback_write(ps, obuf);
}
}
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static void
pulse_playback_start(uint64_t next_pkt, struct timespec *ts)
{
struct pulse_session *ps;
pa_operation* o;
pa_threaded_mainloop_lock(pulse.mainloop);
for (ps = sessions; ps; ps = ps->next)
{
o = pa_stream_cork(ps->stream, 0, NULL, NULL);
if (!o)
{
DPRINTF(E_LOG, L_LAUDIO, "Pulseaudio could not resume '%s': %s\n", ps->devname, pa_strerror(pa_context_errno(pulse.context)));
continue;
}
pa_operation_unref(o);
}
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static void
@ -858,14 +937,6 @@ pulse_playback_stop(void)
pa_threaded_mainloop_unlock(pulse.mainloop);
}
static void
pulse_set_status_cb(struct output_session *session, output_status_cb cb)
{
struct pulse_session *ps = session->session;
ps->status_cb = cb;
}
static int
pulse_init(void)
{
@ -959,10 +1030,9 @@ struct output_definition output_pulse =
.device_flush = pulse_device_flush,
.device_probe = pulse_device_probe,
.device_free_extra = pulse_device_free_extra,
.device_cb_set = pulse_device_cb_set,
.device_volume_set = pulse_device_volume_set,
.playback_start = pulse_playback_start,
.playback_stop = pulse_playback_stop,
.write = pulse_write,
.status_cb = pulse_set_status_cb,
};