[pipe] Use worker thread instead of filescanner for watching pipes

This commit is contained in:
ejurgensen 2017-01-16 22:08:57 +01:00
parent 061beaf272
commit 9fb62441d2
1 changed files with 50 additions and 39 deletions

View File

@ -37,6 +37,7 @@
#include "conffile.h"
#include "listener.h"
#include "player.h"
#include "worker.h"
#include "input.h"
// Maximum number of pipes to watch for data
@ -44,9 +45,7 @@
// Max number of bytes to read from a pipe at a time (
#define PIPE_READ_MAX 65536
// filescanner event base, from filescanner.c
// TODO don't use filescanner thread/base
extern struct event_base *evbase_scan;
extern struct event_base *evbase_worker;
enum pipestate
{
@ -140,7 +139,7 @@ pipe_find(int id)
/* -------------------------------- PIPE I/O ------------------------------ */
/* Thread: filescanner */
/* Thread: worker */
// Updates pipelist with pipe items from the db. Pipes that are no longer in
// the db get marked with PIPE_DEL. Returns count of pipes that should be
@ -192,36 +191,6 @@ pipe_enum(void)
return count;
}
// Some data arrived on a pipe we watch - let's autostart playback
static void
pipe_read_cb(evutil_socket_t fd, short event, void *arg)
{
struct pipe *pipe = arg;
struct player_status status;
struct db_queue_item *queue_item;
int ret;
DPRINTF(E_DBG, L_PLAYER, "Autostarting pipe %d, %d\n", (int) fd, (int)event);
ret = player_get_status(&status);
if ((ret < 0) || (status.status == PLAY_PLAYING))
return;
db_queue_clear();
ret = db_queue_add_by_fileid(pipe->id, 0, 0);
if (ret < 0)
return;
queue_item = db_queue_fetch_byfileid(pipe->id);
if (!queue_item)
return;
player_playback_start_byitem(queue_item);
free_queue_item(queue_item, 0);
}
// Opens a pipe and starts watching it for data if autostart is configured
static int
pipe_open(const char *path)
@ -259,6 +228,40 @@ pipe_close(int fd)
close(fd);
}
// Some data arrived on a pipe we watch - let's autostart playback
static void
pipe_read_cb(evutil_socket_t fd, short event, void *arg)
{
struct pipe *pipe = arg;
struct player_status status;
struct db_queue_item *queue_item;
int ret;
ret = player_get_status(&status);
if ((ret < 0) || (status.status == PLAY_PLAYING))
{
DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s', but we are busy\n", pipe->path);
// FIXME Now the event won't activate any more - even if we are not busy
return;
}
DPRINTF(E_INFO, L_PLAYER, "Autostarting pipe '%s' (fd %d)\n", pipe->path, fd);
db_queue_clear();
ret = db_queue_add_by_fileid(pipe->id, 0, 0);
if (ret < 0)
return;
queue_item = db_queue_fetch_byfileid(pipe->id);
if (!queue_item)
return;
player_playback_start_byitem(queue_item);
free_queue_item(queue_item, 0);
}
static int
pipe_watch_add(struct pipe *pipe)
{
@ -268,7 +271,7 @@ pipe_watch_add(struct pipe *pipe)
pipe->state = PIPE_OPEN;
pipe->ev = event_new(evbase_scan, pipe->fd, EV_READ, pipe_read_cb, pipe);
pipe->ev = event_new(evbase_worker, pipe->fd, EV_READ, pipe_read_cb, pipe);
if (!pipe->ev)
{
DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path);
@ -292,9 +295,10 @@ pipe_watch_del(struct pipe *pipe)
}
/*
static int
pipe_metadata_watch_add(struct pipe *pipe)
static void
pipe_metadata_watch_add(void *arg)
{
struct pipe *pipe = arg;
struct pipe *md_pipe;
char md_pipe_path[PATH_MAX];
int ret;
@ -319,7 +323,7 @@ pipe_metadata_watch_del(struct pipe *pipe)
}*/
static void
pipe_listener_cb(enum listener_event_type type)
pipe_update(void *arg)
{
struct pipe *pipe;
int count;
@ -343,6 +347,13 @@ pipe_listener_cb(enum listener_event_type type)
pipelist_prune();
}
// Thread: filescanner
static void
pipe_listener_cb(enum listener_event_type type)
{
worker_execute(pipe_update, NULL, 0, 0);
}
/* -------------------------- PIPE INPUT INTERFACE ------------------------ */
/* Thread: player/input */
@ -370,7 +381,7 @@ setup(struct player_source *ps)
pipe->state = PIPE_OPEN;
}
// pipe_metadata_open(pipe);
// worker_execute(pipe_metadata_watch_add, pipe, sizeof(struct pipe), 0);
if (pipe->ev)
event_del(pipe->ev); // Avoids autostarting pipe if manually started by user