Fixes for sqlite3 from jlbz on the forums, walk out the thread pooling

This commit is contained in:
Ron Pedde 2006-03-24 05:10:29 +00:00
parent 786ccfecda
commit be8eef8384

View File

@ -1,6 +1,6 @@
/*
* $Id$
* sqlite3-specific db implementation
* sqlite2-specific db implementation
*
* Copyright (C) 2005 Ron Pedde (ron@pedde.com)
*
@ -56,171 +56,36 @@
# define FALSE 0
#endif
#define DB_SQLITE3_JOB_DONE 0
#define DB_SQLITE3_JOB_NOP 1
#define DB_SQLITE3_JOB_OPEN 2
#define DB_SQLITE3_JOB_CLOSE 3
#define DB_SQLITE3_JOB_EXEC 4
#define DB_SQLITE3_JOB_CHANGES 5
#define DB_SQLITE3_JOB_EBEGIN 6
#define DB_SQLITE3_JOB_EFETCH 7
#define DB_SQLITE3_JOB_ESTEP 8
#define DB_SQLITE3_JOB_FINALIZE 9
#define DB_SQLITE3_JOB_ROWID 10
#define DB_SQLITE3_JOB_QUIT 99
/* Globals */
static pthread_mutex_t _db_sqlite3_mutex = PTHREAD_MUTEX_INITIALIZER; /**< sqlite not reentrant */
static sqlite3_stmt *_db_sqlite3_stmt;
static sqlite3 *db_sqlite3_songs; /**< Database that holds the mp3 info */
static pthread_mutex_t db_sqlite3_mutex = PTHREAD_MUTEX_INITIALIZER; /**< sqlite not reentrant */
static sqlite3_stmt *db_sqlite3_stmt;
static int db_sqlite3_reload=0;
static char *_db_sqlite3_enum_query=NULL;
static char **_db_sqlite3_row = NULL;
static char *db_sqlite3_enum_query=NULL;
static char **db_sqlite3_row = NULL;
static char db_sqlite3_path[PATH_MAX + 1];
static pthread_t _db_sqlite3_tid;
static pthread_cond_t _db_sqlite3_start = PTHREAD_COND_INITIALIZER;
static pthread_cond_t _db_sqlite3_done = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t _db_sqlite3_mutex_job = PTHREAD_MUTEX_INITIALIZER;
/* Job info */
static volatile int _db_sqlite3_job = DB_SQLITE3_JOB_DONE;
static sqlite3 *_db_sqlite3_songs; /**< Database that holds the mp3 info */
static int _db_sqlite3_err;
static char *_db_sqlite3_perr=NULL;
static char *_db_sqlite3_query;
#define DB_SQLITE3_VERSION 9
/* Forwards */
void _db_sqlite3_lock(void);
void _db_sqlite3_unlock(void);
void db_sqlite3_lock(void);
void db_sqlite3_unlock(void);
extern char *db_sqlite3_initial1;
extern char *db_sqlite3_initial2;
int _db_sqlite3_enum_begin_helper(char **pe);
int db_sqlite3_enum_begin_helper(char **pe);
/**
* throw a job at the worker thread.
*
* @param pe error buffer
* @param job job (DB_SQLITE3_JOB_*)
* @returns DB_E_SUCCESS on success, DB_E_* otherwise
*/
int _db_sqlite3_start_job(int job) {
int err;
DPRINTF(E_SPAM,L_DB,"About to submit job (%d).. waiting for mutex\n",job);
if((err=pthread_mutex_lock(&_db_sqlite3_mutex_job))) {
DPRINTF(E_FATAL,L_DB,"cannot lock sqlite job lock: %s\n",strerror(err));
}
/* we'll assume that all the other stuff is set correctly */
_db_sqlite3_job = job;
pthread_cond_signal(&_db_sqlite3_start);
DPRINTF(E_SPAM,L_DB,"Submitting sqlite job type: %d\n",job);
/* now wait for the job to be done */
while(_db_sqlite3_job != DB_SQLITE3_JOB_DONE) {
pthread_cond_wait(&_db_sqlite3_done,&_db_sqlite3_mutex_job);
}
DPRINTF(E_SPAM,L_DB,"Job done: status %d, unlocking mutex\n",_db_sqlite3_err);
pthread_mutex_unlock(&_db_sqlite3_mutex_job);
return _db_sqlite3_err;
}
/**
* worker thread main loop. since sqlite3 is picky about only
* using handles from the thread that opened, I'm going to make
* a single worker thread pool to handle all db stuff from one
* thread.
*/
void *_db_sqlite3_threadproc(void *arg) {
int err;
char *perr;
const char *ptail;
int cols;
int idx;
static int done=0;
/* we'll just sit on the "start" cond */
DPRINTF(E_INF,L_DB,"sqlite3 worker: starting\n");
if((err=pthread_mutex_lock(&_db_sqlite3_mutex_job))) {
DPRINTF(E_FATAL,L_DB,"cannot lock sqlite job lock: %s\n",strerror(err));
}
while(!done) {
while(_db_sqlite3_job == DB_SQLITE3_JOB_DONE) {
DPRINTF(E_SPAM,L_DB,"sqlite3 worker: about to cond_wait...\n");
pthread_cond_wait(&_db_sqlite3_start, &_db_sqlite3_mutex_job);
}
DPRINTF(E_SPAM,L_DB,"sqlite3 worker: Found job type %d\n",_db_sqlite3_job);
_db_sqlite3_err = SQLITE_OK;
/* case takes up too much horizontal space */
if(_db_sqlite3_job == DB_SQLITE3_JOB_OPEN) {
err=sqlite3_open(db_sqlite3_path,&_db_sqlite3_songs);
if(err == SQLITE_OK) {
sqlite3_busy_timeout(_db_sqlite3_songs,30000);
}
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_CLOSE) {
err=sqlite3_close(_db_sqlite3_songs);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_EXEC) {
err = sqlite3_exec(_db_sqlite3_songs,_db_sqlite3_query,NULL,NULL,&perr);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_CHANGES) {
err = sqlite3_changes(_db_sqlite3_songs);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_EBEGIN) {
err=sqlite3_prepare(_db_sqlite3_songs,_db_sqlite3_enum_query,0,
&_db_sqlite3_stmt,&ptail);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_ESTEP) {
err=sqlite3_step(_db_sqlite3_stmt);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_EFETCH) {
cols = sqlite3_column_count(_db_sqlite3_stmt);
if(!_db_sqlite3_row) {
/* alloc space */
_db_sqlite3_row = (char**)malloc((sizeof(char*)) * cols);
if(!_db_sqlite3_row)
DPRINTF(E_FATAL,L_DB,"Malloc error\n");
}
for(idx=0; idx < cols; idx++) {
_db_sqlite3_row[idx] = (char*)sqlite3_column_text(_db_sqlite3_stmt,idx);
DPRINTF(E_SPAM,L_DB,"Fetched %s\n",_db_sqlite3_row[idx]);
}
err = SQLITE_OK;
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_FINALIZE) {
err = sqlite3_finalize(_db_sqlite3_stmt);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_ROWID) {
err = (int)sqlite3_last_insert_rowid(_db_sqlite3_songs);
} else if(_db_sqlite3_job == DB_SQLITE3_JOB_QUIT) {
done = 1;
}
_db_sqlite3_err = err;
DPRINTF(E_SPAM,L_DB,"sqlite3 worker: finished job with %d\n",err);
/* hand it back to the client */
_db_sqlite3_job = DB_SQLITE3_JOB_DONE;
pthread_cond_signal(&_db_sqlite3_done);
}
pthread_mutex_unlock(&_db_sqlite3_mutex_job);
DPRINTF(E_INF,L_DB,"sqlite3 worker exiting\n");
return NULL;
}
/**
* lock the db_mutex
*/
void _db_sqlite3_lock(void) {
void db_sqlite3_lock(void) {
int err;
if((err=pthread_mutex_lock(&_db_sqlite3_mutex))) {
if((err=pthread_mutex_lock(&db_sqlite3_mutex))) {
DPRINTF(E_FATAL,L_DB,"cannot lock sqlite lock: %s\n",strerror(err));
}
}
@ -228,10 +93,10 @@ void _db_sqlite3_lock(void) {
/**
* unlock the db_mutex
*/
void _db_sqlite3_unlock(void) {
void db_sqlite3_unlock(void) {
int err;
if((err=pthread_mutex_unlock(&_db_sqlite3_mutex))) {
if((err=pthread_mutex_unlock(&db_sqlite3_mutex))) {
DPRINTF(E_FATAL,L_DB,"cannot unlock sqlite3 lock: %s\n",strerror(err));
}
}
@ -262,28 +127,20 @@ void db_sqlite3_vmfree(char *query) {
int db_sqlite3_open(char **pe, char *dsn) {
int ver;
int err;
char *perr;
snprintf(db_sqlite3_path,sizeof(db_sqlite3_path),"%s/songs3.db",dsn);
_db_sqlite3_lock();
if((err=pthread_create(&_db_sqlite3_tid,NULL,
_db_sqlite3_threadproc,NULL))) {
DPRINTF(E_LOG,L_DB,"Could not spawn thread: %s\n",strerror(err));
return DB_E_PROC;
}
if(_db_sqlite3_start_job(DB_SQLITE3_JOB_OPEN) != SQLITE_OK) {
perr = _db_sqlite3_perr;
db_get_error(pe,DB_E_SQL_ERROR,perr);
DPRINTF(E_LOG,L_DB,"db_sqlite3_open: %s (%s)\n",*pe,
db_sqlite3_lock();
if(sqlite3_open(db_sqlite3_path,&db_sqlite3_songs) != SQLITE_OK) {
db_get_error(pe,DB_E_SQL_ERROR,sqlite3_errmsg(db_sqlite3_songs));
DPRINTF(E_LOG,L_DB,"db_sqlite3_open: %s (%s)\n",pe ? *pe : "Unknown",
db_sqlite3_path);
_db_sqlite3_unlock();
sqlite3_free(perr);
db_sqlite3_unlock();
return DB_E_SQL_ERROR;
}
_db_sqlite3_unlock();
sqlite3_busy_timeout(db_sqlite3_songs,30000); /* 30 seconds */
db_sqlite3_unlock();
err = db_sql_fetch_int(pe,&ver,"select value from config where "
"term='version'");
@ -306,11 +163,9 @@ int db_sqlite3_open(char **pe, char *dsn) {
* close the database
*/
int db_sqlite3_close(void) {
_db_sqlite3_lock();
_db_sqlite3_start_job(DB_SQLITE3_JOB_CLOSE);
_db_sqlite3_start_job(DB_SQLITE3_JOB_QUIT);
pthread_join(_db_sqlite3_tid,NULL);
_db_sqlite3_unlock();
db_sqlite3_lock();
sqlite3_close(db_sqlite3_songs);
db_sqlite3_unlock();
return DB_E_SUCCESS;
}
@ -326,33 +181,32 @@ int db_sqlite3_close(void) {
*/
int db_sqlite3_exec(char **pe, int loglevel, char *fmt, ...) {
va_list ap;
char *query;
int err;
char *perr;
_db_sqlite3_lock();
db_sqlite3_lock();
va_start(ap,fmt);
_db_sqlite3_query=sqlite3_vmprintf(fmt,ap);
query=sqlite3_vmprintf(fmt,ap);
va_end(ap);
DPRINTF(E_DBG,L_DB,"Executing: %s\n",_db_sqlite3_query);
DPRINTF(E_DBG,L_DB,"Executing: %s\n",query);
err=_db_sqlite3_start_job(DB_SQLITE3_JOB_EXEC);
perr = _db_sqlite3_perr;
err=sqlite3_exec(db_sqlite3_songs,query,NULL,NULL,&perr);
if(err != SQLITE_OK) {
db_get_error(pe,DB_E_SQL_ERROR,perr);
DPRINTF(loglevel == E_FATAL ? E_LOG : loglevel,L_DB,"Query: %s\n",
_db_sqlite3_query);
query);
DPRINTF(loglevel,L_DB,"Error: %s\n",perr);
sqlite3_free(perr);
} else {
DPRINTF(E_DBG,L_DB,"Rows: %d\n",_db_sqlite3_start_job(DB_SQLITE3_JOB_CHANGES));
DPRINTF(E_DBG,L_DB,"Rows: %d\n",sqlite3_changes(db_sqlite3_songs));
}
sqlite3_free(_db_sqlite3_query);
sqlite3_free(query);
_db_sqlite3_unlock();
db_sqlite3_unlock();
if(err != SQLITE_OK)
return DB_E_SQL_ERROR;
@ -365,39 +219,38 @@ int db_sqlite3_exec(char **pe, int loglevel, char *fmt, ...) {
int db_sqlite3_enum_begin(char **pe, char *fmt, ...) {
va_list ap;
_db_sqlite3_lock();
db_sqlite3_lock();
va_start(ap, fmt);
_db_sqlite3_enum_query = sqlite3_vmprintf(fmt,ap);
db_sqlite3_enum_query = sqlite3_vmprintf(fmt,ap);
va_end(ap);
DPRINTF(E_SPAM,L_DB,"Starting enum_begin: %s\n",_db_sqlite3_enum_query);
return _db_sqlite3_enum_begin_helper(pe);
return db_sqlite3_enum_begin_helper(pe);
}
int _db_sqlite3_enum_begin_helper(char **pe) {
int db_sqlite3_enum_begin_helper(char **pe) {
int err;
const char *ptail;
if(!_db_sqlite3_enum_query)
if(!db_sqlite3_enum_query)
*((int*)NULL) = 1;
DPRINTF(E_DBG,L_DB,"Executing: %s\n",_db_sqlite3_enum_query);
err=_db_sqlite3_start_job(DB_SQLITE3_JOB_EBEGIN);
DPRINTF(E_DBG,L_DB,"Executing: %s\n",db_sqlite3_enum_query);
err=sqlite3_prepare(db_sqlite3_songs,db_sqlite3_enum_query,-1,
&db_sqlite3_stmt,&ptail);
if(err != SQLITE_OK) {
DPRINTF(E_SPAM,L_DB,"Error: %s, enum exiting\n",_db_sqlite3_perr);
db_get_error(pe,DB_E_SQL_ERROR,_db_sqlite3_perr);
sqlite3_free(_db_sqlite3_perr);
sqlite3_free(_db_sqlite3_enum_query);
_db_sqlite3_enum_query = NULL;
_db_sqlite3_unlock();
db_get_error(pe,DB_E_SQL_ERROR,sqlite3_errmsg(db_sqlite3_songs));
db_sqlite3_unlock();
sqlite3_free(db_sqlite3_enum_query);
db_sqlite3_enum_query=NULL;
return DB_E_SQL_ERROR;
}
/* otherwise, we leave the db locked while we walk through the enums */
if(_db_sqlite3_row)
free(_db_sqlite3_row);
_db_sqlite3_row=NULL;
if(db_sqlite3_row)
free(db_sqlite3_row);
db_sqlite3_row=NULL;
return DB_E_SUCCESS;
@ -417,15 +270,15 @@ int _db_sqlite3_enum_begin_helper(char **pe) {
*/
int db_sqlite3_enum_fetch(char **pe, SQL_ROW *pr) {
int err;
int cols;
int idx;
int counter=10;
DPRINTF(E_SPAM,L_DB,"Fetching row for %s\n",_db_sqlite3_enum_query);
if(!_db_sqlite3_enum_query)
if(!db_sqlite3_enum_query)
*((int*)NULL) = 1;
while(counter--) {
err=_db_sqlite3_start_job(DB_SQLITE3_JOB_ESTEP);
err=sqlite3_step(db_sqlite3_stmt);
if(err != SQLITE_BUSY)
break;
usleep(100);
@ -433,25 +286,36 @@ int db_sqlite3_enum_fetch(char **pe, SQL_ROW *pr) {
if(err == SQLITE_DONE) {
*pr = NULL;
if(_db_sqlite3_row)
free(_db_sqlite3_row);
_db_sqlite3_row = NULL;
if(db_sqlite3_row)
free(db_sqlite3_row);
db_sqlite3_row = NULL;
return DB_E_SUCCESS;
}
if(err == SQLITE_ROW) {
err = _db_sqlite3_start_job(DB_SQLITE3_JOB_EFETCH);
*pr = _db_sqlite3_row;
cols = sqlite3_column_count(db_sqlite3_stmt);
if(!db_sqlite3_row) {
/* gotta alloc space */
db_sqlite3_row = (char**)malloc((sizeof(char*)) * cols);
if(!db_sqlite3_row)
DPRINTF(E_FATAL,L_DB,"Malloc error\n");
}
for(idx=0; idx < cols; idx++) {
db_sqlite3_row[idx] = (char*) sqlite3_column_text(db_sqlite3_stmt,idx);
}
*pr = db_sqlite3_row;
return DB_E_SUCCESS;
}
if(_db_sqlite3_row)
free(_db_sqlite3_row);
_db_sqlite3_row = NULL;
if(db_sqlite3_row)
free(db_sqlite3_row);
db_sqlite3_row = NULL;
db_get_error(pe,DB_E_SQL_ERROR,_db_sqlite3_perr);
sqlite3_free(_db_sqlite3_perr);
_db_sqlite3_start_job(DB_SQLITE3_JOB_FINALIZE);
db_get_error(pe,DB_E_SQL_ERROR,sqlite3_errmsg(db_sqlite3_songs));
sqlite3_finalize(db_sqlite3_stmt);
return DB_E_SQL_ERROR;
}
@ -461,30 +325,24 @@ int db_sqlite3_enum_fetch(char **pe, SQL_ROW *pr) {
*/
int db_sqlite3_enum_end(char **pe) {
int err;
char *perr;
DPRINTF(E_SPAM,L_DB,"Finishing enum for %s\n",_db_sqlite3_enum_query);
if(!_db_sqlite3_enum_query)
if(!db_sqlite3_enum_query)
*((int*)NULL) = 1;
if(_db_sqlite3_row)
free(_db_sqlite3_row);
_db_sqlite3_row = NULL;
sqlite3_free(_db_sqlite3_enum_query);
_db_sqlite3_enum_query = NULL;
if(db_sqlite3_row)
free(db_sqlite3_row);
db_sqlite3_row = NULL;
sqlite3_free(db_sqlite3_enum_query);
db_sqlite3_enum_query = NULL;
err = _db_sqlite3_start_job(DB_SQLITE3_JOB_FINALIZE);
err = sqlite3_finalize(db_sqlite3_stmt);
if(err != SQLITE_OK) {
perr = _db_sqlite3_perr;
db_get_error(pe,DB_E_SQL_ERROR,perr);
DPRINTF(E_LOG,L_DB,"Error in enum_end: %s\n",perr);
sqlite3_free(perr);
_db_sqlite3_unlock();
db_get_error(pe,DB_E_SQL_ERROR,sqlite3_errmsg(db_sqlite3_songs));
db_sqlite3_unlock();
return DB_E_SQL_ERROR;
}
_db_sqlite3_unlock();
db_sqlite3_unlock();
return DB_E_SUCCESS;
}
@ -492,7 +350,7 @@ int db_sqlite3_enum_end(char **pe) {
* restart the enumeration
*/
int db_sqlite3_enum_restart(char **pe) {
return _db_sqlite3_enum_begin_helper(pe);
return db_sqlite3_enum_begin_helper(pe);
}
@ -587,9 +445,9 @@ int db_sqlite3_event(int event_type) {
int db_sqlite3_insert_id(void) {
int result;
_db_sqlite3_lock();
result=_db_sqlite3_start_job(DB_SQLITE3_JOB_ROWID);
_db_sqlite3_unlock();
db_sqlite3_lock();
result = (int)sqlite3_last_insert_rowid(db_sqlite3_songs);
db_sqlite3_unlock();
return result;
}