From 12567d8e93c292c8448c797b3282a17feb8164fc Mon Sep 17 00:00:00 2001 From: ejurgensen Date: Sat, 28 Jan 2017 21:47:30 +0100 Subject: [PATCH] [pipe] Make Shairport metadata parser work with incomplete reads from pipe --- src/inputs/pipe.c | 126 ++++++++++++++++++++++++++++------------------ 1 file changed, 77 insertions(+), 49 deletions(-) diff --git a/src/inputs/pipe.c b/src/inputs/pipe.c index f180fe11..52e636a0 100644 --- a/src/inputs/pipe.c +++ b/src/inputs/pipe.c @@ -107,6 +107,8 @@ static struct pipe *pipe_metadata; static struct evbuffer *pipe_metadata_buf; // Parsed metadata goes here static struct input_metadata pipe_metadata_parsed; +// Mutex to share the parsed metadata +static pthread_mutex_t pipe_metadata_lock; // True if there is new metadata to push to the player static bool pipe_metadata_is_new; @@ -223,31 +225,48 @@ parse_progress(struct input_metadata *m, char *progress) m->song_length = (end - start) * 10 / 441; // Convert to ms based on 44100 } -// returns 0 on metadata found, otherwise -1 +// returns 1 on metadata found, 0 on nothing, -1 on error static int -parse_item(struct input_metadata *m, mxml_node_t *item) +parse_item(struct input_metadata *m, const char *item) { + mxml_node_t *xml; + mxml_node_t *haystack; mxml_node_t *needle; const char *s; uint32_t type; uint32_t code; char *progress; char **data; + int ret; + + ret = 0; + xml = mxmlNewXML("1.0"); + if (!xml) + return -1; + +// DPRINTF(E_DBG, L_PLAYER, "Parsing %s\n", item); + + haystack = mxmlLoadString(xml, item, MXML_NO_CALLBACK); + if (!haystack) + { + DPRINTF(E_LOG, L_PLAYER, "Could not parse pipe metadata\n"); + goto out_error; + } type = 0; - if ( (needle = mxmlFindElement(item, item, "type", NULL, NULL, MXML_DESCEND)) && + if ( (needle = mxmlFindElement(haystack, haystack, "type", NULL, NULL, MXML_DESCEND)) && (s = mxmlGetText(needle, NULL)) ) sscanf(s, "%8x", &type); code = 0; - if ( (needle = mxmlFindElement(item, item, "code", NULL, NULL, MXML_DESCEND)) && + if ( (needle = mxmlFindElement(haystack, haystack, "code", NULL, NULL, MXML_DESCEND)) && (s = mxmlGetText(needle, NULL)) ) sscanf(s, "%8x", &code); if (!type || !code) { - DPRINTF(E_WARN, L_PLAYER, "No type (%d) or code (%d) in pipe metadata\n", type, code); - return -1; + DPRINTF(E_LOG, L_PLAYER, "No type (%d) or code (%d) in pipe metadata, aborting\n", type, code); + goto out_error; } if (code == dmapval("asal")) @@ -261,11 +280,13 @@ parse_item(struct input_metadata *m, mxml_node_t *item) else if (code == dmapval("prgr")) data = &progress; else - return -1; + goto out_nothing; - if ( (needle = mxmlFindElement(item, item, "data", NULL, NULL, MXML_DESCEND)) && + if ( (needle = mxmlFindElement(haystack, haystack, "data", NULL, NULL, MXML_DESCEND)) && (s = mxmlGetText(needle, NULL)) ) { + pthread_mutex_lock(&pipe_metadata_lock); + if (data != &progress) free(*data); @@ -279,53 +300,64 @@ parse_item(struct input_metadata *m, mxml_node_t *item) free(*data); } - return 0; + pthread_mutex_unlock(&pipe_metadata_lock); + + ret = 1; } + out_nothing: + mxmlDelete(xml); + return ret; + + out_error: + mxmlDelete(xml); return -1; } +static char * +extract_item(struct evbuffer *evbuf) +{ + struct evbuffer_ptr evptr; + size_t size; + char *item; + + evptr = evbuffer_search(evbuf, "", strlen(""), NULL); + if (evptr.pos < 0) + return NULL; + + size = evptr.pos + strlen("") + 1; + item = malloc(size); + if (!item) + return NULL; + + evbuffer_remove(evbuf, item, size - 1); + item[size - 1] = '\0'; + + return item; +} + static int pipe_metadata_parse(struct input_metadata *m, struct evbuffer *evbuf) { - mxml_node_t *xml; - mxml_node_t *haystack; - mxml_node_t *item; - const char *s; + char *item; int found; - - xml = mxmlNewXML("1.0"); - if (!xml) - return -1; - - s = (char *)evbuffer_pullup(evbuf, -1); - haystack = mxmlLoadString(xml, s, MXML_NO_CALLBACK); - if (!haystack) - { - DPRINTF(E_LOG, L_PLAYER, "Could not parse pipe metadata\n"); - mxmlDelete(xml); - evbuffer_drain(evbuf, evbuffer_get_length(evbuf)); - return -1; - } - - evbuffer_drain(evbuf, evbuffer_get_length(evbuf)); - -// DPRINTF(E_DBG, L_PLAYER, "Parsing %s\n", s); + int ret; found = 0; - for (item = mxmlGetFirstChild(haystack); item; item = mxmlWalkNext(item, haystack, MXML_NO_DESCEND)) + while ((item = extract_item(evbuf))) { - if (mxmlGetType(item) != 0) - continue; - - if (parse_item(m, item) == 0) + ret = parse_item(m, item); + free(item); + if (ret < 0) + return -1; + if (ret > 0) found = 1; } - mxmlDelete(xml); return found; } + /* ---------------------------- GENERAL PIPE I/O -------------------------- */ /* Thread: worker */ @@ -538,7 +570,6 @@ pipe_listener_cb(enum listener_event_type type) static void pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) { - struct evbuffer_ptr evptr; int ret; ret = evbuffer_read(pipe_metadata_buf, pipe_metadata->fd, PIPE_READ_MAX); @@ -561,16 +592,6 @@ pipe_metadata_read_cb(evutil_socket_t fd, short event, void *arg) return; } - // Did we get the end tag? If not return to wait for more data - evptr = evbuffer_search(pipe_metadata_buf, "", strlen(""), NULL); - if (evptr.pos < 0) - { - goto readd; - } - - // NULL-terminate the buffer - evbuffer_add(pipe_metadata_buf, "", 1); - ret = pipe_metadata_parse(&pipe_metadata_parsed, pipe_metadata_buf); if (ret < 0) { @@ -753,10 +774,11 @@ stop(struct player_source *ps) return 0; } -// FIXME Thread safety of pipe_metadata_parsed static int metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t rtptime) { + pthread_mutex_lock(&pipe_metadata_lock); + if (pipe_metadata_parsed.artist) swap_pointers(&metadata->artist, &pipe_metadata_parsed.artist); if (pipe_metadata_parsed.title) @@ -778,6 +800,8 @@ metadata_get(struct input_metadata *metadata, struct player_source *ps, uint64_t input_metadata_free(&pipe_metadata_parsed, 1); + pthread_mutex_unlock(&pipe_metadata_lock); + return 0; } @@ -787,6 +811,8 @@ init(void) { pipe_autostart = cfg_getbool(cfg_getsec(cfg, "library"), "pipe_autostart"); + CHECK_ERR(L_PLAYER, mutex_init(&pipe_metadata_lock)); + if (pipe_autostart) return listener_add(pipe_listener_cb, LISTENER_DATABASE); else @@ -805,6 +831,8 @@ deinit(void) pipe_free(pipe); } + CHECK_ERR(L_PLAYER, pthread_mutex_destroy(&pipe_metadata_lock)); + if (pipe_autostart) listener_remove(pipe_listener_cb); }