[pipe] Prepare reading and parsing Shairport metadata pipes

This commit is contained in:
ejurgensen 2017-01-22 23:24:47 +01:00
parent 8b5cac0538
commit ae1f2d75d3

View File

@ -57,8 +57,10 @@
// Maximum number of pipes to watch for data // Maximum number of pipes to watch for data
#define PIPE_MAX_WATCH 4 #define PIPE_MAX_WATCH 4
// Max number of bytes to read from a pipe at a time ( // Max number of bytes to read from a pipe at a time
#define PIPE_READ_MAX 65536 #define PIPE_READ_MAX 65536
// Max number of bytes to buffer from metadata pipes
#define PIPE_METADATA_BUFLEN_MAX 262144
extern struct event_base *evbase_worker; extern struct event_base *evbase_worker;
@ -99,12 +101,18 @@ static struct pipe *pipelist;
// Single pipe that we start watching for metadata after playback starts // Single pipe that we start watching for metadata after playback starts
static struct pipe *pipe_metadata; static struct pipe *pipe_metadata;
// We read metadata into this evbuffer
static struct evbuffer *pipe_metadata_buf;
// True if there is new metadata to push to the player
static bool pipe_metadata_is_new;
/* ------------------------------- FORWARDS ------------------------------- */ /* ------------------------------- FORWARDS ------------------------------- */
static void static void
pipe_watch_reset(struct pipe *pipe); pipe_watch_reset(struct pipe *pipe);
static void
pipe_metadata_watch_del(void *arg);
/* -------------------------------- HELPERS ------------------------------- */ /* -------------------------------- HELPERS ------------------------------- */
@ -175,6 +183,23 @@ pipe_find(int id)
return NULL; return NULL;
} }
static int
pipe_metadata_parse(struct evbuffer *evbuf)
{
char *line;
size_t size;
while ( (line = evbuffer_readln(evbuf, &size, EVBUFFER_EOL_CRLF)) )
{
DPRINTF(E_DBG, L_PLAYER, "Parsing %s\n", line);
free(line);
}
evbuffer_drain(evbuf, evbuffer_get_length(evbuf));
return 0;
}
/* ---------------------------- GENERAL PIPE I/O -------------------------- */ /* ---------------------------- GENERAL PIPE I/O -------------------------- */
/* Thread: worker */ /* Thread: worker */
@ -388,30 +413,55 @@ pipe_listener_cb(enum listener_event_type type)
static void static void
pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg)
{ {
struct evbuffer *evbuf; struct evbuffer_ptr evptr;
int ret; int ret;
DPRINTF(E_DBG, L_PLAYER, "BANG\n"); DPRINTF(E_DBG, L_PLAYER, "BANG\n");
evbuf = evbuffer_new(); ret = evbuffer_read(pipe_metadata_buf, pipe_metadata->fd, PIPE_READ_MAX);
ret = evbuffer_read(evbuf, pipe_metadata->fd, PIPE_READ_MAX); if (ret < 0)
evbuffer_free(evbuf); {
if (errno != EAGAIN)
pipe_metadata_watch_del(NULL);
return;
}
else if (ret == 0)
{
pipe_watch_reset(pipe_metadata);
goto readd;
}
event_add(pipe_metadata->ev, NULL); if (evbuffer_get_length(pipe_metadata_buf) > PIPE_METADATA_BUFLEN_MAX)
/* {
else if (ret < 0) DPRINTF(E_LOG, L_PLAYER, "Can't process data from metadata pipe, reading will stop\n");
give up pipe_metadata_watch_del(NULL);
goto out; return;
if (ret == 0) }
pipe_reset(pipe_metadata);
evbuffer_add_printf(evbuf, "\n");
DPRINTF(E_DBG, L_PLAYER, "Got some pipe metadata\n%s", (char *)evbuffer_pullup(evbuf, -1));
event_add(pipe_metadata->ev, NULL); // Did we get the end tag? If not return to wait for more data
evptr = evbuffer_search(pipe_metadata_buf, "</item>", strlen("</item>"), NULL);
if (evptr.pos < 0)
{
DPRINTF(E_DBG, L_PLAYER, "Incomplete\n");
goto readd;
}
out: // NULL-terminate the buffer
evbuffer_free(evbuf); evbuffer_add(pipe_metadata_buf, "", 1);
pipe_reset(pipe_metadata);*/
ret = pipe_metadata_parse(pipe_metadata_buf);
if (ret < 0)
{
pipe_metadata_watch_del(NULL);
return;
}
// Trigger notification in playback loop
pipe_metadata_is_new = 1;
readd:
if (pipe_metadata && pipe_metadata->ev)
event_add(pipe_metadata->ev, NULL);
} }
static void static void
@ -429,9 +479,12 @@ pipe_metadata_watch_add(void *arg)
if (!pipe_metadata) if (!pipe_metadata)
return; return;
pipe_metadata_buf = evbuffer_new();
ret = pipe_watch_add(pipe_metadata); ret = pipe_watch_add(pipe_metadata);
if (ret < 0) if (ret < 0)
{ {
evbuffer_free(pipe_metadata_buf);
pipe_free(pipe_metadata); pipe_free(pipe_metadata);
pipe_metadata = NULL; pipe_metadata = NULL;
return; return;
@ -444,6 +497,7 @@ pipe_metadata_watch_del(void *arg)
if (!pipe_metadata) if (!pipe_metadata)
return; return;
evbuffer_free(pipe_metadata_buf);
pipe_watch_del(pipe_metadata); pipe_watch_del(pipe_metadata);
pipe_free(pipe_metadata); pipe_free(pipe_metadata);
pipe_metadata = NULL; pipe_metadata = NULL;
@ -492,6 +546,7 @@ start(struct player_source *ps)
{ {
struct pipe *pipe; struct pipe *pipe;
struct evbuffer *evbuf; struct evbuffer *evbuf;
short flags;
int ret; int ret;
pipe = pipe_find(ps->id); pipe = pipe_find(ps->id);
@ -525,7 +580,10 @@ start(struct player_source *ps)
break; break;
} }
ret = input_write(evbuf, 0); flags = (pipe_metadata_is_new ? INPUT_FLAG_METADATA : 0);
pipe_metadata_is_new = 0;
ret = input_write(evbuf, flags);
if (ret < 0) if (ret < 0)
break; break;
} }