[pipe] Pipe input interface (wip)

This commit is contained in:
ejurgensen 2017-01-15 23:26:11 +01:00
parent acc67338a1
commit 7f7207bb87

View File

@ -23,6 +23,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <limits.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
@ -39,7 +40,7 @@
#include "input.h" #include "input.h"
// Maximum number of pipes to watch for data // Maximum number of pipes to watch for data
#define PIPE_MAX_OPEN 4 #define PIPE_MAX_WATCH 4
// Max number of bytes to read from a pipe at a time ( // Max number of bytes to read from a pipe at a time (
#define PIPE_READ_MAX 65536 #define PIPE_READ_MAX 65536
@ -99,6 +100,30 @@ pipe_free(struct pipe *pipe)
free(pipe); free(pipe);
} }
static void
pipelist_prune(void)
{
struct pipe *pipe;
struct pipe *next;
for (pipe = pipelist; pipe; pipe = next)
{
next = pipe->next;
if (pipelist->state == PIPE_DEL)
{
pipe_free(pipelist);
pipelist = next;
}
else if (next && (next->state == PIPE_DEL))
{
pipe->next = next->next;
pipe_free(next);
next = pipe->next;
}
}
}
static struct pipe * static struct pipe *
pipe_find(int id) pipe_find(int id)
{ {
@ -114,7 +139,7 @@ pipe_find(int id)
} }
/* ----------------------------- PIPE WATCHING ---------------------------- */ /* -------------------------------- PIPE I/O ------------------------------ */
/* Thread: filescanner */ /* Thread: filescanner */
// Updates pipelist with pipe items from the db. Pipes that are no longer in // Updates pipelist with pipe items from the db. Pipes that are no longer in
@ -197,38 +222,52 @@ pipe_read_cb(evutil_socket_t fd, short event, void *arg)
free_queue_item(queue_item, 0); free_queue_item(queue_item, 0);
} }
/* Opens a pipe and starts watching it for data */ // Opens a pipe and starts watching it for data if autostart is configured
static int static int
pipe_open(struct pipe *pipe) pipe_open(const char *path)
{ {
struct stat sb; struct stat sb;
int fd;
DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", pipe->path); DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", path);
if (lstat(pipe->path, &sb) < 0) if (lstat(path, &sb) < 0)
{ {
DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", pipe->path, strerror(errno)); DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno));
return -1; return -1;
} }
if (!S_ISFIFO(sb.st_mode)) if (!S_ISFIFO(sb.st_mode))
{ {
DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", pipe->path); DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", path);
return -1; return -1;
} }
pipe->fd = open(pipe->path, O_RDONLY | O_NONBLOCK); fd = open(path, O_RDONLY | O_NONBLOCK);
if (pipe->fd < 0) if (fd < 0)
{ {
DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", pipe->path, strerror(errno)); DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", path, strerror(errno));
return -1; return -1;
} }
return fd;
}
static void
pipe_close(int fd)
{
close(fd);
}
static int
pipe_watch_add(struct pipe *pipe)
{
pipe->fd = pipe_open(pipe->path);
if (pipe->fd < 0)
return -1;
pipe->state = PIPE_OPEN; pipe->state = PIPE_OPEN;
if (!pipe_autostart)
return 0; // All done
pipe->ev = event_new(evbase_scan, pipe->fd, EV_READ, pipe_read_cb, pipe); pipe->ev = event_new(evbase_scan, pipe->fd, EV_READ, pipe_read_cb, pipe);
if (!pipe->ev) if (!pipe->ev)
{ {
@ -242,113 +281,96 @@ pipe_open(struct pipe *pipe)
} }
static void static void
pipe_close(struct pipe *pipe) pipe_watch_del(struct pipe *pipe)
{ {
if (pipe->fd < 0)
return;
if (pipe->ev) if (pipe->ev)
event_free(pipe->ev); event_free(pipe->ev);
close(pipe->fd); pipe_close(pipe->fd);
pipe->fd = -1; pipe->fd = -1;
} }
/*
static int
pipe_metadata_watch_add(struct pipe *pipe)
{
struct pipe *md_pipe;
char md_pipe_path[PATH_MAX];
int ret;
ret = snprintf(md_pipe_path, sizeof(md_pipe_path), "%s.metadata", pipe->path);
if ((ret < 0) || (ret > sizeof(md_pipe_path)))
return -1;
md_pipe = pipe_new(md_pipe_path, -1);
xxxx;
md_pipe->fd = pipe_open(md_pipe_path);
if (md_pipe->fd < 0)
return -1;
return 0;
}
static void static void
pipe_remove(struct pipe *pipe) pipe_metadata_watch_del(struct pipe *pipe)
{ {
struct pipe *p; }*/
pipe_close(pipe);
if (pipe == pipelist)
pipelist = pipe->next;
else
{
for (p = pipelist; p && (p->next != pipe); p = p->next)
; /* EMPTY */
if (!p)
{
DPRINTF(E_LOG, L_REMOTE, "WARNING: pipe not found in list; BUG!\n");
return;
}
p->next = pipe->next;
}
pipe_free(pipe);
}
static void static void
pipe_listener_cb(enum listener_event_type type) pipe_listener_cb(enum listener_event_type type)
{ {
struct pipe *pipe; struct pipe *pipe;
struct pipe *next;
int count; int count;
count = pipe_enum(); // Count does not include pipes with state PIPE_DEL count = pipe_enum(); // Count does not include pipes with state PIPE_DEL
if (count < 0) if (count < 0)
return; return;
for (pipe = pipelist; pipe; pipe = next) for (pipe = pipelist; pipe; pipe = pipe->next)
{ {
next = pipe->next;
DPRINTF(E_DBG, L_PLAYER, "Processing pipe '%s', state is %d\n", pipe->path, pipe->state); DPRINTF(E_DBG, L_PLAYER, "Processing pipe '%s', state is %d\n", pipe->path, pipe->state);
if ((pipe->state == PIPE_NEW) && (count > PIPE_MAX_OPEN)) if ((pipe->state == PIPE_NEW) && (count > PIPE_MAX_WATCH))
DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path); DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path);
else if (pipe->state == PIPE_NEW) else if (pipe->state == PIPE_NEW)
pipe_open(pipe); pipe_watch_add(pipe);
else if (pipe->state == PIPE_DEL) else if (pipe->state == PIPE_DEL)
pipe_remove(pipe); // Note: Will free pipe pipe_watch_del(pipe);
} }
pipelist_prune();
} }
/* -------------------------- PIPE INPUT INTERFACE ------------------------ */ /* -------------------------- PIPE INPUT INTERFACE ------------------------ */
/* Thread: player/input */ /* Thread: player/input */
static int
pipe_metadata_open(struct pipe *pipe)
{
// char md_pipe_path[PATH_MAX]; //TODO PATH_MAX - limits.h?
/* snprintf(md_pipe_path, sizeof(md_pipe_path), "%s.metadata", pipe->path);
fd = pipe_open(md_pipe_path); // TODO avoid lstat error
if (fd < 0)
*/
return 0;
}
static int static int
setup(struct player_source *ps) setup(struct player_source *ps)
{ {
struct pipe *pipe; struct pipe *pipe;
int ret;
if (pipe_autostart) if (!pipe_autostart)
{ pipelist = pipe_new(ps->path, ps->id);
pipe = pipe_find(ps->id); pipe = pipe_find(ps->id);
if (!pipe) if (!pipe)
{ {
DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path); DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path);
return -1; return -1;
} }
}
else
pipe = pipe_new(ps->path, ps->id);
if (pipe->state != PIPE_OPEN) if (pipe->state != PIPE_OPEN)
{ {
ret = pipe_open(pipe); pipe->fd = pipe_open(pipe->path);
if (ret < 0) if (pipe->fd < 0)
return -1; return -1;
pipe->state = PIPE_OPEN;
} }
pipe_metadata_open(pipe); // pipe_metadata_open(pipe);
if (pipe->ev) if (pipe->ev)
event_del(pipe->ev); // Avoids autostarting pipe if manually started by user event_del(pipe->ev); // Avoids autostarting pipe if manually started by user
@ -409,16 +431,24 @@ stop(struct player_source *ps)
DPRINTF(E_DBG, L_PLAYER, "Stopping pipe\n"); DPRINTF(E_DBG, L_PLAYER, "Stopping pipe\n");
pipe = pipe_find(ps->id); pipe = pipe_find(ps->id);
if (!pipe)
{
DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path);
return -1;
}
if (pipe_autostart) if (pipe_autostart)
{ {
// Reopen pipe since if I just readd the event it instantly makes the // Reopen pipe since if I just readd the event it instantly makes the
// callback (probably something I have missed...) // callback (probably something I have missed...)
pipe_close(pipe); pipe_watch_del(pipe);
pipe_open(pipe); pipe_watch_add(pipe);
} }
else else
{ {
pipe_remove(pipe); pipe_close(pipe->fd);
pipe->state = PIPE_DEL;
pipelist_prune();
} }
ps->setup_done = 0; ps->setup_done = 0;
@ -446,7 +476,8 @@ deinit(void)
for (pipe = pipelist; pipelist; pipe = pipelist) for (pipe = pipelist; pipelist; pipe = pipelist)
{ {
pipelist = pipe->next; pipelist = pipe->next;
pipe_remove(pipe); pipe_watch_del(pipe);
pipe_free(pipe);
} }
if (pipe_autostart) if (pipe_autostart)