[cast] Add a simple delay to stream to a have little bit of sync

Since it is unknown how to do real sync on Chromecast, this commit instead adds
a primitive delay to the stream, so that it is at least somewhat closer to
Airplay/local audio.

Also some cleanup of unused stuff.
This commit is contained in:
ejurgensen 2019-08-23 19:22:33 +02:00
parent a5987addd4
commit 78373af201
2 changed files with 140 additions and 148 deletions

View File

@ -118,7 +118,7 @@ static cfg_opt_t sec_audio[] =
CFG_STR("mixer", NULL, CFGF_NONE), CFG_STR("mixer", NULL, CFGF_NONE),
CFG_STR("mixer_device", NULL, CFGF_NONE), CFG_STR("mixer_device", NULL, CFGF_NONE),
CFG_BOOL("sync_disable", cfg_false, CFGF_NONE), CFG_BOOL("sync_disable", cfg_false, CFGF_NONE),
CFG_INT("offset", 0, CFGF_NONE), // deprecated CFG_INT("offset", 0, CFGF_DEPRECATED),
CFG_INT("offset_ms", 0, CFGF_NONE), CFG_INT("offset_ms", 0, CFGF_NONE),
CFG_INT("adjust_period_seconds", 100, CFGF_NONE), CFG_INT("adjust_period_seconds", 100, CFGF_NONE),
CFG_END() CFG_END()
@ -138,6 +138,7 @@ static cfg_opt_t sec_airplay[] =
static cfg_opt_t sec_chromecast[] = static cfg_opt_t sec_chromecast[] =
{ {
CFG_BOOL("exclude", cfg_false, CFGF_NONE), CFG_BOOL("exclude", cfg_false, CFGF_NONE),
CFG_INT("offset_ms", 0, CFGF_NONE),
CFG_END() CFG_END()
}; };

View File

@ -96,6 +96,20 @@
#define CAST_QUALITY_BITS_PER_SAMPLE_DEFAULT 16 #define CAST_QUALITY_BITS_PER_SAMPLE_DEFAULT 16
#define CAST_QUALITY_CHANNELS_DEFAULT 2 #define CAST_QUALITY_CHANNELS_DEFAULT 2
// This makes the rtp session buffer 4 seconds of audio (4 sec * 50 pkts/sec),
// which can be used for delayed transmission (and retransmission)
#define CAST_PACKET_BUFFER_SIZE 200
// Max (absolute) value the user is allowed to set offset_ms in the config file
#define CAST_OFFSET_MAX 1000
// This is just my measurement of how much extra delay is required to start at
// the same time as Airplay. The value was found experimentally.
#define CAST_DEVICE_START_DELAY_MS 100
// See cast_packet_header_make()
#define CAST_HEADER_SIZE 11
/* Notes /* Notes
* OFFER/ANSWER <-webrtc * OFFER/ANSWER <-webrtc
* RTCP/RTP * RTCP/RTP
@ -117,10 +131,6 @@ union sockaddr_all
struct cast_session; struct cast_session;
struct cast_msg_payload; struct cast_msg_payload;
// See cast_packet_header_make()
#define CAST_HEADER_SIZE 11
#define CAST_PACKET_BUFFER_SIZE 1000
static struct encode_ctx *cast_encode_ctx; static struct encode_ctx *cast_encode_ctx;
static struct evbuffer *cast_encoded_data; static struct evbuffer *cast_encoded_data;
@ -128,12 +138,10 @@ typedef void (*cast_reply_cb)(struct cast_session *cs, struct cast_msg_payload *
// Session is starting up // Session is starting up
#define CAST_STATE_F_STARTUP (1 << 13) #define CAST_STATE_F_STARTUP (1 << 13)
// The default receiver app is ready // The receiver app is ready
#define CAST_STATE_F_MEDIA_CONNECTED (1 << 14) #define CAST_STATE_F_MEDIA_CONNECTED (1 << 14)
// Media is loaded in the receiver app
#define CAST_STATE_F_MEDIA_LOADED (1 << 15)
// Media is playing in the receiver app // Media is playing in the receiver app
#define CAST_STATE_F_MEDIA_STREAMING (1 << 16) #define CAST_STATE_F_MEDIA_STREAMING (1 << 15)
// Beware, the order of this enum has meaning // Beware, the order of this enum has meaning
enum cast_state enum cast_state
@ -150,7 +158,9 @@ enum cast_state
CAST_STATE_MEDIA_LAUNCHED = CAST_STATE_F_STARTUP | 0x03, CAST_STATE_MEDIA_LAUNCHED = CAST_STATE_F_STARTUP | 0x03,
// CONNECT, GET_STATUS and OFFER made to receiver app // CONNECT, GET_STATUS and OFFER made to receiver app
CAST_STATE_MEDIA_CONNECTED = CAST_STATE_F_MEDIA_CONNECTED, CAST_STATE_MEDIA_CONNECTED = CAST_STATE_F_MEDIA_CONNECTED,
// After OFFER // Buffering packets (playback not started yet)
CAST_STATE_MEDIA_BUFFERING = CAST_STATE_F_MEDIA_CONNECTED | 0x01,
// Streaming (playback started)
CAST_STATE_MEDIA_STREAMING = CAST_STATE_F_MEDIA_CONNECTED | CAST_STATE_F_MEDIA_STREAMING, CAST_STATE_MEDIA_STREAMING = CAST_STATE_F_MEDIA_CONNECTED | CAST_STATE_F_MEDIA_STREAMING,
}; };
@ -166,11 +176,6 @@ struct cast_master_session
uint8_t *rawbuf; uint8_t *rawbuf;
size_t rawbuf_size; size_t rawbuf_size;
int samples_per_packet; int samples_per_packet;
// Number of samples that we tell the output to buffer (this will mean that
// the position that we send in the sync packages are offset by this amount
// compared to the rtptimes of the corresponding RTP packages we are sending)
int output_buffer_samples;
}; };
struct cast_session struct cast_session
@ -201,8 +206,10 @@ struct cast_session
uint32_t ssrc_id; uint32_t ssrc_id;
// IP address URL of forked-daapd's mp3 stream // For initial buffering (delay playback to achieve some sort of sync).
char stream_url[128]; struct timespec start_pts;
struct timespec offset_ts;
uint16_t seqnum_next;
// Outgoing request which have the USE_REQUEST_ID flag get a new id, and a // Outgoing request which have the USE_REQUEST_ID flag get a new id, and a
// callback is registered. The callback is called when an incoming message // callback is registered. The callback is called when an incoming message
@ -515,78 +522,6 @@ cast_disconnect(int fd)
close(fd); close(fd);
} }
static int
stream_url_make(char *out, size_t len, const char *peer_addr, int family)
{
struct ifaddrs *ifap;
struct ifaddrs *ifa;
union sockaddr_all haddr;
union sockaddr_all hmask;
union sockaddr_all paddr;
char host_addr[64];
unsigned short port;
int found;
int ret;
if (family == AF_INET)
ret = inet_pton(AF_INET, peer_addr, &paddr.sin.sin_addr);
else
ret = inet_pton(AF_INET6, peer_addr, &paddr.sin6.sin6_addr);
if (ret != 1)
return -1;
found = 0;
ret = getifaddrs(&ifap);
if (ret < 0)
{
DPRINTF(E_LOG, L_CAST, "Could not get interface address: %s\n", strerror(errno));
return -1;
}
for (ifa = ifap; !found && ifa; ifa = ifa->ifa_next)
{
if (!ifa->ifa_addr)
{
DPRINTF(E_LOG, L_CAST, "Skipping null address from getifaddrs()\n");
continue;
}
if (ifa->ifa_addr->sa_family != family)
continue;
if (family == AF_INET)
{
memcpy(&haddr.sin, ifa->ifa_addr, sizeof(struct sockaddr_in));
memcpy(&hmask.sin, ifa->ifa_netmask, sizeof(struct sockaddr_in));
found = ((haddr.sin.sin_addr.s_addr & hmask.sin.sin_addr.s_addr) ==
(paddr.sin.sin_addr.s_addr & hmask.sin.sin_addr.s_addr));
if (found)
inet_ntop(family, &haddr.sin.sin_addr, host_addr, sizeof(host_addr));
}
else if (family == AF_INET6)
{
memcpy(&haddr.sin6, ifa->ifa_addr, sizeof(struct sockaddr_in6));
found = (memcmp(&haddr.sin6.sin6_addr.s6_addr, &paddr.sin6.sin6_addr.s6_addr, 8) == 0);
if (found)
inet_ntop(family, &haddr.sin6.sin6_addr, host_addr, sizeof(host_addr));
}
}
freeifaddrs(ifap);
if (!found)
return -1;
port = cfg_getint(cfg_getsec(cfg, "library"), "port");
if (family == AF_INET)
snprintf(out, len, "http://%s:%d/stream.mp3", host_addr, port);
else
snprintf(out, len, "http://[%s]:%d/stream.mp3", host_addr, port);
return 0;
}
static char * static char *
squote_to_dquote(char *buf) squote_to_dquote(char *buf)
{ {
@ -740,10 +675,6 @@ cast_msg_send(struct cast_session *cs, enum cast_msg_types type, cast_reply_cb r
snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->request_id, cs->ssrc_id); snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->request_id, cs->ssrc_id);
else if (type == PRESENTATION) else if (type == PRESENTATION)
snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->session_id, cs->request_id); snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->session_id, cs->request_id);
else if (type == MEDIA_LOAD)
snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->stream_url, cs->session_id, cs->request_id);
else if ((type == MEDIA_PLAY) || (type == MEDIA_PAUSE) || (type == MEDIA_STOP))
snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->media_session_id, cs->session_id, cs->request_id);
else if (type == SET_VOLUME) else if (type == SET_VOLUME)
snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->volume, cs->request_id); snprintf(msg_buf, sizeof(msg_buf), cast_msg[type].payload, cs->volume, cs->request_id);
else else
@ -1005,7 +936,7 @@ cast_status(struct cast_session *cs)
case CAST_STATE_DISCONNECTED ... CAST_STATE_MEDIA_LAUNCHED: case CAST_STATE_DISCONNECTED ... CAST_STATE_MEDIA_LAUNCHED:
state = OUTPUT_STATE_STARTUP; state = OUTPUT_STATE_STARTUP;
break; break;
case CAST_STATE_MEDIA_CONNECTED: case CAST_STATE_MEDIA_CONNECTED ... CAST_STATE_MEDIA_BUFFERING:
state = OUTPUT_STATE_CONNECTED; state = OUTPUT_STATE_CONNECTED;
break; break;
case CAST_STATE_MEDIA_STREAMING: case CAST_STATE_MEDIA_STREAMING:
@ -1513,7 +1444,6 @@ master_session_make(struct media_quality *quality)
cms->quality = *quality; cms->quality = *quality;
cms->samples_per_packet = CAST_SAMPLES_PER_PACKET; cms->samples_per_packet = CAST_SAMPLES_PER_PACKET;
cms->rawbuf_size = STOB(cms->samples_per_packet, quality->bits_per_sample, quality->channels); cms->rawbuf_size = STOB(cms->samples_per_packet, quality->bits_per_sample, quality->channels);
cms->output_buffer_samples = OUTPUTS_BUFFER_DURATION * quality->sample_rate;
CHECK_NULL(L_CAST, cms->rawbuf = malloc(cms->rawbuf_size)); CHECK_NULL(L_CAST, cms->rawbuf = malloc(cms->rawbuf_size));
CHECK_NULL(L_CAST, cms->evbuf = evbuffer_new()); CHECK_NULL(L_CAST, cms->evbuf = evbuffer_new());
@ -1527,10 +1457,12 @@ static struct cast_session *
cast_session_make(struct output_device *device, int family, int callback_id) cast_session_make(struct output_device *device, int family, int callback_id)
{ {
struct cast_session *cs; struct cast_session *cs;
cfg_t *chromecast;
const char *proto; const char *proto;
const char *err; const char *err;
char *address; char *address;
unsigned short port; unsigned short port;
uint64_t offset_ms;
int flags; int flags;
int ret; int ret;
@ -1588,13 +1520,22 @@ cast_session_make(struct output_device *device, int family, int callback_id)
goto out_deinit_gnutls; goto out_deinit_gnutls;
} }
ret = stream_url_make(cs->stream_url, sizeof(cs->stream_url), address, family); chromecast = cfg_gettsec(cfg, "chromecast", device->name);
if (ret < 0)
offset_ms = chromecast ? cfg_getint(chromecast, "offset_ms") : 0;
if (abs(offset_ms) > CAST_OFFSET_MAX)
{ {
DPRINTF(E_LOG, L_CAST, "Bug! Could find a network interface on same subnet as %s\n", device->name); DPRINTF(E_LOG, L_CAST, "Ignoring invalid configuration of Chromecast offset (%" PRIu64 " ms)\n", offset_ms);
goto out_close_connection; offset_ms = 0;
} }
offset_ms += OUTPUTS_BUFFER_DURATION * 1000 + CAST_DEVICE_START_DELAY_MS;
cs->offset_ts.tv_sec = (offset_ms / 1000);
cs->offset_ts.tv_nsec = (offset_ms % 1000) * 1000000UL;
DPRINTF(E_LOG, L_CAST, "Offset is set to %lu:%lu\n", cs->offset_ts.tv_sec, cs->offset_ts.tv_nsec);
cs->ev = event_new(evbase_player, cs->server_fd, EV_READ | EV_PERSIST, cast_listen_cb, cs); cs->ev = event_new(evbase_player, cs->server_fd, EV_READ | EV_PERSIST, cast_listen_cb, cs);
if (!cs->ev) if (!cs->ev)
{ {
@ -1686,6 +1627,7 @@ cast_session_shutdown(struct cast_session *cs, enum cast_state wanted_state)
pending = 1; pending = 1;
break; break;
case CAST_STATE_MEDIA_BUFFERING:
case CAST_STATE_MEDIA_CONNECTED: case CAST_STATE_MEDIA_CONNECTED:
cast_disconnect(cs->udp_fd); cast_disconnect(cs->udp_fd);
cs->udp_fd = -1; cs->udp_fd = -1;
@ -1834,7 +1776,7 @@ packet_send(struct cast_session *cs, struct rtp_packet *pkt)
return 0; return 0;
} }
/* DPRINTF(E_DBG, L_PLAYER, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n", /* DPRINTF(E_DBG, L_CAST, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n",
cs->master_session->rtp_session->seqnum, cs->master_session->rtp_session->seqnum,
cs->master_session->rtp_session->pos, cs->master_session->rtp_session->pos,
pkt->header[1], pkt->header[1],
@ -1844,12 +1786,34 @@ packet_send(struct cast_session *cs, struct rtp_packet *pkt)
return 0; return 0;
} }
static int static inline int
packets_send(struct cast_master_session *cms) packets_send(struct cast_session *cs, struct rtp_session *rtp_session)
{
struct rtp_packet *pkt;
int ret;
// Note that the loop must work even though seqnum wraps around, so we use !=, not <
for (; cs->seqnum_next != rtp_session->seqnum; cs->seqnum_next++)
{
pkt = rtp_packet_get(rtp_session, cs->seqnum_next);
if (!pkt)
{
DPRINTF(E_WARN, L_CAST, "Packet to '%s' is missing in our buffer\n", cs->devname);
return 0; // Don't fail session over a missing packet (or should we?)
}
ret = packet_send(cs, pkt);
if (ret < 0)
return -1;
}
return 0;
}
static int
packet_make(struct cast_master_session *cms)
{ {
struct rtp_packet *pkt; struct rtp_packet *pkt;
struct cast_session *cs;
struct cast_session *next;
int len; int len;
int ret; int ret;
@ -1866,28 +1830,38 @@ packets_send(struct cast_master_session *cms)
if (ret < 0) if (ret < 0)
return -1; return -1;
for (cs = cast_sessions; cs; cs = next)
{
next = cs->next;
if (cs->master_session != cms || !(cs->state & CAST_STATE_F_MEDIA_CONNECTED))
continue;
ret = packet_send(cs, pkt);
if (ret < 0)
{
// Downgrade state immediately to avoid further write attempts
cs->state = CAST_STATE_MEDIA_LAUNCHED;
cast_session_shutdown(cs, CAST_STATE_FAILED);
}
}
// Commits packet to retransmit buffer, and prepares the session for the next packet // Commits packet to retransmit buffer, and prepares the session for the next packet
rtp_packet_commit(cms->rtp_session, pkt); rtp_packet_commit(cms->rtp_session, pkt);
return 0; return 0;
} }
static inline int
packets_make(struct cast_master_session *cms, struct output_data *odata)
{
int ret;
int npkts;
// TODO avoid this copy
evbuffer_add(cms->evbuf, odata->buffer, odata->bufsize);
cms->evbuf_samples += odata->samples;
// Make as many packets as we have data for (one packet requires rawbuf_size bytes)
npkts = 0;
while (evbuffer_get_length(cms->evbuf) >= cms->rawbuf_size)
{
evbuffer_remove(cms->evbuf, cms->rawbuf, cms->rawbuf_size);
cms->evbuf_samples -= cms->samples_per_packet;
ret = packet_make(cms);
if (ret == 0)
npkts++;
}
return npkts;
}
/* TODO This does not currently work - need to investigate what sync the devices support /* TODO This does not currently work - need to investigate what sync the devices support
static void static void
packets_sync_send(struct cast_master_session *cms, struct timespec pts) packets_sync_send(struct cast_master_session *cms, struct timespec pts)
@ -1920,7 +1894,7 @@ packets_sync_send(struct cast_master_session *cms, struct timespec pts)
sync_pkt = rtp_sync_packet_next(cms->rtp_session, &cur_stamp, 0x80); sync_pkt = rtp_sync_packet_next(cms->rtp_session, &cur_stamp, 0x80);
packet_send(cs, sync_pkt); packet_send(cs, sync_pkt);
DPRINTF(E_DBG, L_PLAYER, "Start sync packet sent to '%s': cur_pos=%" PRIu32 ", cur_ts=%lu:%lu, now=%lu:%lu, rtptime=%" PRIu32 ",\n", DPRINTF(E_DBG, L_CAST, "Start sync packet sent to '%s': cur_pos=%" PRIu32 ", cur_ts=%lu:%lu, now=%lu:%lu, rtptime=%" PRIu32 ",\n",
cs->devname, cur_stamp.pos, cur_stamp.ts.tv_sec, cur_stamp.ts.tv_nsec, ts.tv_sec, ts.tv_nsec, cms->rtp_session->pos); cs->devname, cur_stamp.pos, cur_stamp.ts.tv_sec, cur_stamp.ts.tv_nsec, ts.tv_sec, ts.tv_nsec, cms->rtp_session->pos);
} }
else if (is_sync_time && cs->state == CAST_STATE_MEDIA_STREAMING) else if (is_sync_time && cs->state == CAST_STATE_MEDIA_STREAMING)
@ -2045,46 +2019,63 @@ cast_device_volume_set(struct output_device *device, int callback_id)
static void static void
cast_write(struct output_buffer *obuf) cast_write(struct output_buffer *obuf)
{ {
struct cast_master_session *cms;
struct cast_session *cs; struct cast_session *cs;
struct cast_session *next;
struct timespec ts;
int i; int i;
int ret;
if (!cast_sessions) if (!cast_sessions)
return; return;
cms = cast_master_session;
for (i = 0; obuf->data[i].buffer; i++) for (i = 0; obuf->data[i].buffer; i++)
{ {
if (!quality_is_equal(&obuf->data[i].quality, &cast_quality_default)) if (quality_is_equal(&obuf->data[i].quality, &cast_quality_default))
continue; break;
// Sends sync packets to new sessions, and if it is sync time then also to old sessions
// packets_sync_send(cms, obuf->pts);
// TODO avoid this copy
evbuffer_add(cms->evbuf, obuf->data[i].buffer, obuf->data[i].bufsize);
cms->evbuf_samples += obuf->data[i].samples;
// Send as many packets as we have data for (one packet requires rawbuf_size bytes)
while (evbuffer_get_length(cms->evbuf) >= cms->rawbuf_size)
{
evbuffer_remove(cms->evbuf, cms->rawbuf, cms->rawbuf_size);
cms->evbuf_samples -= cms->samples_per_packet;
packets_send(cms);
}
} }
// Check for devices that have joined since last write (we have already sent them if (!obuf->data[i].buffer)
// initialization sync and rtp packets via packets_sync_send and packets_send)
for (cs = cast_sessions; cs; cs = cs->next)
{ {
if (cs->state != CAST_STATE_MEDIA_CONNECTED) DPRINTF(E_LOG, L_CAST, "Bug! Output not delivering required data quality\n");
return;
}
// Converts the raw audio in the output_buffer to Chromecast packets
packets_make(cast_master_session, &obuf->data[i]);
for (cs = cast_sessions; cs; cs = next)
{
next = cs->next;
if (!(cs->state & CAST_STATE_F_MEDIA_CONNECTED))
continue; continue;
cs->state = CAST_STATE_MEDIA_STREAMING; if (cs->state == CAST_STATE_MEDIA_CONNECTED)
// Make a cb? {
// Sets that playback will start at time = start_pts with the packet that comes after seqnum_last
cs->start_pts = timespec_add(obuf->pts, cs->offset_ts);
cs->seqnum_next = cast_master_session->rtp_session->seqnum;
cs->state = CAST_STATE_MEDIA_BUFFERING;
clock_gettime(CLOCK_MONOTONIC, &ts);
DPRINTF(E_DBG, L_CAST, "Start time is %lu:%lu, current time is %lu:%lu\n", cs->start_pts.tv_sec, cs->start_pts.tv_nsec, ts.tv_sec, ts.tv_nsec);
}
if (cs->state == CAST_STATE_MEDIA_BUFFERING)
{
clock_gettime(CLOCK_MONOTONIC, &ts);
if (timespec_cmp(cs->start_pts, ts) > 0)
continue; // Keep buffering
cs->state = CAST_STATE_MEDIA_STREAMING;
}
ret = packets_send(cs, cast_master_session->rtp_session);
if (ret < 0)
{
// Downgrade state immediately to avoid further write attempts (session shutdown is async)
cs->state = CAST_STATE_MEDIA_LAUNCHED;
cast_session_shutdown(cs, CAST_STATE_FAILED);
}
} }
} }