Merge pull request #441 from chme/mpdidle

[mpd] Notify MPD clients of library changes and fix idle command
This commit is contained in:
ejurgensen 2017-11-18 20:51:02 +01:00 committed by GitHub
commit e4568e5640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 411 additions and 293 deletions

View File

@ -873,7 +873,7 @@ cache_daap_update(void *arg, int *retval)
/* Callback from filescanner thread */
static void
cache_daap_listener_cb(enum listener_event_type type)
cache_daap_listener_cb(short event_mask)
{
commands_exec_async(cmdbase, cache_daap_update, NULL);
}

105
src/db.c
View File

@ -1499,6 +1499,7 @@ static int
db_query_run(char *query, int free, int library_update)
{
char *errmsg;
int changes = 0;
int ret;
if (!query)
@ -1516,6 +1517,8 @@ db_query_run(char *query, int free, int library_update)
ret = db_exec(query, &errmsg);
if (ret != SQLITE_OK)
DPRINTF(E_LOG, L_DB, "Error '%s' while runnning '%s'\n", errmsg, query);
else
changes = sqlite3_changes(hdl);
sqlite3_free(errmsg);
@ -1524,7 +1527,7 @@ db_query_run(char *query, int free, int library_update)
cache_daap_resume();
if (library_update)
if (library_update && changes > 0)
library_update_trigger();
return ((ret != SQLITE_OK) ? -1 : 0);
@ -3678,7 +3681,7 @@ db_pairing_fetch_byguid(struct pairing_info *pi)
void
db_spotify_purge(void)
{
#define Q_TMPL "UPDATE directories SET disabled = %" PRIi64 " WHERE virtual_path = '/spotify:';"
#define Q_TMPL "UPDATE directories SET disabled = %" PRIi64 " WHERE virtual_path = '/spotify:' AND disabled <> %" PRIi64 ";"
char *queries[4] =
{
"DELETE FROM files WHERE path LIKE 'spotify:%%';",
@ -3699,7 +3702,7 @@ db_spotify_purge(void)
}
// Disable the spotify directory by setting 'disabled' to INOTIFY_FAKE_COOKIE value
query = sqlite3_mprintf(Q_TMPL, INOTIFY_FAKE_COOKIE);
query = sqlite3_mprintf(Q_TMPL, INOTIFY_FAKE_COOKIE, INOTIFY_FAKE_COOKIE);
if (!query)
{
DPRINTF(E_LOG, L_DB, "Out of memory for query string\n");
@ -3768,13 +3771,28 @@ db_admin_set(const char *key, const char *value)
#undef Q_TMPL
}
char *
db_admin_get(const char *key)
int
db_admin_setint64(const char *key, int64_t value)
{
#define Q_TMPL "INSERT OR REPLACE INTO admin (key, value) VALUES ('%q', '%" PRIi64 "');"
char *query;
query = sqlite3_mprintf(Q_TMPL, key, value);
return db_query_run(query, 1, 0);
#undef Q_TMPL
}
static int
admin_get(const char *key, short type, void *value)
{
#define Q_TMPL "SELECT value FROM admin a WHERE a.key = '%q';"
char *query;
sqlite3_stmt *stmt;
char *res;
char *cval;
int32_t *ival;
int64_t *i64val;
char **strval;
int ret;
query = sqlite3_mprintf(Q_TMPL, key);
@ -3782,7 +3800,7 @@ db_admin_get(const char *key)
{
DPRINTF(E_LOG, L_DB, "Out of memory for query string\n");
return NULL;
return -1;
}
DPRINTF(E_DBG, L_DB, "Running query '%s'\n", query);
@ -3793,7 +3811,7 @@ db_admin_get(const char *key)
DPRINTF(E_WARN, L_DB, "Could not prepare statement: %s\n", sqlite3_errmsg(hdl));
sqlite3_free(query);
return NULL;
return -1;
}
ret = db_blocking_step(stmt);
@ -3806,12 +3824,41 @@ db_admin_get(const char *key)
sqlite3_finalize(stmt);
sqlite3_free(query);
return NULL;
return -1;
}
res = (char *)sqlite3_column_text(stmt, 0);
if (res)
res = strdup(res);
switch (type)
{
case DB_TYPE_CHAR:
cval = (char *) value;
*cval = sqlite3_column_int(stmt, 0);
break;
case DB_TYPE_INT:
ival = (int32_t *) value;
*ival = sqlite3_column_int(stmt, 0);
break;
case DB_TYPE_INT64:
i64val = (int64_t *) value;
*i64val = sqlite3_column_int64(stmt, 0);
break;
case DB_TYPE_STRING:
strval = (char **) value;
cval = (char *)sqlite3_column_text(stmt, 0);
if (cval)
*strval = strdup(cval);
break;
default:
DPRINTF(E_LOG, L_DB, "BUG: Unknown type %d in admin_set\n", type);
ret = -2;
}
#ifdef DB_PROFILE
while (db_blocking_step(stmt) == SQLITE_ROW)
@ -3821,11 +3868,43 @@ db_admin_get(const char *key)
sqlite3_finalize(stmt);
sqlite3_free(query);
return res;
return ret;
#undef Q_TMPL
}
char *
db_admin_get(const char *key)
{
char *value = NULL;
int ret;
ret = admin_get(key, DB_TYPE_STRING, &value);
if (ret < 0)
{
DPRINTF(E_LOG, L_DB, "Error reading admin value: %s\n", key);
return NULL;
}
return value;
}
int64_t
db_admin_getint64(const char *key)
{
int64_t value = 0;
int ret;
ret = admin_get(key, DB_TYPE_INT64, &value);
if (ret < 0)
{
DPRINTF(E_LOG, L_DB, "Error reading admin value: %s\n", key);
return 0;
}
return value;
}
int
db_admin_delete(const char *key)
{

View File

@ -680,9 +680,15 @@ db_spotify_files_delete(void);
int
db_admin_set(const char *key, const char *value);
int
db_admin_setint64(const char *key, int64_t value);
char *
db_admin_get(const char *key);
int64_t
db_admin_getint64(const char *key);
int
db_admin_delete(const char *key);

View File

@ -719,12 +719,12 @@ playstatusupdate_cb(int fd, short what, void *arg)
/* Thread: player */
static void
dacp_playstatus_update_handler(enum listener_event_type type)
dacp_playstatus_update_handler(short event_mask)
{
int ret;
// Only send status update on player change events
if (type != LISTENER_PLAYER)
if (!(event_mask & LISTENER_PLAYER))
return;
#ifdef HAVE_EVENTFD

View File

@ -185,7 +185,7 @@ streaming_send_cb(evutil_socket_t fd, short event, void *arg)
// Thread: player (not fully thread safe, but hey...)
static void
player_change_cb(enum listener_event_type type)
player_change_cb(short event_mask)
{
streaming_player_changed = 1;
}

View File

@ -764,7 +764,7 @@ pipelist_create(void)
// the pipe thread to watch the pipes. If no pipes in library, it will shut down
// the pipe thread.
static void
pipe_listener_cb(enum listener_event_type type)
pipe_listener_cb(short event_mask)
{
union pipe_arg *cmdarg;

View File

@ -22,6 +22,7 @@
#endif
#include <errno.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <limits.h>
#include <pthread.h>
@ -77,13 +78,37 @@ static bool scan_exit;
/* Flag for scan in progress */
static bool scanning;
// After being told by db that the library was updated through update_trigger(),
// wait 60 seconds before notifying listeners of LISTENER_DATABASE. This is to
// avoid bombarding the listeners while there are many db updates, and to make
// sure they only get a single update (useful for the cache).
static struct timeval library_update_wait = { 60, 0 };
// After being told by db that the library was updated through
// library_update_trigger(), wait 5 seconds before notifying listeners
// of LISTENER_DATABASE. This is to catch bulk updates like automated
// tag editing, music file imports/renames. This way multiple updates
// are collected for a single update notification (useful to avoid
// repeated library reads from clients).
//
// Note: this update delay does not apply to library scans. The scans
// use the flag `scanning` for deferring update notifcations.
static struct timeval library_update_wait = { 5, 0 };
static struct event *updateev;
// Counts the number of changes made to the database between to DATABASE
// event notifications
static unsigned int deferred_update_notifications = 0;
static bool
handle_deferred_update_notifications(void)
{
bool ret = (deferred_update_notifications > 0);
if (ret)
{
DPRINTF(E_DBG, L_LIB, "Database changed (%d changes)\n", deferred_update_notifications);
deferred_update_notifications = 0;
db_admin_setint64("db_update", (int64_t) time(NULL));
}
return ret;
}
static void
sort_tag_create(char **sort_tag, char *src_tag)
@ -568,9 +593,13 @@ rescan(void *arg, int *ret)
purge_cruft(starttime);
endtime = time(NULL);
DPRINTF(E_LOG, L_LIB, "Library rescan completed in %.f sec\n", difftime(endtime, starttime));
DPRINTF(E_LOG, L_LIB, "Library rescan completed in %.f sec (%d changes)\n", difftime(endtime, starttime), deferred_update_notifications);
scanning = false;
listener_notify(LISTENER_UPDATE);
if (handle_deferred_update_notifications())
listener_notify(LISTENER_UPDATE | LISTENER_DATABASE);
else
listener_notify(LISTENER_UPDATE);
*ret = 0;
return COMMAND_END;
@ -605,24 +634,40 @@ fullrescan(void *arg, int *ret)
}
endtime = time(NULL);
DPRINTF(E_LOG, L_LIB, "Library full-rescan completed in %.f sec\n", difftime(endtime, starttime));
DPRINTF(E_LOG, L_LIB, "Library full-rescan completed in %.f sec (%d changes)\n", difftime(endtime, starttime), deferred_update_notifications);
scanning = false;
listener_notify(LISTENER_UPDATE);
if (handle_deferred_update_notifications())
listener_notify(LISTENER_UPDATE | LISTENER_DATABASE);
else
listener_notify(LISTENER_UPDATE);
*ret = 0;
return COMMAND_END;
}
/*
* Callback to notify listeners of database changes
*/
static void
update_trigger_cb(int fd, short what, void *arg)
{
listener_notify(LISTENER_DATABASE);
if (handle_deferred_update_notifications())
{
listener_notify(LISTENER_DATABASE);
}
}
static enum command_state
update_trigger(void *arg, int *retval)
{
evtimer_add(updateev, &library_update_wait);
++deferred_update_notifications;
// Only add the timer event if the update occurred outside a (init-/re-/fullre-) scan.
// The scanning functions take care of notifying clients of database changes directly
// after the scan finished.
if (!scanning)
evtimer_add(updateev, &library_update_wait);
*retval = 0;
return COMMAND_END;
@ -691,11 +736,14 @@ initscan()
}
endtime = time(NULL);
DPRINTF(E_LOG, L_LIB, "Library init scan completed in %.f sec\n", difftime(endtime, starttime));
DPRINTF(E_LOG, L_LIB, "Library init scan completed in %.f sec (%d changes)\n", difftime(endtime, starttime), deferred_update_notifications);
scanning = false;
listener_notify(LISTENER_UPDATE);
listener_notify(LISTENER_DATABASE);
if (handle_deferred_update_notifications())
listener_notify(LISTENER_UPDATE | LISTENER_DATABASE);
else
listener_notify(LISTENER_UPDATE);
}
/*
@ -725,13 +773,27 @@ library_is_exiting()
return scan_exit;
}
/*
* Trigger for sending the DATABASE event
*
* Needs to be called, if an update to the database (library tables) occurred. The DATABASE event
* is emitted with the delay 'library_update_wait'. It is safe to call this function from any thread.
*/
void
library_update_trigger(void)
{
if (scanning)
return;
int ret;
commands_exec_async(cmdbase, update_trigger, NULL);
pthread_t current_thread = pthread_self();
if (pthread_equal(current_thread, tid_library))
{
// We are already running in the library thread, it is safe to directly call update_trigger
update_trigger(NULL, &ret);
}
else
{
commands_exec_async(cmdbase, update_trigger, NULL);
}
}
static enum command_state

View File

@ -28,18 +28,18 @@ enum listener_event_type
LISTENER_LASTFM = (1 << 10),
};
typedef void (*notify)(enum listener_event_type type);
typedef void (*notify)(short event_mask);
/*
* Registers the given callback function to the given event types.
* This function is not thread safe. Listeners must be added once at startup.
*
* @param notify_cb Callback function
* @param events Event mask, one or more of LISTENER_*
* @param event_mask Event mask, one or more of LISTENER_*
* @return 0 on success, -1 on failure
*/
int
listener_add(notify notify_cb, short events);
listener_add(notify notify_cb, short event_mask);
/*
* Removes the given callback function

477
src/mpd.c

File diff suppressed because it is too large Load Diff

View File

@ -50,10 +50,10 @@ static short write_events;
/* Thread: library (the thread the event occurred) */
static void
listener_cb(enum listener_event_type type)
listener_cb(short event_mask)
{
// Add event to the event mask, clients will be notified at the next break of the libwebsockets service loop
events |= type;
events |= event_mask;
}
/*