From b71c353fc92bc0072d9e7776c2d679f8f9c21243 Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Sat, 20 Nov 2021 15:30:47 +0100 Subject: [PATCH] [spotify] Update librespot-c for better connection handling - New attempt at fixing #1317: Don't prevent reconnection if a request is queued - Protection against flooding Spotify with reconnection attempts - Don't reconnect when getting a stop request during blocked download --- src/inputs/librespot-c/src/channel.c | 66 +++++++++++++---- src/inputs/librespot-c/src/channel.h | 3 + src/inputs/librespot-c/src/connection.c | 45 ++++++++---- src/inputs/librespot-c/src/connection.h | 5 +- .../librespot-c/src/librespot-c-internal.h | 24 +++++-- src/inputs/librespot-c/src/librespot-c.c | 71 ++++++++++++++----- 6 files changed, 160 insertions(+), 54 deletions(-) diff --git a/src/inputs/librespot-c/src/channel.c b/src/inputs/librespot-c/src/channel.c index b97e65a0..46e40d2d 100644 --- a/src/inputs/librespot-c/src/channel.c +++ b/src/inputs/librespot-c/src/channel.c @@ -54,7 +54,7 @@ channel_get(uint32_t channel_id, struct sp_session *session) if (channel_id > sizeof(session->channels)/sizeof(session->channels)[0]) return NULL; - if (!session->channels[channel_id].is_allocated) + if (session->channels[channel_id].state == SP_CHANNEL_STATE_UNALLOCATED) return NULL; return &session->channels[channel_id]; @@ -63,7 +63,7 @@ channel_get(uint32_t channel_id, struct sp_session *session) void channel_free(struct sp_channel *channel) { - if (!channel || !channel->is_allocated) + if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED) return; if (channel->audio_buf) @@ -108,7 +108,7 @@ channel_new(struct sp_channel **new_channel, struct sp_session *session, const c channel_free(channel); channel->id = i; - channel->is_allocated = true; + channel->state = SP_CHANNEL_STATE_OPENED; channel->file.path = strdup(path); path_to_media_id_and_type(&channel->file); @@ -141,15 +141,18 @@ channel_new(struct sp_channel **new_channel, struct sp_session *session, const c return -1; } -// Set the fd to non-blocking in case the caller changed that, and then read -// until empty static int -channel_flush(int fd) +channel_flush(struct sp_channel *channel) { uint8_t buf[4096]; + int fd = channel->audio_fd[0]; int flags; int got; + evbuffer_drain(channel->audio_buf, -1); + + // Note that we flush the read side. We set the fd to non-blocking in case + // the caller changed that, and then read until empty flags = fcntl(fd, F_GETFL, 0); if (flags == -1) return -1; @@ -167,13 +170,13 @@ channel_flush(int fd) void channel_play(struct sp_channel *channel) { - channel->is_writing = true; + channel->state = SP_CHANNEL_STATE_PLAYING; } void channel_stop(struct sp_channel *channel) { - channel->is_writing = false; + channel->state = SP_CHANNEL_STATE_STOPPED; // This will tell the reader that there is no more to read. He should then // call librespotc_close(), which will clean up the rest of the channel via @@ -182,15 +185,19 @@ channel_stop(struct sp_channel *channel) channel->audio_fd[1] = -1; } -int -channel_seek(struct sp_channel *channel, size_t pos) +static int +channel_seek_internal(struct sp_channel *channel, size_t pos, bool do_flush) { uint32_t seek_words; int ret; - ret = channel_flush(channel->audio_fd[0]); - if (ret < 0) - RETURN_ERROR(SP_ERR_INVALID, "Could not flush read fd before seeking"); + if (do_flush) + { + ret = channel_flush(channel); + if (ret < 0) + RETURN_ERROR(SP_ERR_INVALID, "Could not flush read fd before seeking"); + + } channel->seek_pos = pos; @@ -214,12 +221,38 @@ channel_seek(struct sp_channel *channel, size_t pos) return ret; } +int +channel_seek(struct sp_channel *channel, size_t pos) +{ + channel_seek_internal(channel, pos, true); // true -> request flush +} + void channel_pause(struct sp_channel *channel) { - channel_flush(channel->audio_fd[0]); + channel_flush(channel); - channel->is_writing = false; + channel->state = SP_CHANNEL_STATE_PAUSED; +} + +// After a disconnect we connect to another one and try to resume. To make that +// work some data elements need to be reset. +void +channel_retry(struct sp_channel *channel) +{ + size_t pos; + + if (!channel) + return; + + channel->is_data_mode = false; + + memset(&channel->header, 0, sizeof(struct sp_channel_header)); + memset(&channel->body, 0, sizeof(struct sp_channel_body)); + + pos = 4 * channel->file.received_words - SP_OGG_HEADER_LEN; + + channel_seek_internal(channel, pos, false); // false => don't flush } // Always returns number of byte read so caller can advance read pointer. If @@ -380,6 +413,9 @@ channel_data_write(struct sp_channel *channel) ssize_t wrote; int ret; + if (channel->state == SP_CHANNEL_STATE_PAUSED || channel->state == SP_CHANNEL_STATE_STOPPED) + return SP_OK_DONE; + wrote = evbuffer_write(channel->audio_buf, channel->audio_fd[1]); if (wrote < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) return SP_OK_WAIT; diff --git a/src/inputs/librespot-c/src/channel.h b/src/inputs/librespot-c/src/channel.h index d010adc0..db664e66 100644 --- a/src/inputs/librespot-c/src/channel.h +++ b/src/inputs/librespot-c/src/channel.h @@ -25,5 +25,8 @@ channel_seek(struct sp_channel *channel, size_t pos); void channel_pause(struct sp_channel *channel); +void +channel_retry(struct sp_channel *channel); + int channel_msg_read(uint16_t *channel_id, uint8_t *msg, size_t msg_len, struct sp_session *session); diff --git a/src/inputs/librespot-c/src/connection.c b/src/inputs/librespot-c/src/connection.c index 68178cb8..abb6370d 100644 --- a/src/inputs/librespot-c/src/connection.c +++ b/src/inputs/librespot-c/src/connection.c @@ -240,14 +240,6 @@ ap_resolve(char **address, unsigned short *port) return ret; } -static bool -is_handshake(enum sp_msg_type type) -{ - return ( type == MSG_TYPE_CLIENT_HELLO || - type == MSG_TYPE_CLIENT_RESPONSE_PLAINTEXT || - type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED ); -} - static void connection_clear(struct sp_connection *conn) { @@ -293,7 +285,7 @@ connection_idle_cb(int fd, short what, void *arg) } static int -connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct sp_session *session) +connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, void *response_cb_arg) { int response_fd; int ret; @@ -315,7 +307,7 @@ connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct #endif conn->response_fd = response_fd; - conn->response_ev = event_new(cb->evbase, response_fd, EV_READ | EV_PERSIST, cb->response_cb, session); + conn->response_ev = event_new(cb->evbase, response_fd, EV_READ | EV_PERSIST, cb->response_cb, response_cb_arg); conn->timeout_ev = evtimer_new(cb->evbase, cb->timeout_cb, conn); conn->idle_ev = evtimer_new(cb->evbase, connection_idle_cb, conn); @@ -338,24 +330,36 @@ connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct } enum sp_error -ap_connect(enum sp_msg_type type, struct sp_conn_callbacks *cb, struct sp_session *session) +ap_connect(struct sp_connection *conn, enum sp_msg_type type, time_t *cooldown_ts, struct sp_conn_callbacks *cb, void *cb_arg) { int ret; + time_t now; - if (!session->conn.is_connected) + if (!conn->is_connected) { - ret = connection_make(&session->conn, cb, session); + // Protection against flooding the access points with reconnection attempts + // Note that cooldown_ts can't be part of the connection struct because + // the struct is reset between connection attempts. + now = time(NULL); + if (now > *cooldown_ts + SP_AP_COOLDOWN_SECS) // Last attempt was a long time ago + *cooldown_ts = now; + else if (now >= *cooldown_ts) // Last attempt was recent, so disallow more attempts for a while + *cooldown_ts = now + SP_AP_COOLDOWN_SECS; + else + RETURN_ERROR(SP_ERR_NOCONNECTION, "Cannot connect to access point, cooldown after disconnect is in effect"); + + ret = connection_make(conn, cb, cb_arg); if (ret < 0) RETURN_ERROR(ret, sp_errmsg); } - if (is_handshake(type) || session->conn.handshake_completed) + if (msg_is_handshake(type) || conn->handshake_completed) return SP_OK_DONE; // Proceed right away return SP_OK_WAIT; // Caller must login again error: - ap_disconnect(&session->conn); + ap_disconnect(conn); return ret; } @@ -590,6 +594,9 @@ response_apwelcome(uint8_t *payload, size_t payload_len, struct sp_session *sess session->credentials.stored_cred_len = apwelcome->reusable_auth_credentials.len; memcpy(session->credentials.stored_cred, apwelcome->reusable_auth_credentials.data, session->credentials.stored_cred_len); + + // No need for this any more + memset(session->credentials.password, 0, sizeof(session->credentials.password)); } apwelcome__free_unpacked(apwelcome, NULL); @@ -1259,6 +1266,14 @@ msg_make_chunk_request(uint8_t *out, size_t out_len, struct sp_session *session) return required_len; } +bool +msg_is_handshake(enum sp_msg_type type) +{ + return ( type == MSG_TYPE_CLIENT_HELLO || + type == MSG_TYPE_CLIENT_RESPONSE_PLAINTEXT || + type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED ); +} + int msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session) { diff --git a/src/inputs/librespot-c/src/connection.h b/src/inputs/librespot-c/src/connection.h index 5bc3c005..77877e0e 100644 --- a/src/inputs/librespot-c/src/connection.h +++ b/src/inputs/librespot-c/src/connection.h @@ -2,11 +2,14 @@ void ap_disconnect(struct sp_connection *conn); enum sp_error -ap_connect(enum sp_msg_type type, struct sp_conn_callbacks *cb, struct sp_session *session); +ap_connect(struct sp_connection *conn, enum sp_msg_type type, time_t *cooldown_ts, struct sp_conn_callbacks *cb, void *cb_arg); enum sp_error response_read(struct sp_session *session); +bool +msg_is_handshake(enum sp_msg_type type); + int msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session); diff --git a/src/inputs/librespot-c/src/librespot-c-internal.h b/src/inputs/librespot-c/src/librespot-c-internal.h index a9ed6f30..bc9d964a 100644 --- a/src/inputs/librespot-c/src/librespot-c-internal.h +++ b/src/inputs/librespot-c/src/librespot-c-internal.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -43,6 +44,10 @@ // Max wait for AP to respond #define SP_AP_TIMEOUT_SECS 10 +// After a disconnect we try to reconnect, but if we are disconnected yet again +// we get the hint and won't try reconnecting again until after this cooldown +#define SP_AP_COOLDOWN_SECS 30 + // If client hasn't requested anything in particular #define SP_BITRATE_DEFAULT SP_BITRATE_320 @@ -106,6 +111,15 @@ enum sp_media_type SP_MEDIA_EPISODE, }; +enum sp_channel_state +{ + SP_CHANNEL_STATE_UNALLOCATED = 0, + SP_CHANNEL_STATE_OPENED, + SP_CHANNEL_STATE_PLAYING, + SP_CHANNEL_STATE_PAUSED, + SP_CHANNEL_STATE_STOPPED, +}; + // From librespot-golang enum sp_cmd_type { @@ -275,8 +289,8 @@ struct sp_channel { int id; - bool is_allocated; - bool is_writing; + enum sp_channel_state state; + bool is_data_mode; bool is_spotify_header_received; size_t seek_pos; @@ -306,6 +320,7 @@ struct sp_channel struct sp_session { struct sp_connection conn; + time_t cooldown_ts; bool is_logged_in; struct sp_credentials credentials; @@ -322,9 +337,10 @@ struct sp_session // Go to next step in a request sequence struct event *continue_ev; - // Current (or last) message being processed - enum sp_msg_type msg_type_queued; + // Current, next and subsequent message being processed + enum sp_msg_type msg_type_last; enum sp_msg_type msg_type_next; + enum sp_msg_type msg_type_queued; int (*response_handler)(uint8_t *, size_t, struct sp_session *); struct sp_session *next; diff --git a/src/inputs/librespot-c/src/librespot-c.c b/src/inputs/librespot-c/src/librespot-c.c index fa913189..f5ba80e1 100644 --- a/src/inputs/librespot-c/src/librespot-c.c +++ b/src/inputs/librespot-c/src/librespot-c.c @@ -54,11 +54,7 @@ events for proceeding are activated directly. #include "connection.h" #include "channel.h" -/* TODO list - - - protect against DOS -*/ - +// #define DEBUG_DISCONNECT 1 /* -------------------------------- Globals --------------------------------- */ @@ -77,6 +73,9 @@ static struct commands_base *sp_cmdbase; static struct timeval sp_response_timeout_tv = { SP_AP_TIMEOUT_SECS, 0 }; +#ifdef DEBUG_DISCONNECT +static int debug_disconnect_counter; +#endif // Forwards static int @@ -212,7 +211,7 @@ session_return(struct sp_session *session, enum sp_error err) { // track_write() completed, close the write end which means reader will // get an EOF - if (channel && channel->is_writing && err == SP_OK_DONE) + if (channel && channel->state == SP_CHANNEL_STATE_PLAYING && err == SP_OK_DONE) channel_stop(channel); return; } @@ -241,6 +240,29 @@ session_error(struct sp_session *session, enum sp_error err) ap_disconnect(&session->conn); } +// Called if an access point disconnects. Will clear current connection and +// start a flow where the same request will be made to another access point. +static void +session_retry(struct sp_session *session) +{ + struct sp_channel *channel = session->now_streaming_channel; + enum sp_msg_type type = session->msg_type_last; + int ret; + + sp_cb.logmsg("Retrying after disconnect (occurred at msg %d)\n", type); + + channel_retry(channel); + + ap_disconnect(&session->conn); + + // If we were in the middle of a handshake when disconnected we must restart + if (msg_is_handshake(type)) + type = MSG_TYPE_CLIENT_HELLO; + + ret = request_make(type, session); + if (ret < 0) + session_error(session, ret); +} /* ------------------------ Main sequence control --------------------------- */ @@ -334,6 +356,15 @@ response_cb(int fd, short what, void *arg) if (what == EV_READ) { ret = evbuffer_read(conn->incoming, fd, -1); +#ifdef DEBUG_DISCONNECT + debug_disconnect_counter++; + if (debug_disconnect_counter == 1000) + { + sp_cb.logmsg("Simulating a disconnection from the access point (last request type was %d)\n", session->msg_type_last); + ret = 0; + } +#endif + if (ret == 0) RETURN_ERROR(SP_ERR_NOCONNECTION, "The access point disconnected"); else if (ret < 0) @@ -348,7 +379,7 @@ response_cb(int fd, short what, void *arg) case SP_OK_WAIT: // Incomplete, wait for more data break; case SP_OK_DATA: - if (channel->is_writing && !channel->file.end_of_file) + if (channel->state == SP_CHANNEL_STATE_PLAYING && !channel->file.end_of_file) session->msg_type_next = MSG_TYPE_CHUNK_REQUEST; if (channel->progress_cb) channel->progress_cb(channel->audio_fd[0], channel->cb_arg, 4 * channel->file.received_words - SP_OGG_HEADER_LEN, 4 * channel->file.len_words - SP_OGG_HEADER_LEN); @@ -375,7 +406,10 @@ response_cb(int fd, short what, void *arg) return; error: - session_error(session, ret); + if (ret == SP_ERR_NOCONNECTION) + session_retry(session); + else + session_error(session, ret); } static int @@ -383,9 +417,6 @@ relogin(enum sp_msg_type type, struct sp_session *session) { int ret; - if (session->msg_type_queued != MSG_TYPE_NONE) - RETURN_ERROR(SP_ERR_NOCONNECTION, "Cannot send message, another request is waiting for handshake"); - ret = request_make(MSG_TYPE_CLIENT_HELLO, session); if (ret < 0) RETURN_ERROR(ret, sp_errmsg); @@ -408,16 +439,16 @@ request_make(enum sp_msg_type type, struct sp_session *session) struct sp_conn_callbacks cb = { sp_evbase, response_cb, timeout_cb }; int ret; +// sp_cb.logmsg("Making request %d\n", type); + // Make sure the connection is in a state suitable for sending this message - ret = ap_connect(type, &cb, session); + ret = ap_connect(&session->conn, type, &session->cooldown_ts, &cb, session); if (ret == SP_OK_WAIT) return relogin(type, session); // Can't proceed right now, the handshake needs to complete first else if (ret < 0) RETURN_ERROR(ret, sp_errmsg); ret = msg_make(&msg, type, session); - if (type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED) - memset(session->credentials.password, 0, sizeof(session->credentials.password)); if (ret < 0) RETURN_ERROR(SP_ERR_INVALID, "Error constructing message to Spotify"); @@ -435,6 +466,7 @@ request_make(enum sp_msg_type type, struct sp_session *session) else event_active(session->continue_ev, 0, 0); + session->msg_type_last = type; session->msg_type_next = msg.type_next; session->response_handler = msg.response_handler; @@ -463,7 +495,7 @@ track_write(void *arg, int *retval) RETURN_ERROR(SP_ERR_NOSESSION, "Cannot play track, no valid session found"); channel = session->now_streaming_channel; - if (!channel || !channel->is_allocated) + if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED) RETURN_ERROR(SP_ERR_INVALID, "No active channel to play, has track been opened?"); channel_play(channel); @@ -496,19 +528,20 @@ track_pause(void *arg, int *retval) RETURN_ERROR(SP_ERR_NOSESSION, "Cannot pause track, no valid session found"); channel = session->now_streaming_channel; - if (!channel || !channel->is_allocated) + if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED) RETURN_ERROR(SP_ERR_INVALID, "No active channel to pause, has track been opened?"); // If we are playing we are in the process of downloading a chunk, and in that // case we need that to complete before doing anything else with the channel, // e.g. reset it as track_close() does. - if (!channel->is_writing) + if (channel->state != SP_CHANNEL_STATE_PLAYING) { *retval = 0; return COMMAND_END; } channel_pause(channel); + session->msg_type_next = MSG_TYPE_NONE; *retval = 1; return COMMAND_PENDING; @@ -531,9 +564,9 @@ track_seek(void *arg, int *retval) RETURN_ERROR(SP_ERR_NOSESSION, "Cannot seek, no valid session found"); channel = session->now_streaming_channel; - if (!channel || !channel->is_allocated) + if (!channel) RETURN_ERROR(SP_ERR_INVALID, "No active channel to seek, has track been opened?"); - else if (channel->is_writing) + else if (channel->state != SP_CHANNEL_STATE_OPENED) RETURN_ERROR(SP_ERR_INVALID, "Seeking during playback not currently supported"); // This operation is not safe during chunk downloading because it changes the