[cache] Multitreaded header encoding

This commit is contained in:
ejurgensen 2024-01-18 23:14:58 +01:00
parent 088c393dd6
commit 7dd34792ea
1 changed files with 169 additions and 107 deletions

View File

@ -93,6 +93,19 @@ struct cache_artwork_stash
uint8_t *data;
};
struct cache_xcode_job
{
const char *format;
char *file_path;
int file_id;
struct event *ev;
bool is_encoding;
struct evbuffer *header;
};
/* --------------------------------- GLOBALS -------------------------------- */
// cache thread
@ -188,12 +201,13 @@ static struct cache_db_def cache_artwork_db_def[] = {
// Transcoding cache
#define CACHE_XCODE_VERSION 1
#define CACHE_XCODE_NTHREADS 4
#define CACHE_XCODE_FORMAT_MP4 "mp4"
static sqlite3 *cache_xcode_hdl;
static struct event *cache_xcode_updateev;
static struct event *cache_xcode_prepareev;
static struct cache_xcode_job cache_xcode_jobs[CACHE_XCODE_NTHREADS];
static bool cache_xcode_is_enabled;
static bool cache_xcode_prepare_is_running;
static int cache_xcode_last_file;
static struct cache_db_def cache_xcode_db_def[] = {
DB_DEF_ADMIN,
{
@ -212,7 +226,8 @@ static struct cache_db_def cache_xcode_db_def[] = {
" timestamp INTEGER DEFAULT 0,"
" file_id INTEGER DEFAULT 0,"
" format VARCHAR(255) NOT NULL,"
" header BLOB"
" header BLOB,"
" UNIQUE(file_id, format) ON CONFLICT REPLACE"
");",
"DROP TABLE IF EXISTS data;",
},
@ -856,6 +871,24 @@ cache_daap_update_cb(int fd, short what, void *arg)
DPRINTF(E_INFO, L_CACHE, "DAAP cache updated\n");
}
/* ----------------------- Caching of transcoded data ----------------------- */
static void
xcode_job_clear(struct cache_xcode_job *job)
{
free(job->file_path);
if (job->header)
evbuffer_free(job->header);
// Can't just memset to zero, because *ev is persistent
job->format = NULL;
job->file_path = NULL;
job->file_id = 0;
job->header = NULL;
job->is_encoding = false;
}
static enum command_state
xcode_header_get(void *arg, int *retval)
{
@ -934,7 +967,7 @@ xcode_add_entry(sqlite3 *hdl, uint32_t id, uint32_t ts, const char *path)
char *errmsg;
int ret;
DPRINTF(E_LOG, L_CACHE, "Adding xcode file id %d, path '%s'\n", id, path);
DPRINTF(E_SPAM, L_CACHE, "Adding xcode file id %d, path '%s'\n", id, path);
query = sqlite3_mprintf(Q_TMPL, id, ts, path);
@ -960,7 +993,7 @@ xcode_del_entry(sqlite3 *hdl, uint32_t id)
char *errmsg;
int ret;
DPRINTF(E_LOG, L_CACHE, "Deleting xcode file id %d\n", id);
DPRINTF(E_SPAM, L_CACHE, "Deleting xcode file id %d\n", id);
sqlite3_snprintf(sizeof(query), query, Q_TMPL_FILES, (int)id);
ret = sqlite3_exec(hdl, query, NULL, NULL, &errmsg);
@ -1006,8 +1039,11 @@ xcode_sync_with_files(sqlite3 *hdl)
size_t cachelist_len = 0;
struct query_params qp = { .type = Q_ITEMS, .filter = "f.data_kind = 0", .order = "f.id" };
struct db_media_file_info dbmfi;
struct db_media_file_info *rowA;
struct cachelist *rowB;
uint32_t id;
uint32_t ts;
int cmp;
int i;
int ret;
@ -1033,42 +1069,47 @@ xcode_sync_with_files(sqlite3 *hdl)
if (ret < 0)
goto error;
// Loop while either list has remaining items
i = 0;
while (1)
// Loop while either list ("A" files list, "B" cache list) has remaining items
for(i = 0, cmp = 0;;)
{
ret = db_query_fetch_file(&dbmfi, &qp);
if (ret != 0) // At end of files table (or error occured)
{
for (; i < cachelist_len; i++)
xcode_del_entry(hdl, cachelist[i].id);
if (cmp <= 0)
rowA = (db_query_fetch_file(&dbmfi, &qp) == 0) ? &dbmfi : NULL;;
if (cmp >= 0)
rowB = (i < cachelist_len) ? &cachelist[i++] : NULL;
if (!rowA && !rowB)
break; // Done with both lists
break;
#if 0
if (rowA)
DPRINTF(E_DBG, L_CACHE, "cmp %d, rowA->id %s\n", cmp, rowA->id);
if (rowB)
DPRINTF(E_DBG, L_CACHE, "cmp %d, rowB->id %u, i %d, cachelist_len %zu\n", cmp, rowB->id, i, cachelist_len);
#endif
if (rowA)
{
safe_atou32(rowA->id, &id);
safe_atou32(rowA->time_modified, &ts);
}
safe_atou32(dbmfi.id, &id);
safe_atou32(dbmfi.time_modified, &ts);
if (i == cachelist_len || cachelist[i].id > id) // At end of cache table or new file
cmp = 0; // In both lists - unless:
if (!rowB || (rowA && rowB->id > id)) // A had an item not in B
{
xcode_add_entry(hdl, id, ts, dbmfi.path);
xcode_add_entry(hdl, id, ts, rowA->path);
cmp = -1;
}
else if (cachelist[i].id < id) // Removed file
else if (!rowA || (rowB && rowB->id < id)) // B had an item not in A
{
xcode_del_entry(hdl, cachelist[i].id);
i++;
xcode_del_entry(hdl, rowB->id);
cmp = 1;
}
else if (cachelist[i].id == id && cachelist[i].ts < ts) // Modified file
else if (rowB->id == id && rowB->ts < ts) // Item in B is too old
{
xcode_del_entry(hdl, cachelist[i].id);
xcode_add_entry(hdl, id, ts, dbmfi.path);
i++;
}
else // Found in both tables and timestamp in cache table is adequate
{
i++;
xcode_del_entry(hdl, rowB->id);
xcode_add_entry(hdl, id, ts, rowA->path);
}
}
db_query_end(&qp);
free(cachelist);
@ -1081,135 +1122,157 @@ xcode_sync_with_files(sqlite3 *hdl)
}
static int
xcode_prepare_header(sqlite3 *hdl, const char *format, int id, const char *path)
xcode_header_save(sqlite3 *hdl, int file_id, const char *format, uint8_t *data, size_t datalen)
{
#define Q_TMPL "INSERT INTO data (timestamp, file_id, format, header) VALUES (?, ?, ?, ?);"
struct evbuffer *header = NULL;
sqlite3_stmt *stmt = NULL;
unsigned char *data = NULL;
size_t datalen = 0;
sqlite3_stmt *stmt;
int ret;
DPRINTF(E_DBG, L_CACHE, "Preparing %s header for '%s' (file id %d)\n", format, path, id);
#if 1
if (strcmp(format, "mp4") == 0)
ret = transcode_prepare_header(&header, XCODE_MP4_ALAC, path);
else
ret = -1;
// Proceed even if error, we also cache that
if (ret == 0)
{
datalen = evbuffer_get_length(header);
data = evbuffer_pullup(header, -1);
}
#elif
data = (unsigned char*)"dummy";
datalen = 6;
#endif
ret = sqlite3_prepare_v2(hdl, Q_TMPL, -1, &stmt, 0);
if (ret != SQLITE_OK)
{
DPRINTF(E_LOG, L_CACHE, "Error preparing xcode_data for cache update: %s\n", sqlite3_errmsg(hdl));
goto error;
DPRINTF(E_LOG, L_CACHE, "Error preparing xcode data for cache update: %s\n", sqlite3_errmsg(hdl));
return -1;
}
sqlite3_bind_int(stmt, 1, (uint64_t)time(NULL));
sqlite3_bind_int(stmt, 2, id);
sqlite3_bind_int(stmt, 2, file_id);
sqlite3_bind_text(stmt, 3, format, -1, SQLITE_STATIC);
sqlite3_bind_blob(stmt, 4, data, datalen, SQLITE_STATIC);
ret = sqlite3_step(stmt);
if (ret != SQLITE_DONE)
{
DPRINTF(E_LOG, L_CACHE, "Error stepping xcode_data for cache update: %s\n", sqlite3_errmsg(hdl));
goto error;
DPRINTF(E_LOG, L_CACHE, "Error stepping xcode data for cache update: %s\n", sqlite3_errmsg(hdl));
return -1;
}
sqlite3_finalize(stmt);
if (header)
evbuffer_free(header);
return 0;
error:
if (stmt)
sqlite3_finalize(stmt);
if (header)
evbuffer_free(header);
return -1;
#undef Q_TMPL
}
static int
xcode_prepare_next_header_impl(sqlite3 *hdl, const char *format)
xcode_file_next(int *file_id, char **file_path, sqlite3 *hdl, const char *format)
{
#define Q_TMPL "SELECT f.id, f.filepath, d.id FROM files f LEFT JOIN data d ON f.id = d.file_id AND d.format = '%q' WHERE d.id IS NULL LIMIT 1;"
sqlite3_stmt *stmt;
char *query;
const char *file_path;
int file_id;
char query[256];
int ret;
query = sqlite3_mprintf(Q_TMPL, format);
sqlite3_snprintf(sizeof(query), query, Q_TMPL, format);
ret = sqlite3_prepare_v2(hdl, query, -1, &stmt, 0);
if (ret != SQLITE_OK)
goto error;
{
DPRINTF(E_LOG, L_CACHE, "Error occured while finding next file to prepare header for\n");
return -1;
}
ret = sqlite3_step(stmt);
if (ret != SQLITE_ROW)
{
sqlite3_finalize(stmt);
sqlite3_free(query);
return -1; // All done
}
file_id = sqlite3_column_int(stmt, 0);
file_path = (const char *)sqlite3_column_text(stmt, 1);
xcode_prepare_header(hdl, format, file_id, file_path);
*file_id = sqlite3_column_int(stmt, 0);
*file_path = strdup((char *)sqlite3_column_text(stmt, 1));
sqlite3_finalize(stmt);
sqlite3_free(query);
return file_id;
error:
DPRINTF(E_LOG, L_CACHE, "Error occured while preparing headers\n");
sqlite3_free(query);
return -1;
// Save an empty header so next call to this function will return a new file
return xcode_header_save(hdl, *file_id, format, NULL, 0);
#undef Q_TMPL
}
// Thread: worker
static void
xcode_prepare_next_header(void *arg)
xcode_worker(void *arg)
{
struct cache_xcode_job *job = *(struct cache_xcode_job **)arg;
int ret;
// Preparing headers can take very long, so we take one at a time, letting the
// event loop run in between
ret = xcode_prepare_next_header_impl(cache_xcode_hdl, "mp4");
if (ret < 0 || ret == cache_xcode_last_file)
DPRINTF(E_DBG, L_CACHE, "Preparing %s header for '%s' (file id %d)\n", job->format, job->file_path, job->file_id);
if (strcmp(job->format, CACHE_XCODE_FORMAT_MP4) == 0)
{
DPRINTF(E_LOG, L_CACHE, "Header generation completed\n");
cache_xcode_prepare_is_running = false;
return;
ret = transcode_prepare_header(&job->header, XCODE_MP4_ALAC, job->file_path);
if (ret < 0)
DPRINTF(E_LOG, L_CACHE, "Error preparing %s header for '%s' (file id %d)\n", job->format, job->file_path, job->file_id);
}
// Used as failsafe to protect against infinite looping
cache_xcode_last_file = ret;
// Tell the cache thread that we are done. Only the cache thread can save the
// result to the DB.
event_active(job->ev, 0, 0);
}
static void
cache_xcode_job_complete_cb(int fd, short what, void *arg)
{
struct cache_xcode_job *job = arg;
uint8_t *data;
size_t datalen;
if (job->header)
{
#if 1
datalen = evbuffer_get_length(job->header);
data = evbuffer_pullup(job->header, -1);
#else
data = (unsigned char*)"dummy";
datalen = 6;
#endif
xcode_header_save(cache_xcode_hdl, job->file_id, job->format, data, datalen);
}
xcode_job_clear(job); // Makes the job available again
event_active(cache_xcode_prepareev, 0, 0);
}
// Preparing headers can take very long, so we use worker threads. However, all
// DB access must be from the cache thread. So this function will find the next
// file from the db and then dispatch a thread for the encoding.
static void
cache_xcode_prepare_cb(int fd, short what, void *arg)
{
struct cache_xcode_job *job = NULL;
bool is_encoding = false;
int ret;
int i;
if (!cache_is_initialized)
return;
event_active(cache_xcode_prepareev, 0, 0);
}
for (i = 0; i < ARRAY_SIZE(cache_xcode_jobs); i++)
{
if (cache_xcode_jobs[i].is_encoding)
is_encoding = true;
else if (!job)
job = &cache_xcode_jobs[i];
}
static void
cache_xcode_prepare_cb(int fd, short what, void *arg)
{
worker_execute(xcode_prepare_next_header, NULL, 0, 0);
if (!job)
return; // No available thread right now, wait for cache_xcode_job_complete_cb()
ret = xcode_file_next(&job->file_id, &job->file_path, cache_xcode_hdl, CACHE_XCODE_FORMAT_MP4);
if (ret < 0)
{
if (!is_encoding)
DPRINTF(E_LOG, L_CACHE, "Header generation completed\n");
return;
}
else if (!is_encoding)
DPRINTF(E_LOG, L_CACHE, "Kicking off header generation\n");
job->is_encoding = true;
job->format = CACHE_XCODE_FORMAT_MP4;
worker_execute(xcode_worker, &job, sizeof(struct cache_xcode_job *), 0);
// Set off more threads
event_active(cache_xcode_prepareev, 0, 0);
}
static void
@ -1218,12 +1281,6 @@ cache_xcode_update_cb(int fd, short what, void *arg)
if (xcode_sync_with_files(cache_xcode_hdl) < 0)
return;
if (!cache_is_initialized || cache_xcode_prepare_is_running)
return;
DPRINTF(E_LOG, L_CACHE, "Kicking off header generation\n");
cache_xcode_prepare_is_running = true;
event_active(cache_xcode_prepareev, 0, 0);
}
@ -1628,6 +1685,7 @@ static void *
cache(void *arg)
{
int ret;
int i;
ret = cache_open();
if (ret < 0)
@ -1651,6 +1709,8 @@ cache(void *arg)
CHECK_NULL(L_CACHE, cache_xcode_updateev = evtimer_new(evbase_cache, cache_xcode_update_cb, NULL));
CHECK_NULL(L_CACHE, cache_xcode_prepareev = evtimer_new(evbase_cache, cache_xcode_prepare_cb, NULL));
CHECK_ERR(L_CACHE, event_priority_set(cache_xcode_prepareev, 0));
for (i = 0; i < ARRAY_SIZE(cache_xcode_jobs); i++)
CHECK_NULL(L_CACHE, cache_xcode_jobs[i].ev = evtimer_new(evbase_cache, cache_xcode_job_complete_cb, &cache_xcode_jobs[i]));
CHECK_ERR(L_CACHE, listener_add(cache_daap_listener_cb, LISTENER_DATABASE));
@ -1666,6 +1726,8 @@ cache(void *arg)
listener_remove(cache_daap_listener_cb);
for (i = 0; i < ARRAY_SIZE(cache_xcode_jobs); i++)
event_free(cache_xcode_jobs[i].ev);
event_free(cache_xcode_prepareev);
event_free(cache_xcode_updateev);
event_free(cache_daap_updateev);