From 1f15fb19936eca1a2c702950d684850fcbcc948a Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Wed, 8 Feb 2017 22:26:10 +0100 Subject: [PATCH] [pipe] Refactor pipe input, hopefully into a more thread safe version --- src/inputs/pipe.c | 601 ++++++++++++++++++++++++---------------------- 1 file changed, 320 insertions(+), 281 deletions(-) diff --git a/src/inputs/pipe.c b/src/inputs/pipe.c index e0d1b887..04d9fdd7 100644 --- a/src/inputs/pipe.c +++ b/src/inputs/pipe.c @@ -55,6 +55,7 @@ #include "listener.h" #include "player.h" #include "worker.h" +#include "commands.h" #include "mxml-compat.h" // Maximum number of pipes to watch for data @@ -64,15 +65,6 @@ // Max number of bytes to buffer from metadata pipes #define PIPE_METADATA_BUFLEN_MAX 262144 -extern struct event_base *evbase_worker; - -enum pipestate -{ - PIPE_NEW, - PIPE_DEL, - PIPE_OPEN, -}; - enum pipetype { PIPE_PCM, @@ -85,21 +77,25 @@ struct pipe int fd; // File descriptor bool is_autostarted; // We autostarted the pipe (and we will autostop) char *path; // Path - enum pipestate state; // Newly appeared, marked for deletion, open/ready enum pipetype type; // PCM (audio) or metadata event_callback_fn cb; // Callback when there is data to read struct event *ev; // Event for the callback - // TODO mutex struct pipe *next; }; +// The usual thread stuff +static pthread_t tid_pipe; +static struct event_base *evbase_pipe; +static struct commands_base *cmdbase; + // From config - should we watch library pipes for data or only start on request static int pipe_autostart; +// The mfi id of the pipe autostarted by the pipe thread +static int pipe_autostart_id; -// Global list of pipes we are watching. If watching/autostart is disabled this -// will just point to the currently playing pipe (if any). -static struct pipe *pipelist; +// Global list of pipes we are watching (if watching/autostart is enabled) +static struct pipe *pipe_watch_list; // Single pipe that we start watching for metadata after playback starts static struct pipe *pipe_metadata; @@ -112,18 +108,10 @@ static pthread_mutex_t pipe_metadata_lock; // True if there is new metadata to push to the player static bool pipe_metadata_is_new; -/* ------------------------------- FORWARDS ------------------------------- */ - -static void -pipe_watch_reset(struct pipe *pipe); - -static void -pipe_metadata_watch_del(void *arg); - /* -------------------------------- HELPERS ------------------------------- */ static struct pipe * -pipe_new(const char *path, int id, enum pipetype type, event_callback_fn cb) +pipe_create(const char *path, int id, enum pipetype type, event_callback_fn cb) { struct pipe *pipe; @@ -131,16 +119,9 @@ pipe_new(const char *path, int id, enum pipetype type, event_callback_fn cb) pipe->path = strdup(path); pipe->id = id; pipe->fd = -1; - pipe->state = PIPE_NEW; pipe->type = type; pipe->cb = cb; - if (type == PIPE_PCM) - { - pipe->next = pipelist; - pipelist = pipe; - } - return pipe; } @@ -151,39 +132,129 @@ pipe_free(struct pipe *pipe) free(pipe); } -static void -pipelist_prune(void) +static int +pipe_open(const char *path, bool silent) { - struct pipe *pipe; - struct pipe *next; + struct stat sb; + int fd; - for (pipe = pipelist; pipe; pipe = next) + DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", path); + + if (lstat(path, &sb) < 0) { - next = pipe->next; - - if (pipelist->state == PIPE_DEL) - { - pipe_free(pipelist); - pipelist = next; - } - else if (next && (next->state == PIPE_DEL)) - { - pipe->next = next->next; - pipe_free(next); - next = pipe->next; - } + if (!silent) + DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno)); + return -1; } + + if (!S_ISFIFO(sb.st_mode)) + { + DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", path); + return -1; + } + + fd = open(path, O_RDONLY | O_NONBLOCK); + if (fd < 0) + { + DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", path, strerror(errno)); + return -1; + } + + return fd; +} + +static void +pipe_close(int fd) +{ + if (fd >= 0) + close(fd); +} + +static int +watch_add(struct pipe *pipe) +{ + bool silent; + + silent = (pipe->type == PIPE_METADATA); + pipe->fd = pipe_open(pipe->path, silent); + if (pipe->fd < 0) + return -1; + + pipe->ev = event_new(evbase_pipe, pipe->fd, EV_READ, pipe->cb, pipe); + if (!pipe->ev) + { + DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path); + pipe_close(pipe->fd); + return -1; + } + + event_add(pipe->ev, NULL); + + return 0; +} + +static void +watch_del(struct pipe *pipe) +{ + if (pipe->ev) + event_free(pipe->ev); + + pipe_close(pipe->fd); + + pipe->fd = -1; +} + +// If a read on pipe returns 0 it is an EOF, and we must close it and reopen it +// for renewed watching. The event will be freed and reallocated by this. +static int +watch_reset(struct pipe *pipe) +{ + watch_del(pipe); + + return watch_add(pipe); +} + +static void +pipelist_add(struct pipe **list, struct pipe *pipe) +{ + pipe->next = *list; + *list = pipe; +} + +static void +pipelist_remove(struct pipe **list, struct pipe *pipe) +{ + struct pipe *prev = NULL; + struct pipe *p; + + for (p = *list; p; p = p->next) + { + if (p->id == pipe->id) + break; + + prev = p; + } + + if (!p) + return; + + if (!prev) + *list = pipe->next; + else + prev->next = pipe->next; + + pipe_free(pipe); } static struct pipe * -pipe_find(int id) +pipelist_find(struct pipe *list, int id) { - struct pipe *pipe; + struct pipe *p; - for (pipe = pipelist; pipe; pipe = pipe->next) + for (p = list; p; p = p->next) { - if (id == pipe->id) - return pipe; + if (id == p->id) + return p; } return NULL; @@ -358,8 +429,8 @@ pipe_metadata_parse(struct input_metadata *m, struct evbuffer *evbuf) } -/* ---------------------------- GENERAL PIPE I/O -------------------------- */ -/* Thread: worker */ +/* ----------------------------- PIPE WATCHING ---------------------------- */ +/* Thread: pipe */ // Some data arrived on a pipe we watch - let's autostart playback static void @@ -367,205 +438,119 @@ pipe_read_cb(evutil_socket_t fd, short event, void *arg) { struct pipe *pipe = arg; struct player_status status; - struct db_queue_item *queue_item; int ret; ret = player_get_status(&status); - if ((ret < 0) || (status.status == PLAY_PLAYING)) + if (status.id == pipe->id) { - DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s', but player is busy\n", pipe->path); - pipe_watch_reset(pipe); + DPRINTF(E_DBG, L_PLAYER, "Pipe '%s' already playing\n", pipe->path); + return; // We are already playing the pipe + } + else if ((ret < 0) || (status.status == PLAY_PLAYING)) + { + DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s' - ignoring, player is busy\n", pipe->path); + // FIXME What to do in this situation? Can't re-add the event, since it + // will trigger right away, but also not good to stop watching the pipe + // like we do right now. return; } DPRINTF(E_INFO, L_PLAYER, "Autostarting pipe '%s' (fd %d)\n", pipe->path, fd); - db_queue_clear(); + player_playback_stop(); - ret = db_queue_add_by_fileid(pipe->id, 0, 0); + ret = player_playback_start_byid(pipe->id); if (ret < 0) - return; + { + DPRINTF(E_LOG, L_PLAYER, "Autostarting pipe '%s' (fd %d) failed\n", pipe->path, fd); + return; + } - queue_item = db_queue_fetch_byfileid(pipe->id); - if (!queue_item) - return; - - player_playback_start_byitem(queue_item); - - pipe->is_autostarted = 1; - - free_queue_item(queue_item, 0); + pipe_autostart_id = pipe->id; } -// Updates pipelist with pipe items from the db. Pipes that are no longer in -// the db get marked with PIPE_DEL. Returns count of pipes that should be -// watched (may or may not equal length of pipelist) -static int -pipe_enum(void) +static enum command_state +pipe_watch_reset(void *arg, int *retval) { - struct query_params qp; - struct db_media_file_info dbmfi; + int *id = arg; struct pipe *pipe; - char filter[32]; + + pipe_autostart_id = 0; + + pipe = pipelist_find(pipe_watch_list, *id); + + *retval = watch_reset(pipe); + + return COMMAND_END; +} + +static enum command_state +pipe_watch_update(void *arg, int *retval) +{ + struct pipe *pipelist = arg; + struct pipe *pipe; + struct pipe *next; int count; - int id; - int ret; - - memset(&qp, 0, sizeof(struct query_params)); - qp.type = Q_ITEMS; - qp.filter = filter; - - snprintf(filter, sizeof(filter), "f.data_kind = %d", DATA_KIND_PIPE); - - ret = db_query_start(&qp); - if (ret < 0) - return -1; - - for (pipe = pipelist; pipe; pipe = pipe->next) - pipe->state = PIPE_DEL; count = 0; - while (((ret = db_query_fetch_file(&qp, &dbmfi)) == 0) && (dbmfi.id)) + for (pipe = pipe_watch_list; pipe; pipe = next) { - ret = safe_atoi32(dbmfi.id, &id); - if (ret < 0) - continue; + next = pipe->next; - count++; - - if ((pipe = pipe_find(id))) + if (!pipelist_find(pipelist, pipe->id)) { - pipe->state = PIPE_OPEN; + DPRINTF(E_DBG, L_PLAYER, "Pipe deleted: '%s'\n", pipe->path); + watch_del(pipe); + pipelist_remove(&pipe_watch_list, pipe); // Will free pipe continue; } - pipe_new(dbmfi.path, id, PIPE_PCM, pipe_read_cb); + count++; + pipelist_remove(&pipelist, pipe); } - db_query_end(&qp); - - return count; -} - -// Opens a pipe and starts watching it for data if autostart is configured -static int -pipe_open(const char *path, bool silent) -{ - struct stat sb; - int fd; - - DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", path); - - if (lstat(path, &sb) < 0) - { - if (!silent) - DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno)); - return -1; - } - - if (!S_ISFIFO(sb.st_mode)) - { - DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", path); - return -1; - } - - fd = open(path, O_RDONLY | O_NONBLOCK); - if (fd < 0) - { - DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", path, strerror(errno)); - return -1; - } - - return fd; -} - -static void -pipe_close(int fd) -{ - if (fd >= 0) - close(fd); -} - -static int -pipe_watch_add(struct pipe *pipe) -{ - bool silent; - - silent = (pipe->type == PIPE_METADATA); - pipe->fd = pipe_open(pipe->path, silent); - if (pipe->fd < 0) - return -1; - - pipe->ev = event_new(evbase_worker, pipe->fd, EV_READ, pipe->cb, pipe); - if (!pipe->ev) - { - DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path); - pipe_close(pipe->fd); - return -1; - } - - event_add(pipe->ev, NULL); - - pipe->state = PIPE_OPEN; - - return 0; -} - -static void -pipe_watch_del(struct pipe *pipe) -{ - if (pipe->ev) - event_free(pipe->ev); - - pipe_close(pipe->fd); - - pipe->fd = -1; -} - -// If a read on pipe returns 0 it is an EOF, and we must close it and reopen it -// for renewed watching. The event will be freed and reallocated by this. -static void -pipe_watch_reset(struct pipe *pipe) -{ - pipe_watch_del(pipe); - pipe_watch_add(pipe); -} - -static void -pipe_watch_update(void *arg) -{ - struct pipe *pipe; - int count; - - count = pipe_enum(); // Count does not include pipes with state PIPE_DEL - if (count < 0) - return; - for (pipe = pipelist; pipe; pipe = pipe->next) { - DPRINTF(E_DBG, L_PLAYER, "Processing pipe '%s', state is %d\n", pipe->path, pipe->state); + count++; + if (count > PIPE_MAX_WATCH) + { + DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path); + break; + } - if ((pipe->state == PIPE_NEW) && (count > PIPE_MAX_WATCH)) - DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path); - else if (pipe->state == PIPE_NEW) - pipe_watch_add(pipe); - else if (pipe->state == PIPE_DEL) - pipe_watch_del(pipe); + DPRINTF(E_DBG, L_PLAYER, "Pipe added: '%s'\n", pipe->path); + watch_add(pipe); + pipelist_add(&pipe_watch_list, pipe); } - pipelist_prune(); + *retval = 0; + return COMMAND_PENDING; // Stops commands.c from freeing any of the pipes } -// Thread: filescanner -static void -pipe_listener_cb(enum listener_event_type type) +static void * +pipe_thread_run(void *arg) { - worker_execute(pipe_watch_update, NULL, 0, 0); + event_base_dispatch(evbase_pipe); + + pthread_exit(NULL); } + /* -------------------------- METADATA PIPE HANDLING ---------------------- */ /* Thread: worker */ +static void +pipe_metadata_watch_del(void *arg) +{ + if (!pipe_metadata) + return; + + evbuffer_free(pipe_metadata_buf); + watch_del(pipe_metadata); + pipe_free(pipe_metadata); + pipe_metadata = NULL; +} + // Some metadata arrived on a pipe we watch static void pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) @@ -581,7 +566,10 @@ pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) } else if (ret == 0) { - pipe_watch_reset(pipe_metadata); + // Reset the pipe + ret = watch_reset(pipe_metadata); + if (ret < 0) + return; goto readd; } @@ -620,13 +608,13 @@ pipe_metadata_watch_add(void *arg) if ((ret < 0) || (ret > sizeof(path))) return; - pipe_metadata = pipe_new(path, 0, PIPE_METADATA, pipe_metadata_read_cb); + pipe_metadata = pipe_create(path, 0, PIPE_METADATA, pipe_metadata_read_cb); if (!pipe_metadata) return; pipe_metadata_buf = evbuffer_new(); - ret = pipe_watch_add(pipe_metadata); + ret = watch_add(pipe_metadata); if (ret < 0) { evbuffer_free(pipe_metadata_buf); @@ -636,16 +624,95 @@ pipe_metadata_watch_add(void *arg) } } + +/* ---------------------- PIPE WATCH THREAD START/STOP -------------------- */ +/* Thread: filescanner */ + static void -pipe_metadata_watch_del(void *arg) +pipe_thread_start(void) { - if (!pipe_metadata) + CHECK_NULL(L_PLAYER, evbase_pipe = event_base_new()); + CHECK_NULL(L_PLAYER, cmdbase = commands_base_new(evbase_pipe, NULL)); + CHECK_ERR(L_PLAYER, pthread_create(&tid_pipe, NULL, pipe_thread_run, NULL)); + +#if defined(HAVE_PTHREAD_SETNAME_NP) + pthread_setname_np(tid_pipe, "pipe"); +#elif defined(HAVE_PTHREAD_SET_NAME_NP) + pthread_set_name_np(tid_pipe, "pipe"); +#endif +} + +static void +pipe_thread_stop(void) +{ + if (!tid_pipe) return; - evbuffer_free(pipe_metadata_buf); - pipe_watch_del(pipe_metadata); - pipe_free(pipe_metadata); - pipe_metadata = NULL; + commands_exec_sync(cmdbase, pipe_watch_update, NULL, NULL); + + commands_base_destroy(cmdbase); + pthread_join(tid_pipe, NULL); + event_base_free(evbase_pipe); + tid_pipe = 0; +} + +// Makes a pipelist with pipe items from the db, returns NULL on no pipes +static struct pipe * +pipelist_create(void) +{ + struct query_params qp; + struct db_media_file_info dbmfi; + struct pipe *head; + struct pipe *pipe; + char filter[32]; + int id; + int ret; + + memset(&qp, 0, sizeof(struct query_params)); + qp.type = Q_ITEMS; + qp.filter = filter; + + snprintf(filter, sizeof(filter), "f.data_kind = %d", DATA_KIND_PIPE); + + ret = db_query_start(&qp); + if (ret < 0) + return NULL; + + head = NULL; + while (((ret = db_query_fetch_file(&qp, &dbmfi)) == 0) && (dbmfi.id)) + { + ret = safe_atoi32(dbmfi.id, &id); + if (ret < 0) + continue; + + pipe = pipe_create(dbmfi.path, id, PIPE_PCM, pipe_read_cb); + pipelist_add(&head, pipe); + } + + db_query_end(&qp); + + return head; +} + +// Queries the db to see if any pipes are present in the library. If so, starts +// the pipe thread to watch the pipes. If no pipes in library, it will shut down +// the pipe thread. +static void +pipe_listener_cb(enum listener_event_type type) +{ + struct pipe *pipelist; + + pipelist = pipelist_create(); + if (!pipelist) + { + pipe_thread_stop(); + return; + } + + if (!tid_pipe) + pipe_thread_start(); + + commands_exec_async(cmdbase, pipe_watch_update, pipelist); } @@ -656,28 +723,19 @@ static int setup(struct player_source *ps) { struct pipe *pipe; + int fd; - // If autostart is disabled then this is the first time we encounter the pipe - if (!pipe_autostart) - pipe_new(ps->path, ps->id, PIPE_PCM, NULL); + fd = pipe_open(ps->path, 0); + if (fd < 0) + return -1; - pipe = pipe_find(ps->id); - if (!pipe) - { - DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path); - return -1; - } + CHECK_NULL(L_PLAYER, pipe = pipe_create(ps->path, ps->id, PIPE_PCM, NULL)); + pipe->fd = fd; + pipe->is_autostarted = (ps->id == pipe_autostart_id); - if (pipe->state != PIPE_OPEN) - { - pipe->fd = pipe_open(pipe->path, 0); - if (pipe->fd < 0) - return -1; - pipe->state = PIPE_OPEN; - } - - worker_execute(pipe_metadata_watch_add, pipe->path, strlen(pipe->path) + 1, 0); + worker_execute(pipe_metadata_watch_add, ps->path, strlen(ps->path) + 1, 0); + ps->input_ctx = pipe; ps->setup_done = 1; return 0; @@ -686,15 +744,11 @@ setup(struct player_source *ps) static int start(struct player_source *ps) { - struct pipe *pipe; + struct pipe *pipe = ps->input_ctx; struct evbuffer *evbuf; short flags; int ret; - pipe = pipe_find(ps->id); - if (!pipe) - return -1; - evbuf = evbuffer_new(); if (!evbuf) { @@ -718,7 +772,7 @@ start(struct player_source *ps) } else if (ret < 0) { - DPRINTF(E_LOG, L_PLAYER, "Could not read from pipe: %s\n", strerror(errno)); + DPRINTF(E_LOG, L_PLAYER, "Could not read from pipe '%s': %s\n", ps->path, strerror(errno)); break; } @@ -738,34 +792,26 @@ start(struct player_source *ps) static int stop(struct player_source *ps) { - struct pipe *pipe; + struct pipe *pipe = ps->input_ctx; + int *id; DPRINTF(E_DBG, L_PLAYER, "Stopping pipe\n"); - pipe = pipe_find(ps->id); - if (!pipe) - { - DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path); - return -1; - } + pipe_close(pipe->fd); - if (!pipe_autostart) - { - // Since autostart is disabled we are now done with the pipe - pipe_close(pipe->fd); - pipe->state = PIPE_DEL; - pipelist_prune(); - } - else - { - // Reset the pipe and start watching it again for new data - pipe->is_autostarted = 0; - pipe_watch_reset(pipe); - } + // Reset the pipe and start watching it again for new data. Must be async or + // we will deadlock from the stop in pipe_read_cb(). + id = malloc(sizeof(int)); + *id = pipe->id; + if (pipe_autostart) + commands_exec_async(cmdbase, pipe_watch_reset, id); if (pipe_metadata) worker_execute(pipe_metadata_watch_del, NULL, 0, 0); + pipe_free(pipe); + + ps->input_ctx = NULL; ps->setup_done = 0; return 0; @@ -806,32 +852,25 @@ metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t static int init(void) { - pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart"); - CHECK_ERR(L_PLAYER, mutex_init(&pipe_metadata_lock)); + pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart"); if (pipe_autostart) - return listener_add(pipe_listener_cb, LISTENER_DATABASE); - else - return 0; + CHECK_ERR(L_PLAYER, listener_add(pipe_listener_cb, LISTENER_DATABASE)); + + return 0; } static void deinit(void) { - struct pipe *pipe; - - for (pipe = pipelist; pipelist; pipe = pipelist) + if (pipe_autostart) { - pipelist = pipe->next; - pipe_watch_del(pipe); - pipe_free(pipe); + listener_remove(pipe_listener_cb); + pipe_thread_stop(); } CHECK_ERR(L_PLAYER, pthread_mutex_destroy(&pipe_metadata_lock)); - - if (pipe_autostart) - listener_remove(pipe_listener_cb); } struct input_definition input_pipe =