[player/outputs/raop] Get rid of outputs_playback_start() (still WIP)

outputs_playback_start() had the problem that was not consistently invoked: If
for instance local audio playback was running and a Airplay device was then
activated, the raop's playback_start would never be invoked (and vice versa,
of course).

Instead, the player now writes the presentation timestamp every time to the
output, so it doesn't need to keep track of it from the start.
This commit is contained in:
ejurgensen 2019-02-10 01:54:52 +01:00
parent a924a8dd66
commit 0cb3881621
6 changed files with 237 additions and 249 deletions

View File

@ -140,7 +140,7 @@ encoding_reset(struct media_quality *quality)
}
static void
buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_quality *quality, int nsamples)
buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts)
{
transcode_frame *frame;
int ret;
@ -148,6 +148,7 @@ buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_
int n;
obuf->write_counter++;
obuf->pts = pts;
// The resampling/encoding (transcode) contexts work for a given input quality,
// so if the quality changes we need to reset the contexts. We also do that if
@ -217,6 +218,18 @@ outputs_device_start(struct output_device *device, output_status_cb cb, uint64_t
return -1;
}
int
outputs_device_start2(struct output_device *device, output_status_cb cb)
{
if (outputs[device->type]->disabled)
return -1;
if (outputs[device->type]->device_start2)
return outputs[device->type]->device_start2(device, cb);
else
return -1;
}
void
outputs_device_stop(struct output_session *session)
{
@ -298,36 +311,6 @@ outputs_device_quality_set(struct output_device *device, struct media_quality *q
return -1;
}
void
outputs_playback_start(uint64_t next_pkt, struct timespec *start_time)
{
int i;
for (i = 0; outputs[i]; i++)
{
if (outputs[i]->disabled)
continue;
if (outputs[i]->playback_start)
outputs[i]->playback_start(next_pkt, start_time);
}
}
void
outputs_playback_start2(struct timespec *start_time)
{
int i;
for (i = 0; outputs[i]; i++)
{
if (outputs[i]->disabled)
continue;
if (outputs[i]->playback_start2)
outputs[i]->playback_start2(start_time);
}
}
void
outputs_playback_stop(void)
{
@ -359,11 +342,11 @@ outputs_write(uint8_t *buf, uint64_t rtptime)
}
void
outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples)
outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts)
{
int i;
buffer_fill(&output_buffer, buf, bufsize, quality, nsamples);
buffer_fill(&output_buffer, buf, bufsize, quality, nsamples, pts);
for (i = 0; outputs[i]; i++)
{
@ -396,6 +379,25 @@ outputs_flush(output_status_cb cb, uint64_t rtptime)
return ret;
}
int
outputs_flush2(output_status_cb cb)
{
int ret;
int i;
ret = 0;
for (i = 0; outputs[i]; i++)
{
if (outputs[i]->disabled)
continue;
if (outputs[i]->flush2)
ret += outputs[i]->flush2(cb);
}
return ret;
}
void
outputs_status_cb(struct output_session *session, output_status_cb cb)
{

View File

@ -172,6 +172,7 @@ struct output_frame
struct output_buffer
{
uint32_t write_counter; // REMOVE ME? not used for anything
struct timespec *pts;
struct output_frame frames[OUTPUTS_MAX_QUALITY_SUBSCRIPTIONS + 1];
} output_buffer;
@ -202,6 +203,7 @@ struct output_definition
// Prepare a playback session on device and call back
int (*device_start)(struct output_device *device, output_status_cb cb, uint64_t rtptime);
int (*device_start2)(struct output_device *device, output_status_cb cb);
// Close a session prepared by device_start
void (*device_stop)(struct output_session *session);
@ -223,7 +225,6 @@ struct output_definition
// Start/stop playback on devices that were started
void (*playback_start)(uint64_t next_pkt, struct timespec *ts);
void (*playback_start2)(struct timespec *start_time);
void (*playback_stop)(void);
// Write stream data to the output devices
@ -232,6 +233,7 @@ struct output_definition
// Flush all sessions, the return must be number of sessions pending the flush
int (*flush)(output_status_cb cb, uint64_t rtptime);
int (*flush2)(output_status_cb cb);
// Authorize an output with a pin-code (probably coming from the filescanner)
void (*authorize)(const char *pin);
@ -249,6 +251,9 @@ struct output_definition
int
outputs_device_start(struct output_device *device, output_status_cb cb, uint64_t rtptime);
int
outputs_device_start2(struct output_device *device, output_status_cb cb);
void
outputs_device_stop(struct output_session *session);
@ -268,12 +273,6 @@ outputs_device_volume_to_pct(struct output_device *device, const char *value);
int
outputs_device_quality_set(struct output_device *device, struct media_quality *quality);
void
outputs_playback_start(uint64_t next_pkt, struct timespec *start_time);
void
outputs_playback_start2(struct timespec *start_time);
void
outputs_playback_stop(void);
@ -281,11 +280,14 @@ void
outputs_write(uint8_t *buf, uint64_t rtptime);
void
outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples);
outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts);
int
outputs_flush(output_status_cb cb, uint64_t rtptime);
int
outputs_flush2(output_status_cb cb);
void
outputs_status_cb(struct output_session *session, output_status_cb cb);

View File

@ -169,6 +169,7 @@ struct raop_extra
struct raop_master_session
{
struct evbuffer *evbuf;
int evbuf_samples;
struct rtp_session *rtp_session;
@ -177,6 +178,11 @@ struct raop_master_session
int samples_per_packet;
bool encrypt;
// Number of samples that we tell the output to buffer (this will mean that
// the position that we send in the sync packages are offset by this amount
// compared to the rtptimes of the corresponding RTP packages we are sending)
int output_buffer_samples;
struct raop_master_session *next;
};
@ -354,7 +360,7 @@ static struct media_quality raop_quality_default = { RAOP_QUALITY_SAMPLE_RATE_DE
// Forwards
static int
raop_device_start(struct output_device *rd, output_status_cb cb, uint64_t rtptime);
raop_device_start(struct output_device *rd, output_status_cb cb);
static void
raop_device_stop(struct output_session *session);
@ -1245,7 +1251,7 @@ raop_send_req_teardown(struct raop_session *rs, evrtsp_req_cb cb, const char *lo
}
static int
raop_send_req_flush(struct raop_session *rs, uint64_t rtptime, evrtsp_req_cb cb, const char *log_caller)
raop_send_req_flush(struct raop_session *rs, evrtsp_req_cb cb, const char *log_caller)
{
struct raop_master_session *rms = rs->master_session;
struct evrtsp_request *req;
@ -1269,8 +1275,8 @@ raop_send_req_flush(struct raop_session *rs, uint64_t rtptime, evrtsp_req_cb cb,
return -1;
}
/* Restart sequence: last sequence + 1 */
ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum + 1, rms->rtp_session->pos);
/* Restart sequence */
ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum, rms->rtp_session->pos);
if ((ret < 0) || (ret >= sizeof(buf)))
{
DPRINTF(E_LOG, L_RAOP, "RTP-Info too big for buffer in FLUSH request\n");
@ -1375,7 +1381,7 @@ raop_send_req_record(struct raop_session *rs, evrtsp_req_cb cb, const char *log_
evrtsp_add_header(req->output_headers, "Range", "npt=0-");
/* Start sequence: next sequence */
ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum + 1, rms->rtp_session->pos);
ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum, rms->rtp_session->pos);
if ((ret < 0) || (ret >= sizeof(buf)))
{
DPRINTF(E_LOG, L_RAOP, "RTP-Info too big for buffer in RECORD request\n");
@ -1385,6 +1391,8 @@ raop_send_req_record(struct raop_session *rs, evrtsp_req_cb cb, const char *log_
}
evrtsp_add_header(req->output_headers, "RTP-Info", buf);
DPRINTF(E_DBG, L_RAOP, "RTP-Info is %s\n", buf);
ret = evrtsp_make_request(rs->ctrl, req, EVRTSP_REQ_RECORD, rs->session_url);
if (ret < 0)
{
@ -1784,7 +1792,7 @@ master_session_make(struct media_quality *quality, bool encrypt)
CHECK_NULL(L_RAOP, rms = calloc(1, sizeof(struct raop_master_session)));
rms->rtp_session = rtp_session_new(quality, RAOP_PACKET_BUFFER_SIZE, 0, OUTPUTS_BUFFER_DURATION);
rms->rtp_session = rtp_session_new(quality, RAOP_PACKET_BUFFER_SIZE, 0);
if (!rms->rtp_session)
{
outputs_quality_unsubscribe(quality);
@ -1795,6 +1803,7 @@ master_session_make(struct media_quality *quality, bool encrypt)
rms->encrypt = encrypt;
rms->samples_per_packet = RAOP_SAMPLES_PER_PACKET;
rms->rawbuf_size = STOB(rms->samples_per_packet, quality->bits_per_sample, quality->channels);
rms->output_buffer_samples = OUTPUTS_BUFFER_DURATION * quality->sample_rate;
CHECK_NULL(L_RAOP, rms->rawbuf = malloc(rms->rawbuf_size));
CHECK_NULL(L_RAOP, rms->evbuf = evbuffer_new());
@ -1810,10 +1819,8 @@ master_session_free(struct raop_master_session *rms)
{
outputs_quality_unsubscribe(&rms->rtp_session->quality);
rtp_session_free(rms->rtp_session);
evbuffer_free(rms->evbuf);
free(rms->rawbuf);
free(rms);
}
@ -2875,6 +2882,7 @@ packet_prepare(struct rtp_packet *pkt, uint8_t *rawbuf, size_t rawbuf_size, bool
static int
packet_send(struct raop_session *rs, struct rtp_packet *pkt)
{
struct timeval tv;
int ret;
if (!rs)
@ -2885,7 +2893,12 @@ packet_send(struct raop_session *rs, struct rtp_packet *pkt)
{
DPRINTF(E_LOG, L_RAOP, "Send error for '%s': %s\n", rs->devname, strerror(errno));
session_failure(rs);
rs->state = RAOP_STATE_FAILED;
// Can't free it right away, it would make the ->next in the calling
// master_session and session loops invalid
evutil_timerclear(&tv);
evtimer_add(rs->deferredev, &tv);
return -1;
}
else if (ret != pkt->data_len)
@ -2894,6 +2907,13 @@ packet_send(struct raop_session *rs, struct rtp_packet *pkt)
return -1;
}
/* DPRINTF(E_DBG, L_PLAYER, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n",
rs->master_session->rtp_session->seqnum,
rs->master_session->rtp_session->pos,
pkt->header[1],
rs->master_session->rtp_session->pktbuf_len
);
*/
return 0;
}
@ -2923,6 +2943,8 @@ control_packet_send(struct raop_session *rs, struct rtp_packet *pkt)
ret = sendto(rs->control_svc->fd, pkt->data, pkt->data_len, 0, &rs->sa.sa, len);
if (ret < 0)
DPRINTF(E_LOG, L_RAOP, "Could not send playback sync to device '%s': %s\n", rs->devname, strerror(errno));
DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET SENT\n");
}
static void
@ -2946,55 +2968,88 @@ packets_resend(struct raop_session *rs, uint16_t seqnum, uint16_t len)
}
static int
frame_send(struct raop_master_session *rms)
packets_send(struct raop_master_session *rms)
{
struct rtp_packet *pkt;
struct rtp_packet *sync_pkt;
struct raop_session *rs;
struct raop_session *next;
bool sync_send;
int ret;
while (evbuffer_get_length(rms->evbuf) >= rms->rawbuf_size)
pkt = rtp_packet_next(rms->rtp_session, ALAC_HEADER_LEN + rms->rawbuf_size, rms->samples_per_packet, 0x60);
ret = packet_prepare(pkt, rms->rawbuf, rms->rawbuf_size, rms->encrypt);
if (ret < 0)
return -1;
for (rs = raop_sessions; rs; rs = rs->next)
{
evbuffer_remove(rms->evbuf, rms->rawbuf, rms->rawbuf_size);
if (rs->master_session != rms)
continue;
pkt = rtp_packet_next(rms->rtp_session, ALAC_HEADER_LEN + rms->rawbuf_size, rms->samples_per_packet);
ret = packet_prepare(pkt, rms->rawbuf, rms->rawbuf_size, rms->encrypt);
if (ret < 0)
return -1;
sync_send = rtp_sync_check(rms->rtp_session, pkt);
if (sync_send)
sync_pkt = rtp_sync_packet_next(rms->rtp_session);
for (rs = raop_sessions; rs; rs = next)
// Device just joined
if (rs->state == RAOP_STATE_CONNECTED)
{
// packet_send() may free rs on failure, so save rs->next now
next = rs->next;
// A device could have joined after playback_start() was called, so we
// also update state here
if (rs->state == RAOP_STATE_CONNECTED)
rs->state = RAOP_STATE_STREAMING;
if (rs->master_session != rms || rs->state != RAOP_STATE_STREAMING)
continue;
if (sync_send)
control_packet_send(rs, sync_pkt);
pkt->header[1] = 0xe0;
packet_send(rs, pkt);
rs->state = RAOP_STATE_STREAMING;
}
else if (rs->state == RAOP_STATE_STREAMING)
{
pkt->header[1] = 0x60;
packet_send(rs, pkt);
}
// Commits packet to retransmit buffer, and prepares the session for the next packet
rtp_packet_commit(rms->rtp_session, pkt);
}
// Commits packet to retransmit buffer, and prepares the session for the next packet
rtp_packet_commit(rms->rtp_session, pkt);
return 0;
}
static void
packets_sync_send(struct raop_master_session *rms, struct timespec *pts)
{
struct rtp_packet *sync_pkt;
struct raop_session *rs;
struct rtcp_timestamp cur_stamp;
bool is_sync_time;
// Check if it is time send a sync packet to sessions that are already running
is_sync_time = rtp_sync_is_time(rms->rtp_session);
// The last write from the player had a timestamp which has been passed to
// this function as pts. This is the time the device should be playing the
// samples just written by the player, so it is a time which is
// OUTPUTS_BUFFER_DURATION secs into the future. However, in the sync packet
// we want to tell the device what it should be playing right now. So we give
// it a cur_time where we subtract this duration.
// TODO do we need this? could we just send the future timestamp?
cur_stamp.ts.tv_sec = pts->tv_sec - OUTPUTS_BUFFER_DURATION;
cur_stamp.ts.tv_nsec = pts->tv_nsec;
// The cur_pos will be the rtptime of the coming packet, minus
// OUTPUTS_BUFFER_DURATION in samples (output_buffer_samples). Because we
// might also have some data lined up in rms->evbuf, we also need to account
// for that.
cur_stamp.pos = rms->rtp_session->pos + rms->evbuf_samples - rms->output_buffer_samples;
for (rs = raop_sessions; rs; rs = rs->next)
{
if (rs->master_session != rms)
continue;
// A device has joined and should get an init sync packet
if (rs->state == RAOP_STATE_CONNECTED)
{
sync_pkt = rtp_sync_packet_next(rms->rtp_session, &cur_stamp, 0x90);
control_packet_send(rs, sync_pkt);
}
else if (is_sync_time && rs->state == RAOP_STATE_STREAMING)
{
sync_pkt = rtp_sync_packet_next(rms->rtp_session, &cur_stamp, 0x80);
control_packet_send(rs, sync_pkt);
}
}
}
/* ------------------------------ Time service ------------------------------ */
@ -3518,7 +3573,7 @@ raop_startup_cancel(struct raop_session *rs)
raop_send_req_teardown(rs, session_failure_cb, "startup_cancel");
// Try to start a new session
raop_device_start(rs->device, rs->status_cb, rs->start_rtptime);
raop_device_start(rs->device, rs->status_cb);
// Don't let the failed session make a negative status callback
rs->status_cb = NULL;
@ -4744,8 +4799,11 @@ raop_device_probe(struct output_device *rd, output_status_cb cb)
}
static int
raop_device_start(struct output_device *rd, output_status_cb cb, uint64_t rtptime)
raop_device_start(struct output_device *rd, output_status_cb cb)
{
event_del(flush_timer);
evtimer_add(keep_alive_timer, &keep_alive_tv);
return raop_device_start_generic(rd, cb, 0);
}
@ -4760,7 +4818,6 @@ raop_device_stop(struct output_session *session)
session_cleanup(rs);
}
static void
raop_device_free_extra(struct output_device *device)
{
@ -4769,35 +4826,6 @@ raop_device_free_extra(struct output_device *device)
free(re);
}
static void
raop_playback_start(struct timespec *start_time)
{
struct raop_master_session *rms;
struct raop_session *rs;
struct rtp_packet *sync_pkt;
event_del(flush_timer);
evtimer_add(keep_alive_timer, &keep_alive_tv);
for (rms = raop_master_sessions; rms; rms = rms->next)
{
rtp_session_restart(rms->rtp_session, start_time); // Resets sync counter
sync_pkt = rtp_sync_packet_next(rms->rtp_session);
for (rs = raop_sessions; rs; rs = rs->next)
{
if (rs->master_session != rms)
continue;
if (rs->state == RAOP_STATE_CONNECTED)
rs->state = RAOP_STATE_STREAMING;
if (sync_pkt && rs->state == RAOP_STATE_STREAMING)
control_packet_send(rs, sync_pkt);
}
}
}
static void
raop_playback_stop(void)
{
@ -4827,14 +4855,26 @@ raop_write(struct output_buffer *obuf)
if (!quality_is_equal(&obuf->frames[i].quality, &rms->rtp_session->quality))
continue;
// Sends sync packets to new sessions, and if it is sync time then also to old sessions
packets_sync_send(rms, obuf->pts);
evbuffer_add_buffer_reference(rms->evbuf, obuf->frames[i].evbuf);
frame_send(rms);
rms->evbuf_samples += obuf->frames[i].samples;
// Send as many packets as we have data for (one packet requires rawbuf_size bytes)
while (evbuffer_get_length(rms->evbuf) >= rms->rawbuf_size)
{
evbuffer_remove(rms->evbuf, rms->rawbuf, rms->rawbuf_size);
rms->evbuf_samples -= rms->samples_per_packet;
packets_send(rms);
}
}
}
}
static int
raop_flush(output_status_cb cb, uint64_t rtptime)
raop_flush(output_status_cb cb)
{
struct timeval tv;
struct raop_session *rs;
@ -4850,7 +4890,7 @@ raop_flush(output_status_cb cb, uint64_t rtptime)
if (rs->state != RAOP_STATE_STREAMING)
continue;
ret = raop_send_req_flush(rs, rtptime, raop_cb_flush, "flush");
ret = raop_send_req_flush(rs, raop_cb_flush, "flush");
if (ret < 0)
{
session_failure(rs);
@ -5047,16 +5087,15 @@ struct output_definition output_raop =
.disabled = 0,
.init = raop_init,
.deinit = raop_deinit,
.device_start = raop_device_start,
.device_start2 = raop_device_start,
.device_stop = raop_device_stop,
.device_probe = raop_device_probe,
.device_free_extra = raop_device_free_extra,
.device_volume_set = raop_set_volume_one,
.device_volume_to_pct = raop_volume_to_pct,
.playback_start2 = raop_playback_start,
.playback_stop = raop_playback_stop,
.write2 = raop_write,
.flush = raop_flush,
.flush2 = raop_flush,
.status_cb = raop_set_status_cb,
.metadata_prepare = raop_metadata_prepare,
.metadata_send = raop_metadata_send,

View File

@ -45,9 +45,7 @@
#include <gcrypt.h>
#include "logger.h"
#include "conffile.h"
#include "misc.h"
#include "player.h"
#include "rtp_common.h"
#define RTP_HEADER_LEN 12
@ -83,7 +81,7 @@ ntp_to_timespec(struct ntp_timestamp *ns, struct timespec *ts)
}
struct rtp_session *
rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples, int buffer_duration)
rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples)
{
struct rtp_session *session;
@ -104,10 +102,6 @@ rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_ns
else if (sync_each_nsamples == 0)
session->sync_each_nsamples = quality->sample_rate;
session->buffer_duration = buffer_duration;
session->is_virgin = true;
return session;
}
@ -125,10 +119,8 @@ rtp_session_free(struct rtp_session *session)
}
void
rtp_session_restart(struct rtp_session *session, struct timespec *ts)
rtp_session_flush(struct rtp_session *session)
{
session->is_virgin = true;
session->start_time = *ts;
session->pktbuf_len = 0;
session->sync_counter = 0;
}
@ -136,7 +128,7 @@ rtp_session_restart(struct rtp_session *session, struct timespec *ts)
// We don't want the caller to malloc payload for every packet, so instead we
// will get him a packet from the ring buffer, thus in most cases reusing memory
struct rtp_packet *
rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples)
rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples, char type)
{
struct rtp_packet *pkt;
uint16_t seq;
@ -166,7 +158,7 @@ rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples)
// RTP Header
pkt->header[0] = 0x80; // Version = 2, P, X and CC are 0
pkt->header[1] = (session->is_virgin) ? 0xe0 : 0x60; // TODO allow other payloads
pkt->header[1] = type; // RTP payload type
seq = htobe16(session->seqnum);
memcpy(pkt->header + 2, &seq, 2);
@ -177,13 +169,6 @@ rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples)
ssrc_id = htobe32(session->ssrc_id);
memcpy(pkt->header + 8, &ssrc_id, 4);
/* DPRINTF(E_DBG, L_PLAYER, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n",
session->seqnum,
session->pos,
pkt->header[1],
session->pktbuf_len
);
*/
return pkt;
}
@ -198,8 +183,7 @@ rtp_packet_commit(struct rtp_session *session, struct rtp_packet *pkt)
session->pktbuf_next = (session->pktbuf_next + 1) % session->pktbuf_size;
session->seqnum++;
session->pos += pkt->samples;
session->is_virgin = false;
session->sync_counter += pkt->samples;
}
struct rtp_packet *
@ -226,33 +210,23 @@ rtp_packet_get(struct rtp_session *session, uint16_t seqnum)
}
bool
rtp_sync_check(struct rtp_session *session, struct rtp_packet *pkt)
rtp_sync_is_time(struct rtp_session *session)
{
if (!session->sync_each_nsamples)
{
return false;
}
if (session->sync_counter > session->sync_each_nsamples)
if (session->sync_each_nsamples && session->sync_counter > session->sync_each_nsamples)
{
session->sync_counter = 0;
return true;
}
session->sync_counter += pkt->samples; // TODO Should this move to a sync_commit function?
return false;
}
struct rtp_packet *
rtp_sync_packet_next(struct rtp_session *session)
rtp_sync_packet_next(struct rtp_session *session, struct rtcp_timestamp *cur_stamp, char type)
{
struct timespec ts;
struct ntp_timestamp cur_stamp;
uint64_t elapsed_usec;
uint64_t elapsed_samples;
struct ntp_timestamp cur_ts;
uint32_t rtptime;
uint32_t cur_pos;
int ret;
if (!session->sync_packet_next.data)
{
@ -260,54 +234,32 @@ rtp_sync_packet_next(struct rtp_session *session)
session->sync_packet_next.data_len = RTCP_SYNC_PACKET_LEN;
}
memset(session->sync_packet_next.data, 0, session->sync_packet_next.data_len); // TODO remove this and just zero byte 3 instead?
session->sync_packet_next.data[0] = (session->is_virgin) ? 0x90 : 0x80;
session->sync_packet_next.data[0] = type;
session->sync_packet_next.data[1] = 0xd4;
session->sync_packet_next.data[2] = 0x00;
session->sync_packet_next.data[3] = 0x07;
if (session->is_virgin)
{
session->sync_last_check.pos = session->pos - session->buffer_duration * session->quality.sample_rate;
session->sync_last_check.ts = session->start_time;
timespec_to_ntp(&session->start_time, &cur_stamp);
}
else
{
ret = player_get_time(&ts);
if (ret < 0)
return NULL;
timespec_to_ntp(&cur_stamp->ts, &cur_ts);
elapsed_usec = (ts.tv_sec - session->sync_last_check.ts.tv_sec) * 1000000 + (ts.tv_nsec - session->sync_last_check.ts.tv_nsec) / 1000;
// How many samples should have been played since last check
elapsed_samples = (elapsed_usec * session->quality.sample_rate) / 1000000;
session->sync_last_check.pos += elapsed_samples; // TODO should updating sync_last_check move to a commit function?
session->sync_last_check.ts = ts;
timespec_to_ntp(&ts, &cur_stamp);
}
cur_pos = htobe32(session->sync_last_check.pos);
cur_pos = htobe32(cur_stamp->pos);
memcpy(session->sync_packet_next.data + 4, &cur_pos, 4);
cur_stamp.sec = htobe32(cur_stamp.sec);
cur_stamp.frac = htobe32(cur_stamp.frac);
memcpy(session->sync_packet_next.data + 8, &cur_stamp.sec, 4);
memcpy(session->sync_packet_next.data + 12, &cur_stamp.frac, 4);
cur_ts.sec = htobe32(cur_ts.sec);
cur_ts.frac = htobe32(cur_ts.frac);
memcpy(session->sync_packet_next.data + 8, &cur_ts.sec, 4);
memcpy(session->sync_packet_next.data + 12, &cur_ts.frac, 4);
rtptime = htobe32(session->pos);
memcpy(session->sync_packet_next.data + 16, &rtptime, 4);
/* DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET ts:%ld.%ld, next_pkt:%u, cur_pos:%u, payload:0x%x, sync_counter:%d, init:%d\n",
ts.tv_sec, ts.tv_nsec,
DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET cur_ts:%ld.%ld, next_pkt:%u, cur_pos:%u, type:0x%x, sync_counter:%d\n",
cur_stamp->ts.tv_sec, cur_stamp->ts.tv_nsec,
session->pos,
session->sync_last_check.pos,
cur_stamp->pos,
session->sync_packet_next.data[0],
session->sync_counter,
session->is_virgin
session->sync_counter
);
*/
return &session->sync_packet_next;
}

View File

@ -37,9 +37,6 @@ struct rtp_session
uint32_t pos;
uint16_t seqnum;
// True if we haven't started streaming yet
bool is_virgin;
struct media_quality quality;
// Packet buffer (ring buffer), used for retransmission
@ -48,35 +45,25 @@ struct rtp_session
size_t pktbuf_size;
size_t pktbuf_len;
// Time of playback start (given by player)
struct timespec start_time;
// Number of seconds that we tell the client to buffer (this will mean that
// the position that we send in the sync packages are offset by this amount
// compared to the rtptimes of the corresponding RTP packages we are sending)
int buffer_duration;
// Number of samples to elapse before sync'ing. If 0 we set it to the s/r, so
// we sync once a second. If negative we won't sync.
int sync_each_nsamples;
int sync_counter;
struct rtp_packet sync_packet_next;
struct rtcp_timestamp sync_last_check;
};
struct rtp_session *
rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples, int buffer_duration);
rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples);
void
rtp_session_free(struct rtp_session *session);
void
rtp_session_restart(struct rtp_session *session, struct timespec *ts);
rtp_session_flush(struct rtp_session *session);
struct rtp_packet *
rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples);
rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples, char type);
void
rtp_packet_commit(struct rtp_session *session, struct rtp_packet *pkt);
@ -85,9 +72,9 @@ struct rtp_packet *
rtp_packet_get(struct rtp_session *session, uint16_t seqnum);
bool
rtp_sync_check(struct rtp_session *session, struct rtp_packet *pkt);
rtp_sync_is_time(struct rtp_session *session);
struct rtp_packet *
rtp_sync_packet_next(struct rtp_session *session);
rtp_sync_packet_next(struct rtp_session *session, struct rtcp_timestamp *cur_stamp, char type);
#endif /* !__RTP_COMMON_H__ */

View File

@ -134,7 +134,7 @@
// (value is in milliseconds)
#define PLAYER_WRITE_BEHIND_MAX 1500
// TODO fix me
// TODO get rid of me
#define TEMP_NEXT_RTPTIME (last_rtptime + pb_session.samples_written + pb_session.samples_per_read)
struct volume_param {
@ -194,6 +194,14 @@ struct player_session
int samples_per_read;
struct media_quality quality;
// The time the playback session started
struct timespec start_ts;
// The time the first sample in the buffer should be played by the output.
// It will be equal to:
// pts = start_ts + OUTPUTS_BUFFER_DURATION + ticks_elapsed * player_tick_interval
struct timespec pts;
};
static struct player_session pb_session;
@ -225,15 +233,11 @@ static int pb_timer_fd;
timer_t pb_timer;
#endif
static struct event *pb_timer_ev;
//static struct timespec pb_timer_last;
//static struct timespec packet_timer_last;
// How often the playback timer triggers playback_cb()
static struct timespec tick_interval;
// Time between ticks, i.e. time between when playback_cb() is invoked
static struct timespec player_tick_interval;
// Timer resolution
static struct timespec timer_res;
// Time between two packets
//static struct timespec packet_time = { 0, AIRTUNES_V2_STREAM_PERIOD };
static struct timespec player_timer_res;
// How many writes we owe the output (when the input is underrunning)
static int pb_read_deficit;
@ -1009,7 +1013,7 @@ session_init(struct player_session *session, struct media_quality *quality)
{
session->samples_written = 0;
session->quality = *quality;
session->samples_per_read = (quality->sample_rate / 1000) * (tick_interval.tv_nsec / 1000000);
session->samples_per_read = (quality->sample_rate / 1000) * (player_tick_interval.tv_nsec / 1000000);
session->bufsize = STOB(session->samples_per_read, quality->bits_per_sample, quality->channels);
DPRINTF(E_DBG, L_PLAYER, "New session values (q=%d/%d/%d, spr=%d, bufsize=%zu)\n",
@ -1022,6 +1026,10 @@ session_init(struct player_session *session, struct media_quality *quality)
session->buffer = malloc(session->bufsize);
CHECK_NULL(L_PLAYER, session->buffer);
clock_gettime_with_res(CLOCK_MONOTONIC, &session->start_ts, &player_timer_res);
session->pts.tv_sec = session->start_ts.tv_sec + OUTPUTS_BUFFER_DURATION;
session->pts.tv_nsec = session->start_ts.tv_nsec;
}
static void
@ -1092,6 +1100,7 @@ source_read(uint8_t *buf, int len)
static void
playback_cb(int fd, short what, void *arg)
{
struct timespec ts;
uint64_t overrun;
int got;
int nsamples;
@ -1162,16 +1171,25 @@ playback_cb(int fd, short what, void *arg)
}
nsamples = BTOS(got, pb_session.quality.bits_per_sample, pb_session.quality.channels);
outputs_write2(pb_session.buffer, pb_session.bufsize, &pb_session.quality, nsamples);
outputs_write2(pb_session.buffer, pb_session.bufsize, &pb_session.quality, nsamples, &pb_session.pts);
pb_session.samples_written += nsamples;
if (got < pb_session.bufsize)
{
DPRINTF(E_DBG, L_PLAYER, "Incomplete read, wanted %zu, got %d\n", pb_session.bufsize, got);
// How much the number of samples we got corresponds to in time (nanoseconds)
ts.tv_sec = 0;
ts.tv_nsec = 1000000000L * nsamples / pb_session.quality.sample_rate;
pb_session.pts = timespec_add(pb_session.pts, ts);
pb_read_deficit++;
}
else if (pb_read_deficit > 0)
pb_read_deficit--;
else
{
// We got a full frame, so that means we can also advance the presentation timestamp by a full tick
pb_session.pts = timespec_add(pb_session.pts, player_tick_interval);
if (pb_read_deficit > 0)
pb_read_deficit--;
}
}
if (pb_read_deficit > pb_read_deficit_max)
@ -1568,7 +1586,6 @@ device_lost_cb(struct output_device *device, struct output_session *session, enu
static void
device_activate_cb(struct output_device *device, struct output_session *session, enum output_device_state status)
{
struct timespec ts;
int retval;
int ret;
@ -1610,15 +1627,6 @@ device_activate_cb(struct output_device *device, struct output_session *session,
output_sessions++;
if ((player_state == PLAY_PLAYING) && (output_sessions == 1))
{
ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &timer_res);
if (ret < 0)
DPRINTF(E_LOG, L_PLAYER, "Could not get current time: %s\n", strerror(errno));
else
outputs_playback_start2(&ts);
}
outputs_status_cb(session, device_streaming_cb);
out:
@ -1742,8 +1750,8 @@ playback_timer_start(void)
return -1;
}
tick.it_interval = tick_interval;
tick.it_value = tick_interval;
tick.it_interval = player_tick_interval;
tick.it_value = player_tick_interval;
#ifdef HAVE_TIMERFD
ret = timerfd_settime(pb_timer_fd, 0, &tick, NULL);
@ -1807,7 +1815,7 @@ playback_abort(void)
static void
playback_suspend(void)
{
player_flush_pending = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME);
player_flush_pending = outputs_flush2(device_command_cb);
playback_timer_stop();
@ -1941,7 +1949,7 @@ playback_stop(void *arg, int *retval)
// We may be restarting very soon, so we don't bring the devices to a full
// stop just yet; this saves time when restarting, which is nicer for the user
*retval = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME);
*retval = outputs_flush2(device_command_cb);
playback_timer_stop();
@ -1981,7 +1989,7 @@ playback_start_bh(void *arg, int *retval)
// pb_buffer_offset = 0;
pb_read_deficit = 0;
ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &timer_res);
ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &player_timer_res);
if (ret < 0)
goto out_fail;
@ -1989,8 +1997,6 @@ playback_start_bh(void *arg, int *retval)
if (ret < 0)
goto out_fail;
outputs_playback_start2(&ts);
status_update(PLAY_PLAYING);
*retval = 0;
@ -2092,7 +2098,7 @@ playback_start_item(void *arg, int *retval)
{
if (device->selected && !device->session)
{
ret = outputs_device_start(device, device_restart_cb, TEMP_NEXT_RTPTIME);
ret = outputs_device_start2(device, device_restart_cb);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Could not start selected %s device '%s'\n", device->type_name, device->name);
@ -2414,7 +2420,7 @@ playback_pause(void *arg, int *retval)
return COMMAND_END;
}
*retval = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME);
*retval = outputs_flush2(device_command_cb);
playback_timer_stop();
@ -2524,7 +2530,7 @@ speaker_activate(struct output_device *device)
{
DPRINTF(E_DBG, L_PLAYER, "Activating %s device '%s'\n", device->type_name, device->name);
ret = outputs_device_start(device, device_activate_cb, TEMP_NEXT_RTPTIME);
ret = outputs_device_start2(device, device_activate_cb);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Could not start %s device '%s'\n", device->type_name, device->name);
@ -2996,7 +3002,7 @@ player_get_current_pos(uint64_t *pos, struct timespec *ts, int commit)
uint64_t delta;
int ret;
ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &timer_res);
ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &player_timer_res);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Couldn't get clock: %s\n", strerror(errno));
@ -3038,7 +3044,7 @@ player_get_time(struct timespec *ts)
{
int ret;
ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &timer_res);
ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &player_timer_res);
if (ret < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Couldn't get clock: %s\n", strerror(errno));
@ -3503,7 +3509,7 @@ player_init(void)
// Determine if the resolution of the system timer is > or < the size
// of an audio packet. NOTE: this assumes the system clock resolution
// is less than one second.
if (clock_getres(CLOCK_MONOTONIC, &timer_res) < 0)
if (clock_getres(CLOCK_MONOTONIC, &player_timer_res) < 0)
{
DPRINTF(E_LOG, L_PLAYER, "Could not get the system timer resolution.\n");
@ -3512,14 +3518,14 @@ player_init(void)
if (!cfg_getbool(cfg_getsec(cfg, "general"), "high_resolution_clock"))
{
DPRINTF(E_INFO, L_PLAYER, "High resolution clock not enabled on this system (res is %ld)\n", timer_res.tv_nsec);
DPRINTF(E_INFO, L_PLAYER, "High resolution clock not enabled on this system (res is %ld)\n", player_timer_res.tv_nsec);
timer_res.tv_nsec = 10 * PLAYER_TICK_INTERVAL * 1000000;
player_timer_res.tv_nsec = 10 * PLAYER_TICK_INTERVAL * 1000000;
}
// Set the tick interval for the playback timer
interval = MAX(timer_res.tv_nsec, PLAYER_TICK_INTERVAL * 1000000);
tick_interval.tv_nsec = interval;
interval = MAX(player_timer_res.tv_nsec, PLAYER_TICK_INTERVAL * 1000000);
player_tick_interval.tv_nsec = interval;
pb_write_deficit_max = (PLAYER_WRITE_BEHIND_MAX * 1000000 / interval);
pb_read_deficit_max = (PLAYER_READ_BEHIND_MAX * 1000000 / interval);