Merge pull request #1818 from chme/feat/libevent-2.2-ws

[web] Support libevent as WS server instead of libwebsockets
This commit is contained in:
Christian Meffert 2025-01-18 07:12:12 +01:00 committed by GitHub
commit 8c2b44fc6c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 385 additions and 50 deletions

View File

@ -143,7 +143,13 @@ OWNTONE_MODULES_CHECK([COMMON], [SQLITE3], [sqlite3 >= 3.5.0],
]) ])
OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT], [libevent >= 2.1.4], OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT], [libevent >= 2.1.4],
[event_base_new], [event2/event.h]) [event_base_new], [event2/event.h],
[dnl check for version 2.2 (with websocket server support)
PKG_CHECK_EXISTS([libevent >= 2.2.1],
[AC_DEFINE([HAVE_LIBEVENT22], 1,
[Define to 1 if you have libevent > 2.2])],
[])
])
OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT_PTHREADS], [libevent_pthreads], OWNTONE_MODULES_CHECK([OWNTONE], [LIBEVENT_PTHREADS], [libevent_pthreads],
[evthread_use_pthreads], [event2/thread.h]) [evthread_use_pthreads], [event2/thread.h])

View File

@ -1307,7 +1307,7 @@ cache_database_update(void *arg, int *retval)
/* Callback from filescanner thread */ /* Callback from filescanner thread */
static void static void
cache_daap_listener_cb(short event_mask) cache_daap_listener_cb(short event_mask, void *ctx)
{ {
commands_exec_async(cmdbase, cache_database_update, NULL); commands_exec_async(cmdbase, cache_database_update, NULL);
} }
@ -1715,7 +1715,7 @@ cache(void *arg)
for (i = 0; i < ARRAY_SIZE(cache_xcode_jobs); i++) for (i = 0; i < ARRAY_SIZE(cache_xcode_jobs); i++)
CHECK_NULL(L_CACHE, cache_xcode_jobs[i].ev = evtimer_new(evbase_cache, cache_xcode_job_complete_cb, &cache_xcode_jobs[i])); CHECK_NULL(L_CACHE, cache_xcode_jobs[i].ev = evtimer_new(evbase_cache, cache_xcode_job_complete_cb, &cache_xcode_jobs[i]));
CHECK_ERR(L_CACHE, listener_add(cache_daap_listener_cb, LISTENER_DATABASE)); CHECK_ERR(L_CACHE, listener_add(cache_daap_listener_cb, LISTENER_DATABASE, NULL));
cache_is_initialized = 1; cache_is_initialized = 1;

View File

@ -971,7 +971,7 @@ speaker_update_handler_cb(void *arg)
// Thread: player (must not block) // Thread: player (must not block)
static void static void
httpd_speaker_update_handler(short event_mask) httpd_speaker_update_handler(short event_mask, void *ctx)
{ {
worker_execute(speaker_update_handler_cb, NULL, 0, 0); worker_execute(speaker_update_handler_cb, NULL, 0, 0);
} }
@ -1636,7 +1636,7 @@ httpd_init(const char *webroot)
// We need to know about speaker format changes so we can ask the cache to // We need to know about speaker format changes so we can ask the cache to
// start preparing headers for mp4/alac if selected // start preparing headers for mp4/alac if selected
listener_add(httpd_speaker_update_handler, LISTENER_SPEAKER); listener_add(httpd_speaker_update_handler, LISTENER_SPEAKER, NULL);
return 0; return 0;

View File

@ -787,7 +787,7 @@ update_fail_cb(void *arg)
/* Thread: player */ /* Thread: player */
static void static void
dacp_playstatus_update_handler(short event_mask) dacp_playstatus_update_handler(short event_mask, void *ctx)
{ {
struct dacp_update_request *ur; struct dacp_update_request *ur;
@ -2818,7 +2818,7 @@ dacp_init(void)
CHECK_ERR(L_DACP, mutex_init(&update_request_lck)); CHECK_ERR(L_DACP, mutex_init(&update_request_lck));
update_current_rev = 2; update_current_rev = 2;
listener_add(dacp_playstatus_update_handler, LISTENER_PLAYER | LISTENER_VOLUME | LISTENER_QUEUE); listener_add(dacp_playstatus_update_handler, LISTENER_PLAYER | LISTENER_VOLUME | LISTENER_QUEUE, NULL);
return 0; return 0;
} }

View File

@ -20,6 +20,7 @@
# include <config.h> # include <config.h>
#endif #endif
#include <json.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
@ -31,10 +32,15 @@
#include <event2/keyvalq_struct.h> #include <event2/keyvalq_struct.h>
#include <event2/buffer.h> #include <event2/buffer.h>
#include <event2/bufferevent.h> #include <event2/bufferevent.h>
#ifdef HAVE_LIBEVENT22
#include <event2/ws.h>
#endif
#include <pthread.h> #include <pthread.h>
#include "conffile.h"
#include "misc.h" #include "misc.h"
#include "listener.h"
#include "logger.h" #include "logger.h"
#include "commands.h" #include "commands.h"
#include "httpd_internal.h" #include "httpd_internal.h"
@ -94,6 +100,294 @@ static void
closecb_worker(evutil_socket_t fd, short event, void *arg); closecb_worker(evutil_socket_t fd, short event, void *arg);
#ifdef HAVE_LIBEVENT22
/*
* Each session of the "notify" protocol holds this event mask
*
* The client sends the events it wants to be notified of and the event mask is
* set accordingly translating them to the LISTENER enum (see listener.h)
*/
struct ws_client
{
struct evws_connection *evws;
char name[INET6_ADDRSTRLEN];
short requested_events;
struct ws_client *next;
};
static struct ws_client *ws_clients = NULL;
/*
* Notify clients of the notify-protocol about occurred events
*
* Sends a JSON message of the form:
*
* {
* "notify": [ "update" ]
* }
*/
static char *
ws_create_notify_reply(short events, short *requested_events)
{
char *json_response;
json_object *reply;
json_object *notify;
DPRINTF(E_DBG, L_WEB, "notify callback reply: %d\n", events);
notify = json_object_new_array();
if (events & LISTENER_UPDATE)
{
json_object_array_add(notify, json_object_new_string("update"));
}
if (events & LISTENER_DATABASE)
{
json_object_array_add(notify, json_object_new_string("database"));
}
if (events & LISTENER_PAIRING)
{
json_object_array_add(notify, json_object_new_string("pairing"));
}
if (events & LISTENER_SPOTIFY)
{
json_object_array_add(notify, json_object_new_string("spotify"));
}
if (events & LISTENER_LASTFM)
{
json_object_array_add(notify, json_object_new_string("lastfm"));
}
if (events & LISTENER_SPEAKER)
{
json_object_array_add(notify, json_object_new_string("outputs"));
}
if (events & LISTENER_PLAYER)
{
json_object_array_add(notify, json_object_new_string("player"));
}
if (events & LISTENER_OPTIONS)
{
json_object_array_add(notify, json_object_new_string("options"));
}
if (events & LISTENER_VOLUME)
{
json_object_array_add(notify, json_object_new_string("volume"));
}
if (events & LISTENER_QUEUE)
{
json_object_array_add(notify, json_object_new_string("queue"));
}
reply = json_object_new_object();
json_object_object_add(reply, "notify", notify);
json_response = strdup(json_object_to_json_string(reply));
json_object_put(reply);
return json_response;
}
/* Thread: library, player, etc. (the thread the event occurred) */
static enum command_state
ws_listener_cb(void *arg, int *ret)
{
struct ws_client *client = NULL;
char *reply = NULL;
short *event_mask = arg;
for (client = ws_clients; client; client = client->next)
{
reply = ws_create_notify_reply(*event_mask, &client->requested_events);
evws_send_text(client->evws, reply);
free(reply);
}
return COMMAND_END;
}
static void
listener_cb(short event_mask, void *ctx)
{
httpd_server *server = ctx;
commands_exec_sync(server->cmdbase, ws_listener_cb, NULL, &event_mask);
}
/*
* Processes client requests to the notify-protocol
*
* Expects the message in "in" to be a JSON string of the form:
*
* {
* "notify": [ "update" ]
* }
*/
static int
ws_process_notify_request(short *requested_events, const char *in, size_t len)
{
json_tokener *tokener;
json_object *request;
json_object *item;
int count, i;
enum json_tokener_error jerr;
json_object *needle;
const char *event_type;
*requested_events = 0;
tokener = json_tokener_new();
request = json_tokener_parse_ex(tokener, in, len);
jerr = json_tokener_get_error(tokener);
if (jerr != json_tokener_success)
{
DPRINTF(E_LOG, L_WEB, "Failed to parse incoming request: %s\n", json_tokener_error_desc(jerr));
json_tokener_free(tokener);
return -1;
}
DPRINTF(E_DBG, L_WEB, "notify callback request: %s\n", json_object_to_json_string(request));
if (json_object_object_get_ex(request, "notify", &needle) && json_object_get_type(needle) == json_type_array)
{
count = json_object_array_length(needle);
for (i = 0; i < count; i++)
{
item = json_object_array_get_idx(needle, i);
if (json_object_get_type(item) == json_type_string)
{
event_type = json_object_get_string(item);
DPRINTF(E_SPAM, L_WEB, "notify callback event received: %s\n", event_type);
if (0 == strcmp(event_type, "update"))
{
*requested_events |= LISTENER_UPDATE;
}
else if (0 == strcmp(event_type, "database"))
{
*requested_events |= LISTENER_DATABASE;
}
else if (0 == strcmp(event_type, "pairing"))
{
*requested_events |= LISTENER_PAIRING;
}
else if (0 == strcmp(event_type, "spotify"))
{
*requested_events |= LISTENER_SPOTIFY;
}
else if (0 == strcmp(event_type, "lastfm"))
{
*requested_events |= LISTENER_LASTFM;
}
else if (0 == strcmp(event_type, "outputs"))
{
*requested_events |= LISTENER_SPEAKER;
}
else if (0 == strcmp(event_type, "player"))
{
*requested_events |= LISTENER_PLAYER;
}
else if (0 == strcmp(event_type, "options"))
{
*requested_events |= LISTENER_OPTIONS;
}
else if (0 == strcmp(event_type, "volume"))
{
*requested_events |= LISTENER_VOLUME;
}
else if (0 == strcmp(event_type, "queue"))
{
*requested_events |= LISTENER_QUEUE;
}
}
}
}
json_tokener_free(tokener);
json_object_put(request);
return 0;
}
static void
ws_client_msg_cb(struct evws_connection *evws, int type, const unsigned char *data, size_t len, void *arg)
{
struct ws_client *self = arg;
const char *msg = (const char *)data;
ws_process_notify_request(&self->requested_events, msg, len);
}
static void
ws_client_close_cb(struct evws_connection *evws, void *arg)
{
struct ws_client *client = NULL;
struct ws_client *prev = NULL;
for (client = ws_clients; client && client != arg; client = ws_clients->next)
{
prev = client;
}
if (client)
{
if (prev)
prev->next = client->next;
else
ws_clients = client->next;
free(client);
}
}
static void
ws_gencb(struct evhttp_request *req, void *arg)
{
struct ws_client *client;
client = calloc(1, sizeof(*client));
client->evws = evws_new_session(req, ws_client_msg_cb, client, 0);
if (!client->evws)
{
free(client);
return;
}
evws_connection_set_closecb(client->evws, ws_client_close_cb, client);
client->next = ws_clients;
ws_clients = client;
}
static int
ws_init(httpd_server *server)
{
int websocket_port = cfg_getint(cfg_getsec(cfg, "general"), "websocket_port");
if (websocket_port > 0)
{
DPRINTF(E_DBG, L_WEB,
"Libevent websocket disabled, using libwebsockets instead. Set "
"websocket_port to 0 to enable it.\n");
return 0;
}
evhttp_set_cb(server->evhttp, "/ws", ws_gencb, NULL);
listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM
| LISTENER_SPEAKER | LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME
| LISTENER_QUEUE, server);
return 0;
}
static void
ws_deinit(void)
{
listener_remove(listener_cb);
}
#endif
const char * const char *
httpd_query_value_find(httpd_query *query, const char *key) httpd_query_value_find(httpd_query *query, const char *key)
{ {
@ -304,9 +598,11 @@ gencb_httpd(httpd_backend *backend, void *arg)
struct httpd_request *hreq; struct httpd_request *hreq;
struct bufferevent *bufev; struct bufferevent *bufev;
#ifndef HAVE_LIBEVENT22
// Clear the proxy request flag set by evhttp if the request URI was absolute. // Clear the proxy request flag set by evhttp if the request URI was absolute.
// It has side-effects on Connection: keep-alive // It has side-effects on Connection: keep-alive
backend->flags &= ~EVHTTP_PROXY_REQUEST; backend->flags &= ~EVHTTP_PROXY_REQUEST;
#endif
// This is a workaround for some versions of libevent (2.0 and 2.1) that don't // This is a workaround for some versions of libevent (2.0 and 2.1) that don't
// detect if the client hangs up, and thus don't clean up and never call the // detect if the client hangs up, and thus don't clean up and never call the
@ -341,7 +637,12 @@ httpd_server_free(httpd_server *server)
close(server->fd); close(server->fd);
if (server->evhttp) if (server->evhttp)
{
#ifdef HAVE_LIBEVENT22
ws_deinit();
#endif
evhttp_free(server->evhttp); evhttp_free(server->evhttp);
}
commands_base_free(server->cmdbase); commands_base_free(server->cmdbase);
free(server); free(server);
@ -374,6 +675,9 @@ httpd_server_new(struct event_base *evbase, unsigned short port, httpd_request_c
goto error; goto error;
evhttp_set_gencb(server->evhttp, gencb_httpd, server); evhttp_set_gencb(server->evhttp, gencb_httpd, server);
#ifdef HAVE_LIBEVENT22
ws_init(server);
#endif
return server; return server;

View File

@ -999,7 +999,7 @@ pipelist_create(void)
// the pipe thread to watch the pipes. If no pipes in library, it will shut down // the pipe thread to watch the pipes. If no pipes in library, it will shut down
// the pipe thread. // the pipe thread.
static void static void
pipe_listener_cb(short event_mask) pipe_listener_cb(short event_mask, void *ctx)
{ {
union pipe_arg *cmdarg; union pipe_arg *cmdarg;
@ -1146,8 +1146,8 @@ init(void)
pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart"); pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart");
if (pipe_autostart) if (pipe_autostart)
{ {
pipe_listener_cb(0); pipe_listener_cb(0, NULL);
CHECK_ERR(L_PLAYER, listener_add(pipe_listener_cb, LISTENER_DATABASE)); CHECK_ERR(L_PLAYER, listener_add(pipe_listener_cb, LISTENER_DATABASE, NULL));
} }
pipe_sample_rate = cfg_getint(cfg_getsec(cfg, "library"), "pipe_sample_rate"); pipe_sample_rate = cfg_getint(cfg_getsec(cfg, "library"), "pipe_sample_rate");

View File

@ -27,13 +27,14 @@ struct listener
{ {
notify notify_cb; notify notify_cb;
short events; short events;
void *ctx;
struct listener *next; struct listener *next;
}; };
struct listener *listener_list = NULL; struct listener *listener_list = NULL;
int int
listener_add(notify notify_cb, short events) listener_add(notify notify_cb, short events, void *ctx)
{ {
struct listener *listener; struct listener *listener;
@ -44,6 +45,7 @@ listener_add(notify notify_cb, short events)
} }
listener->notify_cb = notify_cb; listener->notify_cb = notify_cb;
listener->events = events; listener->events = events;
listener->ctx = ctx;
listener->next = listener_list; listener->next = listener_list;
listener_list = listener; listener_list = listener;
@ -88,7 +90,7 @@ listener_notify(short event_mask)
while (listener) while (listener)
{ {
if (event_mask & listener->events) if (event_mask & listener->events)
listener->notify_cb(event_mask & listener->events); listener->notify_cb(event_mask & listener->events, listener->ctx);
listener = listener->next; listener = listener->next;
} }
} }

View File

@ -30,7 +30,7 @@ enum listener_event_type
LISTENER_RATING = (1 << 11), LISTENER_RATING = (1 << 11),
}; };
typedef void (*notify)(short event_mask); typedef void (*notify)(short event_mask, void *ctx);
/* /*
* Registers the given callback function to the given event types. * Registers the given callback function to the given event types.
@ -39,10 +39,11 @@ typedef void (*notify)(short event_mask);
* @param notify_cb Callback function (should be a non-blocking function, * @param notify_cb Callback function (should be a non-blocking function,
* especially when the event is from the player) * especially when the event is from the player)
* @param event_mask Event mask, one or more of LISTENER_* * @param event_mask Event mask, one or more of LISTENER_*
* @param ctx Context will be passed to the notify callback
* @return 0 on success, -1 on failure * @return 0 on success, -1 on failure
*/ */
int int
listener_add(notify notify_cb, short event_mask); listener_add(notify notify_cb, short event_mask, void *ctx);
/* /*
* Removes the given callback function * Removes the given callback function

View File

@ -4117,7 +4117,7 @@ mpd_notify_idle(void *arg, int *retval)
} }
static void static void
mpd_listener_cb(short event_mask) mpd_listener_cb(short event_mask, void *ctx)
{ {
short *ptr; short *ptr;
@ -4381,7 +4381,7 @@ mpd_init(void)
thread_setname(tid_mpd, "mpd"); thread_setname(tid_mpd, "mpd");
mpd_clients = NULL; mpd_clients = NULL;
listener_add(mpd_listener_cb, MPD_ALL_IDLE_LISTENER_EVENTS); listener_add(mpd_listener_cb, MPD_ALL_IDLE_LISTENER_EVENTS, NULL);
return 0; return 0;

View File

@ -48,9 +48,9 @@ static pthread_mutex_t websocket_write_event_lock;
static short websocket_write_events; static short websocket_write_events;
/* Thread: library (the thread the event occurred) */ /* Thread: library, player, etc. (the thread the event occurred) */
static void static void
listener_cb(short event_mask) listener_cb(short event_mask, void *ctx)
{ {
pthread_mutex_lock(&websocket_write_event_lock); pthread_mutex_lock(&websocket_write_event_lock);
websocket_write_events |= event_mask; websocket_write_events |= event_mask;
@ -133,7 +133,7 @@ process_notify_request(short *requested_events, void *in, size_t len)
if (json_object_get_type(item) == json_type_string) if (json_object_get_type(item) == json_type_string)
{ {
event_type = json_object_get_string(item); event_type = json_object_get_string(item);
DPRINTF(E_DBG, L_WEB, "notify callback event received: %s\n", event_type); DPRINTF(E_SPAM, L_WEB, "notify callback event received: %s\n", event_type);
if (0 == strcmp(event_type, "update")) if (0 == strcmp(event_type, "update"))
{ {
@ -279,7 +279,7 @@ callback_notify(struct lws *wsi, enum lws_callback_reasons reason, void *user, v
short events = 0; short events = 0;
int ret = 0; int ret = 0;
DPRINTF(E_DBG, L_WEB, "notify callback reason: %d\n", reason); DPRINTF(E_SPAM, L_WEB, "notify callback reason: %d\n", reason);
switch (reason) switch (reason)
{ {
@ -416,7 +416,7 @@ static void *
websocket(void *arg) websocket(void *arg)
{ {
listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM | LISTENER_SPEAKER listener_add(listener_cb, LISTENER_UPDATE | LISTENER_DATABASE | LISTENER_PAIRING | LISTENER_SPOTIFY | LISTENER_LASTFM | LISTENER_SPEAKER
| LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE); | LISTENER_PLAYER | LISTENER_OPTIONS | LISTENER_VOLUME | LISTENER_QUEUE, NULL);
while(!websocket_exit) while(!websocket_exit)
{ {
@ -478,7 +478,11 @@ websocket_init(void)
if (websocket_port <= 0) if (websocket_port <= 0)
{ {
#ifdef HAVE_LIBEVENT22
DPRINTF(E_DBG, L_WEB, "Libwebsocket disabled, using libevent websocket instead. To enable it, set websocket_port in config to a valid port number.\n");
#else
DPRINTF(E_LOG, L_WEB, "Websocket disabled. To enable it, set websocket_port in config to a valid port number.\n"); DPRINTF(E_LOG, L_WEB, "Websocket disabled. To enable it, set websocket_port in config to a valid port number.\n");
#endif
return 0; return 0;
} }

View File

@ -156,34 +156,8 @@ export default {
}) })
}, },
open_ws() { open_ws() {
if (this.configurationStore.websocket_port <= 0) { const wsPort = this.configurationStore.websocket_port
this.notificationsStore.add({ const socket = wsPort <= 0 ? this.create_websocket() : this.create_websocket_with_port(wsPort)
text: this.$t('server.missing-port'),
type: 'danger'
})
return
}
let protocol = 'ws://'
if (window.location.protocol === 'https:') {
protocol = 'wss://'
}
let wsUrl = `${protocol}${window.location.hostname}:${this.configurationStore.websocket_port}`
if (import.meta.env.DEV && import.meta.env.VITE_OWNTONE_URL) {
/*
* If we are running in development mode, construct the websocket
* url from the host of the environment variable VITE_OWNTONE_URL
*/
const url = new URL(import.meta.env.VITE_OWNTONE_URL)
wsUrl = `${protocol}${url.hostname}:${this.configurationStore.websocket_port}`
}
const socket = new ReconnectingWebSocket(wsUrl, 'notify', {
maxReconnectInterval: 2000,
reconnectInterval: 1000
})
const vm = this const vm = this
socket.onopen = () => { socket.onopen = () => {
@ -283,6 +257,49 @@ export default {
} }
} }
}, },
create_websocket() {
// Create websocket connection on the same http connection
const wsHost = window.location.hostname
const wsPath = '/ws'
const wsPort = window.location.port
let wsProtocol = 'ws://'
if (window.location.protocol === 'https:') {
wsProtocol = 'wss://'
}
const wsUrl = `${wsProtocol}${wsHost}:${wsPort}${wsPath}`
const socket = new ReconnectingWebSocket(wsUrl, 'notify', {
maxReconnectInterval: 2000,
reconnectInterval: 1000
})
return socket
},
create_websocket_with_port(port) {
// Create websocket connection on a separate http port
let protocol = 'ws://'
if (window.location.protocol === 'https:') {
protocol = 'wss://'
}
let wsUrl = `${protocol}${window.location.hostname}:${port}`
if (import.meta.env.DEV && import.meta.env.VITE_OWNTONE_URL) {
/*
* If we are running in development mode, construct the websocket
* url from the host of the environment variable VITE_OWNTONE_URL
*/
const url = new URL(import.meta.env.VITE_OWNTONE_URL)
wsUrl = `${protocol}${url.hostname}:${port}`
}
const socket = new ReconnectingWebSocket(wsUrl, 'notify', {
maxReconnectInterval: 2000,
reconnectInterval: 1000
})
return socket
},
update_is_clipped() { update_is_clipped() {
if (this.show_burger_menu || this.show_player_menu) { if (this.show_burger_menu || this.show_player_menu) {
document.querySelector('html').classList.add('is-clipped') document.querySelector('html').classList.add('is-clipped')

View File

@ -32,6 +32,7 @@ export default defineConfig({
server: { server: {
proxy: { proxy: {
'/api': { target }, '/api': { target },
'/ws': { target, ws: true },
'/artwork': { target }, '/artwork': { target },
'/stream.mp3': { target } '/stream.mp3': { target }
} }