From a15edfcae9536e7ee5899198b8f16c7a0ac1dec4 Mon Sep 17 00:00:00 2001 From: martin Date: Thu, 13 May 2021 22:54:11 +0200 Subject: [PATCH] Improve the websocket to send events to clients immediately. --- configure.ac | 2 +- src/websocket.c | 180 ++++++++++++++++++++++++++------------------ web-src/src/App.vue | 2 +- 3 files changed, 107 insertions(+), 77 deletions(-) diff --git a/configure.ac b/configure.ac index c97d745d..cc63ead2 100644 --- a/configure.ac +++ b/configure.ac @@ -269,7 +269,7 @@ AM_CONDITIONAL([COND_PULSEAUDIO], [[test "x$with_pulseaudio" = "xyes"]]) dnl Build with libwebsockets OWNTONE_ARG_WITH_CHECK([OWNTONE_OPTS], [libwebsockets support], [libwebsockets], [LIBWEBSOCKETS], - [libwebsockets >= 2.0.2]) + [libwebsockets >= 3.0.0]) AM_CONDITIONAL([COND_LIBWEBSOCKETS], [[test "x$with_libwebsockets" = "xyes"]]) dnl Build with libevent_pthreads diff --git a/src/websocket.c b/src/websocket.c index 0d531d9d..df2626c4 100644 --- a/src/websocket.c +++ b/src/websocket.c @@ -43,12 +43,10 @@ static const char *websocket_interface; static int websocket_port; static bool websocket_exit = false; -// Event mask of events to notify websocket clients -static short websocket_events; +// Lock the event mask of events processed by the writeable callback +static pthread_mutex_t websocket_write_event_lock; // Event mask of events processed by the writeable callback static short websocket_write_events; -// Counter for events to keep track of when to write -static unsigned short websocket_write_events_counter; @@ -56,8 +54,11 @@ static unsigned short websocket_write_events_counter; static void listener_cb(short event_mask) { - // Add event to the event mask, clients will be notified at the next break of the libwebsockets service loop - websocket_events |= event_mask; + DPRINTF(E_INFO, L_WEB, "listener_cb: %d\n", event_mask); + pthread_mutex_lock(&websocket_write_event_lock); + websocket_write_events |= event_mask; + pthread_mutex_unlock(&websocket_write_event_lock); + lws_cancel_service(context); } /* @@ -77,10 +78,16 @@ callback_http(struct lws *wsi, enum lws_callback_reasons reason, void *user, voi * The client sends the events it wants to be notified of and the event mask is * set accordingly translating them to the LISTENER enum (see listener.h) */ -struct ws_session_data_notify -{ - short events; - unsigned short counter; // to keep track of whether this user has already written +struct per_session_data { + struct per_session_data *pss_list; + struct lws *wsi; + short requested_events; + short write_events; +}; + +/* one of these is created for each vhost our protocol is used with */ +struct per_vhost_data { + struct per_session_data *pss_list; /* linked-list of live pss*/ }; /* @@ -93,7 +100,7 @@ struct ws_session_data_notify * } */ static int -process_notify_request(struct ws_session_data_notify *session_data, void *in, size_t len) +process_notify_request(short *requested_events, void *in, size_t len) { json_tokener *tokener; json_object *request; @@ -103,7 +110,7 @@ process_notify_request(struct ws_session_data_notify *session_data, void *in, si json_object *needle; const char *event_type; - memset(session_data, 0, sizeof(struct ws_session_data_notify)); + *requested_events = 0; tokener = json_tokener_new(); request = json_tokener_parse_ex(tokener, in, len); @@ -132,43 +139,43 @@ process_notify_request(struct ws_session_data_notify *session_data, void *in, si if (0 == strcmp(event_type, "update")) { - session_data->events |= LISTENER_UPDATE; + *requested_events |= LISTENER_UPDATE; } else if (0 == strcmp(event_type, "database")) { - session_data->events |= LISTENER_DATABASE; + *requested_events |= LISTENER_DATABASE; } else if (0 == strcmp(event_type, "pairing")) { - session_data->events |= LISTENER_PAIRING; + *requested_events |= LISTENER_PAIRING; } else if (0 == strcmp(event_type, "spotify")) { - session_data->events |= LISTENER_SPOTIFY; + *requested_events |= LISTENER_SPOTIFY; } else if (0 == strcmp(event_type, "lastfm")) { - session_data->events |= LISTENER_LASTFM; + *requested_events |= LISTENER_LASTFM; } else if (0 == strcmp(event_type, "ouputs")) { - session_data->events |= LISTENER_SPEAKER; + *requested_events |= LISTENER_SPEAKER; } else if (0 == strcmp(event_type, "player")) { - session_data->events |= LISTENER_PLAYER; + *requested_events |= LISTENER_PLAYER; } else if (0 == strcmp(event_type, "options")) { - session_data->events |= LISTENER_OPTIONS; + *requested_events |= LISTENER_OPTIONS; } else if (0 == strcmp(event_type, "volume")) { - session_data->events |= LISTENER_VOLUME; + *requested_events |= LISTENER_VOLUME; } else if (0 == strcmp(event_type, "queue")) { - session_data->events |= LISTENER_QUEUE; + *requested_events |= LISTENER_QUEUE; } } } @@ -197,6 +204,8 @@ send_notify_reply(short events, struct lws* wsi) json_object* reply; json_object* notify; + DPRINTF(E_DBG, L_WEB, "notify callback reply: %d\n", events); + notify = json_object_new_array(); if (events & LISTENER_UPDATE) { @@ -258,46 +267,72 @@ send_notify_reply(short events, struct lws* wsi) static int callback_notify(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) { - struct ws_session_data_notify *session_data = user; + struct per_session_data *pss = user; + struct per_vhost_data *vhd = lws_protocol_vh_priv_get(lws_get_vhost(wsi), lws_get_protocol(wsi)); + short events = 0; int ret = 0; DPRINTF(E_DBG, L_WEB, "notify callback reason: %d\n", reason); + switch (reason) - { - case LWS_CALLBACK_ESTABLISHED: - // Initialize session data for new connections - memset(session_data, 0, sizeof(struct ws_session_data_notify)); - session_data->counter = websocket_write_events_counter; - break; + { + case LWS_CALLBACK_PROTOCOL_INIT: + vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), + lws_get_protocol(wsi), + sizeof(struct per_vhost_data)); + if (!vhd) + { + DPRINTF(E_LOG, L_WEB, "Failed to allocate websocket per-vhoststorage\n"); + return 1; + } + break; - case LWS_CALLBACK_RECEIVE: - ret = process_notify_request(session_data, in, len); - break; + case LWS_CALLBACK_ESTABLISHED: + /* add ourselves to the list of live pss held in the vhd */ + lws_ll_fwd_insert(pss, pss_list, vhd->pss_list); + pss->wsi = wsi; + break; - case LWS_CALLBACK_SERVER_WRITEABLE: - if (websocket_write_events && (websocket_write_events_counter != session_data->counter)) - { - send_notify_reply(websocket_write_events, wsi); - session_data->counter = websocket_write_events_counter; - } - break; + case LWS_CALLBACK_CLOSED: + /* remove our closing pss from the list of live pss */ + lws_ll_fwd_remove(struct per_session_data, pss_list, pss, vhd->pss_list); + break; - default: - break; - } + case LWS_CALLBACK_SERVER_WRITEABLE: + if (pss->requested_events & pss->write_events) + { + events = pss->requested_events & pss->write_events; + send_notify_reply(events, wsi); + pss->write_events = 0; + } + break; + + case LWS_CALLBACK_RECEIVE: + ret = process_notify_request(&pss->requested_events, in, len); + break; + + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: + if (vhd) + { + pthread_mutex_lock(&websocket_write_event_lock); + events = websocket_write_events; + websocket_write_events = 0; + pthread_mutex_unlock(&websocket_write_event_lock); + lws_start_foreach_llp(struct per_session_data **, ppss, vhd->pss_list) + { + (*ppss)->write_events = events; + lws_callback_on_writable((*ppss)->wsi); + } lws_end_foreach_llp(ppss, pss_list); + } + break; + + default: + break; + } return ret; } -/* - * Supported protocols of the websocket, needs to be in line with the protocols array - */ -enum ws_protocols -{ - WS_PROTOCOL_HTTP = 0, - WS_PROTOCOL_NOTIFY, -}; - static struct lws_protocols protocols[] = { // The first protocol must always be the HTTP handler @@ -310,7 +345,7 @@ static struct lws_protocols protocols[] = { "notify", callback_notify, - sizeof(struct ws_session_data_notify), + sizeof(struct per_session_data), 0, }, { NULL, NULL, 0, 0 } // terminator @@ -322,19 +357,13 @@ static void * websocket(void *arg) { listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM | LISTENER_SPEAKER - | LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE); + | LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE); while(!websocket_exit) - { - lws_service(context, 1000); - if (websocket_events) - { - websocket_write_events = websocket_events; - websocket_write_events_counter++; - websocket_events = 0; - lws_callback_on_writable_all_protocol(context, &protocols[WS_PROTOCOL_NOTIFY]); - } - } + { + if (lws_service(context, 0)) + websocket_exit = true; + } lws_context_destroy(context); pthread_exit(NULL); @@ -348,25 +377,25 @@ logger_libwebsockets(int level, const char *line) switch (level) { case LLL_ERR: - severity = E_LOG; - break; + severity = E_LOG; + break; case LLL_WARN: - severity = E_WARN; - break; + severity = E_WARN; + break; case LLL_NOTICE: - severity = E_DBG; - break; + severity = E_DBG; + break; case LLL_INFO: case LLL_DEBUG: - severity = E_SPAM; - break; + severity = E_SPAM; + break; default: - severity = E_LOG; - break; + severity = E_LOG; + break; } DPRINTF(severity, L_WEB, "LWS %s", line); @@ -401,7 +430,7 @@ websocket_init(void) // Log levels below NOTICE are only emmited if libwebsockets was built with DEBUG defined lws_set_log_level(LLL_ERR | LLL_WARN | LLL_NOTICE | LLL_INFO | LLL_DEBUG, - logger_libwebsockets); + logger_libwebsockets); context = lws_create_context(&info); if (context == NULL) @@ -410,7 +439,8 @@ websocket_init(void) return -1; } - websocket_write_events_counter = 0; + pthread_mutex_init(&websocket_write_event_lock, NULL); + websocket_write_events = 0; ret = pthread_create(&tid_websocket, NULL, websocket, NULL); if (ret < 0) { diff --git a/web-src/src/App.vue b/web-src/src/App.vue index 33486be6..eca3056c 100644 --- a/web-src/src/App.vue +++ b/web-src/src/App.vue @@ -127,7 +127,7 @@ export default { socket.onopen = function () { vm.$store.dispatch('add_notification', { text: 'Connection to server established', type: 'primary', topic: 'connection', timeout: 2000 }) vm.reconnect_attempts = 0 - socket.send(JSON.stringify({ notify: ['update', 'database', 'player', 'options', 'outputs', 'volume', 'spotify', 'lastfm', 'pairing'] })) + socket.send(JSON.stringify({ notify: ['update', 'database', 'player', 'options', 'outputs', 'volume', 'queue', 'spotify', 'lastfm', 'pairing'] })) vm.update_outputs() vm.update_player_status()