From ee32b9cb7085400dfebbf5058e81cc21c93db16f Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Wed, 18 Jan 2017 23:42:52 +0100 Subject: [PATCH] [pipe] Reset pipes when required + prepare for metadata pipes --- src/inputs/pipe.c | 292 +++++++++++++++++++++++++++++++--------------- 1 file changed, 198 insertions(+), 94 deletions(-) diff --git a/src/inputs/pipe.c b/src/inputs/pipe.c index e99a6112..e636f9f1 100644 --- a/src/inputs/pipe.c +++ b/src/inputs/pipe.c @@ -14,10 +14,25 @@ * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * + * About pipe.c + * -------------- + * This module will read a PCM16 stream from a named pipe and write it to the + * input buffer. The user may start/stop playback from a pipe by selecting it + * through a client. If the user has configured pipe_autostart, then pipes in + * the library will also be watched for data, and playback will start/stop + * automatically. + * + * The module will also look for pipes with a .metadata suffix, and if found, + * the metadata will be parsed and fed to the player. The metadata must be in + * the format Shairport uses for this purpose. + * */ #include #include +#include #include #include #include @@ -49,18 +64,27 @@ extern struct event_base *evbase_worker; enum pipestate { - PIPE_NEW = (1 << 0), - PIPE_DEL = (1 << 1), - PIPE_OPEN = (1 << 2), + PIPE_NEW, + PIPE_DEL, + PIPE_OPEN, +}; + +enum pipetype +{ + PIPE_PCM, + PIPE_METADATA, }; struct pipe { - int id; - int fd; - char *path; - enum pipestate state; - struct event *ev; + int id; // The mfi id of the pipe + int fd; // File descriptor + bool is_autostarted; // We autostarted the pipe (and we will autostop) + char *path; // Path + enum pipestate state; // Newly appeared, marked for deletion, open/ready + enum pipetype type; // PCM (audio) or metadata + event_callback_fn cb; // Callback when there is data to read + struct event *ev; // Event for the callback // TODO mutex struct pipe *next; @@ -73,11 +97,19 @@ static int pipe_autostart; // will just point to the currently playing pipe (if any). static struct pipe *pipelist; +// Single pipe that we start watching for metadata after playback starts +static struct pipe *pipe_metadata; + +/* ------------------------------- FORWARDS ------------------------------- */ + +static void +pipe_watch_reset(struct pipe *pipe); + /* -------------------------------- HELPERS ------------------------------- */ static struct pipe * -pipe_new(const char *path, int id) +pipe_new(const char *path, int id, enum pipetype type, event_callback_fn cb) { struct pipe *pipe; @@ -86,8 +118,14 @@ pipe_new(const char *path, int id) pipe->id = id; pipe->fd = -1; pipe->state = PIPE_NEW; - pipe->next = pipelist; - pipelist = pipe; + pipe->type = type; + pipe->cb = cb; + + if (type == PIPE_PCM) + { + pipe->next = pipelist; + pipelist = pipe; + } return pipe; } @@ -138,9 +176,45 @@ pipe_find(int id) } -/* -------------------------------- PIPE I/O ------------------------------ */ +/* ---------------------------- GENERAL PIPE I/O -------------------------- */ /* Thread: worker */ +// Some data arrived on a pipe we watch - let's autostart playback +static void +pipe_read_cb(evutil_socket_t fd, short event, void *arg) +{ + struct pipe *pipe = arg; + struct player_status status; + struct db_queue_item *queue_item; + int ret; + + ret = player_get_status(&status); + if ((ret < 0) || (status.status == PLAY_PLAYING)) + { + DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s', but player is busy\n", pipe->path); + pipe_watch_reset(pipe); + return; + } + + DPRINTF(E_INFO, L_PLAYER, "Autostarting pipe '%s' (fd %d)\n", pipe->path, fd); + + db_queue_clear(); + + ret = db_queue_add_by_fileid(pipe->id, 0, 0); + if (ret < 0) + return; + + queue_item = db_queue_fetch_byfileid(pipe->id); + if (!queue_item) + return; + + player_playback_start_byitem(queue_item); + + pipe->is_autostarted = 1; + + free_queue_item(queue_item, 0); +} + // Updates pipelist with pipe items from the db. Pipes that are no longer in // the db get marked with PIPE_DEL. Returns count of pipes that should be // watched (may or may not equal length of pipelist) @@ -183,7 +257,7 @@ pipe_enum(void) continue; } - pipe = pipe_new(dbmfi.path, id); + pipe = pipe_new(dbmfi.path, id, PIPE_PCM, pipe_read_cb); } db_query_end(&qp); @@ -193,7 +267,7 @@ pipe_enum(void) // Opens a pipe and starts watching it for data if autostart is configured static int -pipe_open(const char *path) +pipe_open(const char *path, bool silent) { struct stat sb; int fd; @@ -202,7 +276,8 @@ pipe_open(const char *path) if (lstat(path, &sb) < 0) { - DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno)); + if (!silent) + DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno)); return -1; } @@ -225,61 +300,32 @@ pipe_open(const char *path) static void pipe_close(int fd) { - close(fd); -} - -// Some data arrived on a pipe we watch - let's autostart playback -static void -pipe_read_cb(evutil_socket_t fd, short event, void *arg) -{ - struct pipe *pipe = arg; - struct player_status status; - struct db_queue_item *queue_item; - int ret; - - ret = player_get_status(&status); - if ((ret < 0) || (status.status == PLAY_PLAYING)) - { - DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s', but we are busy\n", pipe->path); - // FIXME Now the event won't activate any more - even if we are not busy - return; - } - - DPRINTF(E_INFO, L_PLAYER, "Autostarting pipe '%s' (fd %d)\n", pipe->path, fd); - - db_queue_clear(); - - ret = db_queue_add_by_fileid(pipe->id, 0, 0); - if (ret < 0) - return; - - queue_item = db_queue_fetch_byfileid(pipe->id); - if (!queue_item) - return; - - player_playback_start_byitem(queue_item); - - free_queue_item(queue_item, 0); + if (fd >= 0) + close(fd); } static int pipe_watch_add(struct pipe *pipe) { - pipe->fd = pipe_open(pipe->path); + bool silent; + + silent = (pipe->type == PIPE_METADATA); + pipe->fd = pipe_open(pipe->path, silent); if (pipe->fd < 0) return -1; - pipe->state = PIPE_OPEN; - - pipe->ev = event_new(evbase_worker, pipe->fd, EV_READ, pipe_read_cb, pipe); + pipe->ev = event_new(evbase_worker, pipe->fd, EV_READ, pipe->cb, pipe); if (!pipe->ev) { DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path); + pipe_close(pipe->fd); return -1; } event_add(pipe->ev, NULL); + pipe->state = PIPE_OPEN; + return 0; } @@ -294,36 +340,17 @@ pipe_watch_del(struct pipe *pipe) pipe->fd = -1; } -/* +// If a read on pipe returns 0 it is an EOF, and we must close it and reopen it +// for renewed watching. The event will be freed and reallocated by this. static void -pipe_metadata_watch_add(void *arg) +pipe_watch_reset(struct pipe *pipe) { - struct pipe *pipe = arg; - struct pipe *md_pipe; - char md_pipe_path[PATH_MAX]; - int ret; - - ret = snprintf(md_pipe_path, sizeof(md_pipe_path), "%s.metadata", pipe->path); - if ((ret < 0) || (ret > sizeof(md_pipe_path))) - return -1; - - md_pipe = pipe_new(md_pipe_path, -1); -xxxx; - - md_pipe->fd = pipe_open(md_pipe_path); - if (md_pipe->fd < 0) - return -1; - - return 0; + pipe_watch_del(pipe); + pipe_watch_add(pipe); } static void -pipe_metadata_watch_del(struct pipe *pipe) -{ -}*/ - -static void -pipe_update(void *arg) +pipe_watch_update(void *arg) { struct pipe *pipe; int count; @@ -351,7 +378,75 @@ pipe_update(void *arg) static void pipe_listener_cb(enum listener_event_type type) { - worker_execute(pipe_update, NULL, 0, 0); + worker_execute(pipe_watch_update, NULL, 0, 0); +} + +/* -------------------------- METADATA PIPE HANDLING ---------------------- */ +/* Thread: worker */ + +// Some metadata arrived on a pipe we watch +static void +pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) +{ + struct evbuffer *evbuf; + int ret; + + DPRINTF(E_DBG, L_PLAYER, "BANG\n"); + + evbuf = evbuffer_new(); + ret = evbuffer_read(evbuf, pipe_metadata->fd, PIPE_READ_MAX); + evbuffer_free(evbuf); + + event_add(pipe_metadata->ev, NULL); +/* + else if (ret < 0) + give up + goto out; + if (ret == 0) + pipe_reset(pipe_metadata); + evbuffer_add_printf(evbuf, "\n"); + DPRINTF(E_DBG, L_PLAYER, "Got some pipe metadata\n%s", (char *)evbuffer_pullup(evbuf, -1)); + + event_add(pipe_metadata->ev, NULL); + + out: + evbuffer_free(evbuf); + pipe_reset(pipe_metadata);*/ +} + +static void +pipe_metadata_watch_add(void *arg) +{ + char *base_path = arg; + char path[PATH_MAX]; + int ret; + + ret = snprintf(path, sizeof(path), "%s.metadata", base_path); + if ((ret < 0) || (ret > sizeof(path))) + return; + + pipe_metadata = pipe_new(path, 0, PIPE_METADATA, pipe_metadata_read_cb); + if (!pipe_metadata) + return; + + ret = pipe_watch_add(pipe_metadata); + if (ret < 0) + { + pipe_free(pipe_metadata); + pipe_metadata = NULL; + return; + } +} + +static void +pipe_metadata_watch_del(void *arg) +{ + if (!pipe_metadata) + return; + + pipe_watch_del(pipe_metadata); + pipe_free(pipe_metadata); + pipe_metadata = NULL; } @@ -363,8 +458,9 @@ setup(struct player_source *ps) { struct pipe *pipe; + // If autostart is disabled then this is the first time we encounter the pipe if (!pipe_autostart) - pipelist = pipe_new(ps->path, ps->id); + pipe_new(ps->path, ps->id, PIPE_PCM, NULL); pipe = pipe_find(ps->id); if (!pipe) @@ -372,20 +468,20 @@ setup(struct player_source *ps) DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path); return -1; } - +// TODO pipe mutex here if (pipe->state != PIPE_OPEN) { - pipe->fd = pipe_open(pipe->path); + pipe->fd = pipe_open(pipe->path, 0); if (pipe->fd < 0) return -1; pipe->state = PIPE_OPEN; } -// worker_execute(pipe_metadata_watch_add, pipe, sizeof(struct pipe), 0); - if (pipe->ev) event_del(pipe->ev); // Avoids autostarting pipe if manually started by user + worker_execute(pipe_metadata_watch_add, pipe->path, strlen(pipe->path) + 1, 0); + ps->setup_done = 1; return 0; @@ -413,7 +509,12 @@ start(struct player_source *ps) while (!input_loop_break) { ret = evbuffer_read(evbuf, pipe->fd, PIPE_READ_MAX); - if ( (ret == 0) || ((ret < 0) && (errno == EAGAIN)) ) + if ((ret == 0) && (pipe->is_autostarted)) + { + input_write(evbuf, INPUT_FLAG_EOF); // Autostop + break; + } + else if ((ret == 0) || ((ret < 0) && (errno == EAGAIN))) { input_wait(); continue; @@ -448,19 +549,22 @@ stop(struct player_source *ps) return -1; } - if (pipe_autostart) - { - // Reopen pipe since if I just readd the event it instantly makes the - // callback (probably something I have missed...) - pipe_watch_del(pipe); - pipe_watch_add(pipe); - } - else + if (!pipe_autostart) { + // Since autostart is disabled we are now done with the pipe pipe_close(pipe->fd); pipe->state = PIPE_DEL; pipelist_prune(); } + else + { + // Reset the pipe and start watching it again for new data + pipe->is_autostarted = 0; + pipe_watch_reset(pipe); + } + + if (pipe_metadata) + worker_execute(pipe_metadata_watch_del, NULL, 0, 0); ps->setup_done = 0;