mirror of
https://github.com/owntone/owntone-server.git
synced 2025-01-26 14:13:18 -05:00
[pipe] Refactor pipe input, hopefully into a more thread safe version
This commit is contained in:
parent
e6793b4779
commit
1f15fb1993
@ -55,6 +55,7 @@
|
||||
#include "listener.h"
|
||||
#include "player.h"
|
||||
#include "worker.h"
|
||||
#include "commands.h"
|
||||
#include "mxml-compat.h"
|
||||
|
||||
// Maximum number of pipes to watch for data
|
||||
@ -64,15 +65,6 @@
|
||||
// Max number of bytes to buffer from metadata pipes
|
||||
#define PIPE_METADATA_BUFLEN_MAX 262144
|
||||
|
||||
extern struct event_base *evbase_worker;
|
||||
|
||||
enum pipestate
|
||||
{
|
||||
PIPE_NEW,
|
||||
PIPE_DEL,
|
||||
PIPE_OPEN,
|
||||
};
|
||||
|
||||
enum pipetype
|
||||
{
|
||||
PIPE_PCM,
|
||||
@ -85,21 +77,25 @@ struct pipe
|
||||
int fd; // File descriptor
|
||||
bool is_autostarted; // We autostarted the pipe (and we will autostop)
|
||||
char *path; // Path
|
||||
enum pipestate state; // Newly appeared, marked for deletion, open/ready
|
||||
enum pipetype type; // PCM (audio) or metadata
|
||||
event_callback_fn cb; // Callback when there is data to read
|
||||
struct event *ev; // Event for the callback
|
||||
// TODO mutex
|
||||
|
||||
struct pipe *next;
|
||||
};
|
||||
|
||||
// The usual thread stuff
|
||||
static pthread_t tid_pipe;
|
||||
static struct event_base *evbase_pipe;
|
||||
static struct commands_base *cmdbase;
|
||||
|
||||
// From config - should we watch library pipes for data or only start on request
|
||||
static int pipe_autostart;
|
||||
// The mfi id of the pipe autostarted by the pipe thread
|
||||
static int pipe_autostart_id;
|
||||
|
||||
// Global list of pipes we are watching. If watching/autostart is disabled this
|
||||
// will just point to the currently playing pipe (if any).
|
||||
static struct pipe *pipelist;
|
||||
// Global list of pipes we are watching (if watching/autostart is enabled)
|
||||
static struct pipe *pipe_watch_list;
|
||||
|
||||
// Single pipe that we start watching for metadata after playback starts
|
||||
static struct pipe *pipe_metadata;
|
||||
@ -112,18 +108,10 @@ static pthread_mutex_t pipe_metadata_lock;
|
||||
// True if there is new metadata to push to the player
|
||||
static bool pipe_metadata_is_new;
|
||||
|
||||
/* ------------------------------- FORWARDS ------------------------------- */
|
||||
|
||||
static void
|
||||
pipe_watch_reset(struct pipe *pipe);
|
||||
|
||||
static void
|
||||
pipe_metadata_watch_del(void *arg);
|
||||
|
||||
/* -------------------------------- HELPERS ------------------------------- */
|
||||
|
||||
static struct pipe *
|
||||
pipe_new(const char *path, int id, enum pipetype type, event_callback_fn cb)
|
||||
pipe_create(const char *path, int id, enum pipetype type, event_callback_fn cb)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
|
||||
@ -131,16 +119,9 @@ pipe_new(const char *path, int id, enum pipetype type, event_callback_fn cb)
|
||||
pipe->path = strdup(path);
|
||||
pipe->id = id;
|
||||
pipe->fd = -1;
|
||||
pipe->state = PIPE_NEW;
|
||||
pipe->type = type;
|
||||
pipe->cb = cb;
|
||||
|
||||
if (type == PIPE_PCM)
|
||||
{
|
||||
pipe->next = pipelist;
|
||||
pipelist = pipe;
|
||||
}
|
||||
|
||||
return pipe;
|
||||
}
|
||||
|
||||
@ -151,39 +132,129 @@ pipe_free(struct pipe *pipe)
|
||||
free(pipe);
|
||||
}
|
||||
|
||||
static void
|
||||
pipelist_prune(void)
|
||||
static int
|
||||
pipe_open(const char *path, bool silent)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
struct pipe *next;
|
||||
struct stat sb;
|
||||
int fd;
|
||||
|
||||
for (pipe = pipelist; pipe; pipe = next)
|
||||
DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", path);
|
||||
|
||||
if (lstat(path, &sb) < 0)
|
||||
{
|
||||
next = pipe->next;
|
||||
|
||||
if (pipelist->state == PIPE_DEL)
|
||||
{
|
||||
pipe_free(pipelist);
|
||||
pipelist = next;
|
||||
}
|
||||
else if (next && (next->state == PIPE_DEL))
|
||||
{
|
||||
pipe->next = next->next;
|
||||
pipe_free(next);
|
||||
next = pipe->next;
|
||||
}
|
||||
if (!silent)
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!S_ISFIFO(sb.st_mode))
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", path);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fd = open(path, O_RDONLY | O_NONBLOCK);
|
||||
if (fd < 0)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
static void
|
||||
pipe_close(int fd)
|
||||
{
|
||||
if (fd >= 0)
|
||||
close(fd);
|
||||
}
|
||||
|
||||
static int
|
||||
watch_add(struct pipe *pipe)
|
||||
{
|
||||
bool silent;
|
||||
|
||||
silent = (pipe->type == PIPE_METADATA);
|
||||
pipe->fd = pipe_open(pipe->path, silent);
|
||||
if (pipe->fd < 0)
|
||||
return -1;
|
||||
|
||||
pipe->ev = event_new(evbase_pipe, pipe->fd, EV_READ, pipe->cb, pipe);
|
||||
if (!pipe->ev)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path);
|
||||
pipe_close(pipe->fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
event_add(pipe->ev, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
watch_del(struct pipe *pipe)
|
||||
{
|
||||
if (pipe->ev)
|
||||
event_free(pipe->ev);
|
||||
|
||||
pipe_close(pipe->fd);
|
||||
|
||||
pipe->fd = -1;
|
||||
}
|
||||
|
||||
// If a read on pipe returns 0 it is an EOF, and we must close it and reopen it
|
||||
// for renewed watching. The event will be freed and reallocated by this.
|
||||
static int
|
||||
watch_reset(struct pipe *pipe)
|
||||
{
|
||||
watch_del(pipe);
|
||||
|
||||
return watch_add(pipe);
|
||||
}
|
||||
|
||||
static void
|
||||
pipelist_add(struct pipe **list, struct pipe *pipe)
|
||||
{
|
||||
pipe->next = *list;
|
||||
*list = pipe;
|
||||
}
|
||||
|
||||
static void
|
||||
pipelist_remove(struct pipe **list, struct pipe *pipe)
|
||||
{
|
||||
struct pipe *prev = NULL;
|
||||
struct pipe *p;
|
||||
|
||||
for (p = *list; p; p = p->next)
|
||||
{
|
||||
if (p->id == pipe->id)
|
||||
break;
|
||||
|
||||
prev = p;
|
||||
}
|
||||
|
||||
if (!p)
|
||||
return;
|
||||
|
||||
if (!prev)
|
||||
*list = pipe->next;
|
||||
else
|
||||
prev->next = pipe->next;
|
||||
|
||||
pipe_free(pipe);
|
||||
}
|
||||
|
||||
static struct pipe *
|
||||
pipe_find(int id)
|
||||
pipelist_find(struct pipe *list, int id)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
struct pipe *p;
|
||||
|
||||
for (pipe = pipelist; pipe; pipe = pipe->next)
|
||||
for (p = list; p; p = p->next)
|
||||
{
|
||||
if (id == pipe->id)
|
||||
return pipe;
|
||||
if (id == p->id)
|
||||
return p;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
@ -358,8 +429,8 @@ pipe_metadata_parse(struct input_metadata *m, struct evbuffer *evbuf)
|
||||
}
|
||||
|
||||
|
||||
/* ---------------------------- GENERAL PIPE I/O -------------------------- */
|
||||
/* Thread: worker */
|
||||
/* ----------------------------- PIPE WATCHING ---------------------------- */
|
||||
/* Thread: pipe */
|
||||
|
||||
// Some data arrived on a pipe we watch - let's autostart playback
|
||||
static void
|
||||
@ -367,205 +438,119 @@ pipe_read_cb(evutil_socket_t fd, short event, void *arg)
|
||||
{
|
||||
struct pipe *pipe = arg;
|
||||
struct player_status status;
|
||||
struct db_queue_item *queue_item;
|
||||
int ret;
|
||||
|
||||
ret = player_get_status(&status);
|
||||
if ((ret < 0) || (status.status == PLAY_PLAYING))
|
||||
if (status.id == pipe->id)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s', but player is busy\n", pipe->path);
|
||||
pipe_watch_reset(pipe);
|
||||
DPRINTF(E_DBG, L_PLAYER, "Pipe '%s' already playing\n", pipe->path);
|
||||
return; // We are already playing the pipe
|
||||
}
|
||||
else if ((ret < 0) || (status.status == PLAY_PLAYING))
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Data arrived on pipe '%s' - ignoring, player is busy\n", pipe->path);
|
||||
// FIXME What to do in this situation? Can't re-add the event, since it
|
||||
// will trigger right away, but also not good to stop watching the pipe
|
||||
// like we do right now.
|
||||
return;
|
||||
}
|
||||
|
||||
DPRINTF(E_INFO, L_PLAYER, "Autostarting pipe '%s' (fd %d)\n", pipe->path, fd);
|
||||
|
||||
db_queue_clear();
|
||||
player_playback_stop();
|
||||
|
||||
ret = db_queue_add_by_fileid(pipe->id, 0, 0);
|
||||
ret = player_playback_start_byid(pipe->id);
|
||||
if (ret < 0)
|
||||
return;
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Autostarting pipe '%s' (fd %d) failed\n", pipe->path, fd);
|
||||
return;
|
||||
}
|
||||
|
||||
queue_item = db_queue_fetch_byfileid(pipe->id);
|
||||
if (!queue_item)
|
||||
return;
|
||||
|
||||
player_playback_start_byitem(queue_item);
|
||||
|
||||
pipe->is_autostarted = 1;
|
||||
|
||||
free_queue_item(queue_item, 0);
|
||||
pipe_autostart_id = pipe->id;
|
||||
}
|
||||
|
||||
// Updates pipelist with pipe items from the db. Pipes that are no longer in
|
||||
// the db get marked with PIPE_DEL. Returns count of pipes that should be
|
||||
// watched (may or may not equal length of pipelist)
|
||||
static int
|
||||
pipe_enum(void)
|
||||
static enum command_state
|
||||
pipe_watch_reset(void *arg, int *retval)
|
||||
{
|
||||
struct query_params qp;
|
||||
struct db_media_file_info dbmfi;
|
||||
int *id = arg;
|
||||
struct pipe *pipe;
|
||||
char filter[32];
|
||||
|
||||
pipe_autostart_id = 0;
|
||||
|
||||
pipe = pipelist_find(pipe_watch_list, *id);
|
||||
|
||||
*retval = watch_reset(pipe);
|
||||
|
||||
return COMMAND_END;
|
||||
}
|
||||
|
||||
static enum command_state
|
||||
pipe_watch_update(void *arg, int *retval)
|
||||
{
|
||||
struct pipe *pipelist = arg;
|
||||
struct pipe *pipe;
|
||||
struct pipe *next;
|
||||
int count;
|
||||
int id;
|
||||
int ret;
|
||||
|
||||
memset(&qp, 0, sizeof(struct query_params));
|
||||
qp.type = Q_ITEMS;
|
||||
qp.filter = filter;
|
||||
|
||||
snprintf(filter, sizeof(filter), "f.data_kind = %d", DATA_KIND_PIPE);
|
||||
|
||||
ret = db_query_start(&qp);
|
||||
if (ret < 0)
|
||||
return -1;
|
||||
|
||||
for (pipe = pipelist; pipe; pipe = pipe->next)
|
||||
pipe->state = PIPE_DEL;
|
||||
|
||||
count = 0;
|
||||
while (((ret = db_query_fetch_file(&qp, &dbmfi)) == 0) && (dbmfi.id))
|
||||
for (pipe = pipe_watch_list; pipe; pipe = next)
|
||||
{
|
||||
ret = safe_atoi32(dbmfi.id, &id);
|
||||
if (ret < 0)
|
||||
continue;
|
||||
next = pipe->next;
|
||||
|
||||
count++;
|
||||
|
||||
if ((pipe = pipe_find(id)))
|
||||
if (!pipelist_find(pipelist, pipe->id))
|
||||
{
|
||||
pipe->state = PIPE_OPEN;
|
||||
DPRINTF(E_DBG, L_PLAYER, "Pipe deleted: '%s'\n", pipe->path);
|
||||
watch_del(pipe);
|
||||
pipelist_remove(&pipe_watch_list, pipe); // Will free pipe
|
||||
continue;
|
||||
}
|
||||
|
||||
pipe_new(dbmfi.path, id, PIPE_PCM, pipe_read_cb);
|
||||
count++;
|
||||
pipelist_remove(&pipelist, pipe);
|
||||
}
|
||||
|
||||
db_query_end(&qp);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
// Opens a pipe and starts watching it for data if autostart is configured
|
||||
static int
|
||||
pipe_open(const char *path, bool silent)
|
||||
{
|
||||
struct stat sb;
|
||||
int fd;
|
||||
|
||||
DPRINTF(E_DBG, L_PLAYER, "(Re)opening pipe: '%s'\n", path);
|
||||
|
||||
if (lstat(path, &sb) < 0)
|
||||
{
|
||||
if (!silent)
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not lstat() '%s': %s\n", path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!S_ISFIFO(sb.st_mode))
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Source type is pipe, but path is not a fifo: %s\n", path);
|
||||
return -1;
|
||||
}
|
||||
|
||||
fd = open(path, O_RDONLY | O_NONBLOCK);
|
||||
if (fd < 0)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not open pipe for reading '%s': %s\n", path, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return fd;
|
||||
}
|
||||
|
||||
static void
|
||||
pipe_close(int fd)
|
||||
{
|
||||
if (fd >= 0)
|
||||
close(fd);
|
||||
}
|
||||
|
||||
static int
|
||||
pipe_watch_add(struct pipe *pipe)
|
||||
{
|
||||
bool silent;
|
||||
|
||||
silent = (pipe->type == PIPE_METADATA);
|
||||
pipe->fd = pipe_open(pipe->path, silent);
|
||||
if (pipe->fd < 0)
|
||||
return -1;
|
||||
|
||||
pipe->ev = event_new(evbase_worker, pipe->fd, EV_READ, pipe->cb, pipe);
|
||||
if (!pipe->ev)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not watch pipe for new data '%s'\n", pipe->path);
|
||||
pipe_close(pipe->fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
event_add(pipe->ev, NULL);
|
||||
|
||||
pipe->state = PIPE_OPEN;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
pipe_watch_del(struct pipe *pipe)
|
||||
{
|
||||
if (pipe->ev)
|
||||
event_free(pipe->ev);
|
||||
|
||||
pipe_close(pipe->fd);
|
||||
|
||||
pipe->fd = -1;
|
||||
}
|
||||
|
||||
// If a read on pipe returns 0 it is an EOF, and we must close it and reopen it
|
||||
// for renewed watching. The event will be freed and reallocated by this.
|
||||
static void
|
||||
pipe_watch_reset(struct pipe *pipe)
|
||||
{
|
||||
pipe_watch_del(pipe);
|
||||
pipe_watch_add(pipe);
|
||||
}
|
||||
|
||||
static void
|
||||
pipe_watch_update(void *arg)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
int count;
|
||||
|
||||
count = pipe_enum(); // Count does not include pipes with state PIPE_DEL
|
||||
if (count < 0)
|
||||
return;
|
||||
|
||||
for (pipe = pipelist; pipe; pipe = pipe->next)
|
||||
{
|
||||
DPRINTF(E_DBG, L_PLAYER, "Processing pipe '%s', state is %d\n", pipe->path, pipe->state);
|
||||
count++;
|
||||
if (count > PIPE_MAX_WATCH)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path);
|
||||
break;
|
||||
}
|
||||
|
||||
if ((pipe->state == PIPE_NEW) && (count > PIPE_MAX_WATCH))
|
||||
DPRINTF(E_LOG, L_PLAYER, "Max open pipes reached, will not watch %s\n", pipe->path);
|
||||
else if (pipe->state == PIPE_NEW)
|
||||
pipe_watch_add(pipe);
|
||||
else if (pipe->state == PIPE_DEL)
|
||||
pipe_watch_del(pipe);
|
||||
DPRINTF(E_DBG, L_PLAYER, "Pipe added: '%s'\n", pipe->path);
|
||||
watch_add(pipe);
|
||||
pipelist_add(&pipe_watch_list, pipe);
|
||||
}
|
||||
|
||||
pipelist_prune();
|
||||
*retval = 0;
|
||||
return COMMAND_PENDING; // Stops commands.c from freeing any of the pipes
|
||||
}
|
||||
|
||||
// Thread: filescanner
|
||||
static void
|
||||
pipe_listener_cb(enum listener_event_type type)
|
||||
static void *
|
||||
pipe_thread_run(void *arg)
|
||||
{
|
||||
worker_execute(pipe_watch_update, NULL, 0, 0);
|
||||
event_base_dispatch(evbase_pipe);
|
||||
|
||||
pthread_exit(NULL);
|
||||
}
|
||||
|
||||
|
||||
/* -------------------------- METADATA PIPE HANDLING ---------------------- */
|
||||
/* Thread: worker */
|
||||
|
||||
static void
|
||||
pipe_metadata_watch_del(void *arg)
|
||||
{
|
||||
if (!pipe_metadata)
|
||||
return;
|
||||
|
||||
evbuffer_free(pipe_metadata_buf);
|
||||
watch_del(pipe_metadata);
|
||||
pipe_free(pipe_metadata);
|
||||
pipe_metadata = NULL;
|
||||
}
|
||||
|
||||
// Some metadata arrived on a pipe we watch
|
||||
static void
|
||||
pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg)
|
||||
@ -581,7 +566,10 @@ pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg)
|
||||
}
|
||||
else if (ret == 0)
|
||||
{
|
||||
pipe_watch_reset(pipe_metadata);
|
||||
// Reset the pipe
|
||||
ret = watch_reset(pipe_metadata);
|
||||
if (ret < 0)
|
||||
return;
|
||||
goto readd;
|
||||
}
|
||||
|
||||
@ -620,13 +608,13 @@ pipe_metadata_watch_add(void *arg)
|
||||
if ((ret < 0) || (ret > sizeof(path)))
|
||||
return;
|
||||
|
||||
pipe_metadata = pipe_new(path, 0, PIPE_METADATA, pipe_metadata_read_cb);
|
||||
pipe_metadata = pipe_create(path, 0, PIPE_METADATA, pipe_metadata_read_cb);
|
||||
if (!pipe_metadata)
|
||||
return;
|
||||
|
||||
pipe_metadata_buf = evbuffer_new();
|
||||
|
||||
ret = pipe_watch_add(pipe_metadata);
|
||||
ret = watch_add(pipe_metadata);
|
||||
if (ret < 0)
|
||||
{
|
||||
evbuffer_free(pipe_metadata_buf);
|
||||
@ -636,16 +624,95 @@ pipe_metadata_watch_add(void *arg)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* ---------------------- PIPE WATCH THREAD START/STOP -------------------- */
|
||||
/* Thread: filescanner */
|
||||
|
||||
static void
|
||||
pipe_metadata_watch_del(void *arg)
|
||||
pipe_thread_start(void)
|
||||
{
|
||||
if (!pipe_metadata)
|
||||
CHECK_NULL(L_PLAYER, evbase_pipe = event_base_new());
|
||||
CHECK_NULL(L_PLAYER, cmdbase = commands_base_new(evbase_pipe, NULL));
|
||||
CHECK_ERR(L_PLAYER, pthread_create(&tid_pipe, NULL, pipe_thread_run, NULL));
|
||||
|
||||
#if defined(HAVE_PTHREAD_SETNAME_NP)
|
||||
pthread_setname_np(tid_pipe, "pipe");
|
||||
#elif defined(HAVE_PTHREAD_SET_NAME_NP)
|
||||
pthread_set_name_np(tid_pipe, "pipe");
|
||||
#endif
|
||||
}
|
||||
|
||||
static void
|
||||
pipe_thread_stop(void)
|
||||
{
|
||||
if (!tid_pipe)
|
||||
return;
|
||||
|
||||
evbuffer_free(pipe_metadata_buf);
|
||||
pipe_watch_del(pipe_metadata);
|
||||
pipe_free(pipe_metadata);
|
||||
pipe_metadata = NULL;
|
||||
commands_exec_sync(cmdbase, pipe_watch_update, NULL, NULL);
|
||||
|
||||
commands_base_destroy(cmdbase);
|
||||
pthread_join(tid_pipe, NULL);
|
||||
event_base_free(evbase_pipe);
|
||||
tid_pipe = 0;
|
||||
}
|
||||
|
||||
// Makes a pipelist with pipe items from the db, returns NULL on no pipes
|
||||
static struct pipe *
|
||||
pipelist_create(void)
|
||||
{
|
||||
struct query_params qp;
|
||||
struct db_media_file_info dbmfi;
|
||||
struct pipe *head;
|
||||
struct pipe *pipe;
|
||||
char filter[32];
|
||||
int id;
|
||||
int ret;
|
||||
|
||||
memset(&qp, 0, sizeof(struct query_params));
|
||||
qp.type = Q_ITEMS;
|
||||
qp.filter = filter;
|
||||
|
||||
snprintf(filter, sizeof(filter), "f.data_kind = %d", DATA_KIND_PIPE);
|
||||
|
||||
ret = db_query_start(&qp);
|
||||
if (ret < 0)
|
||||
return NULL;
|
||||
|
||||
head = NULL;
|
||||
while (((ret = db_query_fetch_file(&qp, &dbmfi)) == 0) && (dbmfi.id))
|
||||
{
|
||||
ret = safe_atoi32(dbmfi.id, &id);
|
||||
if (ret < 0)
|
||||
continue;
|
||||
|
||||
pipe = pipe_create(dbmfi.path, id, PIPE_PCM, pipe_read_cb);
|
||||
pipelist_add(&head, pipe);
|
||||
}
|
||||
|
||||
db_query_end(&qp);
|
||||
|
||||
return head;
|
||||
}
|
||||
|
||||
// Queries the db to see if any pipes are present in the library. If so, starts
|
||||
// the pipe thread to watch the pipes. If no pipes in library, it will shut down
|
||||
// the pipe thread.
|
||||
static void
|
||||
pipe_listener_cb(enum listener_event_type type)
|
||||
{
|
||||
struct pipe *pipelist;
|
||||
|
||||
pipelist = pipelist_create();
|
||||
if (!pipelist)
|
||||
{
|
||||
pipe_thread_stop();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!tid_pipe)
|
||||
pipe_thread_start();
|
||||
|
||||
commands_exec_async(cmdbase, pipe_watch_update, pipelist);
|
||||
}
|
||||
|
||||
|
||||
@ -656,28 +723,19 @@ static int
|
||||
setup(struct player_source *ps)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
int fd;
|
||||
|
||||
// If autostart is disabled then this is the first time we encounter the pipe
|
||||
if (!pipe_autostart)
|
||||
pipe_new(ps->path, ps->id, PIPE_PCM, NULL);
|
||||
fd = pipe_open(ps->path, 0);
|
||||
if (fd < 0)
|
||||
return -1;
|
||||
|
||||
pipe = pipe_find(ps->id);
|
||||
if (!pipe)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path);
|
||||
return -1;
|
||||
}
|
||||
CHECK_NULL(L_PLAYER, pipe = pipe_create(ps->path, ps->id, PIPE_PCM, NULL));
|
||||
pipe->fd = fd;
|
||||
pipe->is_autostarted = (ps->id == pipe_autostart_id);
|
||||
|
||||
if (pipe->state != PIPE_OPEN)
|
||||
{
|
||||
pipe->fd = pipe_open(pipe->path, 0);
|
||||
if (pipe->fd < 0)
|
||||
return -1;
|
||||
pipe->state = PIPE_OPEN;
|
||||
}
|
||||
|
||||
worker_execute(pipe_metadata_watch_add, pipe->path, strlen(pipe->path) + 1, 0);
|
||||
worker_execute(pipe_metadata_watch_add, ps->path, strlen(ps->path) + 1, 0);
|
||||
|
||||
ps->input_ctx = pipe;
|
||||
ps->setup_done = 1;
|
||||
|
||||
return 0;
|
||||
@ -686,15 +744,11 @@ setup(struct player_source *ps)
|
||||
static int
|
||||
start(struct player_source *ps)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
struct pipe *pipe = ps->input_ctx;
|
||||
struct evbuffer *evbuf;
|
||||
short flags;
|
||||
int ret;
|
||||
|
||||
pipe = pipe_find(ps->id);
|
||||
if (!pipe)
|
||||
return -1;
|
||||
|
||||
evbuf = evbuffer_new();
|
||||
if (!evbuf)
|
||||
{
|
||||
@ -718,7 +772,7 @@ start(struct player_source *ps)
|
||||
}
|
||||
else if (ret < 0)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not read from pipe: %s\n", strerror(errno));
|
||||
DPRINTF(E_LOG, L_PLAYER, "Could not read from pipe '%s': %s\n", ps->path, strerror(errno));
|
||||
break;
|
||||
}
|
||||
|
||||
@ -738,34 +792,26 @@ start(struct player_source *ps)
|
||||
static int
|
||||
stop(struct player_source *ps)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
struct pipe *pipe = ps->input_ctx;
|
||||
int *id;
|
||||
|
||||
DPRINTF(E_DBG, L_PLAYER, "Stopping pipe\n");
|
||||
|
||||
pipe = pipe_find(ps->id);
|
||||
if (!pipe)
|
||||
{
|
||||
DPRINTF(E_LOG, L_PLAYER, "Unknown pipe '%s'\n", ps->path);
|
||||
return -1;
|
||||
}
|
||||
pipe_close(pipe->fd);
|
||||
|
||||
if (!pipe_autostart)
|
||||
{
|
||||
// Since autostart is disabled we are now done with the pipe
|
||||
pipe_close(pipe->fd);
|
||||
pipe->state = PIPE_DEL;
|
||||
pipelist_prune();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Reset the pipe and start watching it again for new data
|
||||
pipe->is_autostarted = 0;
|
||||
pipe_watch_reset(pipe);
|
||||
}
|
||||
// Reset the pipe and start watching it again for new data. Must be async or
|
||||
// we will deadlock from the stop in pipe_read_cb().
|
||||
id = malloc(sizeof(int));
|
||||
*id = pipe->id;
|
||||
if (pipe_autostart)
|
||||
commands_exec_async(cmdbase, pipe_watch_reset, id);
|
||||
|
||||
if (pipe_metadata)
|
||||
worker_execute(pipe_metadata_watch_del, NULL, 0, 0);
|
||||
|
||||
pipe_free(pipe);
|
||||
|
||||
ps->input_ctx = NULL;
|
||||
ps->setup_done = 0;
|
||||
|
||||
return 0;
|
||||
@ -806,32 +852,25 @@ metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t
|
||||
static int
|
||||
init(void)
|
||||
{
|
||||
pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart");
|
||||
|
||||
CHECK_ERR(L_PLAYER, mutex_init(&pipe_metadata_lock));
|
||||
|
||||
pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart");
|
||||
if (pipe_autostart)
|
||||
return listener_add(pipe_listener_cb, LISTENER_DATABASE);
|
||||
else
|
||||
return 0;
|
||||
CHECK_ERR(L_PLAYER, listener_add(pipe_listener_cb, LISTENER_DATABASE));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
deinit(void)
|
||||
{
|
||||
struct pipe *pipe;
|
||||
|
||||
for (pipe = pipelist; pipelist; pipe = pipelist)
|
||||
if (pipe_autostart)
|
||||
{
|
||||
pipelist = pipe->next;
|
||||
pipe_watch_del(pipe);
|
||||
pipe_free(pipe);
|
||||
listener_remove(pipe_listener_cb);
|
||||
pipe_thread_stop();
|
||||
}
|
||||
|
||||
CHECK_ERR(L_PLAYER, pthread_mutex_destroy(&pipe_metadata_lock));
|
||||
|
||||
if (pipe_autostart)
|
||||
listener_remove(pipe_listener_cb);
|
||||
}
|
||||
|
||||
struct input_definition input_pipe =
|
||||
|
Loading…
x
Reference in New Issue
Block a user