[spotify] Update librespot-c for better connection handling

- New attempt at fixing #1317: Don't prevent reconnection if a request is queued
- Protection against flooding Spotify with reconnection attempts
- Don't reconnect when getting a stop request during blocked download
This commit is contained in:
ejurgensen 2021-11-20 15:30:47 +01:00
parent 434fc4652c
commit b71c353fc9
6 changed files with 160 additions and 54 deletions

View File

@ -54,7 +54,7 @@ channel_get(uint32_t channel_id, struct sp_session *session)
if (channel_id > sizeof(session->channels)/sizeof(session->channels)[0]) if (channel_id > sizeof(session->channels)/sizeof(session->channels)[0])
return NULL; return NULL;
if (!session->channels[channel_id].is_allocated) if (session->channels[channel_id].state == SP_CHANNEL_STATE_UNALLOCATED)
return NULL; return NULL;
return &session->channels[channel_id]; return &session->channels[channel_id];
@ -63,7 +63,7 @@ channel_get(uint32_t channel_id, struct sp_session *session)
void void
channel_free(struct sp_channel *channel) channel_free(struct sp_channel *channel)
{ {
if (!channel || !channel->is_allocated) if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED)
return; return;
if (channel->audio_buf) if (channel->audio_buf)
@ -108,7 +108,7 @@ channel_new(struct sp_channel **new_channel, struct sp_session *session, const c
channel_free(channel); channel_free(channel);
channel->id = i; channel->id = i;
channel->is_allocated = true; channel->state = SP_CHANNEL_STATE_OPENED;
channel->file.path = strdup(path); channel->file.path = strdup(path);
path_to_media_id_and_type(&channel->file); path_to_media_id_and_type(&channel->file);
@ -141,15 +141,18 @@ channel_new(struct sp_channel **new_channel, struct sp_session *session, const c
return -1; return -1;
} }
// Set the fd to non-blocking in case the caller changed that, and then read
// until empty
static int static int
channel_flush(int fd) channel_flush(struct sp_channel *channel)
{ {
uint8_t buf[4096]; uint8_t buf[4096];
int fd = channel->audio_fd[0];
int flags; int flags;
int got; int got;
evbuffer_drain(channel->audio_buf, -1);
// Note that we flush the read side. We set the fd to non-blocking in case
// the caller changed that, and then read until empty
flags = fcntl(fd, F_GETFL, 0); flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) if (flags == -1)
return -1; return -1;
@ -167,13 +170,13 @@ channel_flush(int fd)
void void
channel_play(struct sp_channel *channel) channel_play(struct sp_channel *channel)
{ {
channel->is_writing = true; channel->state = SP_CHANNEL_STATE_PLAYING;
} }
void void
channel_stop(struct sp_channel *channel) channel_stop(struct sp_channel *channel)
{ {
channel->is_writing = false; channel->state = SP_CHANNEL_STATE_STOPPED;
// This will tell the reader that there is no more to read. He should then // This will tell the reader that there is no more to read. He should then
// call librespotc_close(), which will clean up the rest of the channel via // call librespotc_close(), which will clean up the rest of the channel via
@ -182,16 +185,20 @@ channel_stop(struct sp_channel *channel)
channel->audio_fd[1] = -1; channel->audio_fd[1] = -1;
} }
int static int
channel_seek(struct sp_channel *channel, size_t pos) channel_seek_internal(struct sp_channel *channel, size_t pos, bool do_flush)
{ {
uint32_t seek_words; uint32_t seek_words;
int ret; int ret;
ret = channel_flush(channel->audio_fd[0]); if (do_flush)
{
ret = channel_flush(channel);
if (ret < 0) if (ret < 0)
RETURN_ERROR(SP_ERR_INVALID, "Could not flush read fd before seeking"); RETURN_ERROR(SP_ERR_INVALID, "Could not flush read fd before seeking");
}
channel->seek_pos = pos; channel->seek_pos = pos;
// If seek + header isn't word aligned we will get up to 3 bytes before the // If seek + header isn't word aligned we will get up to 3 bytes before the
@ -214,12 +221,38 @@ channel_seek(struct sp_channel *channel, size_t pos)
return ret; return ret;
} }
int
channel_seek(struct sp_channel *channel, size_t pos)
{
channel_seek_internal(channel, pos, true); // true -> request flush
}
void void
channel_pause(struct sp_channel *channel) channel_pause(struct sp_channel *channel)
{ {
channel_flush(channel->audio_fd[0]); channel_flush(channel);
channel->is_writing = false; channel->state = SP_CHANNEL_STATE_PAUSED;
}
// After a disconnect we connect to another one and try to resume. To make that
// work some data elements need to be reset.
void
channel_retry(struct sp_channel *channel)
{
size_t pos;
if (!channel)
return;
channel->is_data_mode = false;
memset(&channel->header, 0, sizeof(struct sp_channel_header));
memset(&channel->body, 0, sizeof(struct sp_channel_body));
pos = 4 * channel->file.received_words - SP_OGG_HEADER_LEN;
channel_seek_internal(channel, pos, false); // false => don't flush
} }
// Always returns number of byte read so caller can advance read pointer. If // Always returns number of byte read so caller can advance read pointer. If
@ -380,6 +413,9 @@ channel_data_write(struct sp_channel *channel)
ssize_t wrote; ssize_t wrote;
int ret; int ret;
if (channel->state == SP_CHANNEL_STATE_PAUSED || channel->state == SP_CHANNEL_STATE_STOPPED)
return SP_OK_DONE;
wrote = evbuffer_write(channel->audio_buf, channel->audio_fd[1]); wrote = evbuffer_write(channel->audio_buf, channel->audio_fd[1]);
if (wrote < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) if (wrote < 0 && (errno == EAGAIN || errno == EWOULDBLOCK))
return SP_OK_WAIT; return SP_OK_WAIT;

View File

@ -25,5 +25,8 @@ channel_seek(struct sp_channel *channel, size_t pos);
void void
channel_pause(struct sp_channel *channel); channel_pause(struct sp_channel *channel);
void
channel_retry(struct sp_channel *channel);
int int
channel_msg_read(uint16_t *channel_id, uint8_t *msg, size_t msg_len, struct sp_session *session); channel_msg_read(uint16_t *channel_id, uint8_t *msg, size_t msg_len, struct sp_session *session);

View File

@ -240,14 +240,6 @@ ap_resolve(char **address, unsigned short *port)
return ret; return ret;
} }
static bool
is_handshake(enum sp_msg_type type)
{
return ( type == MSG_TYPE_CLIENT_HELLO ||
type == MSG_TYPE_CLIENT_RESPONSE_PLAINTEXT ||
type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED );
}
static void static void
connection_clear(struct sp_connection *conn) connection_clear(struct sp_connection *conn)
{ {
@ -293,7 +285,7 @@ connection_idle_cb(int fd, short what, void *arg)
} }
static int static int
connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct sp_session *session) connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, void *response_cb_arg)
{ {
int response_fd; int response_fd;
int ret; int ret;
@ -315,7 +307,7 @@ connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct
#endif #endif
conn->response_fd = response_fd; conn->response_fd = response_fd;
conn->response_ev = event_new(cb->evbase, response_fd, EV_READ | EV_PERSIST, cb->response_cb, session); conn->response_ev = event_new(cb->evbase, response_fd, EV_READ | EV_PERSIST, cb->response_cb, response_cb_arg);
conn->timeout_ev = evtimer_new(cb->evbase, cb->timeout_cb, conn); conn->timeout_ev = evtimer_new(cb->evbase, cb->timeout_cb, conn);
conn->idle_ev = evtimer_new(cb->evbase, connection_idle_cb, conn); conn->idle_ev = evtimer_new(cb->evbase, connection_idle_cb, conn);
@ -338,24 +330,36 @@ connection_make(struct sp_connection *conn, struct sp_conn_callbacks *cb, struct
} }
enum sp_error enum sp_error
ap_connect(enum sp_msg_type type, struct sp_conn_callbacks *cb, struct sp_session *session) ap_connect(struct sp_connection *conn, enum sp_msg_type type, time_t *cooldown_ts, struct sp_conn_callbacks *cb, void *cb_arg)
{ {
int ret; int ret;
time_t now;
if (!session->conn.is_connected) if (!conn->is_connected)
{ {
ret = connection_make(&session->conn, cb, session); // Protection against flooding the access points with reconnection attempts
// Note that cooldown_ts can't be part of the connection struct because
// the struct is reset between connection attempts.
now = time(NULL);
if (now > *cooldown_ts + SP_AP_COOLDOWN_SECS) // Last attempt was a long time ago
*cooldown_ts = now;
else if (now >= *cooldown_ts) // Last attempt was recent, so disallow more attempts for a while
*cooldown_ts = now + SP_AP_COOLDOWN_SECS;
else
RETURN_ERROR(SP_ERR_NOCONNECTION, "Cannot connect to access point, cooldown after disconnect is in effect");
ret = connection_make(conn, cb, cb_arg);
if (ret < 0) if (ret < 0)
RETURN_ERROR(ret, sp_errmsg); RETURN_ERROR(ret, sp_errmsg);
} }
if (is_handshake(type) || session->conn.handshake_completed) if (msg_is_handshake(type) || conn->handshake_completed)
return SP_OK_DONE; // Proceed right away return SP_OK_DONE; // Proceed right away
return SP_OK_WAIT; // Caller must login again return SP_OK_WAIT; // Caller must login again
error: error:
ap_disconnect(&session->conn); ap_disconnect(conn);
return ret; return ret;
} }
@ -590,6 +594,9 @@ response_apwelcome(uint8_t *payload, size_t payload_len, struct sp_session *sess
session->credentials.stored_cred_len = apwelcome->reusable_auth_credentials.len; session->credentials.stored_cred_len = apwelcome->reusable_auth_credentials.len;
memcpy(session->credentials.stored_cred, apwelcome->reusable_auth_credentials.data, session->credentials.stored_cred_len); memcpy(session->credentials.stored_cred, apwelcome->reusable_auth_credentials.data, session->credentials.stored_cred_len);
// No need for this any more
memset(session->credentials.password, 0, sizeof(session->credentials.password));
} }
apwelcome__free_unpacked(apwelcome, NULL); apwelcome__free_unpacked(apwelcome, NULL);
@ -1259,6 +1266,14 @@ msg_make_chunk_request(uint8_t *out, size_t out_len, struct sp_session *session)
return required_len; return required_len;
} }
bool
msg_is_handshake(enum sp_msg_type type)
{
return ( type == MSG_TYPE_CLIENT_HELLO ||
type == MSG_TYPE_CLIENT_RESPONSE_PLAINTEXT ||
type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED );
}
int int
msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session) msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session)
{ {

View File

@ -2,11 +2,14 @@ void
ap_disconnect(struct sp_connection *conn); ap_disconnect(struct sp_connection *conn);
enum sp_error enum sp_error
ap_connect(enum sp_msg_type type, struct sp_conn_callbacks *cb, struct sp_session *session); ap_connect(struct sp_connection *conn, enum sp_msg_type type, time_t *cooldown_ts, struct sp_conn_callbacks *cb, void *cb_arg);
enum sp_error enum sp_error
response_read(struct sp_session *session); response_read(struct sp_session *session);
bool
msg_is_handshake(enum sp_msg_type type);
int int
msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session); msg_make(struct sp_message *msg, enum sp_msg_type type, struct sp_session *session);

View File

@ -8,6 +8,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdbool.h> #include <stdbool.h>
#include <unistd.h> #include <unistd.h>
#include <time.h>
#include <event2/event.h> #include <event2/event.h>
#include <event2/buffer.h> #include <event2/buffer.h>
@ -43,6 +44,10 @@
// Max wait for AP to respond // Max wait for AP to respond
#define SP_AP_TIMEOUT_SECS 10 #define SP_AP_TIMEOUT_SECS 10
// After a disconnect we try to reconnect, but if we are disconnected yet again
// we get the hint and won't try reconnecting again until after this cooldown
#define SP_AP_COOLDOWN_SECS 30
// If client hasn't requested anything in particular // If client hasn't requested anything in particular
#define SP_BITRATE_DEFAULT SP_BITRATE_320 #define SP_BITRATE_DEFAULT SP_BITRATE_320
@ -106,6 +111,15 @@ enum sp_media_type
SP_MEDIA_EPISODE, SP_MEDIA_EPISODE,
}; };
enum sp_channel_state
{
SP_CHANNEL_STATE_UNALLOCATED = 0,
SP_CHANNEL_STATE_OPENED,
SP_CHANNEL_STATE_PLAYING,
SP_CHANNEL_STATE_PAUSED,
SP_CHANNEL_STATE_STOPPED,
};
// From librespot-golang // From librespot-golang
enum sp_cmd_type enum sp_cmd_type
{ {
@ -275,8 +289,8 @@ struct sp_channel
{ {
int id; int id;
bool is_allocated; enum sp_channel_state state;
bool is_writing;
bool is_data_mode; bool is_data_mode;
bool is_spotify_header_received; bool is_spotify_header_received;
size_t seek_pos; size_t seek_pos;
@ -306,6 +320,7 @@ struct sp_channel
struct sp_session struct sp_session
{ {
struct sp_connection conn; struct sp_connection conn;
time_t cooldown_ts;
bool is_logged_in; bool is_logged_in;
struct sp_credentials credentials; struct sp_credentials credentials;
@ -322,9 +337,10 @@ struct sp_session
// Go to next step in a request sequence // Go to next step in a request sequence
struct event *continue_ev; struct event *continue_ev;
// Current (or last) message being processed // Current, next and subsequent message being processed
enum sp_msg_type msg_type_queued; enum sp_msg_type msg_type_last;
enum sp_msg_type msg_type_next; enum sp_msg_type msg_type_next;
enum sp_msg_type msg_type_queued;
int (*response_handler)(uint8_t *, size_t, struct sp_session *); int (*response_handler)(uint8_t *, size_t, struct sp_session *);
struct sp_session *next; struct sp_session *next;

View File

@ -54,11 +54,7 @@ events for proceeding are activated directly.
#include "connection.h" #include "connection.h"
#include "channel.h" #include "channel.h"
/* TODO list // #define DEBUG_DISCONNECT 1
- protect against DOS
*/
/* -------------------------------- Globals --------------------------------- */ /* -------------------------------- Globals --------------------------------- */
@ -77,6 +73,9 @@ static struct commands_base *sp_cmdbase;
static struct timeval sp_response_timeout_tv = { SP_AP_TIMEOUT_SECS, 0 }; static struct timeval sp_response_timeout_tv = { SP_AP_TIMEOUT_SECS, 0 };
#ifdef DEBUG_DISCONNECT
static int debug_disconnect_counter;
#endif
// Forwards // Forwards
static int static int
@ -212,7 +211,7 @@ session_return(struct sp_session *session, enum sp_error err)
{ {
// track_write() completed, close the write end which means reader will // track_write() completed, close the write end which means reader will
// get an EOF // get an EOF
if (channel && channel->is_writing && err == SP_OK_DONE) if (channel && channel->state == SP_CHANNEL_STATE_PLAYING && err == SP_OK_DONE)
channel_stop(channel); channel_stop(channel);
return; return;
} }
@ -241,6 +240,29 @@ session_error(struct sp_session *session, enum sp_error err)
ap_disconnect(&session->conn); ap_disconnect(&session->conn);
} }
// Called if an access point disconnects. Will clear current connection and
// start a flow where the same request will be made to another access point.
static void
session_retry(struct sp_session *session)
{
struct sp_channel *channel = session->now_streaming_channel;
enum sp_msg_type type = session->msg_type_last;
int ret;
sp_cb.logmsg("Retrying after disconnect (occurred at msg %d)\n", type);
channel_retry(channel);
ap_disconnect(&session->conn);
// If we were in the middle of a handshake when disconnected we must restart
if (msg_is_handshake(type))
type = MSG_TYPE_CLIENT_HELLO;
ret = request_make(type, session);
if (ret < 0)
session_error(session, ret);
}
/* ------------------------ Main sequence control --------------------------- */ /* ------------------------ Main sequence control --------------------------- */
@ -334,6 +356,15 @@ response_cb(int fd, short what, void *arg)
if (what == EV_READ) if (what == EV_READ)
{ {
ret = evbuffer_read(conn->incoming, fd, -1); ret = evbuffer_read(conn->incoming, fd, -1);
#ifdef DEBUG_DISCONNECT
debug_disconnect_counter++;
if (debug_disconnect_counter == 1000)
{
sp_cb.logmsg("Simulating a disconnection from the access point (last request type was %d)\n", session->msg_type_last);
ret = 0;
}
#endif
if (ret == 0) if (ret == 0)
RETURN_ERROR(SP_ERR_NOCONNECTION, "The access point disconnected"); RETURN_ERROR(SP_ERR_NOCONNECTION, "The access point disconnected");
else if (ret < 0) else if (ret < 0)
@ -348,7 +379,7 @@ response_cb(int fd, short what, void *arg)
case SP_OK_WAIT: // Incomplete, wait for more data case SP_OK_WAIT: // Incomplete, wait for more data
break; break;
case SP_OK_DATA: case SP_OK_DATA:
if (channel->is_writing && !channel->file.end_of_file) if (channel->state == SP_CHANNEL_STATE_PLAYING && !channel->file.end_of_file)
session->msg_type_next = MSG_TYPE_CHUNK_REQUEST; session->msg_type_next = MSG_TYPE_CHUNK_REQUEST;
if (channel->progress_cb) if (channel->progress_cb)
channel->progress_cb(channel->audio_fd[0], channel->cb_arg, 4 * channel->file.received_words - SP_OGG_HEADER_LEN, 4 * channel->file.len_words - SP_OGG_HEADER_LEN); channel->progress_cb(channel->audio_fd[0], channel->cb_arg, 4 * channel->file.received_words - SP_OGG_HEADER_LEN, 4 * channel->file.len_words - SP_OGG_HEADER_LEN);
@ -375,6 +406,9 @@ response_cb(int fd, short what, void *arg)
return; return;
error: error:
if (ret == SP_ERR_NOCONNECTION)
session_retry(session);
else
session_error(session, ret); session_error(session, ret);
} }
@ -383,9 +417,6 @@ relogin(enum sp_msg_type type, struct sp_session *session)
{ {
int ret; int ret;
if (session->msg_type_queued != MSG_TYPE_NONE)
RETURN_ERROR(SP_ERR_NOCONNECTION, "Cannot send message, another request is waiting for handshake");
ret = request_make(MSG_TYPE_CLIENT_HELLO, session); ret = request_make(MSG_TYPE_CLIENT_HELLO, session);
if (ret < 0) if (ret < 0)
RETURN_ERROR(ret, sp_errmsg); RETURN_ERROR(ret, sp_errmsg);
@ -408,16 +439,16 @@ request_make(enum sp_msg_type type, struct sp_session *session)
struct sp_conn_callbacks cb = { sp_evbase, response_cb, timeout_cb }; struct sp_conn_callbacks cb = { sp_evbase, response_cb, timeout_cb };
int ret; int ret;
// sp_cb.logmsg("Making request %d\n", type);
// Make sure the connection is in a state suitable for sending this message // Make sure the connection is in a state suitable for sending this message
ret = ap_connect(type, &cb, session); ret = ap_connect(&session->conn, type, &session->cooldown_ts, &cb, session);
if (ret == SP_OK_WAIT) if (ret == SP_OK_WAIT)
return relogin(type, session); // Can't proceed right now, the handshake needs to complete first return relogin(type, session); // Can't proceed right now, the handshake needs to complete first
else if (ret < 0) else if (ret < 0)
RETURN_ERROR(ret, sp_errmsg); RETURN_ERROR(ret, sp_errmsg);
ret = msg_make(&msg, type, session); ret = msg_make(&msg, type, session);
if (type == MSG_TYPE_CLIENT_RESPONSE_ENCRYPTED)
memset(session->credentials.password, 0, sizeof(session->credentials.password));
if (ret < 0) if (ret < 0)
RETURN_ERROR(SP_ERR_INVALID, "Error constructing message to Spotify"); RETURN_ERROR(SP_ERR_INVALID, "Error constructing message to Spotify");
@ -435,6 +466,7 @@ request_make(enum sp_msg_type type, struct sp_session *session)
else else
event_active(session->continue_ev, 0, 0); event_active(session->continue_ev, 0, 0);
session->msg_type_last = type;
session->msg_type_next = msg.type_next; session->msg_type_next = msg.type_next;
session->response_handler = msg.response_handler; session->response_handler = msg.response_handler;
@ -463,7 +495,7 @@ track_write(void *arg, int *retval)
RETURN_ERROR(SP_ERR_NOSESSION, "Cannot play track, no valid session found"); RETURN_ERROR(SP_ERR_NOSESSION, "Cannot play track, no valid session found");
channel = session->now_streaming_channel; channel = session->now_streaming_channel;
if (!channel || !channel->is_allocated) if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED)
RETURN_ERROR(SP_ERR_INVALID, "No active channel to play, has track been opened?"); RETURN_ERROR(SP_ERR_INVALID, "No active channel to play, has track been opened?");
channel_play(channel); channel_play(channel);
@ -496,19 +528,20 @@ track_pause(void *arg, int *retval)
RETURN_ERROR(SP_ERR_NOSESSION, "Cannot pause track, no valid session found"); RETURN_ERROR(SP_ERR_NOSESSION, "Cannot pause track, no valid session found");
channel = session->now_streaming_channel; channel = session->now_streaming_channel;
if (!channel || !channel->is_allocated) if (!channel || channel->state == SP_CHANNEL_STATE_UNALLOCATED)
RETURN_ERROR(SP_ERR_INVALID, "No active channel to pause, has track been opened?"); RETURN_ERROR(SP_ERR_INVALID, "No active channel to pause, has track been opened?");
// If we are playing we are in the process of downloading a chunk, and in that // If we are playing we are in the process of downloading a chunk, and in that
// case we need that to complete before doing anything else with the channel, // case we need that to complete before doing anything else with the channel,
// e.g. reset it as track_close() does. // e.g. reset it as track_close() does.
if (!channel->is_writing) if (channel->state != SP_CHANNEL_STATE_PLAYING)
{ {
*retval = 0; *retval = 0;
return COMMAND_END; return COMMAND_END;
} }
channel_pause(channel); channel_pause(channel);
session->msg_type_next = MSG_TYPE_NONE;
*retval = 1; *retval = 1;
return COMMAND_PENDING; return COMMAND_PENDING;
@ -531,9 +564,9 @@ track_seek(void *arg, int *retval)
RETURN_ERROR(SP_ERR_NOSESSION, "Cannot seek, no valid session found"); RETURN_ERROR(SP_ERR_NOSESSION, "Cannot seek, no valid session found");
channel = session->now_streaming_channel; channel = session->now_streaming_channel;
if (!channel || !channel->is_allocated) if (!channel)
RETURN_ERROR(SP_ERR_INVALID, "No active channel to seek, has track been opened?"); RETURN_ERROR(SP_ERR_INVALID, "No active channel to seek, has track been opened?");
else if (channel->is_writing) else if (channel->state != SP_CHANNEL_STATE_OPENED)
RETURN_ERROR(SP_ERR_INVALID, "Seeking during playback not currently supported"); RETURN_ERROR(SP_ERR_INVALID, "Seeking during playback not currently supported");
// This operation is not safe during chunk downloading because it changes the // This operation is not safe during chunk downloading because it changes the