[commands] Add generic functionality for inter-thread communication
using libevent and pipes
This commit is contained in:
parent
524d060b7a
commit
9856536fd7
|
@ -109,7 +109,8 @@ forked_daapd_SOURCES = main.c \
|
||||||
$(SPOTIFY_SRC) \
|
$(SPOTIFY_SRC) \
|
||||||
$(LASTFM_SRC) \
|
$(LASTFM_SRC) \
|
||||||
$(MPD_SRC) \
|
$(MPD_SRC) \
|
||||||
listener.c listener.h
|
listener.c listener.h \
|
||||||
|
commands.c commands.h
|
||||||
|
|
||||||
nodist_forked_daapd_SOURCES = \
|
nodist_forked_daapd_SOURCES = \
|
||||||
$(ANTLR_SOURCES)
|
$(ANTLR_SOURCES)
|
||||||
|
|
|
@ -0,0 +1,274 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2016 Christian Meffert <christian.meffert@googlemail.com>
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation; either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "commands.h"
|
||||||
|
|
||||||
|
#include <errno.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <event2/event.h>
|
||||||
|
|
||||||
|
#include "logger.h"
|
||||||
|
|
||||||
|
|
||||||
|
struct command
|
||||||
|
{
|
||||||
|
pthread_mutex_t lck;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
|
||||||
|
command_function func;
|
||||||
|
command_function func_bh;
|
||||||
|
void *arg;
|
||||||
|
int nonblock;
|
||||||
|
int ret;
|
||||||
|
int pending;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct commands_base
|
||||||
|
{
|
||||||
|
int command_pipe[2];
|
||||||
|
struct event *command_event;
|
||||||
|
struct command *current_cmd;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void
|
||||||
|
command_cb(int fd, short what, void *arg)
|
||||||
|
{
|
||||||
|
struct commands_base *cmdbase;
|
||||||
|
struct command *cmd;
|
||||||
|
enum command_state cmdstate;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
cmdbase = (struct commands_base *) arg;
|
||||||
|
|
||||||
|
ret = read(cmdbase->command_pipe[0], &cmd, sizeof(cmd));
|
||||||
|
if (ret != sizeof(cmd))
|
||||||
|
{
|
||||||
|
goto readd;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cmd->nonblock)
|
||||||
|
{
|
||||||
|
// Command is executed asynchronously
|
||||||
|
cmdstate = cmd->func(cmd->arg, &cmd->ret);
|
||||||
|
|
||||||
|
if (cmdstate == COMMAND_END && cmd->arg)
|
||||||
|
free(cmd->arg);
|
||||||
|
free(cmd);
|
||||||
|
goto readd;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Command is executed synchronously, caller is waiting until signaled that the execution finished
|
||||||
|
pthread_mutex_lock(&cmd->lck);
|
||||||
|
|
||||||
|
cmdstate = cmd->func(cmd->arg, &cmd->ret);
|
||||||
|
if (cmdstate == COMMAND_END)
|
||||||
|
{
|
||||||
|
// Command execution finished, execute the bottom half function
|
||||||
|
if (cmd->ret == 0 && cmd->func_bh)
|
||||||
|
{
|
||||||
|
cmdstate = cmd->func_bh(cmd->arg, &cmd->ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_cond_signal(&cmd->cond);
|
||||||
|
pthread_mutex_unlock(&cmd->lck);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Command execution waiting for pending events before returning to the caller
|
||||||
|
cmdbase->current_cmd = cmd;
|
||||||
|
cmd->pending = cmd->ret;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
readd:
|
||||||
|
event_add(cmdbase->command_event, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
send_command(struct commands_base *cmdbase, struct command *cmd)
|
||||||
|
{
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
if (!cmd->func)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = write(cmdbase->command_pipe[1], &cmd, sizeof(cmd));
|
||||||
|
if (ret != sizeof(cmd))
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct commands_base *
|
||||||
|
commands_base_new(struct event_base *evbase)
|
||||||
|
{
|
||||||
|
struct commands_base *cmdbase;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
cmdbase = (struct commands_base*) calloc(1, sizeof(struct commands_base));
|
||||||
|
if (!cmdbase)
|
||||||
|
{
|
||||||
|
DPRINTF(E_LOG, L_MAIN, "Out of memory for cmdbase\n");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
# if defined(__linux__)
|
||||||
|
ret = pipe2(cmdbase->command_pipe, O_CLOEXEC);
|
||||||
|
# else
|
||||||
|
ret = pipe(cmdbase->command_pipe);
|
||||||
|
# endif
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
DPRINTF(E_LOG, L_MAIN, "Could not create command pipe: %s\n", strerror(errno));
|
||||||
|
free(cmdbase);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
cmdbase->command_event = event_new(evbase, cmdbase->command_pipe[0], EV_READ, command_cb, cmdbase);
|
||||||
|
if (!cmdbase->command_event)
|
||||||
|
{
|
||||||
|
DPRINTF(E_LOG, L_MAIN, "Could not create cmd event\n");
|
||||||
|
close(cmdbase->command_pipe[0]);
|
||||||
|
close(cmdbase->command_pipe[1]);
|
||||||
|
free(cmdbase);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = event_add(cmdbase->command_event, NULL);
|
||||||
|
if (ret != 0)
|
||||||
|
{
|
||||||
|
DPRINTF(E_LOG, L_MAIN, "Could not add cmd event\n");
|
||||||
|
close(cmdbase->command_pipe[0]);
|
||||||
|
close(cmdbase->command_pipe[1]);
|
||||||
|
free(cmdbase);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return cmdbase;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
commands_base_free(struct commands_base *cmdbase)
|
||||||
|
{
|
||||||
|
close(cmdbase->command_pipe[0]);
|
||||||
|
close(cmdbase->command_pipe[1]);
|
||||||
|
free(cmdbase);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
commands_exec_returnvalue(struct commands_base *cmdbase)
|
||||||
|
{
|
||||||
|
if (cmdbase->current_cmd == NULL)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
return cmdbase->current_cmd->ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
commands_exec_end(struct commands_base *cmdbase, int retvalue)
|
||||||
|
{
|
||||||
|
if (cmdbase->current_cmd == NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// A pending event finished, decrease the number of pending events and update the return value
|
||||||
|
cmdbase->current_cmd->pending--;
|
||||||
|
cmdbase->current_cmd->ret = retvalue;
|
||||||
|
|
||||||
|
DPRINTF(E_DBG, L_MAIN, "Command has %d pending events\n", cmdbase->current_cmd->pending);
|
||||||
|
|
||||||
|
// If there are still pending events return
|
||||||
|
if (cmdbase->current_cmd->pending > 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
// All pending events have finished, execute the bottom half and signal the caller that the command finished execution
|
||||||
|
if (cmdbase->current_cmd->func_bh)
|
||||||
|
{
|
||||||
|
cmdbase->current_cmd->func_bh(cmdbase->current_cmd->arg, &cmdbase->current_cmd->ret);
|
||||||
|
}
|
||||||
|
pthread_cond_signal(&cmdbase->current_cmd->cond);
|
||||||
|
pthread_mutex_unlock(&cmdbase->current_cmd->lck);
|
||||||
|
|
||||||
|
cmdbase->current_cmd = NULL;
|
||||||
|
|
||||||
|
/* Process commands again */
|
||||||
|
event_add(cmdbase->command_event, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
commands_exec_sync(struct commands_base *cmdbase, command_function func, command_function func_bh, void *arg)
|
||||||
|
{
|
||||||
|
struct command *cmd;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
cmd = (struct command*) calloc(1, sizeof(struct command));
|
||||||
|
cmd->func = func;
|
||||||
|
cmd->func_bh = func_bh;
|
||||||
|
cmd->arg = arg;
|
||||||
|
cmd->nonblock = 0;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&cmd->lck);
|
||||||
|
|
||||||
|
ret = send_command(cmdbase, cmd);
|
||||||
|
if (ret < 0)
|
||||||
|
{
|
||||||
|
DPRINTF(E_LOG, L_MAIN, "Error sending command\n");
|
||||||
|
pthread_mutex_unlock(&cmd->lck);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_cond_wait(&cmd->cond, &cmd->lck);
|
||||||
|
pthread_mutex_unlock(&cmd->lck);
|
||||||
|
|
||||||
|
ret = cmd->ret;
|
||||||
|
free(cmd);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int
|
||||||
|
commands_exec_async(struct commands_base *cmdbase, command_function func, void *arg)
|
||||||
|
{
|
||||||
|
struct command *cmd;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
cmd = (struct command*) calloc(1, sizeof(struct command));
|
||||||
|
cmd->func = func;
|
||||||
|
cmd->func_bh = NULL;
|
||||||
|
cmd->arg = arg;
|
||||||
|
cmd->nonblock = 1;
|
||||||
|
|
||||||
|
ret = send_command(cmdbase, cmd);
|
||||||
|
if (ret < 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2016 Christian Meffert <christian.meffert@googlemail.com>
|
||||||
|
*
|
||||||
|
* This program is free software; you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU General Public License as published by
|
||||||
|
* the Free Software Foundation; either version 2 of the License, or
|
||||||
|
* (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU General Public License
|
||||||
|
* along with this program; if not, write to the Free Software
|
||||||
|
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
*/
|
||||||
|
#ifndef SRC_COMMANDS_H_
|
||||||
|
#define SRC_COMMANDS_H_
|
||||||
|
|
||||||
|
#include <event2/event.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
|
||||||
|
enum command_state {
|
||||||
|
COMMAND_END = 0,
|
||||||
|
COMMAND_PENDING = 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Function that will be executed in the event loop thread.
|
||||||
|
*
|
||||||
|
* If the function has pending events to complete, it needs to return COMMAND_PENDING with 'ret' set to
|
||||||
|
* the number of pending events to wait for.
|
||||||
|
*
|
||||||
|
* @param arg Opaque pointer passed by command_exec_sync or command_exec_async
|
||||||
|
* @param ret Pointer to the return value for the caller of the command
|
||||||
|
* @return COMMAND_END if there are no pending events (function execution is complete) or COMMAND_PENDING if there are pending events
|
||||||
|
*/
|
||||||
|
typedef enum command_state (*command_function)(void *arg, int *ret);
|
||||||
|
|
||||||
|
struct commands_base;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Creates a new command base, needs to be freed by commands_base_free.
|
||||||
|
*/
|
||||||
|
struct commands_base *
|
||||||
|
commands_base_new(struct event_base *evbase);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Frees the command base and closes the (internally used) pipes
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
commands_base_free(struct commands_base *cmdbase);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Gets the current return value for the current pending command.
|
||||||
|
*
|
||||||
|
* @param cmdbase The command base
|
||||||
|
* @return The current return value
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
commands_exec_returnvalue(struct commands_base *cmdbase);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If a command function returned COMMAND_PENDING, each event triggered by this command needs to
|
||||||
|
* call command_exec_end, passing it the return value of the event execution.
|
||||||
|
*
|
||||||
|
* If a command function is waiting for multiple events the, each event needs to call command_exec_end.
|
||||||
|
* The command base keeps track of the number of still pending events and only returns to the caller
|
||||||
|
* if there are no pending events left.
|
||||||
|
*
|
||||||
|
* @param cmdbase The command base (holds the current pending command)
|
||||||
|
* @param retvalue The return value for the calling thread
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
commands_exec_end(struct commands_base *cmdbase, int retvalue);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Execute the function 'func' with the given argument 'arg' in the event loop thread.
|
||||||
|
* Blocks the caller (thread) until the function returned.
|
||||||
|
*
|
||||||
|
* If a function 'func_bh' ("bottom half") is given, it is executed after 'func' has successfully
|
||||||
|
* finished.
|
||||||
|
*
|
||||||
|
* @param cmdbase The command base
|
||||||
|
* @param func The function to be executed
|
||||||
|
* @param func_bh The bottom half function to be executed after all pending events from func are processed
|
||||||
|
* @param arg Argument passed to func (and func_bh)
|
||||||
|
* @return Return value of func (or func_bh if func_bh is not NULL)
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
commands_exec_sync(struct commands_base *cmdbase, command_function func, command_function func_bh, void *arg);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Execute the function 'func' with the given argument 'arg' in the event loop thread.
|
||||||
|
* Triggers the function execution and immediately returns (does not wait for func to finish).
|
||||||
|
*
|
||||||
|
* The pointer passed as argument is freed in the event loop thread after func returned.
|
||||||
|
*
|
||||||
|
* @param cmdbase The command base
|
||||||
|
* @param func The function to be executed
|
||||||
|
* @param arg Argument passed to func
|
||||||
|
* @return 0 if triggering the function execution succeeded, -1 on failure.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
commands_exec_async(struct commands_base *cmdbase, command_function func, void *arg);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* SRC_COMMANDS_H_ */
|
Loading…
Reference in New Issue