[pipe] Make Shairport metadata parser work with incomplete reads from pipe

This commit is contained in:
ejurgensen 2017-01-28 21:47:30 +01:00
parent 41c5ef1474
commit 12567d8e93

View File

@ -107,6 +107,8 @@ static struct pipe *pipe_metadata;
static struct evbuffer *pipe_metadata_buf; static struct evbuffer *pipe_metadata_buf;
// Parsed metadata goes here // Parsed metadata goes here
static struct input_metadata pipe_metadata_parsed; static struct input_metadata pipe_metadata_parsed;
// Mutex to share the parsed metadata
static pthread_mutex_t pipe_metadata_lock;
// True if there is new metadata to push to the player // True if there is new metadata to push to the player
static bool pipe_metadata_is_new; static bool pipe_metadata_is_new;
@ -223,31 +225,48 @@ parse_progress(struct input_metadata *m, char *progress)
m->song_length = (end - start) * 10 / 441; // Convert to ms based on 44100 m->song_length = (end - start) * 10 / 441; // Convert to ms based on 44100
} }
// returns 0 on metadata found, otherwise -1 // returns 1 on metadata found, 0 on nothing, -1 on error
static int static int
parse_item(struct input_metadata *m, mxml_node_t *item) parse_item(struct input_metadata *m, const char *item)
{ {
mxml_node_t *xml;
mxml_node_t *haystack;
mxml_node_t *needle; mxml_node_t *needle;
const char *s; const char *s;
uint32_t type; uint32_t type;
uint32_t code; uint32_t code;
char *progress; char *progress;
char **data; char **data;
int ret;
ret = 0;
xml = mxmlNewXML("1.0");
if (!xml)
return -1;
// DPRINTF(E_DBG, L_PLAYER, "Parsing %s\n", item);
haystack = mxmlLoadString(xml, item, MXML_NO_CALLBACK);
if (!haystack)
{
DPRINTF(E_LOG, L_PLAYER, "Could not parse pipe metadata\n");
goto out_error;
}
type = 0; type = 0;
if ( (needle = mxmlFindElement(item, item, "type", NULL, NULL, MXML_DESCEND)) && if ( (needle = mxmlFindElement(haystack, haystack, "type", NULL, NULL, MXML_DESCEND)) &&
(s = mxmlGetText(needle, NULL)) ) (s = mxmlGetText(needle, NULL)) )
sscanf(s, "%8x", &type); sscanf(s, "%8x", &type);
code = 0; code = 0;
if ( (needle = mxmlFindElement(item, item, "code", NULL, NULL, MXML_DESCEND)) && if ( (needle = mxmlFindElement(haystack, haystack, "code", NULL, NULL, MXML_DESCEND)) &&
(s = mxmlGetText(needle, NULL)) ) (s = mxmlGetText(needle, NULL)) )
sscanf(s, "%8x", &code); sscanf(s, "%8x", &code);
if (!type || !code) if (!type || !code)
{ {
DPRINTF(E_WARN, L_PLAYER, "No type (%d) or code (%d) in pipe metadata\n", type, code); DPRINTF(E_LOG, L_PLAYER, "No type (%d) or code (%d) in pipe metadata, aborting\n", type, code);
return -1; goto out_error;
} }
if (code == dmapval("asal")) if (code == dmapval("asal"))
@ -261,11 +280,13 @@ parse_item(struct input_metadata *m, mxml_node_t *item)
else if (code == dmapval("prgr")) else if (code == dmapval("prgr"))
data = &progress; data = &progress;
else else
return -1; goto out_nothing;
if ( (needle = mxmlFindElement(item, item, "data", NULL, NULL, MXML_DESCEND)) && if ( (needle = mxmlFindElement(haystack, haystack, "data", NULL, NULL, MXML_DESCEND)) &&
(s = mxmlGetText(needle, NULL)) ) (s = mxmlGetText(needle, NULL)) )
{ {
pthread_mutex_lock(&pipe_metadata_lock);
if (data != &progress) if (data != &progress)
free(*data); free(*data);
@ -279,53 +300,64 @@ parse_item(struct input_metadata *m, mxml_node_t *item)
free(*data); free(*data);
} }
return 0; pthread_mutex_unlock(&pipe_metadata_lock);
ret = 1;
} }
out_nothing:
mxmlDelete(xml);
return ret;
out_error:
mxmlDelete(xml);
return -1; return -1;
} }
static char *
extract_item(struct evbuffer *evbuf)
{
struct evbuffer_ptr evptr;
size_t size;
char *item;
evptr = evbuffer_search(evbuf, "</item>", strlen("</item>"), NULL);
if (evptr.pos < 0)
return NULL;
size = evptr.pos + strlen("</item>") + 1;
item = malloc(size);
if (!item)
return NULL;
evbuffer_remove(evbuf, item, size - 1);
item[size - 1] = '\0';
return item;
}
static int static int
pipe_metadata_parse(struct input_metadata *m, struct evbuffer *evbuf) pipe_metadata_parse(struct input_metadata *m, struct evbuffer *evbuf)
{ {
mxml_node_t *xml; char *item;
mxml_node_t *haystack;
mxml_node_t *item;
const char *s;
int found; int found;
int ret;
xml = mxmlNewXML("1.0");
if (!xml)
return -1;
s = (char *)evbuffer_pullup(evbuf, -1);
haystack = mxmlLoadString(xml, s, MXML_NO_CALLBACK);
if (!haystack)
{
DPRINTF(E_LOG, L_PLAYER, "Could not parse pipe metadata\n");
mxmlDelete(xml);
evbuffer_drain(evbuf, evbuffer_get_length(evbuf));
return -1;
}
evbuffer_drain(evbuf, evbuffer_get_length(evbuf));
// DPRINTF(E_DBG, L_PLAYER, "Parsing %s\n", s);
found = 0; found = 0;
for (item = mxmlGetFirstChild(haystack); item; item = mxmlWalkNext(item, haystack, MXML_NO_DESCEND)) while ((item = extract_item(evbuf)))
{ {
if (mxmlGetType(item) != 0) ret = parse_item(m, item);
continue; free(item);
if (ret < 0)
if (parse_item(m, item) == 0) return -1;
if (ret > 0)
found = 1; found = 1;
} }
mxmlDelete(xml);
return found; return found;
} }
/* ---------------------------- GENERAL PIPE I/O -------------------------- */ /* ---------------------------- GENERAL PIPE I/O -------------------------- */
/* Thread: worker */ /* Thread: worker */
@ -538,7 +570,6 @@ pipe_listener_cb(enum listener_event_type type)
static void static void
pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg)
{ {
struct evbuffer_ptr evptr;
int ret; int ret;
ret = evbuffer_read(pipe_metadata_buf, pipe_metadata->fd, PIPE_READ_MAX); ret = evbuffer_read(pipe_metadata_buf, pipe_metadata->fd, PIPE_READ_MAX);
@ -561,16 +592,6 @@ pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg)
return; return;
} }
// Did we get the end tag? If not return to wait for more data
evptr = evbuffer_search(pipe_metadata_buf, "</item>", strlen("</item>"), NULL);
if (evptr.pos < 0)
{
goto readd;
}
// NULL-terminate the buffer
evbuffer_add(pipe_metadata_buf, "", 1);
ret = pipe_metadata_parse(&pipe_metadata_parsed, pipe_metadata_buf); ret = pipe_metadata_parse(&pipe_metadata_parsed, pipe_metadata_buf);
if (ret < 0) if (ret < 0)
{ {
@ -753,10 +774,11 @@ stop(struct player_source *ps)
return 0; return 0;
} }
// FIXME Thread safety of pipe_metadata_parsed
static int static int
metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t rtptime) metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t rtptime)
{ {
pthread_mutex_lock(&pipe_metadata_lock);
if (pipe_metadata_parsed.artist) if (pipe_metadata_parsed.artist)
swap_pointers(&metadata->artist, &pipe_metadata_parsed.artist); swap_pointers(&metadata->artist, &pipe_metadata_parsed.artist);
if (pipe_metadata_parsed.title) if (pipe_metadata_parsed.title)
@ -778,6 +800,8 @@ metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t
input_metadata_free(&pipe_metadata_parsed, 1); input_metadata_free(&pipe_metadata_parsed, 1);
pthread_mutex_unlock(&pipe_metadata_lock);
return 0; return 0;
} }
@ -787,6 +811,8 @@ init(void)
{ {
pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart"); pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart");
CHECK_ERR(L_PLAYER, mutex_init(&pipe_metadata_lock));
if (pipe_autostart) if (pipe_autostart)
return listener_add(pipe_listener_cb, LISTENER_DATABASE); return listener_add(pipe_listener_cb, LISTENER_DATABASE);
else else
@ -805,6 +831,8 @@ deinit(void)
pipe_free(pipe); pipe_free(pipe);
} }
CHECK_ERR(L_PLAYER, pthread_mutex_destroy(&pipe_metadata_lock));
if (pipe_autostart) if (pipe_autostart)
listener_remove(pipe_listener_cb); listener_remove(pipe_listener_cb);
} }