diff --git a/db/db.rs b/db/db.rs index f3a4477..3363890 100644 --- a/db/db.rs +++ b/db/db.rs @@ -372,9 +372,13 @@ pub struct Stream { /// On flush, delete the following recordings. Note they must be the oldest recordings. to_delete: Vec, - /// The total bytes to delete. + /// The total bytes to delete with the next flush. pub bytes_to_delete: i64, + /// The total bytes to add with the next flush. (`mark_synced` has already been called on these + /// recordings.) + pub bytes_to_add: i64, + /// The total duration of recorded data. This may not be `range.end - range.start` due to /// gaps and overlap. pub duration: recording::Duration, @@ -394,6 +398,9 @@ pub struct Stream { /// /// TODO: alter the serving path to show these just as if they were already committed. uncommitted: VecDeque>>, + + /// The number of recordings in `uncommitted` which are synced and ready to commit. + synced_recordings: usize, } impl Stream { @@ -402,11 +409,8 @@ impl Stream { /// before it are unsynced. pub(crate) fn unflushed(&self) -> recording::Duration { let mut dur = recording::Duration(0); - for u in &self.uncommitted { - let l = u.lock(); - if !l.synced { - break; - } + for i in 0 .. self.synced_recordings { + let l = self.uncommitted[i].lock(); if let Some(ref r) = l.recording { dur += r.time.end - r.time.start; } @@ -417,11 +421,7 @@ impl Stream { #[derive(Debug)] pub(crate) struct UncommittedRecording { - /// If this recording has been synced to disk and thus is ready to commit to the database. - /// `recording` should not be modified after this is set to true. - pub(crate) synced: bool, - - /// The recording to add. Absent iff the recording has been abandoned. + /// The recording to add. Absent if not yet ready. /// TODO: modify `SampleIndexEncoder` to update this record as it goes. pub(crate) recording: Option, } @@ -591,13 +591,6 @@ pub(crate) struct Open { pub(crate) uuid: Uuid, } -/// A modification to be done to a `Stream`, used within `LockedDatabase::flush`. -#[derive(Default)] -struct StreamModification { - num_recordings_to_commit: i32, - range: Option>, -} - #[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)] pub struct CompositeId(pub i64); @@ -723,11 +716,13 @@ impl StreamStateChanger { sample_file_bytes: 0, to_delete: Vec::new(), bytes_to_delete: 0, + bytes_to_add: 0, duration: recording::Duration(0), days: BTreeMap::new(), record: sc.record, next_recording_id: 1, uncommitted: VecDeque::new(), + synced_recordings: 0, }))); } } @@ -767,6 +762,11 @@ impl LockedDatabase { &self.sample_file_dirs_by_id } + /// Adds a placeholder for an uncommitted recording. + /// The caller should write and sync the file and populate the returned `UncommittedRecording` + /// (noting that the database lock must not be acquired while holding the + /// `UncommittedRecording`'s lock) then call `mark_synced`. The data will be written to the + /// database on the next `flush`. pub(crate) fn add_recording(&mut self, stream_id: i32) -> Result<(CompositeId, Arc>), Error> { let stream = match self.streams_by_id.get_mut(&stream_id) { @@ -776,13 +776,37 @@ impl LockedDatabase { let id = CompositeId::new(stream_id, stream.next_recording_id + (stream.uncommitted.len() as i32)); let recording = Arc::new(Mutex::new(UncommittedRecording { - synced: false, recording: None, })); stream.uncommitted.push_back(Arc::clone(&recording)); Ok((id, recording)) } + /// Marks the given uncomitted recording as synced and ready to flush. + /// This must be the next unsynced recording. + pub(crate) fn mark_synced(&mut self, id: CompositeId) -> Result<(), Error> { + let stream = match self.streams_by_id.get_mut(&id.stream()) { + None => bail!("no stream for recording {}", id), + Some(s) => s, + }; + let next_unsynced = stream.next_recording_id + (stream.synced_recordings as i32); + if id.recording() != next_unsynced { + bail!("can't sync {} when next unsynced recording is {} (next unflushed is {})", + id, next_unsynced, stream.next_recording_id); + } + if stream.synced_recordings == stream.uncommitted.len() { + bail!("can't sync un-added recording {}", id); + } + let l = stream.uncommitted[stream.synced_recordings].lock(); + let r = match l.recording.as_ref() { + None => bail!("can't sync unfinished recording {}", id), + Some(r) => r, + }; + stream.bytes_to_add += r.sample_file_bytes as i64; + stream.synced_recordings += 1; + Ok(()) + } + pub(crate) fn delete_garbage(&mut self, dir_id: i32, ids: &mut Vec) -> Result<(), Error> { let dir = match self.sample_file_dirs_by_id.get_mut(&dir_id) { @@ -808,35 +832,29 @@ impl LockedDatabase { Some(o) => o, }; let tx = self.conn.transaction()?; - let mut mods = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), - Default::default()); + let mut new_ranges = FnvHashMap::with_capacity_and_hasher(self.streams_by_id.len(), + Default::default()); { let mut stmt = tx.prepare_cached(UPDATE_NEXT_RECORDING_ID_SQL)?; for (&stream_id, s) in &self.streams_by_id { // Process additions. - let mut i = 0; - for recording in &s.uncommitted { - let l = recording.lock(); - if !l.synced { break; } - if let Some(ref r) = l.recording { - raw::insert_recording( - &tx, o, CompositeId::new(stream_id, s.next_recording_id + i), &r)?; - } - i += 1; + for i in 0..s.synced_recordings { + let l = s.uncommitted[i].lock(); + let r = l.recording.as_ref().unwrap(); + raw::insert_recording( + &tx, o, CompositeId::new(stream_id, s.next_recording_id + i as i32), &r)?; } - if i > 0 { - let m = mods.entry(stream_id).or_insert_with(StreamModification::default); - m.num_recordings_to_commit = i; + if s.synced_recordings > 0 { + new_ranges.entry(stream_id).or_insert(None); stmt.execute_named(&[ (":stream_id", &stream_id), - (":next_recording_id", &(s.next_recording_id + i)), + (":next_recording_id", &(s.next_recording_id + s.synced_recordings as i32)), ])?; } // Process deletions. if let Some(l) = s.to_delete.last() { - // Add a placeholder for recomputing the range. - mods.entry(stream_id).or_insert_with(StreamModification::default); + new_ranges.entry(stream_id).or_insert(None); let dir = match s.sample_file_dir_id { None => bail!("stream {} has no directory!", stream_id), Some(d) => d, @@ -854,8 +872,8 @@ impl LockedDatabase { for dir in self.sample_file_dirs_by_id.values() { raw::mark_sample_files_deleted(&tx, &dir.to_gc)?; } - for (&stream_id, m) in &mut mods { - m.range = raw::get_range(&tx, stream_id)?; + for (&stream_id, mut r) in &mut new_ranges { + *r = raw::get_range(&tx, stream_id)?; } tx.commit()?; @@ -870,7 +888,7 @@ impl LockedDatabase { let mut added = 0; let mut deleted = 0; - for (stream_id, m) in mods.drain() { + for (stream_id, new_range) in new_ranges.drain() { let s = self.streams_by_id.get_mut(&stream_id).unwrap(); let d = self.sample_file_dirs_by_id.get_mut(&s.sample_file_dir_id.unwrap()).unwrap(); @@ -886,18 +904,20 @@ impl LockedDatabase { } // Process add_recordings. - s.next_recording_id += m.num_recordings_to_commit; - added += m.num_recordings_to_commit; - for _ in 0..m.num_recordings_to_commit { + s.next_recording_id += s.synced_recordings as i32; + added += s.synced_recordings; + s.bytes_to_add = 0; + for _ in 0..s.synced_recordings { let u = s.uncommitted.pop_front().unwrap(); let l = u.lock(); if let Some(ref r) = l.recording { s.add_recording(r.time.clone(), r.sample_file_bytes); } } + s.synced_recordings = 0; // Fix the range. - s.range = m.range; + s.range = new_range; } info!("Flush due to {}: added {} recordings, deleted {}, marked {} files GCed.", reason, added, deleted, gced); @@ -1335,11 +1355,13 @@ impl LockedDatabase { sample_file_bytes: 0, to_delete: Vec::new(), bytes_to_delete: 0, + bytes_to_add: 0, duration: recording::Duration(0), days: BTreeMap::new(), next_recording_id: row.get_checked(7)?, record: row.get_checked(8)?, uncommitted: VecDeque::new(), + synced_recordings: 0, }); c.streams[type_.index()] = Some(id); } @@ -2065,7 +2087,7 @@ mod tests { let mut db = db.lock(); let (id, u) = db.add_recording(main_stream_id).unwrap(); u.lock().recording = Some(recording.clone()); - u.lock().synced = true; + db.mark_synced(id).unwrap(); db.flush("add test").unwrap(); id }; diff --git a/db/dir.rs b/db/dir.rs index 6027120..15d43ad 100644 --- a/db/dir.rs +++ b/db/dir.rs @@ -295,7 +295,7 @@ impl SampleFileDir { /// A command sent to the syncer. These correspond to methods in the `SyncerChannel` struct. enum SyncerCommand { - AsyncSaveRecording(CompositeId, Arc>, fs::File), + AsyncSaveRecording(CompositeId, fs::File), DatabaseFlushed, Flush(mpsc::SyncSender<()>), } @@ -365,14 +365,15 @@ pub fn lower_retention(db: Arc, dir_id: i32, limits: &[NewLimit]) { let stream = db.streams_by_id().get(&l.stream_id) .ok_or_else(|| format_err!("no such stream {}", l.stream_id))?; - bytes_before = stream.sample_file_bytes - stream.bytes_to_delete; + bytes_before = stream.sample_file_bytes + stream.bytes_to_add - + stream.bytes_to_delete; extra = stream.retain_bytes - l.limit; } if l.limit >= bytes_before { continue } delete_recordings(db, l.stream_id, extra)?; let stream = db.streams_by_id().get(&l.stream_id).unwrap(); info!("stream {}, deleting: {}->{}", l.stream_id, bytes_before, - stream.sample_file_bytes - stream.bytes_to_delete); + stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete); } Ok(()) }) @@ -386,7 +387,7 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, None => bail!("no stream {}", stream_id), Some(s) => s, }; - stream.sample_file_bytes - stream.bytes_to_delete + extra_bytes_needed + stream.sample_file_bytes + stream.bytes_to_add - stream.bytes_to_delete + extra_bytes_needed - stream.retain_bytes }; let mut bytes_to_delete = 0; @@ -412,9 +413,8 @@ fn delete_recordings(db: &mut db::LockedDatabase, stream_id: i32, impl SyncerChannel { /// Asynchronously syncs the given writer, closes it, records it into the database, and /// starts rotation. - fn async_save_recording(&self, id: CompositeId, recording: Arc>, - f: fs::File) { - self.0.send(SyncerCommand::AsyncSaveRecording(id, recording, f)).unwrap(); + fn async_save_recording(&self, id: CompositeId, f: fs::File) { + self.0.send(SyncerCommand::AsyncSaveRecording(id, f)).unwrap(); } /// For testing: flushes the syncer, waiting for all currently-queued commands to complete. @@ -495,7 +495,7 @@ impl Syncer { loop { match cmds.recv() { Err(_) => return, // all senders have closed the channel; shutdown - Ok(SyncerCommand::AsyncSaveRecording(id, r, f)) => self.save(id, r, f), + Ok(SyncerCommand::AsyncSaveRecording(id, f)) => self.save(id, f), Ok(SyncerCommand::DatabaseFlushed) => { retry_forever(&mut || self.collect_garbage(true)) }, @@ -577,16 +577,15 @@ impl Syncer { /// so that there can be only one dir sync and database transaction per save. /// Internal helper for `save`. This is separated out so that the question-mark operator /// can be used in the many error paths. - fn save(&mut self, id: CompositeId, recording: Arc>, - f: fs::File) { + fn save(&mut self, id: CompositeId, f: fs::File) { let stream_id = id.stream(); // Free up a like number of bytes. retry_forever(&mut || delete_recordings(&mut self.db.lock(), stream_id, 0)); retry_forever(&mut || f.sync_all()); retry_forever(&mut || self.dir.sync()); - recording.lock().synced = true; let mut db = self.db.lock(); + db.mark_synced(id).unwrap(); let reason = { let s = db.streams_by_id().get(&stream_id).unwrap(); let c = db.cameras_by_id().get(&s.camera_id).unwrap(); @@ -880,7 +879,7 @@ impl InnerWriter { flags: flags, }; self.r.lock().recording = Some(recording); - channel.async_save_recording(self.id, self.r, self.f); + channel.async_save_recording(self.id, self.f); PreviousWriter { end_time: end, local_time_delta: local_start_delta, diff --git a/db/testutil.rs b/db/testutil.rs index c4fd97e..7c5e4f2 100644 --- a/db/testutil.rs +++ b/db/testutil.rs @@ -145,7 +145,7 @@ impl TestDb { run_offset: 0, flags: db::RecordingFlags::TrailingZero as i32, }); - u.lock().synced = true; + db.mark_synced(id).unwrap(); db.flush("create_recording_from_encoder").unwrap(); let mut row = None; db.list_recordings_by_id(TEST_STREAM_ID, id.recording() .. id.recording()+1, @@ -177,12 +177,12 @@ pub fn add_dummy_recordings_to_db(db: &db::Database, num: usize) { run_offset: 0, }; for _ in 0..num { - let (_, u) = db.add_recording(TEST_STREAM_ID).unwrap(); + let (id, u) = db.add_recording(TEST_STREAM_ID).unwrap(); u.lock().recording = Some(recording.clone()); - u.lock().synced = true; recording.time.start += DURATION; recording.time.end += DURATION; recording.run_offset += 1; + db.mark_synced(id).unwrap(); } db.flush("add_dummy_recordings_to_db").unwrap(); }