diff --git a/src/outputs.c b/src/outputs.c index 873bb3fc..38e0230c 100644 --- a/src/outputs.c +++ b/src/outputs.c @@ -140,7 +140,7 @@ encoding_reset(struct media_quality *quality) } static void -buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_quality *quality, int nsamples) +buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts) { transcode_frame *frame; int ret; @@ -148,6 +148,7 @@ buffer_fill(struct output_buffer *obuf, void *buf, size_t bufsize, struct media_ int n; obuf->write_counter++; + obuf->pts = pts; // The resampling/encoding (transcode) contexts work for a given input quality, // so if the quality changes we need to reset the contexts. We also do that if @@ -217,6 +218,18 @@ outputs_device_start(struct output_device *device, output_status_cb cb, uint64_t return -1; } +int +outputs_device_start2(struct output_device *device, output_status_cb cb) +{ + if (outputs[device->type]->disabled) + return -1; + + if (outputs[device->type]->device_start2) + return outputs[device->type]->device_start2(device, cb); + else + return -1; +} + void outputs_device_stop(struct output_session *session) { @@ -298,36 +311,6 @@ outputs_device_quality_set(struct output_device *device, struct media_quality *q return -1; } -void -outputs_playback_start(uint64_t next_pkt, struct timespec *start_time) -{ - int i; - - for (i = 0; outputs[i]; i++) - { - if (outputs[i]->disabled) - continue; - - if (outputs[i]->playback_start) - outputs[i]->playback_start(next_pkt, start_time); - } -} - -void -outputs_playback_start2(struct timespec *start_time) -{ - int i; - - for (i = 0; outputs[i]; i++) - { - if (outputs[i]->disabled) - continue; - - if (outputs[i]->playback_start2) - outputs[i]->playback_start2(start_time); - } -} - void outputs_playback_stop(void) { @@ -359,11 +342,11 @@ outputs_write(uint8_t *buf, uint64_t rtptime) } void -outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples) +outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts) { int i; - buffer_fill(&output_buffer, buf, bufsize, quality, nsamples); + buffer_fill(&output_buffer, buf, bufsize, quality, nsamples, pts); for (i = 0; outputs[i]; i++) { @@ -396,6 +379,25 @@ outputs_flush(output_status_cb cb, uint64_t rtptime) return ret; } +int +outputs_flush2(output_status_cb cb) +{ + int ret; + int i; + + ret = 0; + for (i = 0; outputs[i]; i++) + { + if (outputs[i]->disabled) + continue; + + if (outputs[i]->flush2) + ret += outputs[i]->flush2(cb); + } + + return ret; +} + void outputs_status_cb(struct output_session *session, output_status_cb cb) { diff --git a/src/outputs.h b/src/outputs.h index 07b11810..d77547e1 100644 --- a/src/outputs.h +++ b/src/outputs.h @@ -172,6 +172,7 @@ struct output_frame struct output_buffer { uint32_t write_counter; // REMOVE ME? not used for anything + struct timespec *pts; struct output_frame frames[OUTPUTS_MAX_QUALITY_SUBSCRIPTIONS + 1]; } output_buffer; @@ -202,6 +203,7 @@ struct output_definition // Prepare a playback session on device and call back int (*device_start)(struct output_device *device, output_status_cb cb, uint64_t rtptime); + int (*device_start2)(struct output_device *device, output_status_cb cb); // Close a session prepared by device_start void (*device_stop)(struct output_session *session); @@ -223,7 +225,6 @@ struct output_definition // Start/stop playback on devices that were started void (*playback_start)(uint64_t next_pkt, struct timespec *ts); - void (*playback_start2)(struct timespec *start_time); void (*playback_stop)(void); // Write stream data to the output devices @@ -232,6 +233,7 @@ struct output_definition // Flush all sessions, the return must be number of sessions pending the flush int (*flush)(output_status_cb cb, uint64_t rtptime); + int (*flush2)(output_status_cb cb); // Authorize an output with a pin-code (probably coming from the filescanner) void (*authorize)(const char *pin); @@ -249,6 +251,9 @@ struct output_definition int outputs_device_start(struct output_device *device, output_status_cb cb, uint64_t rtptime); +int +outputs_device_start2(struct output_device *device, output_status_cb cb); + void outputs_device_stop(struct output_session *session); @@ -268,12 +273,6 @@ outputs_device_volume_to_pct(struct output_device *device, const char *value); int outputs_device_quality_set(struct output_device *device, struct media_quality *quality); -void -outputs_playback_start(uint64_t next_pkt, struct timespec *start_time); - -void -outputs_playback_start2(struct timespec *start_time); - void outputs_playback_stop(void); @@ -281,11 +280,14 @@ void outputs_write(uint8_t *buf, uint64_t rtptime); void -outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples); +outputs_write2(void *buf, size_t bufsize, struct media_quality *quality, int nsamples, struct timespec *pts); int outputs_flush(output_status_cb cb, uint64_t rtptime); +int +outputs_flush2(output_status_cb cb); + void outputs_status_cb(struct output_session *session, output_status_cb cb); diff --git a/src/outputs/raop.c b/src/outputs/raop.c index dc94e5f8..4f228980 100644 --- a/src/outputs/raop.c +++ b/src/outputs/raop.c @@ -169,6 +169,7 @@ struct raop_extra struct raop_master_session { struct evbuffer *evbuf; + int evbuf_samples; struct rtp_session *rtp_session; @@ -177,6 +178,11 @@ struct raop_master_session int samples_per_packet; bool encrypt; + // Number of samples that we tell the output to buffer (this will mean that + // the position that we send in the sync packages are offset by this amount + // compared to the rtptimes of the corresponding RTP packages we are sending) + int output_buffer_samples; + struct raop_master_session *next; }; @@ -354,7 +360,7 @@ static struct media_quality raop_quality_default = { RAOP_QUALITY_SAMPLE_RATE_DE // Forwards static int -raop_device_start(struct output_device *rd, output_status_cb cb, uint64_t rtptime); +raop_device_start(struct output_device *rd, output_status_cb cb); static void raop_device_stop(struct output_session *session); @@ -1245,7 +1251,7 @@ raop_send_req_teardown(struct raop_session *rs, evrtsp_req_cb cb, const char *lo } static int -raop_send_req_flush(struct raop_session *rs, uint64_t rtptime, evrtsp_req_cb cb, const char *log_caller) +raop_send_req_flush(struct raop_session *rs, evrtsp_req_cb cb, const char *log_caller) { struct raop_master_session *rms = rs->master_session; struct evrtsp_request *req; @@ -1269,8 +1275,8 @@ raop_send_req_flush(struct raop_session *rs, uint64_t rtptime, evrtsp_req_cb cb, return -1; } - /* Restart sequence: last sequence + 1 */ - ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum + 1, rms->rtp_session->pos); + /* Restart sequence */ + ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum, rms->rtp_session->pos); if ((ret < 0) || (ret >= sizeof(buf))) { DPRINTF(E_LOG, L_RAOP, "RTP-Info too big for buffer in FLUSH request\n"); @@ -1375,7 +1381,7 @@ raop_send_req_record(struct raop_session *rs, evrtsp_req_cb cb, const char *log_ evrtsp_add_header(req->output_headers, "Range", "npt=0-"); /* Start sequence: next sequence */ - ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum + 1, rms->rtp_session->pos); + ret = snprintf(buf, sizeof(buf), "seq=%" PRIu16 ";rtptime=%u", rms->rtp_session->seqnum, rms->rtp_session->pos); if ((ret < 0) || (ret >= sizeof(buf))) { DPRINTF(E_LOG, L_RAOP, "RTP-Info too big for buffer in RECORD request\n"); @@ -1385,6 +1391,8 @@ raop_send_req_record(struct raop_session *rs, evrtsp_req_cb cb, const char *log_ } evrtsp_add_header(req->output_headers, "RTP-Info", buf); + DPRINTF(E_DBG, L_RAOP, "RTP-Info is %s\n", buf); + ret = evrtsp_make_request(rs->ctrl, req, EVRTSP_REQ_RECORD, rs->session_url); if (ret < 0) { @@ -1784,7 +1792,7 @@ master_session_make(struct media_quality *quality, bool encrypt) CHECK_NULL(L_RAOP, rms = calloc(1, sizeof(struct raop_master_session))); - rms->rtp_session = rtp_session_new(quality, RAOP_PACKET_BUFFER_SIZE, 0, OUTPUTS_BUFFER_DURATION); + rms->rtp_session = rtp_session_new(quality, RAOP_PACKET_BUFFER_SIZE, 0); if (!rms->rtp_session) { outputs_quality_unsubscribe(quality); @@ -1795,6 +1803,7 @@ master_session_make(struct media_quality *quality, bool encrypt) rms->encrypt = encrypt; rms->samples_per_packet = RAOP_SAMPLES_PER_PACKET; rms->rawbuf_size = STOB(rms->samples_per_packet, quality->bits_per_sample, quality->channels); + rms->output_buffer_samples = OUTPUTS_BUFFER_DURATION * quality->sample_rate; CHECK_NULL(L_RAOP, rms->rawbuf = malloc(rms->rawbuf_size)); CHECK_NULL(L_RAOP, rms->evbuf = evbuffer_new()); @@ -1810,10 +1819,8 @@ master_session_free(struct raop_master_session *rms) { outputs_quality_unsubscribe(&rms->rtp_session->quality); rtp_session_free(rms->rtp_session); - evbuffer_free(rms->evbuf); free(rms->rawbuf); - free(rms); } @@ -2875,6 +2882,7 @@ packet_prepare(struct rtp_packet *pkt, uint8_t *rawbuf, size_t rawbuf_size, bool static int packet_send(struct raop_session *rs, struct rtp_packet *pkt) { + struct timeval tv; int ret; if (!rs) @@ -2885,7 +2893,12 @@ packet_send(struct raop_session *rs, struct rtp_packet *pkt) { DPRINTF(E_LOG, L_RAOP, "Send error for '%s': %s\n", rs->devname, strerror(errno)); - session_failure(rs); + rs->state = RAOP_STATE_FAILED; + + // Can't free it right away, it would make the ->next in the calling + // master_session and session loops invalid + evutil_timerclear(&tv); + evtimer_add(rs->deferredev, &tv); return -1; } else if (ret != pkt->data_len) @@ -2894,6 +2907,13 @@ packet_send(struct raop_session *rs, struct rtp_packet *pkt) return -1; } +/* DPRINTF(E_DBG, L_PLAYER, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n", + rs->master_session->rtp_session->seqnum, + rs->master_session->rtp_session->pos, + pkt->header[1], + rs->master_session->rtp_session->pktbuf_len + ); +*/ return 0; } @@ -2923,6 +2943,8 @@ control_packet_send(struct raop_session *rs, struct rtp_packet *pkt) ret = sendto(rs->control_svc->fd, pkt->data, pkt->data_len, 0, &rs->sa.sa, len); if (ret < 0) DPRINTF(E_LOG, L_RAOP, "Could not send playback sync to device '%s': %s\n", rs->devname, strerror(errno)); + + DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET SENT\n"); } static void @@ -2946,55 +2968,88 @@ packets_resend(struct raop_session *rs, uint16_t seqnum, uint16_t len) } static int -frame_send(struct raop_master_session *rms) +packets_send(struct raop_master_session *rms) { struct rtp_packet *pkt; - struct rtp_packet *sync_pkt; struct raop_session *rs; - struct raop_session *next; - bool sync_send; int ret; - while (evbuffer_get_length(rms->evbuf) >= rms->rawbuf_size) + pkt = rtp_packet_next(rms->rtp_session, ALAC_HEADER_LEN + rms->rawbuf_size, rms->samples_per_packet, 0x60); + + ret = packet_prepare(pkt, rms->rawbuf, rms->rawbuf_size, rms->encrypt); + if (ret < 0) + return -1; + + for (rs = raop_sessions; rs; rs = rs->next) { - evbuffer_remove(rms->evbuf, rms->rawbuf, rms->rawbuf_size); + if (rs->master_session != rms) + continue; - pkt = rtp_packet_next(rms->rtp_session, ALAC_HEADER_LEN + rms->rawbuf_size, rms->samples_per_packet); - - ret = packet_prepare(pkt, rms->rawbuf, rms->rawbuf_size, rms->encrypt); - if (ret < 0) - return -1; - - sync_send = rtp_sync_check(rms->rtp_session, pkt); - if (sync_send) - sync_pkt = rtp_sync_packet_next(rms->rtp_session); - - for (rs = raop_sessions; rs; rs = next) + // Device just joined + if (rs->state == RAOP_STATE_CONNECTED) { - // packet_send() may free rs on failure, so save rs->next now - next = rs->next; - - // A device could have joined after playback_start() was called, so we - // also update state here - if (rs->state == RAOP_STATE_CONNECTED) - rs->state = RAOP_STATE_STREAMING; - - if (rs->master_session != rms || rs->state != RAOP_STATE_STREAMING) - continue; - - if (sync_send) - control_packet_send(rs, sync_pkt); - + pkt->header[1] = 0xe0; + packet_send(rs, pkt); + rs->state = RAOP_STATE_STREAMING; + } + else if (rs->state == RAOP_STATE_STREAMING) + { + pkt->header[1] = 0x60; packet_send(rs, pkt); } - - // Commits packet to retransmit buffer, and prepares the session for the next packet - rtp_packet_commit(rms->rtp_session, pkt); } + // Commits packet to retransmit buffer, and prepares the session for the next packet + rtp_packet_commit(rms->rtp_session, pkt); + return 0; } +static void +packets_sync_send(struct raop_master_session *rms, struct timespec *pts) +{ + struct rtp_packet *sync_pkt; + struct raop_session *rs; + struct rtcp_timestamp cur_stamp; + bool is_sync_time; + + // Check if it is time send a sync packet to sessions that are already running + is_sync_time = rtp_sync_is_time(rms->rtp_session); + + // The last write from the player had a timestamp which has been passed to + // this function as pts. This is the time the device should be playing the + // samples just written by the player, so it is a time which is + // OUTPUTS_BUFFER_DURATION secs into the future. However, in the sync packet + // we want to tell the device what it should be playing right now. So we give + // it a cur_time where we subtract this duration. + // TODO do we need this? could we just send the future timestamp? + cur_stamp.ts.tv_sec = pts->tv_sec - OUTPUTS_BUFFER_DURATION; + cur_stamp.ts.tv_nsec = pts->tv_nsec; + // The cur_pos will be the rtptime of the coming packet, minus + // OUTPUTS_BUFFER_DURATION in samples (output_buffer_samples). Because we + // might also have some data lined up in rms->evbuf, we also need to account + // for that. + cur_stamp.pos = rms->rtp_session->pos + rms->evbuf_samples - rms->output_buffer_samples; + + for (rs = raop_sessions; rs; rs = rs->next) + { + if (rs->master_session != rms) + continue; + + // A device has joined and should get an init sync packet + if (rs->state == RAOP_STATE_CONNECTED) + { + sync_pkt = rtp_sync_packet_next(rms->rtp_session, &cur_stamp, 0x90); + control_packet_send(rs, sync_pkt); + } + else if (is_sync_time && rs->state == RAOP_STATE_STREAMING) + { + sync_pkt = rtp_sync_packet_next(rms->rtp_session, &cur_stamp, 0x80); + control_packet_send(rs, sync_pkt); + } + } +} + /* ------------------------------ Time service ------------------------------ */ @@ -3518,7 +3573,7 @@ raop_startup_cancel(struct raop_session *rs) raop_send_req_teardown(rs, session_failure_cb, "startup_cancel"); // Try to start a new session - raop_device_start(rs->device, rs->status_cb, rs->start_rtptime); + raop_device_start(rs->device, rs->status_cb); // Don't let the failed session make a negative status callback rs->status_cb = NULL; @@ -4744,8 +4799,11 @@ raop_device_probe(struct output_device *rd, output_status_cb cb) } static int -raop_device_start(struct output_device *rd, output_status_cb cb, uint64_t rtptime) +raop_device_start(struct output_device *rd, output_status_cb cb) { + event_del(flush_timer); + evtimer_add(keep_alive_timer, &keep_alive_tv); + return raop_device_start_generic(rd, cb, 0); } @@ -4760,7 +4818,6 @@ raop_device_stop(struct output_session *session) session_cleanup(rs); } - static void raop_device_free_extra(struct output_device *device) { @@ -4769,35 +4826,6 @@ raop_device_free_extra(struct output_device *device) free(re); } -static void -raop_playback_start(struct timespec *start_time) -{ - struct raop_master_session *rms; - struct raop_session *rs; - struct rtp_packet *sync_pkt; - - event_del(flush_timer); - evtimer_add(keep_alive_timer, &keep_alive_tv); - - for (rms = raop_master_sessions; rms; rms = rms->next) - { - rtp_session_restart(rms->rtp_session, start_time); // Resets sync counter - sync_pkt = rtp_sync_packet_next(rms->rtp_session); - - for (rs = raop_sessions; rs; rs = rs->next) - { - if (rs->master_session != rms) - continue; - - if (rs->state == RAOP_STATE_CONNECTED) - rs->state = RAOP_STATE_STREAMING; - - if (sync_pkt && rs->state == RAOP_STATE_STREAMING) - control_packet_send(rs, sync_pkt); - } - } -} - static void raop_playback_stop(void) { @@ -4827,14 +4855,26 @@ raop_write(struct output_buffer *obuf) if (!quality_is_equal(&obuf->frames[i].quality, &rms->rtp_session->quality)) continue; + // Sends sync packets to new sessions, and if it is sync time then also to old sessions + packets_sync_send(rms, obuf->pts); + evbuffer_add_buffer_reference(rms->evbuf, obuf->frames[i].evbuf); - frame_send(rms); + rms->evbuf_samples += obuf->frames[i].samples; + + // Send as many packets as we have data for (one packet requires rawbuf_size bytes) + while (evbuffer_get_length(rms->evbuf) >= rms->rawbuf_size) + { + evbuffer_remove(rms->evbuf, rms->rawbuf, rms->rawbuf_size); + rms->evbuf_samples -= rms->samples_per_packet; + + packets_send(rms); + } } } } static int -raop_flush(output_status_cb cb, uint64_t rtptime) +raop_flush(output_status_cb cb) { struct timeval tv; struct raop_session *rs; @@ -4850,7 +4890,7 @@ raop_flush(output_status_cb cb, uint64_t rtptime) if (rs->state != RAOP_STATE_STREAMING) continue; - ret = raop_send_req_flush(rs, rtptime, raop_cb_flush, "flush"); + ret = raop_send_req_flush(rs, raop_cb_flush, "flush"); if (ret < 0) { session_failure(rs); @@ -5047,16 +5087,15 @@ struct output_definition output_raop = .disabled = 0, .init = raop_init, .deinit = raop_deinit, - .device_start = raop_device_start, + .device_start2 = raop_device_start, .device_stop = raop_device_stop, .device_probe = raop_device_probe, .device_free_extra = raop_device_free_extra, .device_volume_set = raop_set_volume_one, .device_volume_to_pct = raop_volume_to_pct, - .playback_start2 = raop_playback_start, .playback_stop = raop_playback_stop, .write2 = raop_write, - .flush = raop_flush, + .flush2 = raop_flush, .status_cb = raop_set_status_cb, .metadata_prepare = raop_metadata_prepare, .metadata_send = raop_metadata_send, diff --git a/src/outputs/rtp_common.c b/src/outputs/rtp_common.c index 0e8a8014..ecfa3146 100644 --- a/src/outputs/rtp_common.c +++ b/src/outputs/rtp_common.c @@ -45,9 +45,7 @@ #include #include "logger.h" -#include "conffile.h" #include "misc.h" -#include "player.h" #include "rtp_common.h" #define RTP_HEADER_LEN 12 @@ -83,7 +81,7 @@ ntp_to_timespec(struct ntp_timestamp *ns, struct timespec *ts) } struct rtp_session * -rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples, int buffer_duration) +rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples) { struct rtp_session *session; @@ -104,10 +102,6 @@ rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_ns else if (sync_each_nsamples == 0) session->sync_each_nsamples = quality->sample_rate; - session->buffer_duration = buffer_duration; - - session->is_virgin = true; - return session; } @@ -125,10 +119,8 @@ rtp_session_free(struct rtp_session *session) } void -rtp_session_restart(struct rtp_session *session, struct timespec *ts) +rtp_session_flush(struct rtp_session *session) { - session->is_virgin = true; - session->start_time = *ts; session->pktbuf_len = 0; session->sync_counter = 0; } @@ -136,7 +128,7 @@ rtp_session_restart(struct rtp_session *session, struct timespec *ts) // We don't want the caller to malloc payload for every packet, so instead we // will get him a packet from the ring buffer, thus in most cases reusing memory struct rtp_packet * -rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples) +rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples, char type) { struct rtp_packet *pkt; uint16_t seq; @@ -166,7 +158,7 @@ rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples) // RTP Header pkt->header[0] = 0x80; // Version = 2, P, X and CC are 0 - pkt->header[1] = (session->is_virgin) ? 0xe0 : 0x60; // TODO allow other payloads + pkt->header[1] = type; // RTP payload type seq = htobe16(session->seqnum); memcpy(pkt->header + 2, &seq, 2); @@ -177,13 +169,6 @@ rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples) ssrc_id = htobe32(session->ssrc_id); memcpy(pkt->header + 8, &ssrc_id, 4); -/* DPRINTF(E_DBG, L_PLAYER, "RTP PACKET seqnum %u, rtptime %u, payload 0x%x, pktbuf_s %zu\n", - session->seqnum, - session->pos, - pkt->header[1], - session->pktbuf_len - ); -*/ return pkt; } @@ -198,8 +183,7 @@ rtp_packet_commit(struct rtp_session *session, struct rtp_packet *pkt) session->pktbuf_next = (session->pktbuf_next + 1) % session->pktbuf_size; session->seqnum++; session->pos += pkt->samples; - - session->is_virgin = false; + session->sync_counter += pkt->samples; } struct rtp_packet * @@ -226,33 +210,23 @@ rtp_packet_get(struct rtp_session *session, uint16_t seqnum) } bool -rtp_sync_check(struct rtp_session *session, struct rtp_packet *pkt) +rtp_sync_is_time(struct rtp_session *session) { - if (!session->sync_each_nsamples) - { - return false; - } - - if (session->sync_counter > session->sync_each_nsamples) + if (session->sync_each_nsamples && session->sync_counter > session->sync_each_nsamples) { session->sync_counter = 0; return true; } - session->sync_counter += pkt->samples; // TODO Should this move to a sync_commit function? return false; } struct rtp_packet * -rtp_sync_packet_next(struct rtp_session *session) +rtp_sync_packet_next(struct rtp_session *session, struct rtcp_timestamp *cur_stamp, char type) { - struct timespec ts; - struct ntp_timestamp cur_stamp; - uint64_t elapsed_usec; - uint64_t elapsed_samples; + struct ntp_timestamp cur_ts; uint32_t rtptime; uint32_t cur_pos; - int ret; if (!session->sync_packet_next.data) { @@ -260,54 +234,32 @@ rtp_sync_packet_next(struct rtp_session *session) session->sync_packet_next.data_len = RTCP_SYNC_PACKET_LEN; } - memset(session->sync_packet_next.data, 0, session->sync_packet_next.data_len); // TODO remove this and just zero byte 3 instead? - - session->sync_packet_next.data[0] = (session->is_virgin) ? 0x90 : 0x80; + session->sync_packet_next.data[0] = type; session->sync_packet_next.data[1] = 0xd4; + session->sync_packet_next.data[2] = 0x00; session->sync_packet_next.data[3] = 0x07; - if (session->is_virgin) - { - session->sync_last_check.pos = session->pos - session->buffer_duration * session->quality.sample_rate; - session->sync_last_check.ts = session->start_time; - timespec_to_ntp(&session->start_time, &cur_stamp); - } - else - { - ret = player_get_time(&ts); - if (ret < 0) - return NULL; + timespec_to_ntp(&cur_stamp->ts, &cur_ts); - elapsed_usec = (ts.tv_sec - session->sync_last_check.ts.tv_sec) * 1000000 + (ts.tv_nsec - session->sync_last_check.ts.tv_nsec) / 1000; - - // How many samples should have been played since last check - elapsed_samples = (elapsed_usec * session->quality.sample_rate) / 1000000; - - session->sync_last_check.pos += elapsed_samples; // TODO should updating sync_last_check move to a commit function? - session->sync_last_check.ts = ts; - timespec_to_ntp(&ts, &cur_stamp); - } - - cur_pos = htobe32(session->sync_last_check.pos); + cur_pos = htobe32(cur_stamp->pos); memcpy(session->sync_packet_next.data + 4, &cur_pos, 4); - cur_stamp.sec = htobe32(cur_stamp.sec); - cur_stamp.frac = htobe32(cur_stamp.frac); - memcpy(session->sync_packet_next.data + 8, &cur_stamp.sec, 4); - memcpy(session->sync_packet_next.data + 12, &cur_stamp.frac, 4); + cur_ts.sec = htobe32(cur_ts.sec); + cur_ts.frac = htobe32(cur_ts.frac); + memcpy(session->sync_packet_next.data + 8, &cur_ts.sec, 4); + memcpy(session->sync_packet_next.data + 12, &cur_ts.frac, 4); rtptime = htobe32(session->pos); memcpy(session->sync_packet_next.data + 16, &rtptime, 4); -/* DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET ts:%ld.%ld, next_pkt:%u, cur_pos:%u, payload:0x%x, sync_counter:%d, init:%d\n", - ts.tv_sec, ts.tv_nsec, + DPRINTF(E_DBG, L_PLAYER, "SYNC PACKET cur_ts:%ld.%ld, next_pkt:%u, cur_pos:%u, type:0x%x, sync_counter:%d\n", + cur_stamp->ts.tv_sec, cur_stamp->ts.tv_nsec, session->pos, - session->sync_last_check.pos, + cur_stamp->pos, session->sync_packet_next.data[0], - session->sync_counter, - session->is_virgin + session->sync_counter ); -*/ + return &session->sync_packet_next; } diff --git a/src/outputs/rtp_common.h b/src/outputs/rtp_common.h index 977412a3..217353e4 100644 --- a/src/outputs/rtp_common.h +++ b/src/outputs/rtp_common.h @@ -37,9 +37,6 @@ struct rtp_session uint32_t pos; uint16_t seqnum; - // True if we haven't started streaming yet - bool is_virgin; - struct media_quality quality; // Packet buffer (ring buffer), used for retransmission @@ -48,35 +45,25 @@ struct rtp_session size_t pktbuf_size; size_t pktbuf_len; - // Time of playback start (given by player) - struct timespec start_time; - - // Number of seconds that we tell the client to buffer (this will mean that - // the position that we send in the sync packages are offset by this amount - // compared to the rtptimes of the corresponding RTP packages we are sending) - int buffer_duration; - // Number of samples to elapse before sync'ing. If 0 we set it to the s/r, so // we sync once a second. If negative we won't sync. int sync_each_nsamples; int sync_counter; struct rtp_packet sync_packet_next; - - struct rtcp_timestamp sync_last_check; }; struct rtp_session * -rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples, int buffer_duration); +rtp_session_new(struct media_quality *quality, int pktbuf_size, int sync_each_nsamples); void rtp_session_free(struct rtp_session *session); void -rtp_session_restart(struct rtp_session *session, struct timespec *ts); +rtp_session_flush(struct rtp_session *session); struct rtp_packet * -rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples); +rtp_packet_next(struct rtp_session *session, size_t payload_len, int samples, char type); void rtp_packet_commit(struct rtp_session *session, struct rtp_packet *pkt); @@ -85,9 +72,9 @@ struct rtp_packet * rtp_packet_get(struct rtp_session *session, uint16_t seqnum); bool -rtp_sync_check(struct rtp_session *session, struct rtp_packet *pkt); +rtp_sync_is_time(struct rtp_session *session); struct rtp_packet * -rtp_sync_packet_next(struct rtp_session *session); +rtp_sync_packet_next(struct rtp_session *session, struct rtcp_timestamp *cur_stamp, char type); #endif /* !__RTP_COMMON_H__ */ diff --git a/src/player.c b/src/player.c index f13222dc..295514d3 100644 --- a/src/player.c +++ b/src/player.c @@ -134,7 +134,7 @@ // (value is in milliseconds) #define PLAYER_WRITE_BEHIND_MAX 1500 -// TODO fix me +// TODO get rid of me #define TEMP_NEXT_RTPTIME (last_rtptime + pb_session.samples_written + pb_session.samples_per_read) struct volume_param { @@ -194,6 +194,14 @@ struct player_session int samples_per_read; struct media_quality quality; + + // The time the playback session started + struct timespec start_ts; + + // The time the first sample in the buffer should be played by the output. + // It will be equal to: + // pts = start_ts + OUTPUTS_BUFFER_DURATION + ticks_elapsed * player_tick_interval + struct timespec pts; }; static struct player_session pb_session; @@ -225,15 +233,11 @@ static int pb_timer_fd; timer_t pb_timer; #endif static struct event *pb_timer_ev; -//static struct timespec pb_timer_last; -//static struct timespec packet_timer_last; -// How often the playback timer triggers playback_cb() -static struct timespec tick_interval; +// Time between ticks, i.e. time between when playback_cb() is invoked +static struct timespec player_tick_interval; // Timer resolution -static struct timespec timer_res; -// Time between two packets -//static struct timespec packet_time = { 0, AIRTUNES_V2_STREAM_PERIOD }; +static struct timespec player_timer_res; // How many writes we owe the output (when the input is underrunning) static int pb_read_deficit; @@ -1009,7 +1013,7 @@ session_init(struct player_session *session, struct media_quality *quality) { session->samples_written = 0; session->quality = *quality; - session->samples_per_read = (quality->sample_rate / 1000) * (tick_interval.tv_nsec / 1000000); + session->samples_per_read = (quality->sample_rate / 1000) * (player_tick_interval.tv_nsec / 1000000); session->bufsize = STOB(session->samples_per_read, quality->bits_per_sample, quality->channels); DPRINTF(E_DBG, L_PLAYER, "New session values (q=%d/%d/%d, spr=%d, bufsize=%zu)\n", @@ -1022,6 +1026,10 @@ session_init(struct player_session *session, struct media_quality *quality) session->buffer = malloc(session->bufsize); CHECK_NULL(L_PLAYER, session->buffer); + + clock_gettime_with_res(CLOCK_MONOTONIC, &session->start_ts, &player_timer_res); + session->pts.tv_sec = session->start_ts.tv_sec + OUTPUTS_BUFFER_DURATION; + session->pts.tv_nsec = session->start_ts.tv_nsec; } static void @@ -1092,6 +1100,7 @@ source_read(uint8_t *buf, int len) static void playback_cb(int fd, short what, void *arg) { + struct timespec ts; uint64_t overrun; int got; int nsamples; @@ -1162,16 +1171,25 @@ playback_cb(int fd, short what, void *arg) } nsamples = BTOS(got, pb_session.quality.bits_per_sample, pb_session.quality.channels); - outputs_write2(pb_session.buffer, pb_session.bufsize, &pb_session.quality, nsamples); + outputs_write2(pb_session.buffer, pb_session.bufsize, &pb_session.quality, nsamples, &pb_session.pts); pb_session.samples_written += nsamples; if (got < pb_session.bufsize) { DPRINTF(E_DBG, L_PLAYER, "Incomplete read, wanted %zu, got %d\n", pb_session.bufsize, got); + // How much the number of samples we got corresponds to in time (nanoseconds) + ts.tv_sec = 0; + ts.tv_nsec = 1000000000L * nsamples / pb_session.quality.sample_rate; + pb_session.pts = timespec_add(pb_session.pts, ts); pb_read_deficit++; } - else if (pb_read_deficit > 0) - pb_read_deficit--; + else + { + // We got a full frame, so that means we can also advance the presentation timestamp by a full tick + pb_session.pts = timespec_add(pb_session.pts, player_tick_interval); + if (pb_read_deficit > 0) + pb_read_deficit--; + } } if (pb_read_deficit > pb_read_deficit_max) @@ -1568,7 +1586,6 @@ device_lost_cb(struct output_device *device, struct output_session *session, enu static void device_activate_cb(struct output_device *device, struct output_session *session, enum output_device_state status) { - struct timespec ts; int retval; int ret; @@ -1610,15 +1627,6 @@ device_activate_cb(struct output_device *device, struct output_session *session, output_sessions++; - if ((player_state == PLAY_PLAYING) && (output_sessions == 1)) - { - ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &timer_res); - if (ret < 0) - DPRINTF(E_LOG, L_PLAYER, "Could not get current time: %s\n", strerror(errno)); - else - outputs_playback_start2(&ts); - } - outputs_status_cb(session, device_streaming_cb); out: @@ -1742,8 +1750,8 @@ playback_timer_start(void) return -1; } - tick.it_interval = tick_interval; - tick.it_value = tick_interval; + tick.it_interval = player_tick_interval; + tick.it_value = player_tick_interval; #ifdef HAVE_TIMERFD ret = timerfd_settime(pb_timer_fd, 0, &tick, NULL); @@ -1807,7 +1815,7 @@ playback_abort(void) static void playback_suspend(void) { - player_flush_pending = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME); + player_flush_pending = outputs_flush2(device_command_cb); playback_timer_stop(); @@ -1941,7 +1949,7 @@ playback_stop(void *arg, int *retval) // We may be restarting very soon, so we don't bring the devices to a full // stop just yet; this saves time when restarting, which is nicer for the user - *retval = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME); + *retval = outputs_flush2(device_command_cb); playback_timer_stop(); @@ -1981,7 +1989,7 @@ playback_start_bh(void *arg, int *retval) // pb_buffer_offset = 0; pb_read_deficit = 0; - ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &timer_res); + ret = clock_gettime_with_res(CLOCK_MONOTONIC, &ts, &player_timer_res); if (ret < 0) goto out_fail; @@ -1989,8 +1997,6 @@ playback_start_bh(void *arg, int *retval) if (ret < 0) goto out_fail; - outputs_playback_start2(&ts); - status_update(PLAY_PLAYING); *retval = 0; @@ -2092,7 +2098,7 @@ playback_start_item(void *arg, int *retval) { if (device->selected && !device->session) { - ret = outputs_device_start(device, device_restart_cb, TEMP_NEXT_RTPTIME); + ret = outputs_device_start2(device, device_restart_cb); if (ret < 0) { DPRINTF(E_LOG, L_PLAYER, "Could not start selected %s device '%s'\n", device->type_name, device->name); @@ -2414,7 +2420,7 @@ playback_pause(void *arg, int *retval) return COMMAND_END; } - *retval = outputs_flush(device_command_cb, TEMP_NEXT_RTPTIME); + *retval = outputs_flush2(device_command_cb); playback_timer_stop(); @@ -2524,7 +2530,7 @@ speaker_activate(struct output_device *device) { DPRINTF(E_DBG, L_PLAYER, "Activating %s device '%s'\n", device->type_name, device->name); - ret = outputs_device_start(device, device_activate_cb, TEMP_NEXT_RTPTIME); + ret = outputs_device_start2(device, device_activate_cb); if (ret < 0) { DPRINTF(E_LOG, L_PLAYER, "Could not start %s device '%s'\n", device->type_name, device->name); @@ -2996,7 +3002,7 @@ player_get_current_pos(uint64_t *pos, struct timespec *ts, int commit) uint64_t delta; int ret; - ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &timer_res); + ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &player_timer_res); if (ret < 0) { DPRINTF(E_LOG, L_PLAYER, "Couldn't get clock: %s\n", strerror(errno)); @@ -3038,7 +3044,7 @@ player_get_time(struct timespec *ts) { int ret; - ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &timer_res); + ret = clock_gettime_with_res(CLOCK_MONOTONIC, ts, &player_timer_res); if (ret < 0) { DPRINTF(E_LOG, L_PLAYER, "Couldn't get clock: %s\n", strerror(errno)); @@ -3503,7 +3509,7 @@ player_init(void) // Determine if the resolution of the system timer is > or < the size // of an audio packet. NOTE: this assumes the system clock resolution // is less than one second. - if (clock_getres(CLOCK_MONOTONIC, &timer_res) < 0) + if (clock_getres(CLOCK_MONOTONIC, &player_timer_res) < 0) { DPRINTF(E_LOG, L_PLAYER, "Could not get the system timer resolution.\n"); @@ -3512,14 +3518,14 @@ player_init(void) if (!cfg_getbool(cfg_getsec(cfg, "general"), "high_resolution_clock")) { - DPRINTF(E_INFO, L_PLAYER, "High resolution clock not enabled on this system (res is %ld)\n", timer_res.tv_nsec); + DPRINTF(E_INFO, L_PLAYER, "High resolution clock not enabled on this system (res is %ld)\n", player_timer_res.tv_nsec); - timer_res.tv_nsec = 10 * PLAYER_TICK_INTERVAL * 1000000; + player_timer_res.tv_nsec = 10 * PLAYER_TICK_INTERVAL * 1000000; } // Set the tick interval for the playback timer - interval = MAX(timer_res.tv_nsec, PLAYER_TICK_INTERVAL * 1000000); - tick_interval.tv_nsec = interval; + interval = MAX(player_timer_res.tv_nsec, PLAYER_TICK_INTERVAL * 1000000); + player_tick_interval.tv_nsec = interval; pb_write_deficit_max = (PLAYER_WRITE_BEHIND_MAX * 1000000 / interval); pb_read_deficit_max = (PLAYER_READ_BEHIND_MAX * 1000000 / interval);