Improve the websocket to send events to clients immediately.

This commit is contained in:
martin 2021-05-13 22:54:11 +02:00
parent 520a1251ec
commit a15edfcae9
3 changed files with 107 additions and 77 deletions

View File

@ -269,7 +269,7 @@ AM_CONDITIONAL([COND_PULSEAUDIO], [[test "x$with_pulseaudio" = "xyes"]])
dnl Build with libwebsockets dnl Build with libwebsockets
OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [libwebsockets support], [libwebsockets], [LIBWEBSOCKETS], OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [libwebsockets support], [libwebsockets], [LIBWEBSOCKETS],
[libwebsockets >= 2.0.2]) [libwebsockets >= 3.0.0])
AM_CONDITIONAL([COND_LIBWEBSOCKETS], [[test "x$with_libwebsockets" = "xyes"]]) AM_CONDITIONAL([COND_LIBWEBSOCKETS], [[test "x$with_libwebsockets" = "xyes"]])
dnl Build with libevent_pthreads dnl Build with libevent_pthreads

View File

@ -43,12 +43,10 @@ static const char *websocket_interface;
static int websocket_port; static int websocket_port;
static bool websocket_exit = false; static bool websocket_exit = false;
// Event mask of events to notify websocket clients // Lock the event mask of events processed by the writeable callback
static short websocket_events; static pthread_mutex_t websocket_write_event_lock;
// Event mask of events processed by the writeable callback // Event mask of events processed by the writeable callback
static short websocket_write_events; static short websocket_write_events;
// Counter for events to keep track of when to write
static unsigned short websocket_write_events_counter;
@ -56,8 +54,11 @@ static unsigned short websocket_write_events_counter;
static void static void
listener_cb(short event_mask) listener_cb(short event_mask)
{ {
// Add event to the event mask, clients will be notified at the next break of the libwebsockets service loop DPRINTF(E_INFO, L_WEB, "listener_cb: %d\n", event_mask);
websocket_events |= event_mask; pthread_mutex_lock(&websocket_write_event_lock);
websocket_write_events |= event_mask;
pthread_mutex_unlock(&websocket_write_event_lock);
lws_cancel_service(context);
} }
/* /*
@ -77,10 +78,16 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, voi
* The client sends the events it wants to be notified of and the event mask is * The client sends the events it wants to be notified of and the event mask is
* set accordingly translating them to the LISTENER enum (see listener.h) * set accordingly translating them to the LISTENER enum (see listener.h)
*/ */
struct ws_session_data_notify struct per_session_data {
{ struct per_session_data *pss_list;
short events; struct lws *wsi;
unsigned short counter; // to keep track of whether this user has already written short requested_events;
short write_events;
};
/* one of these is created for each vhost our protocol is used with */
struct per_vhost_data {
struct per_session_data *pss_list; /* linked-list of live pss*/
}; };
/* /*
@ -93,7 +100,7 @@ struct ws_session_data_notify
* } * }
*/ */
static int static int
process_notify_request(struct ws_session_data_notify *session_data, void *in, size_t len) process_notify_request(short *requested_events, void *in, size_t len)
{ {
json_tokener *tokener; json_tokener *tokener;
json_object *request; json_object *request;
@ -103,7 +110,7 @@ process_notify_request(struct ws_session_data_notify *session_data, void *in, si
json_object *needle; json_object *needle;
const char *event_type; const char *event_type;
memset(session_data, 0, sizeof(struct ws_session_data_notify)); *requested_events = 0;
tokener = json_tokener_new(); tokener = json_tokener_new();
request = json_tokener_parse_ex(tokener, in, len); request = json_tokener_parse_ex(tokener, in, len);
@ -132,43 +139,43 @@ process_notify_request(struct ws_session_data_notify *session_data, void *in, si
if (0 == strcmp(event_type, "update")) if (0 == strcmp(event_type, "update"))
{ {
session_data->events |= LISTENER_UPDATE; *requested_events |= LISTENER_UPDATE;
} }
else if (0 == strcmp(event_type, "database")) else if (0 == strcmp(event_type, "database"))
{ {
session_data->events |= LISTENER_DATABASE; *requested_events |= LISTENER_DATABASE;
} }
else if (0 == strcmp(event_type, "pairing")) else if (0 == strcmp(event_type, "pairing"))
{ {
session_data->events |= LISTENER_PAIRING; *requested_events |= LISTENER_PAIRING;
} }
else if (0 == strcmp(event_type, "spotify")) else if (0 == strcmp(event_type, "spotify"))
{ {
session_data->events |= LISTENER_SPOTIFY; *requested_events |= LISTENER_SPOTIFY;
} }
else if (0 == strcmp(event_type, "lastfm")) else if (0 == strcmp(event_type, "lastfm"))
{ {
session_data->events |= LISTENER_LASTFM; *requested_events |= LISTENER_LASTFM;
} }
else if (0 == strcmp(event_type, "ouputs")) else if (0 == strcmp(event_type, "ouputs"))
{ {
session_data->events |= LISTENER_SPEAKER; *requested_events |= LISTENER_SPEAKER;
} }
else if (0 == strcmp(event_type, "player")) else if (0 == strcmp(event_type, "player"))
{ {
session_data->events |= LISTENER_PLAYER; *requested_events |= LISTENER_PLAYER;
} }
else if (0 == strcmp(event_type, "options")) else if (0 == strcmp(event_type, "options"))
{ {
session_data->events |= LISTENER_OPTIONS; *requested_events |= LISTENER_OPTIONS;
} }
else if (0 == strcmp(event_type, "volume")) else if (0 == strcmp(event_type, "volume"))
{ {
session_data->events |= LISTENER_VOLUME; *requested_events |= LISTENER_VOLUME;
} }
else if (0 == strcmp(event_type, "queue")) else if (0 == strcmp(event_type, "queue"))
{ {
session_data->events |= LISTENER_QUEUE; *requested_events |= LISTENER_QUEUE;
} }
} }
} }
@ -197,6 +204,8 @@ send_notify_reply(short events, struct lws* wsi)
json_object* reply; json_object* reply;
json_object* notify; json_object* notify;
DPRINTF(E_DBG, L_WEB, "notify callback reply: %d\n", events);
notify = json_object_new_array(); notify = json_object_new_array();
if (events & LISTENER_UPDATE) if (events & LISTENER_UPDATE)
{ {
@ -258,46 +267,72 @@ send_notify_reply(short events, struct lws* wsi)
static int static int
callback_notify(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) callback_notify(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
{ {
struct ws_session_data_notify *session_data = user; struct per_session_data *pss = user;
struct per_vhost_data *vhd = lws_protocol_vh_priv_get(lws_get_vhost(wsi), lws_get_protocol(wsi));
short events = 0;
int ret = 0; int ret = 0;
DPRINTF(E_DBG, L_WEB, "notify callback reason: %d\n", reason); DPRINTF(E_DBG, L_WEB, "notify callback reason: %d\n", reason);
switch (reason) switch (reason)
{ {
case LWS_CALLBACK_ESTABLISHED: case LWS_CALLBACK_PROTOCOL_INIT:
// Initialize session data for new connections vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
memset(session_data, 0, sizeof(struct ws_session_data_notify)); lws_get_protocol(wsi),
session_data->counter = websocket_write_events_counter; sizeof(struct per_vhost_data));
break; if (!vhd)
{
DPRINTF(E_LOG, L_WEB, "Failed to allocate websocket per-vhoststorage\n");
return 1;
}
break;
case LWS_CALLBACK_RECEIVE: case LWS_CALLBACK_ESTABLISHED:
ret = process_notify_request(session_data, in, len); /* add ourselves to the list of live pss held in the vhd */
break; lws_ll_fwd_insert(pss, pss_list, vhd->pss_list);
pss->wsi = wsi;
break;
case LWS_CALLBACK_SERVER_WRITEABLE: case LWS_CALLBACK_CLOSED:
if (websocket_write_events && (websocket_write_events_counter != session_data->counter)) /* remove our closing pss from the list of live pss */
{ lws_ll_fwd_remove(struct per_session_data, pss_list, pss, vhd->pss_list);
send_notify_reply(websocket_write_events, wsi); break;
session_data->counter = websocket_write_events_counter;
}
break;
default: case LWS_CALLBACK_SERVER_WRITEABLE:
break; if (pss->requested_events & pss->write_events)
} {
events = pss->requested_events & pss->write_events;
send_notify_reply(events, wsi);
pss->write_events = 0;
}
break;
case LWS_CALLBACK_RECEIVE:
ret = process_notify_request(&pss->requested_events, in, len);
break;
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
if (vhd)
{
pthread_mutex_lock(&websocket_write_event_lock);
events = websocket_write_events;
websocket_write_events = 0;
pthread_mutex_unlock(&websocket_write_event_lock);
lws_start_foreach_llp(struct per_session_data **, ppss, vhd->pss_list)
{
(*ppss)->write_events = events;
lws_callback_on_writable((*ppss)->wsi);
} lws_end_foreach_llp(ppss, pss_list);
}
break;
default:
break;
}
return ret; return ret;
} }
/*
* Supported protocols of the websocket, needs to be in line with the protocols array
*/
enum ws_protocols
{
WS_PROTOCOL_HTTP = 0,
WS_PROTOCOL_NOTIFY,
};
static struct lws_protocols protocols[] = static struct lws_protocols protocols[] =
{ {
// The first protocol must always be the HTTP handler // The first protocol must always be the HTTP handler
@ -310,7 +345,7 @@ static struct lws_protocols protocols[] =
{ {
"notify", "notify",
callback_notify, callback_notify,
sizeof(struct ws_session_data_notify), sizeof(struct per_session_data),
0, 0,
}, },
{ NULL, NULL, 0, 0 } // terminator { NULL, NULL, 0, 0 } // terminator
@ -322,19 +357,13 @@ static void *
websocket(void *arg) websocket(void *arg)
{ {
listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM | LISTENER_SPEAKER listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM | LISTENER_SPEAKER
| LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE); | LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE);
while(!websocket_exit) while(!websocket_exit)
{ {
lws_service(context, 1000); if (lws_service(context, 0))
if (websocket_events) websocket_exit = true;
{ }
websocket_write_events = websocket_events;
websocket_write_events_counter++;
websocket_events = 0;
lws_callback_on_writable_all_protocol(context, &protocols[WS_PROTOCOL_NOTIFY]);
}
}
lws_context_destroy(context); lws_context_destroy(context);
pthread_exit(NULL); pthread_exit(NULL);
@ -348,25 +377,25 @@ logger_libwebsockets(int level, const char *line)
switch (level) switch (level)
{ {
case LLL_ERR: case LLL_ERR:
severity = E_LOG; severity = E_LOG;
break; break;
case LLL_WARN: case LLL_WARN:
severity = E_WARN; severity = E_WARN;
break; break;
case LLL_NOTICE: case LLL_NOTICE:
severity = E_DBG; severity = E_DBG;
break; break;
case LLL_INFO: case LLL_INFO:
case LLL_DEBUG: case LLL_DEBUG:
severity = E_SPAM; severity = E_SPAM;
break; break;
default: default:
severity = E_LOG; severity = E_LOG;
break; break;
} }
DPRINTF(severity, L_WEB, "LWS %s", line); DPRINTF(severity, L_WEB, "LWS %s", line);
@ -401,7 +430,7 @@ websocket_init(void)
// Log levels below NOTICE are only emmited if libwebsockets was built with DEBUG defined // Log levels below NOTICE are only emmited if libwebsockets was built with DEBUG defined
lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG,
logger_libwebsockets); logger_libwebsockets);
context = lws_create_context(&info); context = lws_create_context(&info);
if (context == NULL) if (context == NULL)
@ -410,7 +439,8 @@ websocket_init(void)
return -1; return -1;
} }
websocket_write_events_counter = 0; pthread_mutex_init(&websocket_write_event_lock, NULL);
websocket_write_events = 0;
ret = pthread_create(&tid_websocket, NULL, websocket, NULL); ret = pthread_create(&tid_websocket, NULL, websocket, NULL);
if (ret < 0) if (ret < 0)
{ {

View File

@ -127,7 +127,7 @@ export default {
socket.onopen = function () { socket.onopen = function () {
vm.$store.dispatch('add_notification', { text: 'Connection to server established', type: 'primary', topic: 'connection', timeout: 2000 }) vm.$store.dispatch('add_notification', { text: 'Connection to server established', type: 'primary', topic: 'connection', timeout: 2000 })
vm.reconnect_attempts = 0 vm.reconnect_attempts = 0
socket.send(JSON.stringify({ notify: ['update', 'database', 'player', 'options', 'outputs', 'volume', 'spotify', 'lastfm', 'pairing'] })) socket.send(JSON.stringify({ notify: ['update', 'database', 'player', 'options', 'outputs', 'volume', 'queue', 'spotify', 'lastfm', 'pairing'] }))
vm.update_outputs() vm.update_outputs()
vm.update_player_status() vm.update_player_status()