Merge pull request #259 from chme/commands2

Refactor inter thread communication
This commit is contained in:
ejurgensen 2016-06-02 22:26:53 +02:00
commit 05572dcac0
9 changed files with 1425 additions and 2119 deletions

View File

@ -109,7 +109,8 @@ forked_daapd_SOURCES = main.c \
$(SPOTIFY_SRC) \
$(LASTFM_SRC) \
$(MPD_SRC) \
listener.c listener.h
listener.c listener.h \
commands.c commands.h
nodist_forked_daapd_SOURCES = \
$(ANTLR_SOURCES)

File diff suppressed because it is too large Load Diff

361
src/commands.c Normal file
View File

@ -0,0 +1,361 @@
/*
* Copyright (C) 2016 Christian Meffert <christian.meffert@googlemail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "commands.h"
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <event2/event.h>
#include "logger.h"
struct command
{
pthread_mutex_t lck;
pthread_cond_t cond;
command_function func;
command_function func_bh;
void *arg;
int nonblock;
int ret;
int pending;
};
struct commands_base
{
int command_pipe[2];
struct event *command_event;
struct command *current_cmd;
};
/*
* Asynchronous execution of the command function
*/
static void
command_cb_async(struct commands_base *cmdbase, struct command *cmd)
{
enum command_state cmdstate;
// Command is executed asynchronously
cmdstate = cmd->func(cmd->arg, &cmd->ret);
// Only free arg if there are no pending events (used in worker.c)
if (cmdstate == COMMAND_END && cmd->arg)
free(cmd->arg);
free(cmd);
event_add(cmdbase->command_event, NULL);
}
/*
* Synchronous execution of the command function
*/
static void
command_cb_sync(struct commands_base *cmdbase, struct command *cmd)
{
enum command_state cmdstate;
pthread_mutex_lock(&cmd->lck);
cmdstate = cmd->func(cmd->arg, &cmd->ret);
if (cmdstate == COMMAND_END)
{
// Command execution finished, execute the bottom half function
if (cmd->ret == 0 && cmd->func_bh)
{
cmdstate = cmd->func_bh(cmd->arg, &cmd->ret);
}
// Signal the calling thread that the command execution finished
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
event_add(cmdbase->command_event, NULL);
}
else
{
// Command execution is waiting for pending events before returning to the caller
cmdbase->current_cmd = cmd;
cmd->pending = cmd->ret;
}
}
/*
* Event callback function
*
* Function is triggered by libevent if there is data to read on the command pipe (writing to the command pipe happens through
* the send_command function).
*/
static void
command_cb(int fd, short what, void *arg)
{
struct commands_base *cmdbase;
struct command *cmd;
int ret;
cmdbase = arg;
// Get the command to execute from the pipe
ret = read(cmdbase->command_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MAIN, "Error reading command from command pipe: expected %zu bytes, read %d bytes\n", sizeof(cmd), ret);
event_add(cmdbase->command_event, NULL);
return;
}
// Execute the command function
if (cmd->nonblock)
{
// Command is executed asynchronously
command_cb_async(cmdbase, cmd);
}
else
{
// Command is executed synchronously, caller is waiting until signaled that the execution finished
command_cb_sync(cmdbase, cmd);
}
}
/*
* Writes the given command to the command pipe
*/
static int
send_command(struct commands_base *cmdbase, struct command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_MAIN, "Programming error: send_command called with command->func NULL!\n");
return -1;
}
ret = write(cmdbase->command_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
return -1;
}
return 0;
}
/*
* Creates a new command base, needs to be freed by commands_base_free.
*/
struct commands_base *
commands_base_new(struct event_base *evbase)
{
struct commands_base *cmdbase;
int ret;
cmdbase = calloc(1, sizeof(struct commands_base));
if (!cmdbase)
{
DPRINTF(E_LOG, L_MAIN, "Out of memory for cmdbase\n");
return NULL;
}
# if defined(__linux__)
ret = pipe2(cmdbase->command_pipe, O_CLOEXEC);
# else
ret = pipe(cmdbase->command_pipe);
# endif
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not create command pipe: %s\n", strerror(errno));
free(cmdbase);
return NULL;
}
cmdbase->command_event = event_new(evbase, cmdbase->command_pipe[0], EV_READ, command_cb, cmdbase);
if (!cmdbase->command_event)
{
DPRINTF(E_LOG, L_MAIN, "Could not create cmd event\n");
close(cmdbase->command_pipe[0]);
close(cmdbase->command_pipe[1]);
free(cmdbase);
return NULL;
}
ret = event_add(cmdbase->command_event, NULL);
if (ret != 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not add cmd event\n");
close(cmdbase->command_pipe[0]);
close(cmdbase->command_pipe[1]);
free(cmdbase);
return NULL;
}
return cmdbase;
}
/*
* Frees the command base and closes the (internally used) pipes
*/
int
commands_base_free(struct commands_base *cmdbase)
{
close(cmdbase->command_pipe[0]);
close(cmdbase->command_pipe[1]);
free(cmdbase);
return 0;
}
/*
* Gets the current return value for the current pending command.
*
* If a command has more than one pending event, each event can access the previous set return value
* if it depends on it.
*
* @param cmdbase The command base
* @return The current return value
*/
int
commands_exec_returnvalue(struct commands_base *cmdbase)
{
if (cmdbase->current_cmd == NULL)
return 0;
return cmdbase->current_cmd->ret;
}
/*
* If a command function returned COMMAND_PENDING, each event triggered by this command needs to
* call command_exec_end, passing it the return value of the event execution.
*
* If a command function is waiting for multiple events, each event needs to call command_exec_end.
* The command base keeps track of the number of still pending events and only returns to the caller
* if there are no pending events left.
*
* @param cmdbase The command base (holds the current pending command)
* @param retvalue The return value for the calling thread
*/
void
commands_exec_end(struct commands_base *cmdbase, int retvalue)
{
if (cmdbase->current_cmd == NULL)
return;
// A pending event finished, decrease the number of pending events and update the return value
cmdbase->current_cmd->pending--;
cmdbase->current_cmd->ret = retvalue;
DPRINTF(E_DBG, L_MAIN, "Command has %d pending events\n", cmdbase->current_cmd->pending);
// If there are still pending events return
if (cmdbase->current_cmd->pending > 0)
return;
// All pending events have finished, execute the bottom half and signal the caller that the command execution finished
if (cmdbase->current_cmd->func_bh)
{
cmdbase->current_cmd->func_bh(cmdbase->current_cmd->arg, &cmdbase->current_cmd->ret);
}
pthread_cond_signal(&cmdbase->current_cmd->cond);
pthread_mutex_unlock(&cmdbase->current_cmd->lck);
cmdbase->current_cmd = NULL;
/* Process commands again */
event_add(cmdbase->command_event, NULL);
}
/*
* Execute the function 'func' with the given argument 'arg' in the event loop thread.
* Blocks the caller (thread) until the function returned.
*
* If a function 'func_bh' ("bottom half") is given, it is executed after 'func' has successfully
* finished.
*
* @param cmdbase The command base
* @param func The function to be executed
* @param func_bh The bottom half function to be executed after all pending events from func are processed
* @param arg Argument passed to func (and func_bh)
* @return Return value of func (or func_bh if func_bh is not NULL)
*/
int
commands_exec_sync(struct commands_base *cmdbase, command_function func, command_function func_bh, void *arg)
{
struct command cmd;
int ret;
memset(&cmd, 0, sizeof(struct command));
cmd.func = func;
cmd.func_bh = func_bh;
cmd.arg = arg;
cmd.nonblock = 0;
pthread_mutex_lock(&cmd.lck);
ret = send_command(cmdbase, &cmd);
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Error sending command\n");
pthread_mutex_unlock(&cmd.lck);
return -1;
}
pthread_cond_wait(&cmd.cond, &cmd.lck);
pthread_mutex_unlock(&cmd.lck);
return cmd.ret;
}
/*
* Execute the function 'func' with the given argument 'arg' in the event loop thread.
* Triggers the function execution and immediately returns (does not wait for func to finish).
*
* The pointer passed as argument is freed in the event loop thread after func returned.
*
* @param cmdbase The command base
* @param func The function to be executed
* @param arg Argument passed to func
* @return 0 if triggering the function execution succeeded, -1 on failure.
*/
int
commands_exec_async(struct commands_base *cmdbase, command_function func, void *arg)
{
struct command *cmd;
int ret;
cmd = calloc(1, sizeof(struct command));
cmd->func = func;
cmd->func_bh = NULL;
cmd->arg = arg;
cmd->nonblock = 1;
ret = send_command(cmdbase, cmd);
if (ret < 0)
return -1;
return 0;
}

64
src/commands.h Normal file
View File

@ -0,0 +1,64 @@
/*
* Copyright (C) 2016 Christian Meffert <christian.meffert@googlemail.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#ifndef SRC_COMMANDS_H_
#define SRC_COMMANDS_H_
#include <event2/event.h>
#include <time.h>
enum command_state {
COMMAND_END = 0,
COMMAND_PENDING = 1,
};
/*
* Function that will be executed in the event loop thread.
*
* If the function has pending events to complete, it needs to return COMMAND_PENDING with 'ret' set to
* the number of pending events to wait for.
*
* @param arg Opaque pointer passed by command_exec_sync or command_exec_async
* @param ret Pointer to the return value for the caller of the command
* @return COMMAND_END if there are no pending events (function execution is complete) or COMMAND_PENDING if there are pending events
*/
typedef enum command_state (*command_function)(void *arg, int *ret);
struct commands_base;
struct commands_base *
commands_base_new(struct event_base *evbase);
int
commands_base_free(struct commands_base *cmdbase);
int
commands_exec_returnvalue(struct commands_base *cmdbase);
void
commands_exec_end(struct commands_base *cmdbase, int retvalue);
int
commands_exec_sync(struct commands_base *cmdbase, command_function func, command_function func_bh, void *arg);
int
commands_exec_async(struct commands_base *cmdbase, command_function func, void *arg);
#endif /* SRC_COMMANDS_H_ */

View File

@ -61,6 +61,7 @@
#include "player.h"
#include "cache.h"
#include "artwork.h"
#include "commands.h"
#ifdef LASTFM
# include "lastfm.h"
@ -69,21 +70,6 @@
# include "spotify.h"
#endif
struct filescanner_command;
typedef int (*cmd_func)(struct filescanner_command *cmd);
struct filescanner_command
{
pthread_mutex_t lck;
pthread_cond_t cond;
cmd_func func;
int nonblock;
int ret;
};
#define F_SCAN_BULK (1 << 0)
#define F_SCAN_RESCAN (1 << 1)
@ -118,17 +104,16 @@ struct stacked_dir {
struct stacked_dir *next;
};
static int cmd_pipe[2];
static int exit_pipe[2];
static int scan_exit;
static int inofd;
static struct event_base *evbase_scan;
static struct event *inoev;
static struct event *exitev;
static struct event *cmdev;
static pthread_t tid_scan;
static struct deferred_pl *playlists;
static struct stacked_dir *dirstack;
static struct commands_base *cmdbase;
#if defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
struct deferred_file
@ -167,47 +152,12 @@ static int
inofd_event_set(void);
static void
inofd_event_unset(void);
static int
filescanner_initscan(struct filescanner_command *cmd);
static int
filescanner_fullrescan(struct filescanner_command *cmd);
static enum command_state
filescanner_initscan(void *arg, int *retval);
static enum command_state
filescanner_fullrescan(void *arg, int *retval);
/* ---------------------------- COMMAND EXECUTION -------------------------- */
static int
send_command(struct filescanner_command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_SCAN, "BUG: cmd->func is NULL!\n");
return -1;
}
ret = write(cmd_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_SCAN, "Could not send command: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int
nonblock_command(struct filescanner_command *cmd)
{
int ret;
ret = send_command(cmd);
if (ret < 0)
return -1;
return 0;
}
static int
push_dir(struct stacked_dir **s, char *path, int parent_id)
{
@ -855,6 +805,7 @@ static void
process_file(char *file, time_t mtime, off_t size, int type, int flags, int dir_id)
{
int is_bulkscan;
int ret;
is_bulkscan = (flags & F_SCAN_BULK);
@ -924,7 +875,7 @@ process_file(char *file, time_t mtime, off_t size, int type, int flags, int dir_
DPRINTF(E_LOG, L_SCAN, "Startup rescan triggered, found init-rescan file: %s\n", file);
filescanner_initscan(NULL);
filescanner_initscan(NULL, &ret);
break;
case FILE_CTRL_FULLSCAN:
@ -933,7 +884,7 @@ process_file(char *file, time_t mtime, off_t size, int type, int flags, int dir_
DPRINTF(E_LOG, L_SCAN, "Full rescan triggered, found full-rescan file: %s\n", file);
filescanner_fullrescan(NULL);
filescanner_fullrescan(NULL, &ret);
break;
default:
@ -1968,41 +1919,8 @@ exit_cb(int fd, short event, void *arg)
scan_exit = 1;
}
static void
command_cb(int fd, short what, void *arg)
{
struct filescanner_command *cmd;
int ret;
ret = read(cmd_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_SCAN, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-");
goto readd;
}
if (cmd->nonblock)
{
cmd->func(cmd);
free(cmd);
goto readd;
}
pthread_mutex_lock(&cmd->lck);
ret = cmd->func(cmd);
cmd->ret = ret;
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
readd:
event_add(cmdev, NULL);
}
static int
filescanner_initscan(struct filescanner_command *cmd)
static enum command_state
filescanner_initscan(void *arg, int *retval)
{
DPRINTF(E_LOG, L_SCAN, "Startup rescan triggered\n");
@ -2012,11 +1930,12 @@ filescanner_initscan(struct filescanner_command *cmd)
inofd_event_set();
bulk_scan(F_SCAN_BULK | F_SCAN_RESCAN);
return 0;
*retval = 0;
return COMMAND_END;
}
static int
filescanner_fullrescan(struct filescanner_command *cmd)
static enum command_state
filescanner_fullrescan(void *arg, int *retval)
{
DPRINTF(E_LOG, L_SCAN, "Full rescan triggered\n");
@ -2028,62 +1947,32 @@ filescanner_fullrescan(struct filescanner_command *cmd)
inofd_event_set();
bulk_scan(F_SCAN_BULK);
return 0;
*retval = 0;
return COMMAND_END;
}
void
filescanner_trigger_initscan(void)
{
struct filescanner_command *cmd;
if (scanning)
{
DPRINTF(E_INFO, L_SCAN, "Scan already running, ignoring request to trigger a new init scan\n");
return;
}
cmd = (struct filescanner_command *)malloc(sizeof(struct filescanner_command));
if (!cmd)
{
DPRINTF(E_LOG, L_SCAN, "Could not allocate cache_command\n");
return;
}
memset(cmd, 0, sizeof(struct filescanner_command));
cmd->nonblock = 1;
cmd->func = filescanner_initscan;
nonblock_command(cmd);
commands_exec_async(cmdbase, filescanner_initscan, NULL);
}
void
filescanner_trigger_fullrescan(void)
{
struct filescanner_command *cmd;
if (scanning)
{
DPRINTF(E_INFO, L_SCAN, "Scan already running, ignoring request to trigger a new init scan\n");
return;
}
cmd = (struct filescanner_command *)malloc(sizeof(struct filescanner_command));
if (!cmd)
{
DPRINTF(E_LOG, L_SCAN, "Could not allocate cache_command\n");
return;
}
memset(cmd, 0, sizeof(struct filescanner_command));
cmd->nonblock = 1;
cmd->func = filescanner_fullrescan;
nonblock_command(cmd);
commands_exec_async(cmdbase, filescanner_fullrescan, NULL);
}
/*
@ -2138,23 +2027,7 @@ filescanner_init(void)
goto ino_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(cmd_pipe, O_CLOEXEC);
#else
ret = pipe(cmd_pipe);
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_SCAN, "Could not create command pipe: %s\n", strerror(errno));
goto cmd_fail;
}
cmdev = event_new(evbase_scan, cmd_pipe[0], EV_READ, command_cb, NULL);
if (!cmdev || (event_add(cmdev, NULL) < 0))
{
DPRINTF(E_LOG, L_SCAN, "Could not create/add command event\n");
goto cmd_fail;
}
cmdbase = commands_base_new(evbase_scan);
ret = pthread_create(&tid_scan, NULL, filescanner, NULL);
if (ret != 0)
@ -2173,9 +2046,7 @@ filescanner_init(void)
return 0;
thread_fail:
cmd_fail:
close(cmd_pipe[0]);
close(cmd_pipe[1]);
commands_base_free(cmdbase);
close(inofd);
exitev_fail:
ino_fail:
@ -2214,9 +2085,8 @@ filescanner_deinit(void)
inofd_event_unset();
event_base_free(evbase_scan);
commands_base_free(cmdbase);
close(exit_pipe[0]);
close(exit_pipe[1]);
close(cmd_pipe[0]);
close(cmd_pipe[1]);
event_base_free(evbase_scan);
}

162
src/mpd.c
View File

@ -62,6 +62,7 @@
#include "player.h"
#include "queue.h"
#include "filescanner.h"
#include "commands.h"
static pthread_t tid_mpd;
@ -70,29 +71,12 @@ static struct event_base *evbase_mpd;
static int g_exit_pipe[2];
static struct event *g_exitev;
static int g_cmd_pipe[2];
static struct event *g_cmdev;
static struct commands_base *cmdbase;
static struct evhttp *evhttpd;
struct evconnlistener *listener;
struct mpd_command;
typedef int (*cmd_func)(struct mpd_command *cmd);
struct mpd_command
{
pthread_mutex_t lck;
pthread_cond_t cond;
cmd_func func;
enum listener_event_type arg_evtype;
int nonblock;
int ret;
};
#define COMMAND_ARGV_MAX 37
@ -207,40 +191,6 @@ struct idle_client
struct idle_client *idle_clients;
/* ---------------------------- COMMAND EXECUTION -------------------------- */
static int
send_command(struct mpd_command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_MPD, "BUG: cmd->func is NULL!\n");
return -1;
}
ret = write(g_cmd_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MPD, "Could not send command: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int
nonblock_command(struct mpd_command *cmd)
{
int ret;
ret = send_command(cmd);
if (ret < 0)
return -1;
return 0;
}
static void
thread_exit(void)
@ -3699,7 +3649,7 @@ mpd_command_decoders(struct evbuffer *evbuf, int argc, char **argv, char **errms
return 0;
}
struct command
struct mpd_command
{
/* The command name */
const char *mpdcommand;
@ -3716,7 +3666,7 @@ struct command
int (*handler)(struct evbuffer *evbuf, int argc, char **argv, char **errmsg);
};
static struct command mpd_handlers[] =
static struct mpd_command mpd_handlers[] =
{
/*
* Commands for querying status
@ -4192,7 +4142,7 @@ static struct command mpd_handlers[] =
* @param name the name of the command
* @return the command or NULL if it is an unknown/unsupported command
*/
static struct command*
static struct mpd_command*
mpd_find_command(const char *name)
{
int i;
@ -4240,7 +4190,7 @@ mpd_read_cb(struct bufferevent *bev, void *ctx)
int ncmd;
char *line;
char *errmsg;
struct command *command;
struct mpd_command *command;
enum command_list_type listtype;
int idle_cmd;
int close_cmd;
@ -4525,16 +4475,18 @@ mpd_notify_idle_client(struct idle_client *client, enum listener_event_type type
return 0;
}
static int
mpd_notify_idle(struct mpd_command *cmd)
static enum command_state
mpd_notify_idle(void *arg, int *retval)
{
enum listener_event_type type;
struct idle_client *client;
struct idle_client *prev;
struct idle_client *next;
int i;
int ret;
DPRINTF(E_DBG, L_MPD, "Notify clients waiting for idle results: %d\n", cmd->arg_evtype);
type = *(enum listener_event_type *)arg;
DPRINTF(E_DBG, L_MPD, "Notify clients waiting for idle results: %d\n", type);
prev = NULL;
next = NULL;
@ -4546,7 +4498,7 @@ mpd_notify_idle(struct mpd_command *cmd)
next = client->next;
ret = mpd_notify_idle_client(client, cmd->arg_evtype);
ret = mpd_notify_idle_client(client, type);
if (ret == 0)
{
@ -4566,63 +4518,20 @@ mpd_notify_idle(struct mpd_command *cmd)
i++;
}
return 0;
*retval = 0;
return COMMAND_END;
}
static void
mpd_listener_cb(enum listener_event_type type)
{
enum listener_event_type *ptr;
ptr = (enum listener_event_type *)malloc(sizeof(enum listener_event_type));
*ptr = type;
DPRINTF(E_DBG, L_MPD, "Listener callback called with event type %d.\n", type);
struct mpd_command *cmd;
cmd = (struct mpd_command *)malloc(sizeof(struct mpd_command));
if (!cmd)
{
DPRINTF(E_LOG, L_MPD, "Could not allocate cache_command\n");
return;
}
memset(cmd, 0, sizeof(struct mpd_command));
cmd->nonblock = 1;
cmd->func = mpd_notify_idle;
cmd->arg_evtype = type;
nonblock_command(cmd);
}
static void
command_cb(int fd, short what, void *arg)
{
struct mpd_command *cmd;
int ret;
ret = read(g_cmd_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MPD, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-");
goto readd;
}
if (cmd->nonblock)
{
cmd->func(cmd);
free(cmd);
goto readd;
}
pthread_mutex_lock(&cmd->lck);
ret = cmd->func(cmd);
cmd->ret = ret;
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
readd:
event_add(g_cmdev, NULL);
commands_exec_async(cmdbase, mpd_notify_idle, ptr);
}
/*
@ -4772,17 +4681,6 @@ int mpd_init(void)
goto exit_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(g_cmd_pipe, O_CLOEXEC);
#else
ret = pipe(g_cmd_pipe);
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_MPD, "Could not create command pipe: %s\n", strerror(errno));
goto cmd_fail;
}
evbase_mpd = event_base_new();
if (!evbase_mpd)
{
@ -4799,15 +4697,7 @@ int mpd_init(void)
event_add(g_exitev, NULL);
g_cmdev = event_new(evbase_mpd, g_cmd_pipe[0], EV_READ, command_cb, NULL);
if (!g_cmdev)
{
DPRINTF(E_LOG, L_MPD, "Could not create cmd event\n");
goto evnew_fail;
}
event_add(g_cmdev, NULL);
cmdbase = commands_base_new(evbase_mpd);
if (v6enabled)
{
@ -4917,15 +4807,12 @@ int mpd_init(void)
evhttp_fail:
evconnlistener_free(listener);
connew_fail:
commands_base_free(cmdbase);
evnew_fail:
event_base_free(evbase_mpd);
evbase_mpd = NULL;
evbase_fail:
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
cmd_fail:
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
@ -4975,9 +4862,8 @@ void mpd_deinit(void)
// Free event base (should free events too)
event_base_free(evbase_mpd);
// Close pipes
// Close pipes and free command base
commands_base_free(cmdbase);
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
}

File diff suppressed because it is too large Load Diff

View File

@ -49,6 +49,7 @@
#include "conffile.h"
#include "filescanner.h"
#include "cache.h"
#include "commands.h"
/* How long to wait for audio (in sec) before giving up */
@ -102,30 +103,6 @@ struct artwork_get_param
int is_loaded;
};
struct spotify_command;
typedef int (*cmd_func)(struct spotify_command *cmd);
struct spotify_command
{
pthread_mutex_t lck;
pthread_cond_t cond;
cmd_func func;
cmd_func func_bh;
int nonblock;
union {
void *noarg;
sp_link *link;
int seek_ms;
struct audio_get_param audio;
struct artwork_get_param artwork;
} arg;
int ret;
};
/* --- Globals --- */
// Spotify thread
@ -138,20 +115,18 @@ static pthread_cond_t login_cond;
// Event base, pipes and events
struct event_base *evbase_spotify;
static int g_exit_pipe[2];
static int g_cmd_pipe[2];
static int g_notify_pipe[2];
static struct event *g_exitev;
static struct event *g_cmdev;
static struct event *g_notifyev;
static struct commands_base *cmdbase;
// The global session handle
static sp_session *g_sess;
// The global library handle
static void *g_libhandle;
// The global state telling us what the thread is currently doing
static enum spotify_state g_state;
/* (not used) Tells which commmand is currently being processed */
static struct spotify_command *g_cmd;
// The global base playlist id (parent of all Spotify playlists in the db)
static int g_base_plid;
@ -411,77 +386,6 @@ fptr_assign_all()
/* ---------------------------- COMMAND EXECUTION -------------------------- */
static void
command_init(struct spotify_command *cmd)
{
memset(cmd, 0, sizeof(struct spotify_command));
pthread_mutex_init(&cmd->lck, NULL);
pthread_cond_init(&cmd->cond, NULL);
}
static void
command_deinit(struct spotify_command *cmd)
{
pthread_cond_destroy(&cmd->cond);
pthread_mutex_destroy(&cmd->lck);
}
static int
send_command(struct spotify_command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_SPOTIFY, "BUG: cmd->func is NULL!\n");
return -1;
}
ret = write(g_cmd_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not send command: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int
sync_command(struct spotify_command *cmd)
{
int ret;
pthread_mutex_lock(&cmd->lck);
ret = send_command(cmd);
if (ret < 0)
{
pthread_mutex_unlock(&cmd->lck);
return -1;
}
pthread_cond_wait(&cmd->cond, &cmd->lck);
pthread_mutex_unlock(&cmd->lck);
ret = cmd->ret;
return ret;
}
static int
nonblock_command(struct spotify_command *cmd)
{
int ret;
ret = send_command(cmd);
if (ret < 0)
return -1;
return 0;
}
/* Thread: main and filescanner */
static void
thread_exit(void)
@ -1033,47 +937,55 @@ audio_fifo_flush(void)
pthread_mutex_unlock(&g_audio_fifo->mutex);
}
static int
playback_setup(struct spotify_command *cmd)
static enum command_state
playback_setup(void *arg, int *retval)
{
sp_link *link;
sp_track *track;
sp_error err;
DPRINTF(E_DBG, L_SPOTIFY, "Setting up for playback\n");
link = (sp_link *) arg;
if (SP_CONNECTION_STATE_LOGGED_IN != fptr_sp_session_connectionstate(g_sess))
{
DPRINTF(E_LOG, L_SPOTIFY, "Can't play music, not connected and logged in to Spotify\n");
return -1;
*retval = -1;
return COMMAND_END;
}
if (!cmd->arg.link)
if (!link)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback setup failed, no Spotify link\n");
return -1;
*retval = -1;
return COMMAND_END;
}
track = fptr_sp_link_as_track(cmd->arg.link);
track = fptr_sp_link_as_track(link);
if (!track)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback setup failed, invalid Spotify track\n");
return -1;
*retval = -1;
return COMMAND_END;
}
err = fptr_sp_session_player_load(g_sess, track);
if (SP_ERROR_OK != err)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback setup failed: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
audio_fifo_flush();
return 0;
*retval = 0;
return COMMAND_END;
}
static int
playback_play(struct spotify_command *cmd)
static enum command_state
playback_play(void *arg, int *retval)
{
sp_error err;
@ -1083,16 +995,18 @@ playback_play(struct spotify_command *cmd)
if (SP_ERROR_OK != err)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback failed: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
g_state = SPOTIFY_STATE_PLAYING;
return 0;
*retval = 0;
return COMMAND_END;
}
static int
playback_pause(struct spotify_command *cmd)
static enum command_state
playback_pause(void *arg, int *retval)
{
sp_error err;
@ -1104,16 +1018,18 @@ playback_pause(struct spotify_command *cmd)
if (SP_ERROR_OK != err)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback pause failed: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
g_state = SPOTIFY_STATE_PAUSED;
return 0;
*retval = 0;
return COMMAND_END;
}
static int
playback_stop(struct spotify_command *cmd)
static enum command_state
playback_stop(void *arg, int *retval)
{
sp_error err;
@ -1123,35 +1039,43 @@ playback_stop(struct spotify_command *cmd)
if (SP_ERROR_OK != err)
{
DPRINTF(E_LOG, L_SPOTIFY, "Playback stop failed: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
g_state = SPOTIFY_STATE_STOPPED;
return 0;
*retval = 0;
return COMMAND_END;
}
static int
playback_seek(struct spotify_command *cmd)
static enum command_state
playback_seek(void *arg, int *retval)
{
int seek_ms;
sp_error err;
DPRINTF(E_DBG, L_SPOTIFY, "Playback seek\n");
err = fptr_sp_session_player_seek(g_sess, cmd->arg.seek_ms);
seek_ms = *((int *) arg);
err = fptr_sp_session_player_seek(g_sess, seek_ms);
if (SP_ERROR_OK != err)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not seek: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
audio_fifo_flush();
return 0;
*retval = 0;
return COMMAND_END;
}
static int
playback_eot(struct spotify_command *cmd)
static enum command_state
playback_eot(void *arg, int *retval)
{
sp_error err;
@ -1166,12 +1090,14 @@ playback_eot(struct spotify_command *cmd)
g_state = SPOTIFY_STATE_STOPPING;
return 0;
*retval = 0;
return COMMAND_END;
}
static int
audio_get(struct spotify_command *cmd)
static enum command_state
audio_get(void *arg, int *retval)
{
struct audio_get_param *audio;
struct timespec ts;
audio_fifo_data_t *afd;
int processed;
@ -1179,16 +1105,17 @@ audio_get(struct spotify_command *cmd)
int ret;
int s;
audio = (struct audio_get_param *) arg;
afd = NULL;
processed = 0;
// If spotify was paused begin by resuming playback
if (g_state == SPOTIFY_STATE_PAUSED)
playback_play(NULL);
playback_play(NULL, retval);
pthread_mutex_lock(&g_audio_fifo->mutex);
while ((processed < cmd->arg.audio.wanted) && (g_state != SPOTIFY_STATE_STOPPED))
while ((processed < audio->wanted) && (g_state != SPOTIFY_STATE_STOPPED))
{
// If track has ended and buffer is empty
if ((g_state == SPOTIFY_STATE_STOPPING) && (g_audio_fifo->qlen <= 0))
@ -1227,14 +1154,15 @@ audio_get(struct spotify_command *cmd)
s = afd->nsamples * sizeof(int16_t) * 2;
ret = evbuffer_add(cmd->arg.audio.evbuf, afd->samples, s);
ret = evbuffer_add(audio->evbuf, afd->samples, s);
free(afd);
afd = NULL;
if (ret < 0)
{
DPRINTF(E_LOG, L_SPOTIFY, "Out of memory for evbuffer (tried to add %d bytes)\n", s);
pthread_mutex_unlock(&g_audio_fifo->mutex);
return -1;
*retval = -1;
return COMMAND_END;
}
processed += s;
@ -1242,41 +1170,39 @@ audio_get(struct spotify_command *cmd)
pthread_mutex_unlock(&g_audio_fifo->mutex);
return processed;
*retval = processed;
return COMMAND_END;
}
static void
artwork_loaded_cb(sp_image *image, void *userdata)
{
struct spotify_command *cmd = userdata;
struct artwork_get_param *artwork;
artwork = userdata;
pthread_mutex_lock(&artwork->mutex);
pthread_mutex_lock(&cmd->arg.artwork.mutex);
artwork->is_loaded = 1;
cmd->arg.artwork.is_loaded = 1;
pthread_cond_signal(&cmd->arg.artwork.cond);
pthread_mutex_unlock(&cmd->arg.artwork.mutex);
pthread_cond_signal(&artwork->cond);
pthread_mutex_unlock(&artwork->mutex);
}
static int
artwork_get_bh(struct spotify_command *cmd)
static enum command_state
artwork_get_bh(void *arg, int *retval)
{
struct artwork_get_param *artwork;
sp_imageformat imageformat;
sp_error err;
const void *data;
size_t data_size;
int ret;
sp_image *image = cmd->arg.artwork.image;
char *path = cmd->arg.artwork.path;
if (!cmd->arg.artwork.is_loaded)
{
DPRINTF(E_DBG, L_SPOTIFY, "Request for artwork timed out: %s\n", path);
fptr_sp_image_remove_load_callback(image, artwork_loaded_cb, cmd);
goto fail;
}
artwork = arg;
sp_image *image = artwork->image;
char *path = artwork->path;
err = fptr_sp_image_error(image);
if (err != SP_ERROR_OK)
@ -1305,14 +1231,14 @@ artwork_get_bh(struct spotify_command *cmd)
goto fail;
}
ret = evbuffer_expand(cmd->arg.artwork.evbuf, data_size);
ret = evbuffer_expand(artwork->evbuf, data_size);
if (ret < 0)
{
DPRINTF(E_LOG, L_SPOTIFY, "Out of memory for artwork\n");
goto fail;
}
ret = evbuffer_add(cmd->arg.artwork.evbuf, data, data_size);
ret = evbuffer_add(artwork->evbuf, data, data_size);
if (ret < 0)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not add Spotify image to event buffer\n");
@ -1323,17 +1249,20 @@ artwork_get_bh(struct spotify_command *cmd)
fptr_sp_image_release(image);
return data_size;
*retval = 0;
return COMMAND_END;
fail:
fptr_sp_image_release(image);
return -1;
*retval = -1;
return COMMAND_END;
}
static int
artwork_get(struct spotify_command *cmd)
static enum command_state
artwork_get(void *arg, int *retval)
{
struct artwork_get_param *artwork;
char *path;
sp_link *link;
sp_track *track;
@ -1343,7 +1272,8 @@ artwork_get(struct spotify_command *cmd)
sp_image_size image_size;
sp_error err;
path = cmd->arg.artwork.path;
artwork = arg;
path = artwork->path;
// Now begins: path -> link -> track -> album -> image_id -> image -> format -> data
link = fptr_sp_link_create_from_string(path);
@ -1369,9 +1299,9 @@ artwork_get(struct spotify_command *cmd)
// Get an image at least the same size as requested
image_size = SP_IMAGE_SIZE_SMALL; // 64x64
if ((cmd->arg.artwork.max_w > 64) || (cmd->arg.artwork.max_h > 64))
if ((artwork->max_w > 64) || (artwork->max_h > 64))
image_size = SP_IMAGE_SIZE_NORMAL; // 300x300
if ((cmd->arg.artwork.max_w > 300) || (cmd->arg.artwork.max_h > 300))
if ((artwork->max_w > 300) || (artwork->max_h > 300))
image_size = SP_IMAGE_SIZE_LARGE; // 640x640
image_id = fptr_sp_album_cover(album, image_size);
@ -1390,31 +1320,35 @@ artwork_get(struct spotify_command *cmd)
fptr_sp_link_release(link);
cmd->arg.artwork.image = image;
artwork->image = image;
artwork->is_loaded = fptr_sp_image_is_loaded(image);
/* If the image is ready we can return it straight away, otherwise we will
* let the calling thread wait, since the Spotify thread should not wait
*/
if ( (cmd->arg.artwork.is_loaded = fptr_sp_image_is_loaded(image)) )
return artwork_get_bh(cmd);
if (artwork->is_loaded)
return artwork_get_bh(artwork, retval);
DPRINTF(E_SPAM, L_SPOTIFY, "Will wait for Spotify to call artwork_loaded_cb\n");
/* Async - we will return to spotify_artwork_get which will wait for callback */
err = fptr_sp_image_add_load_callback(image, artwork_loaded_cb, cmd);
err = fptr_sp_image_add_load_callback(image, artwork_loaded_cb, artwork);
if (err != SP_ERROR_OK)
{
DPRINTF(E_WARN, L_SPOTIFY, "Adding artwork cb failed, Spotify error: %s\n", fptr_sp_error_message(err));
return -1;
*retval = -1;
return COMMAND_END;
}
return 0;
*retval = 0;
return COMMAND_END;
level2_exit:
fptr_sp_link_release(link);
level1_exit:
return -1;
*retval = -1;
return COMMAND_END;
}
@ -1615,24 +1549,9 @@ static void connectionstate_updated(sp_session *session)
*/
static void end_of_track(sp_session *sess)
{
struct spotify_command *cmd;
DPRINTF(E_DBG, L_SPOTIFY, "End of track\n");
cmd = (struct spotify_command *)malloc(sizeof(struct spotify_command));
if (!cmd)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not allocate spotify_command\n");
return;
}
memset(cmd, 0, sizeof(struct spotify_command));
cmd->nonblock = 1;
cmd->func = playback_eot;
cmd->arg.noarg = NULL;
nonblock_command(cmd);
commands_exec_async(cmdbase, playback_eot, NULL);
}
/**
@ -1719,41 +1638,6 @@ exit_cb(int fd, short what, void *arg)
event_add(g_exitev, NULL);
}
static void
command_cb(int fd, short what, void *arg)
{
struct spotify_command *cmd;
int ret;
ret = read(g_cmd_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-");
goto readd;
}
if (cmd->nonblock)
{
cmd->func(cmd);
free(cmd);
goto readd;
}
pthread_mutex_lock(&cmd->lck);
g_cmd = cmd;
ret = cmd->func(cmd);
cmd->ret = ret;
g_cmd = NULL;
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
readd:
event_add(g_cmdev, NULL);
}
/* Process events when timeout expires or triggered by libspotify's notify_main_thread */
static void
notify_cb(int fd, short what, void *arg)
@ -1789,9 +1673,7 @@ notify_cb(int fd, short what, void *arg)
int
spotify_playback_setup(struct media_file_info *mfi)
{
struct spotify_command cmd;
sp_link *link;
int ret;
DPRINTF(E_DBG, L_SPOTIFY, "Playback setup request\n");
@ -1802,144 +1684,59 @@ spotify_playback_setup(struct media_file_info *mfi)
return -1;
}
command_init(&cmd);
cmd.func = playback_setup;
cmd.arg.link = link;
ret = sync_command(&cmd);
command_deinit(&cmd);
return ret;
return commands_exec_sync(cmdbase, playback_setup, NULL, link);
}
int
spotify_playback_play()
{
struct spotify_command cmd;
int ret;
DPRINTF(E_DBG, L_SPOTIFY, "Playback request\n");
command_init(&cmd);
cmd.func = playback_play;
cmd.arg.noarg = NULL;
ret = sync_command(&cmd);
command_deinit(&cmd);
return ret;
return commands_exec_sync(cmdbase, playback_play, NULL, NULL);
}
int
spotify_playback_pause()
{
struct spotify_command cmd;
int ret;
DPRINTF(E_DBG, L_SPOTIFY, "Pause request\n");
command_init(&cmd);
cmd.func = playback_pause;
cmd.arg.noarg = NULL;
ret = sync_command(&cmd);
command_deinit(&cmd);
return ret;
return commands_exec_sync(cmdbase, playback_pause, NULL, NULL);
}
/* Thread: libspotify */
void
spotify_playback_pause_nonblock(void)
{
struct spotify_command *cmd;
DPRINTF(E_DBG, L_SPOTIFY, "Nonblock pause request\n");
cmd = (struct spotify_command *)malloc(sizeof(struct spotify_command));
if (!cmd)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not allocate spotify_command\n");
return;
}
memset(cmd, 0, sizeof(struct spotify_command));
cmd->nonblock = 1;
cmd->func = playback_pause;
cmd->arg.noarg = NULL;
nonblock_command(cmd);
commands_exec_async(cmdbase, playback_pause, NULL);
}
/* Thread: player and libspotify */
int
spotify_playback_stop(void)
{
struct spotify_command cmd;
int ret;
DPRINTF(E_DBG, L_SPOTIFY, "Stop request\n");
command_init(&cmd);
cmd.func = playback_stop;
cmd.arg.noarg = NULL;
ret = sync_command(&cmd);
command_deinit(&cmd);
return ret;
return commands_exec_sync(cmdbase, playback_stop, NULL, NULL);
}
/* Thread: player and libspotify */
void
spotify_playback_stop_nonblock(void)
{
struct spotify_command *cmd;
DPRINTF(E_DBG, L_SPOTIFY, "Nonblock stop request\n");
cmd = (struct spotify_command *)malloc(sizeof(struct spotify_command));
if (!cmd)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not allocate spotify_command\n");
return;
}
memset(cmd, 0, sizeof(struct spotify_command));
cmd->nonblock = 1;
cmd->func = playback_stop;
cmd->arg.noarg = NULL;
nonblock_command(cmd);
commands_exec_async(cmdbase, playback_stop, NULL);
}
/* Thread: player */
int
spotify_playback_seek(int ms)
{
struct spotify_command cmd;
int ret;
command_init(&cmd);
cmd.func = playback_seek;
cmd.arg.seek_ms = ms;
ret = sync_command(&cmd);
command_deinit(&cmd);
ret = commands_exec_sync(cmdbase, playback_seek, NULL, &ms);
if (ret == 0)
return ms;
@ -1951,58 +1748,44 @@ spotify_playback_seek(int ms)
int
spotify_audio_get(struct evbuffer *evbuf, int wanted)
{
struct spotify_command cmd;
int ret;
struct audio_get_param audio;
command_init(&cmd);
audio.evbuf = evbuf;
audio.wanted = wanted;
cmd.func = audio_get;
cmd.arg.audio.evbuf = evbuf;
cmd.arg.audio.wanted = wanted;
ret = sync_command(&cmd);
command_deinit(&cmd);
return ret;
return commands_exec_sync(cmdbase, audio_get, NULL, &audio);
}
/* Thread: httpd (artwork) and worker */
int
spotify_artwork_get(struct evbuffer *evbuf, char *path, int max_w, int max_h)
{
struct spotify_command cmd;
struct artwork_get_param artwork;
struct timespec ts;
int ret;
command_init(&cmd);
artwork.evbuf = evbuf;
artwork.path = path;
artwork.max_w = max_w;
artwork.max_h = max_h;
cmd.func = artwork_get;
cmd.arg.artwork.evbuf = evbuf;
cmd.arg.artwork.path = path;
cmd.arg.artwork.max_w = max_w;
cmd.arg.artwork.max_h = max_h;
pthread_mutex_init(&cmd.arg.artwork.mutex, NULL);
pthread_cond_init(&cmd.arg.artwork.cond, NULL);
ret = sync_command(&cmd);
pthread_mutex_init(&artwork.mutex, NULL);
pthread_cond_init(&artwork.cond, NULL);
ret = commands_exec_sync(cmdbase, artwork_get, NULL, &artwork);
// Artwork was not ready, wait for callback from libspotify
if (ret == 0)
{
pthread_mutex_lock(&cmd.arg.artwork.mutex);
pthread_mutex_lock(&artwork.mutex);
mk_reltime(&ts, SPOTIFY_ARTWORK_TIMEOUT);
if (!cmd.arg.artwork.is_loaded)
pthread_cond_timedwait(&cmd.arg.artwork.cond, &cmd.arg.artwork.mutex, &ts);
pthread_mutex_unlock(&cmd.arg.artwork.mutex);
if (!artwork.is_loaded)
pthread_cond_timedwait(&artwork.cond, &artwork.mutex, &ts);
pthread_mutex_unlock(&artwork.mutex);
cmd.func = artwork_get_bh;
ret = sync_command(&cmd);
ret = commands_exec_sync(cmdbase, artwork_get_bh, NULL, &artwork);
}
command_deinit(&cmd);
return ret;
}
@ -2204,17 +1987,6 @@ spotify_init(void)
goto exit_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(g_cmd_pipe, O_CLOEXEC);
#else
ret = pipe(g_cmd_pipe);
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not create command pipe: %s\n", strerror(errno));
goto cmd_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(g_notify_pipe, O_CLOEXEC);
#else
@ -2240,13 +2012,6 @@ spotify_init(void)
goto evnew_fail;
}
g_cmdev = event_new(evbase_spotify, g_cmd_pipe[0], EV_READ, command_cb, NULL);
if (!g_cmdev)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not create cmd event\n");
goto evnew_fail;
}
g_notifyev = event_new(evbase_spotify, g_notify_pipe[0], EV_READ | EV_TIMEOUT, notify_cb, NULL);
if (!g_notifyev)
{
@ -2255,9 +2020,16 @@ spotify_init(void)
}
event_add(g_exitev, NULL);
event_add(g_cmdev, NULL);
event_add(g_notifyev, NULL);
cmdbase = commands_base_new(evbase_spotify);
if (!cmdbase)
{
DPRINTF(E_LOG, L_SPOTIFY, "Could not create command base\n");
goto cmd_fail;
}
DPRINTF(E_INFO, L_SPOTIFY, "Spotify session init\n");
spotify_cfg = cfg_getsec(cfg, "spotify");
@ -2333,7 +2105,9 @@ spotify_init(void)
g_sess = NULL;
session_fail:
cmd_fail:
evnew_fail:
commands_base_free(cmdbase);
event_base_free(evbase_spotify);
evbase_spotify = NULL;
@ -2342,10 +2116,6 @@ spotify_init(void)
close(g_notify_pipe[1]);
notify_fail:
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
cmd_fail:
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
@ -2385,11 +2155,10 @@ spotify_deinit(void)
/* Free event base (should free events too) */
event_base_free(evbase_spotify);
/* Close pipes */
/* Close pipes and free command base */
commands_base_free(cmdbase);
close(g_notify_pipe[0]);
close(g_notify_pipe[1]);
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);

View File

@ -39,29 +39,15 @@
#include "db.h"
#include "logger.h"
#include "worker.h"
#include "commands.h"
struct worker_command;
typedef int (*cmd_func)(struct worker_command *cmd);
struct worker_command
struct worker_arg
{
pthread_mutex_t lck;
pthread_cond_t cond;
cmd_func func;
int nonblock;
struct {
void (*cb)(void *);
void *cb_arg;
int delay;
struct event *timer;
} arg;
int ret;
void (*cb)(void *);
void *cb_arg;
int delay;
struct event *timer;
};
@ -73,9 +59,9 @@ static pthread_t tid_worker;
struct event_base *evbase_worker;
static int g_initialized;
static int g_exit_pipe[2];
static int g_cmd_pipe[2];
static struct event *g_exitev;
static struct event *g_cmdev;
static struct commands_base *cmdbase;
/* ---------------------------- CALLBACK EXECUTION ------------------------- */
/* Thread: worker */
@ -83,71 +69,39 @@ static struct event *g_cmdev;
static void
execute_cb(int fd, short what, void *arg)
{
struct worker_command *cmd = arg;
struct worker_arg *cmdarg = arg;
cmd->arg.cb(cmd->arg.cb_arg);
cmdarg->cb(cmdarg->cb_arg);
event_free(cmd->arg.timer);
free(cmd->arg.cb_arg);
free(cmd);
event_free(cmdarg->timer);
free(cmdarg->cb_arg);
free(cmdarg);
}
static int
execute(struct worker_command *cmd)
static enum command_state
execute(void *arg, int *retval)
{
struct timeval tv = { cmd->arg.delay, 0 };
struct worker_arg *cmdarg = arg;
struct timeval tv = { cmdarg->delay, 0 };
if (cmd->arg.delay)
if (cmdarg->delay)
{
cmd->arg.timer = evtimer_new(evbase_worker, execute_cb, cmd);
evtimer_add(cmd->arg.timer, &tv);
cmdarg->timer = evtimer_new(evbase_worker, execute_cb, cmdarg);
evtimer_add(cmdarg->timer, &tv);
return 1; // Not done yet, ask caller not to free cmd
*retval = 0;
return COMMAND_PENDING; // Not done yet, ask caller not to free cmd
}
cmd->arg.cb(cmd->arg.cb_arg);
free(cmd->arg.cb_arg);
cmdarg->cb(cmdarg->cb_arg);
free(cmdarg->cb_arg);
return 0;
*retval = 0;
return COMMAND_END;
}
/* ---------------------------- COMMAND EXECUTION -------------------------- */
static int
send_command(struct worker_command *cmd)
{
int ret;
if (!cmd->func)
{
DPRINTF(E_LOG, L_MAIN, "BUG: cmd->func is NULL!\n");
return -1;
}
ret = write(g_cmd_pipe[1], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MAIN, "Could not send command: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int
nonblock_command(struct worker_command *cmd)
{
int ret;
ret = send_command(cmd);
if (ret < 0)
return -1;
return 0;
}
/* Thread: main */
static void
thread_exit(void)
@ -209,40 +163,6 @@ exit_cb(int fd, short what, void *arg)
event_add(g_exitev, NULL);
}
static void
command_cb(int fd, short what, void *arg)
{
struct worker_command *cmd;
int ret;
ret = read(g_cmd_pipe[0], &cmd, sizeof(cmd));
if (ret != sizeof(cmd))
{
DPRINTF(E_LOG, L_MAIN, "Could not read command! (read %d): %s\n", ret, (ret < 0) ? strerror(errno) : "-no error-");
goto readd;
}
if (cmd->nonblock)
{
ret = cmd->func(cmd);
if (ret == 0)
free(cmd);
goto readd;
}
pthread_mutex_lock(&cmd->lck);
ret = cmd->func(cmd);
cmd->ret = ret;
pthread_cond_signal(&cmd->cond);
pthread_mutex_unlock(&cmd->lck);
readd:
event_add(g_cmdev, NULL);
}
/* ---------------------------- Our worker API --------------------------- */
@ -250,19 +170,19 @@ command_cb(int fd, short what, void *arg)
void
worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
{
struct worker_command *cmd;
struct worker_arg *cmdarg;
void *argcpy;
DPRINTF(E_DBG, L_MAIN, "Got worker execute request\n");
cmd = (struct worker_command *)malloc(sizeof(struct worker_command));
if (!cmd)
cmdarg = (struct worker_arg *)malloc(sizeof(struct worker_arg));
if (!cmdarg)
{
DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_command\n");
DPRINTF(E_LOG, L_MAIN, "Could not allocate worker_arg\n");
return;
}
memset(cmd, 0, sizeof(struct worker_command));
memset(cmdarg, 0, sizeof(struct worker_arg));
argcpy = malloc(arg_size);
if (!argcpy)
@ -273,15 +193,11 @@ worker_execute(void (*cb)(void *), void *cb_arg, size_t arg_size, int delay)
memcpy(argcpy, cb_arg, arg_size);
cmd->nonblock = 1;
cmd->func = execute;
cmd->arg.cb = cb;
cmd->arg.cb_arg = argcpy;
cmd->arg.delay = delay;
cmdarg->cb = cb;
cmdarg->cb_arg = argcpy;
cmdarg->delay = delay;
nonblock_command(cmd);
return;
commands_exec_async(cmdbase, execute, cmdarg);
}
int
@ -300,17 +216,6 @@ worker_init(void)
goto exit_fail;
}
#ifdef HAVE_PIPE2
ret = pipe2(g_cmd_pipe, O_CLOEXEC);
#else
ret = pipe(g_cmd_pipe);
#endif
if (ret < 0)
{
DPRINTF(E_LOG, L_MAIN, "Could not create command pipe: %s\n", strerror(errno));
goto cmd_fail;
}
evbase_worker = event_base_new();
if (!evbase_worker)
{
@ -325,15 +230,9 @@ worker_init(void)
goto evnew_fail;
}
g_cmdev = event_new(evbase_worker, g_cmd_pipe[0], EV_READ, command_cb, NULL);
if (!g_cmdev)
{
DPRINTF(E_LOG, L_MAIN, "Could not create cmd event\n");
goto evnew_fail;
}
cmdbase = commands_base_new(evbase_worker);
event_add(g_exitev, NULL);
event_add(g_cmdev, NULL);
ret = pthread_create(&tid_worker, NULL, worker, NULL);
if (ret < 0)
@ -352,15 +251,12 @@ worker_init(void)
return 0;
thread_fail:
commands_base_free(cmdbase);
evnew_fail:
event_base_free(evbase_worker);
evbase_worker = NULL;
evbase_fail:
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
cmd_fail:
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
@ -385,9 +281,8 @@ worker_deinit(void)
// Free event base (should free events too)
event_base_free(evbase_worker);
// Close pipes
close(g_cmd_pipe[0]);
close(g_cmd_pipe[1]);
// Close pipes and free command base
commands_base_free(cmdbase);
close(g_exit_pipe[0]);
close(g_exit_pipe[1]);
}