From b41a6c43da44650d3646fb70afc364bc1fa50db8 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Thu, 23 Sep 2021 15:55:53 -0700 Subject: [PATCH] shutdown better After a frustrating search for a suitable channel to use for shutdown (tokio::sync::watch::Receiver and futures::future::Shared didn't look quite right) in which I rethought my life decisions, I finally just made my own (server/base/shutdown.rs). We can easily poll it or wait for it in async or sync contexts. Most importantly, it's convenient; not that it really matters here, but it's also efficient. We now do a slightly better job of propagating a "graceful" shutdown signal, and this channel will give us tools to improve it over time. * Shut down even when writer or syncer operations are stuck. Fixes #117 * Not done yet: streamers should instantly shut down without waiting for a connection attempt or frame or something. I'll probably implement that when removing --rtsp-library=ffmpeg. The code should be cleaner then. * Not done yet: fix a couple places that sleep for up to a second when they could shut down immediately. I just need to do the plumbing for mock clocks to work. I also implemented an immediate shutdown mode, activated by a second signal. I think this will mitigate the streamer wait situation. --- CHANGELOG.md | 7 ++ guide/troubleshooting.md | 7 +- server/Cargo.lock | 2 + server/Cargo.toml | 2 +- server/base/Cargo.toml | 2 + server/base/clock.rs | 11 +- server/base/lib.rs | 3 +- server/base/shutdown.rs | 211 +++++++++++++++++++++++++++++++++ server/db/testutil.rs | 7 +- server/db/writer.rs | 198 ++++++++++++++++++++----------- server/src/cmds/check.rs | 2 +- server/src/cmds/config/mod.rs | 2 +- server/src/cmds/init.rs | 2 +- server/src/cmds/login.rs | 2 +- server/src/cmds/run.rs | 95 ++++++++++----- server/src/cmds/sql.rs | 2 +- server/src/cmds/ts.rs | 2 +- server/src/cmds/upgrade/mod.rs | 2 +- server/src/main.rs | 18 +-- server/src/mp4.rs | 30 +++-- server/src/streamer.rs | 31 +++-- 21 files changed, 487 insertions(+), 151 deletions(-) create mode 100644 server/base/shutdown.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index d2581b9..0f573c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,13 @@ Each release is tagged in Git and on the Docker repository reliability with old live555 versions when using TCP also. * improve compatibility with cameras that send non-compliant SDP, including models from Geovision and Anpviz. +* fix [#117](https://github.com/scottlamb/moonfire-nvr/issues/117): honor +* shut down requests when out of disk space, instead of retrying forever. +* shut down immediately on a second `SIGINT` or `SIGTERM`. The normal + "graceful" shutdown will still be slow in some cases, eg when waiting for a + RTSP UDP session to time out after a `TEARDOWN` failure. This allows the + impatient to get fast results with ctrl-C when running interactively, rather + than having to use `SIGKILL` from another terminal. ## `v0.6.5` (2021-08-13) diff --git a/guide/troubleshooting.md b/guide/troubleshooting.md index 9e7ba61..4285506 100644 --- a/guide/troubleshooting.md +++ b/guide/troubleshooting.md @@ -272,13 +272,10 @@ clean up the excess files. Moonfire NVR will start working again immediately. If Moonfire NVR's own files are too large, follow this procedure: -1. Shut it down via `SIGKILL`: +1. Shut it down. ```console - $ sudo killall -KILL moonfire-nvr + $ sudo killall moonfire-nvr ``` - (Be sure to use `-KILL`. It won't shut down properly on `SIGTERM` or `SIGINT` - when out of disk space due to [issue - #117](https://github.com/scottlamb/moonfire-nvr/issues/117).) 2. Reconfigure it use less disk space. See [Completing configuration through the UI](install.md#completing-configuration-through-the-ui) in the installation guide. Pay attention to the note about slack space. diff --git a/server/Cargo.lock b/server/Cargo.lock index 6b2a2c7..4380c3a 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1196,6 +1196,7 @@ name = "moonfire-base" version = "0.0.1" dependencies = [ "failure", + "futures", "lazy_static", "libc", "log", @@ -1203,6 +1204,7 @@ dependencies = [ "parking_lot", "serde", "serde_json", + "slab", "time", ] diff --git a/server/Cargo.toml b/server/Cargo.toml index 6283e24..76374dc 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -55,7 +55,7 @@ smallvec = "1.0" structopt = { version = "0.3.13", default-features = false } sync_wrapper = "0.1.0" time = "0.1" -tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "time"] } +tokio = { version = "1.0", features = ["macros", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tokio-stream = "0.1.5" tokio-tungstenite = "0.15.0" tracing = { version = "0.1", features = ["log"] } diff --git a/server/base/Cargo.toml b/server/base/Cargo.toml index 26032e1..7712b27 100644 --- a/server/base/Cargo.toml +++ b/server/base/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" [dependencies] failure = "0.1.1" +futures = "0.3" lazy_static = "1.0" libc = "0.2" log = "0.4" @@ -21,4 +22,5 @@ parking_lot = { version = "0.11.1", features = [] } nom = "7.0.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +slab = "0.4" time = "0.1" diff --git a/server/base/clock.rs b/server/base/clock.rs index bc3fdaa..0354928 100644 --- a/server/base/clock.rs +++ b/server/base/clock.rs @@ -13,6 +13,8 @@ use std::thread; use std::time::Duration as StdDuration; use time::{Duration, Timespec}; +use crate::shutdown::ShutdownError; + /// Abstract interface to the system clocks. This is for testability. pub trait Clocks: Send + Sync + 'static { /// Gets the current time from `CLOCK_REALTIME`. @@ -35,16 +37,21 @@ pub trait Clocks: Send + Sync + 'static { ) -> Result; } -pub fn retry_forever(clocks: &C, f: &mut dyn FnMut() -> Result) -> T +pub fn retry( + clocks: &C, + shutdown_rx: &crate::shutdown::Receiver, + f: &mut dyn FnMut() -> Result, +) -> Result where C: Clocks, E: Into, { loop { let e = match f() { - Ok(t) => return t, + Ok(t) => return Ok(t), Err(e) => e.into(), }; + shutdown_rx.check()?; let sleep_time = Duration::seconds(1); warn!( "sleeping for {} after error: {}", diff --git a/server/base/lib.rs b/server/base/lib.rs index a4e9d3f..40ab9b2 100644 --- a/server/base/lib.rs +++ b/server/base/lib.rs @@ -1,9 +1,10 @@ // This file is part of Moonfire NVR, a security camera network video recorder. -// Copyright (C) 2018 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. pub mod clock; mod error; +pub mod shutdown; pub mod strutil; pub mod time; diff --git a/server/base/shutdown.rs b/server/base/shutdown.rs new file mode 100644 index 0000000..618d4c0 --- /dev/null +++ b/server/base/shutdown.rs @@ -0,0 +1,211 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. +// SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. + +//! Tools for propagating a graceful shutdown signal through the program. +//! +//! The receiver can be cloned, checked and used as a future in async code. +//! Also, for convenience, blocked in synchronous code without going through the +//! runtime. +//! +//! Surprisingly, I couldn't find any simple existing mechanism for anything +//! close to this in `futures::channels` or `tokio::sync`. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use futures::Future; +use parking_lot::{Condvar, Mutex}; +use slab::Slab; + +#[derive(Debug)] +pub struct ShutdownError; + +impl std::fmt::Display for ShutdownError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("shutdown requested") + } +} + +impl std::error::Error for ShutdownError {} + +struct Inner { + /// `None` iff shutdown has already happened. + wakers: Mutex>>, + + condvar: Condvar, +} + +pub struct Sender(Arc); + +impl Drop for Sender { + fn drop(&mut self) { + // Note sequencing: modify the lock state, then notify async/sync waiters. + // The opposite order would create a race in which something might never wake. + let mut wakers = self + .0 + .wakers + .lock() + .take() + .expect("only the single Sender takes the slab"); + for w in wakers.drain() { + w.wake(); + } + self.0.condvar.notify_all(); + } +} + +#[derive(Clone)] +pub struct Receiver(Arc); + +pub struct ReceiverRefFuture<'receiver> { + receiver: &'receiver Receiver, + waker_i: usize, +} + +pub struct ReceiverFuture { + receiver: Arc, + waker_i: usize, +} + +/// `waker_i` value to indicate no slot has been assigned. +/// +/// There can't be `usize::MAX` items in the slab because there are other things +/// in the address space (and because `Waker` uses more than one byte anyway). +const NO_WAKER: usize = usize::MAX; + +impl Receiver { + pub fn check(&self) -> Result<(), ShutdownError> { + if self.0.wakers.lock().is_none() { + Err(ShutdownError) + } else { + Ok(()) + } + } + + pub fn as_future(&self) -> ReceiverRefFuture { + ReceiverRefFuture { + receiver: self, + waker_i: NO_WAKER, + } + } + + pub fn future(&self) -> ReceiverFuture { + ReceiverFuture { + receiver: self.0.clone(), + waker_i: NO_WAKER, + } + } + + pub fn into_future(self) -> ReceiverFuture { + ReceiverFuture { + receiver: self.0, + waker_i: NO_WAKER, + } + } + + pub fn wait_for(&self, timeout: std::time::Duration) -> Result<(), ShutdownError> { + let mut l = self.0.wakers.lock(); + if l.is_none() { + return Err(ShutdownError); + } + if self.0.condvar.wait_for(&mut l, timeout).timed_out() { + Ok(()) + } else { + // parking_lot guarantees no spurious wakeups. + debug_assert!(l.is_none()); + Err(ShutdownError) + } + } +} + +fn poll_impl(inner: &Inner, waker_i: &mut usize, cx: &mut Context<'_>) -> Poll<()> { + let mut l = inner.wakers.lock(); + let wakers = match &mut *l { + None => return Poll::Ready(()), + Some(w) => w, + }; + let new_waker = cx.waker(); + if *waker_i == NO_WAKER { + *waker_i = wakers.insert(new_waker.clone()); + } else { + let existing_waker = &mut wakers[*waker_i]; + if !new_waker.will_wake(existing_waker) { + *existing_waker = new_waker.clone(); + } + } + Poll::Pending +} + +impl<'receiver> Future for ReceiverRefFuture<'receiver> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + poll_impl(&self.receiver.0, &mut self.waker_i, cx) + } +} + +impl Future for ReceiverFuture { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = Pin::into_inner(self); + poll_impl(&this.receiver, &mut this.waker_i, cx) + } +} + +/// Returns a sender and receiver for graceful shutdown. +/// +/// Dropping the sender will request shutdown. +/// +/// The receiver can be used as a future or just polled when convenient. +pub fn channel() -> (Sender, Receiver) { + let inner = Arc::new(Inner { + wakers: Mutex::new(Some(Slab::new())), + condvar: Condvar::new(), + }); + (Sender(inner.clone()), Receiver(inner)) +} + +#[cfg(test)] +mod tests { + use futures::Future; + use std::task::{Context, Poll}; + + #[test] + fn simple_check() { + let (tx, rx) = super::channel(); + rx.check().unwrap(); + drop(tx); + rx.check().unwrap_err(); + } + + #[test] + fn blocking() { + let (tx, rx) = super::channel(); + rx.wait_for(std::time::Duration::from_secs(0)).unwrap(); + let h = std::thread::spawn(move || { + rx.wait_for(std::time::Duration::from_secs(1000)) + .unwrap_err() + }); + + // Make it likely that rx has done its initial check and is waiting on the Condvar. + std::thread::sleep(std::time::Duration::from_millis(10)); + + drop(tx); + h.join().unwrap(); + } + + #[test] + fn future() { + let (tx, rx) = super::channel(); + let waker = futures::task::noop_waker_ref(); + let mut cx = Context::from_waker(waker); + let mut f = rx.as_future(); + assert_eq!(std::pin::Pin::new(&mut f).poll(&mut cx), Poll::Pending); + drop(tx); + assert_eq!(std::pin::Pin::new(&mut f).poll(&mut cx), Poll::Ready(())); + // TODO: this doesn't actually check that waker is even used. + } +} diff --git a/server/db/testutil.rs b/server/db/testutil.rs index cb59f5c..523085f 100644 --- a/server/db/testutil.rs +++ b/server/db/testutil.rs @@ -51,6 +51,8 @@ pub fn init() { pub struct TestDb { pub db: Arc>, pub dirs_by_stream_id: Arc>>, + pub shutdown_tx: base::shutdown::Sender, + pub shutdown_rx: base::shutdown::Receiver, pub syncer_channel: writer::SyncerChannel<::std::fs::File>, pub syncer_join: thread::JoinHandle<()>, pub tmpdir: TempDir, @@ -114,11 +116,14 @@ impl TestDb { } let mut dirs_by_stream_id = FnvHashMap::default(); dirs_by_stream_id.insert(TEST_STREAM_ID, dir); + let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let (syncer_channel, syncer_join) = - writer::start_syncer(db.clone(), sample_file_dir_id).unwrap(); + writer::start_syncer(db.clone(), shutdown_rx.clone(), sample_file_dir_id).unwrap(); TestDb { db, dirs_by_stream_id: Arc::new(dirs_by_stream_id), + shutdown_tx, + shutdown_rx, syncer_channel, syncer_join, tmpdir, diff --git a/server/db/writer.rs b/server/db/writer.rs index 0df7e43..edc7df7 100644 --- a/server/db/writer.rs +++ b/server/db/writer.rs @@ -8,6 +8,7 @@ use crate::db::{self, CompositeId}; use crate::dir; use crate::recording::{self, MAX_RECORDING_WALL_DURATION}; use base::clock::{self, Clocks}; +use base::shutdown::ShutdownError; use failure::{bail, format_err, Error}; use fnv::FnvHashMap; use log::{debug, trace, warn}; @@ -95,6 +96,7 @@ struct Syncer { dir: D, db: Arc>, planned_flushes: std::collections::BinaryHeap, + shutdown_rx: base::shutdown::Receiver, } /// A plan to flush at a given instant due to a recently-saved recording's `flush_if_sec` parameter. @@ -155,13 +157,14 @@ impl Eq for PlannedFlush {} /// TODO: add a join wrapper which arranges for the on flush hook to be removed automatically. pub fn start_syncer( db: Arc>, + shutdown_rx: base::shutdown::Receiver, dir_id: i32, ) -> Result<(SyncerChannel<::std::fs::File>, thread::JoinHandle<()>), Error> where C: Clocks + Clone, { let db2 = db.clone(); - let (mut syncer, path) = Syncer::new(&db.lock(), db2, dir_id)?; + let (mut syncer, path) = Syncer::new(&db.lock(), shutdown_rx, db2, dir_id)?; syncer.initial_rotation()?; let (snd, rcv) = mpsc::channel(); db.lock().on_flush(Box::new({ @@ -199,7 +202,8 @@ pub fn lower_retention( limits: &[NewLimit], ) -> Result<(), Error> { let db2 = db.clone(); - let (mut syncer, _) = Syncer::new(&db.lock(), db2, dir_id)?; + let (_tx, rx) = base::shutdown::channel(); + let (mut syncer, _) = Syncer::new(&db.lock(), rx, db2, dir_id)?; syncer.do_rotation(|db| { for l in limits { let (fs_bytes_before, extra); @@ -305,6 +309,7 @@ fn list_files_to_abandon( impl Syncer> { fn new( l: &db::LockedDatabase, + shutdown_rx: base::shutdown::Receiver, db: Arc>, dir_id: i32, ) -> Result<(Self, String), Error> { @@ -346,6 +351,7 @@ impl Syncer> { Ok(( Syncer { dir_id, + shutdown_rx, dir, db, planned_flushes: std::collections::BinaryHeap::new(), @@ -438,8 +444,16 @@ impl Syncer { // Have a command; handle it. match cmd { - SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => self.save(id, wall_dur, f), - SyncerCommand::DatabaseFlushed => self.collect_garbage(), + SyncerCommand::AsyncSaveRecording(id, wall_dur, f) => { + if self.save(id, wall_dur, f).is_err() { + return false; + } + } + SyncerCommand::DatabaseFlushed => { + if self.collect_garbage().is_err() { + return false; + } + } SyncerCommand::Flush(flush) => { // The sender is waiting for the supplied writer to be dropped. If there's no // timeout, do so immediately; otherwise wait for that timeout then drop it. @@ -453,7 +467,7 @@ impl Syncer { } /// Collects garbage (without forcing a sync). Called from worker thread. - fn collect_garbage(&mut self) { + fn collect_garbage(&mut self) -> Result<(), ShutdownError> { trace!("Collecting garbage"); let mut garbage: Vec<_> = { let l = self.db.lock(); @@ -461,11 +475,11 @@ impl Syncer { d.garbage_needs_unlink.iter().copied().collect() }; if garbage.is_empty() { - return; + return Ok(()); } let c = &self.db.clocks(); for &id in &garbage { - clock::retry_forever(c, &mut || { + clock::retry(c, &self.shutdown_rx, &mut || { if let Err(e) = self.dir.unlink_file(id) { if e == nix::Error::ENOENT { warn!("dir: recording {} already deleted!", id); @@ -474,25 +488,33 @@ impl Syncer { return Err(e); } Ok(()) - }); + })?; } - clock::retry_forever(c, &mut || self.dir.sync()); - clock::retry_forever(c, &mut || { + clock::retry(c, &self.shutdown_rx, &mut || self.dir.sync())?; + clock::retry(c, &self.shutdown_rx, &mut || { self.db.lock().delete_garbage(self.dir_id, &mut garbage) - }); + })?; + Ok(()) } /// Saves the given recording and prompts rotation. Called from worker thread. /// Note that this doesn't flush immediately; SQLite transactions are batched to lower SSD /// wear. On the next flush, the old recordings will actually be marked as garbage in the /// database, and shortly afterward actually deleted from disk. - fn save(&mut self, id: CompositeId, wall_duration: recording::Duration, f: D::File) { + fn save( + &mut self, + id: CompositeId, + wall_duration: recording::Duration, + f: D::File, + ) -> Result<(), ShutdownError> { trace!("Processing save for {}", id); let stream_id = id.stream(); // Free up a like number of bytes. - clock::retry_forever(&self.db.clocks(), &mut || f.sync_all()); - clock::retry_forever(&self.db.clocks(), &mut || self.dir.sync()); + clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || f.sync_all())?; + clock::retry(&self.db.clocks(), &self.shutdown_rx, &mut || { + self.dir.sync() + })?; let mut db = self.db.lock(); db.mark_synced(id).unwrap(); delete_recordings(&mut db, stream_id, 0).unwrap(); @@ -518,6 +540,7 @@ impl Syncer { recording: id, senders: Vec::new(), }); + Ok(()) } /// Flushes the database if necessary to honor `flush_if_sec` for some recording. @@ -612,8 +635,8 @@ struct InnerWriter { hasher: blake3::Hasher, - /// The start time of this segment, based solely on examining the local clock after frames in - /// this segment were received. Frames can suffer from various kinds of delay (initial + /// The start time of this recording, based solely on examining the local clock after frames in + /// this recording were received. Frames can suffer from various kinds of delay (initial /// buffering, encoding, and network transmission), so this time is set to far in the future on /// construction, given a real value on the first packet, and decreased as less-delayed packets /// are discovered. See design/time.md for details. @@ -625,7 +648,8 @@ struct InnerWriter { /// the writer is closed cleanly (the caller supplies the next pts), or when the writer is /// closed uncleanly (with a zero duration, which the `.mp4` format allows only at the end). /// - /// Invariant: this should always be `Some` (briefly violated during `write` call only). + /// `unindexed_sample` should always be `Some`, except when a `write` call has aborted on + /// shutdown. In that case, the close will be unable to write the full segment. unindexed_sample: Option, } @@ -670,7 +694,7 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { /// On successful return, `self.state` will be `WriterState::Open(w)` with `w` violating the /// invariant that `unindexed_sample` is `Some`. The caller (`write`) is responsible for /// correcting this. - fn open(&mut self) -> Result<(), Error> { + fn open(&mut self, shutdown_rx: &mut base::shutdown::Receiver) -> Result<(), Error> { let prev = match self.state { WriterState::Unopened => None, WriterState::Open(_) => return Ok(()), @@ -688,7 +712,9 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { ..Default::default() }, )?; - let f = clock::retry_forever(&self.db.clocks(), &mut || self.dir.create_file(id)); + let f = clock::retry(&self.db.clocks(), shutdown_rx, &mut || { + self.dir.create_file(id) + })?; self.state = WriterState::Open(InnerWriter { f, @@ -710,16 +736,17 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { }) } - /// Writes a new frame to this segment. + /// Writes a new frame to this recording. /// `local_time` should be the local clock's time as of when this packet was received. pub fn write( &mut self, + shutdown_rx: &mut base::shutdown::Receiver, pkt: &[u8], local_time: recording::Time, pts_90k: i64, is_key: bool, ) -> Result<(), Error> { - self.open()?; + self.open(shutdown_rx)?; let w = match self.state { WriterState::Open(ref mut w) => w, _ => unreachable!(), @@ -763,7 +790,18 @@ impl<'a, C: Clocks + Clone, D: DirWriter> Writer<'a, C, D> { } let mut remaining = pkt; while !remaining.is_empty() { - let written = clock::retry_forever(&self.db.clocks(), &mut || w.f.write(remaining)); + let written = + match clock::retry(&self.db.clocks(), shutdown_rx, &mut || w.f.write(remaining)) { + Ok(w) => w, + Err(e) => { + // close() will do nothing because unindexed_sample will be None. + log::warn!( + "Abandoning incompletely written recording {} on shutdown", + w.id + ); + return Err(e.into()); + } + }; remaining = &remaining[written..]; } w.unindexed_sample = Some(UnindexedSample { @@ -855,10 +893,12 @@ impl InnerWriter { db: &db::Database, stream_id: i32, ) -> Result { - let unindexed = self - .unindexed_sample - .take() - .expect("should always be an unindexed sample"); + let unindexed = self.unindexed_sample.take().ok_or_else(|| { + format_err!( + "Unable to add recording {} to database due to aborted write", + self.id + ) + })?; let (last_sample_duration, flags) = match next_pts { None => (0, db::RecordingFlags::TrailingZero as i32), Some(p) => (i32::try_from(p - unindexed.pts_90k)?, 0), @@ -1050,8 +1090,10 @@ mod tests { _tmpdir: ::tempfile::TempDir, dir: MockDir, channel: super::SyncerChannel, + _shutdown_tx: base::shutdown::Sender, + shutdown_rx: base::shutdown::Receiver, syncer: super::Syncer, - syncer_rcv: mpsc::Receiver>, + syncer_rx: mpsc::Receiver>, } fn new_harness(flush_if_sec: i64) -> Harness { @@ -1072,6 +1114,7 @@ mod tests { // Start a mock syncer. let dir = MockDir::new(); + let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let syncer = super::Syncer { dir_id: *tdb .db @@ -1083,10 +1126,11 @@ mod tests { dir: dir.clone(), db: tdb.db.clone(), planned_flushes: std::collections::BinaryHeap::new(), + shutdown_rx: shutdown_rx.clone(), }; - let (syncer_snd, syncer_rcv) = mpsc::channel(); + let (syncer_tx, syncer_rx) = mpsc::channel(); tdb.db.lock().on_flush(Box::new({ - let snd = syncer_snd.clone(); + let snd = syncer_tx.clone(); move || { if let Err(e) = snd.send(super::SyncerCommand::DatabaseFlushed) { warn!("Unable to notify syncer for dir {} of flush: {}", dir_id, e); @@ -1098,9 +1142,11 @@ mod tests { dir, db: tdb.db, _tmpdir: tdb.tmpdir, - channel: super::SyncerChannel(syncer_snd), + channel: super::SyncerChannel(syncer_tx), + _shutdown_tx: shutdown_tx, + shutdown_rx, syncer, - syncer_rcv, + syncer_rx, } } @@ -1144,19 +1190,26 @@ mod tests { )); f.expect(MockFileAction::Write(Box::new(|_| Ok(1)))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"1", recording::Time(1), 0, true).unwrap(); + w.write(&mut h.shutdown_rx, b"1", recording::Time(1), 0, true) + .unwrap(); let e = w - .write(b"2", recording::Time(2), i32::max_value() as i64 + 1, true) + .write( + &mut h.shutdown_rx, + b"2", + recording::Time(2), + i32::max_value() as i64 + 1, + true, + ) .unwrap_err(); assert!(e.to_string().contains("excessive pts jump")); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); } @@ -1206,14 +1259,15 @@ mod tests { Ok(3) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"123", recording::Time(2), 0, true).unwrap(); + w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true) + .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); w.close(Some(1)).unwrap(); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); @@ -1231,7 +1285,8 @@ mod tests { Ok(1) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"4", recording::Time(3), 1, true).unwrap(); + w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true) + .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Unlink( CompositeId::new(1, 0), @@ -1252,15 +1307,15 @@ mod tests { drop(w); trace!("expecting AsyncSave"); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); trace!("expecting planned flush"); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); trace!("expecting DatabaseFlushed"); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed trace!("expecting DatabaseFlushed again"); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed again + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed again f.ensure_done(); h.dir.ensure_done(); @@ -1277,13 +1332,13 @@ mod tests { } assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( - h.syncer_rcv.try_recv().err(), + h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); @@ -1341,16 +1396,17 @@ mod tests { }))); f.expect(MockFileAction::SyncAll(Box::new(|| Err(eio())))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"1234", recording::Time(1), 0, true).unwrap(); + w.write(&mut h.shutdown_rx, b"1234", recording::Time(1), 0, true) + .unwrap(); h.dir .expect(MockDirAction::Sync(Box::new(|| Err(nix::Error::EIO)))); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); @@ -1365,7 +1421,7 @@ mod tests { drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( - h.syncer_rcv.try_recv().err(), + h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); @@ -1415,15 +1471,16 @@ mod tests { Ok(3) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"123", recording::Time(2), 0, true).unwrap(); + w.write(&mut h.shutdown_rx, b"123", recording::Time(2), 0, true) + .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); w.close(Some(1)).unwrap(); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); @@ -1441,7 +1498,8 @@ mod tests { Ok(1) }))); f.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); - w.write(b"4", recording::Time(3), 1, true).unwrap(); + w.write(&mut h.shutdown_rx, b"4", recording::Time(3), 1, true) + .unwrap(); h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); h.dir.expect(MockDirAction::Unlink( CompositeId::new(1, 0), @@ -1473,11 +1531,11 @@ mod tests { drop(w); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f.ensure_done(); h.dir.ensure_done(); @@ -1493,13 +1551,13 @@ mod tests { assert!(dir.garbage_unlinked.is_empty()); } - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed // The syncer should shut down cleanly. drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( - h.syncer_rcv.try_recv().err(), + h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); @@ -1546,6 +1604,7 @@ mod tests { }))); f1.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write( + &mut h.shutdown_rx, b"123", recording::Time(recording::TIME_UNITS_PER_SEC), 0, @@ -1555,12 +1614,12 @@ mod tests { h.dir.expect(MockDirAction::Sync(Box::new(|| Ok(())))); drop(w); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 1); // Flush and let 30 seconds go by. h.db.lock().flush("forced").unwrap(); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed assert_eq!(h.syncer.planned_flushes.len(), 1); h.db.clocks().sleep(time::Duration::seconds(30)); @@ -1586,6 +1645,7 @@ mod tests { }))); f2.expect(MockFileAction::SyncAll(Box::new(|| Ok(())))); w.write( + &mut h.shutdown_rx, b"4", recording::Time(31 * recording::TIME_UNITS_PER_SEC), 1, @@ -1596,21 +1656,21 @@ mod tests { drop(w); - assert!(h.syncer.iter(&h.syncer_rcv)); // AsyncSave + assert!(h.syncer.iter(&h.syncer_rx)); // AsyncSave assert_eq!(h.syncer.planned_flushes.len(), 2); assert_eq!(h.syncer.planned_flushes.len(), 2); let db_flush_count_before = h.db.lock().flushes(); assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(31, 0)); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush (no-op) + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush (no-op) assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(61, 0)); assert_eq!(h.db.lock().flushes(), db_flush_count_before); assert_eq!(h.syncer.planned_flushes.len(), 1); - assert!(h.syncer.iter(&h.syncer_rcv)); // planned flush + assert!(h.syncer.iter(&h.syncer_rx)); // planned flush assert_eq!(h.db.clocks().monotonic(), time::Timespec::new(91, 0)); assert_eq!(h.db.lock().flushes(), db_flush_count_before + 1); assert_eq!(h.syncer.planned_flushes.len(), 0); - assert!(h.syncer.iter(&h.syncer_rcv)); // DatabaseFlushed + assert!(h.syncer.iter(&h.syncer_rx)); // DatabaseFlushed f1.ensure_done(); f2.ensure_done(); @@ -1620,7 +1680,7 @@ mod tests { drop(h.channel); h.db.lock().clear_on_flush(); assert_eq!( - h.syncer_rcv.try_recv().err(), + h.syncer_rx.try_recv().err(), Some(std::sync::mpsc::TryRecvError::Disconnected) ); assert!(h.syncer.planned_flushes.is_empty()); diff --git a/server/src/cmds/check.rs b/server/src/cmds/check.rs index e77b71b..ebf9498 100644 --- a/server/src/cmds/check.rs +++ b/server/src/cmds/check.rs @@ -46,7 +46,7 @@ pub struct Args { trash_corrupt_rows: bool, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let (_db_dir, mut conn) = super::open_conn(&args.db_dir, super::OpenMode::ReadWrite)?; check::run( &mut conn, diff --git a/server/src/cmds/config/mod.rs b/server/src/cmds/config/mod.rs index 7dba29b..c66676a 100644 --- a/server/src/cmds/config/mod.rs +++ b/server/src/cmds/config/mod.rs @@ -31,7 +31,7 @@ pub struct Args { db_dir: PathBuf, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let (_db_dir, conn) = super::open_conn(&args.db_dir, super::OpenMode::ReadWrite)?; let clocks = clock::RealClocks {}; let db = Arc::new(db::Database::new(clocks, conn, true)?); diff --git a/server/src/cmds/init.rs b/server/src/cmds/init.rs index b9f9df1..6e97f7d 100644 --- a/server/src/cmds/init.rs +++ b/server/src/cmds/init.rs @@ -19,7 +19,7 @@ pub struct Args { db_dir: PathBuf, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let (_db_dir, mut conn) = super::open_conn(&args.db_dir, super::OpenMode::Create)?; // Check if the database has already been initialized. diff --git a/server/src/cmds/login.rs b/server/src/cmds/login.rs index 9b23005..a30cf94 100644 --- a/server/src/cmds/login.rs +++ b/server/src/cmds/login.rs @@ -53,7 +53,7 @@ pub struct Args { username: String, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let clocks = clock::RealClocks {}; let (_db_dir, conn) = super::open_conn(&args.db_dir, super::OpenMode::ReadWrite)?; let db = std::sync::Arc::new(db::Database::new(clocks, conn, true).unwrap()); diff --git a/server/src/cmds/run.rs b/server/src/cmds/run.rs index 4f71ea0..217fda4 100644 --- a/server/src/cmds/run.rs +++ b/server/src/cmds/run.rs @@ -8,12 +8,10 @@ use base::clock; use db::{dir, writer}; use failure::{bail, Error, ResultExt}; use fnv::FnvHashMap; -use futures::future::FutureExt; use hyper::service::{make_service_fn, service_fn}; use log::error; use log::{info, warn}; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use structopt::StructOpt; @@ -171,16 +169,55 @@ struct Syncer { join: thread::JoinHandle<()>, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let mut builder = tokio::runtime::Builder::new_multi_thread(); builder.enable_all(); if let Some(worker_threads) = args.worker_threads { builder.worker_threads(worker_threads); } - builder.build().unwrap().block_on(async_run(args)) + let rt = builder.build()?; + let r = rt.block_on(async_run(args)); + + // tokio normally waits for all spawned tasks to complete, but: + // * in the graceful shutdown path, we wait for specific tasks with logging. + // * in the immediate shutdown path, we don't want to wait. + rt.shutdown_background(); + + r } -async fn async_run(args: &Args) -> Result { +async fn async_run(args: Args) -> Result { + let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); + let mut shutdown_tx = Some(shutdown_tx); + + tokio::pin! { + let int = signal(SignalKind::interrupt())?; + let term = signal(SignalKind::terminate())?; + let inner = inner(args, shutdown_rx); + } + + tokio::select! { + _ = int.recv() => { + info!("Received SIGINT; shutting down gracefully. \ + Send another SIGINT or SIGTERM to shut down immediately."); + shutdown_tx.take(); + }, + _ = term.recv() => { + info!("Received SIGTERM; shutting down gracefully. \ + Send another SIGINT or SIGTERM to shut down immediately."); + shutdown_tx.take(); + }, + result = &mut inner => return result, + } + + tokio::select! { + _ = int.recv() => bail!("immediate shutdown due to second signal (SIGINT)"), + _ = term.recv() => bail!("immediate shutdown due to second singal (SIGTERM)"), + result = &mut inner => result, + } +} + +async fn inner(args: Args, shutdown_rx: base::shutdown::Receiver) -> Result { let clocks = clock::RealClocks {}; let (_db_dir, conn) = super::open_conn( &args.db_dir, @@ -215,7 +252,6 @@ async fn async_run(args: &Args) -> Result { })?); // Start a streamer for each stream. - let shutdown_streamers = Arc::new(AtomicBool::new(false)); let mut streamers = Vec::new(); let mut session_groups_by_camera: FnvHashMap> = FnvHashMap::default(); @@ -230,7 +266,7 @@ async fn async_run(args: &Args) -> Result { db: &db, opener: args.rtsp_library.opener(), transport: args.rtsp_transport, - shutdown: &shutdown_streamers, + shutdown_rx: &shutdown_rx, }; // Get the directories that need syncers. @@ -248,7 +284,7 @@ async fn async_run(args: &Args) -> Result { drop(l); let mut syncers = FnvHashMap::with_capacity_and_hasher(dirs.len(), Default::default()); for (id, dir) in dirs.drain() { - let (channel, join) = writer::start_syncer(db.clone(), id)?; + let (channel, join) = writer::start_syncer(db.clone(), shutdown_rx.clone(), id)?; syncers.insert(id, Syncer { dir, channel, join }); } @@ -319,34 +355,31 @@ async fn async_run(args: &Args) -> Result { .with_context(|_| format!("unable to bind --http-addr={}", &args.http_addr))? .tcp_nodelay(true) .serve(make_svc); - - let mut int = signal(SignalKind::interrupt())?; - let mut term = signal(SignalKind::terminate())?; - let shutdown = futures::future::select(Box::pin(int.recv()), Box::pin(term.recv())); - - let (shutdown_tx, shutdown_rx) = futures::channel::oneshot::channel(); - let server = server.with_graceful_shutdown(shutdown_rx.map(|_| ())); + let server = server.with_graceful_shutdown(shutdown_rx.future()); let server_handle = tokio::spawn(server); info!("Ready to serve HTTP requests"); - shutdown.await; - shutdown_tx.send(()).unwrap(); + let _ = shutdown_rx.as_future().await; - info!("Shutting down streamers."); - shutdown_streamers.store(true, Ordering::SeqCst); - for streamer in streamers.drain(..) { - streamer.join().unwrap(); - } - - if let Some(mut ss) = syncers { - // The syncers shut down when all channels to them have been dropped. - // The database maintains one; and `ss` holds one. Drop both. - db.lock().clear_on_flush(); - for (_, s) in ss.drain() { - drop(s.channel); - s.join.join().unwrap(); + info!("Shutting down streamers and syncers."); + tokio::task::spawn_blocking({ + let db = db.clone(); + move || { + for streamer in streamers.drain(..) { + streamer.join().unwrap(); + } + if let Some(mut ss) = syncers { + // The syncers shut down when all channels to them have been dropped. + // The database maintains one; and `ss` holds one. Drop both. + db.lock().clear_on_flush(); + for (_, s) in ss.drain() { + drop(s.channel); + s.join.join().unwrap(); + } + } } - } + }) + .await?; db.lock().clear_watches(); diff --git a/server/src/cmds/sql.rs b/server/src/cmds/sql.rs index 6c16d17..3242c38 100644 --- a/server/src/cmds/sql.rs +++ b/server/src/cmds/sql.rs @@ -37,7 +37,7 @@ pub struct Args { arg: Vec, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let mode = if args.read_only { OpenMode::ReadOnly } else { diff --git a/server/src/cmds/ts.rs b/server/src/cmds/ts.rs index 32ed7de..0feecdb 100644 --- a/server/src/cmds/ts.rs +++ b/server/src/cmds/ts.rs @@ -17,7 +17,7 @@ pub struct Args { timestamps: Vec, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { for timestamp in &args.timestamps { let t = db::recording::Time::parse(timestamp)?; println!("{} == {}", t, t.0); diff --git a/server/src/cmds/upgrade/mod.rs b/server/src/cmds/upgrade/mod.rs index 642c3a6..7bff52a 100644 --- a/server/src/cmds/upgrade/mod.rs +++ b/server/src/cmds/upgrade/mod.rs @@ -40,7 +40,7 @@ pub struct Args { no_vacuum: bool, } -pub fn run(args: &Args) -> Result { +pub fn run(args: Args) -> Result { let (_db_dir, mut conn) = super::open_conn(&args.db_dir, super::OpenMode::ReadWrite)?; db::upgrade::run( diff --git a/server/src/main.rs b/server/src/main.rs index c14b5a4..24a0db8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -59,16 +59,16 @@ enum Args { } impl Args { - fn run(&self) -> Result { + fn run(self) -> Result { match self { - Args::Check(ref a) => cmds::check::run(a), - Args::Config(ref a) => cmds::config::run(a), - Args::Init(ref a) => cmds::init::run(a), - Args::Login(ref a) => cmds::login::run(a), - Args::Run(ref a) => cmds::run::run(a), - Args::Sql(ref a) => cmds::sql::run(a), - Args::Ts(ref a) => cmds::ts::run(a), - Args::Upgrade(ref a) => cmds::upgrade::run(a), + Args::Check(a) => cmds::check::run(a), + Args::Config(a) => cmds::config::run(a), + Args::Init(a) => cmds::init::run(a), + Args::Login(a) => cmds::login::run(a), + Args::Run(a) => cmds::run::run(a), + Args::Sql(a) => cmds::sql::run(a), + Args::Ts(a) => cmds::ts::run(a), + Args::Upgrade(a) => cmds::upgrade::run(a), } } } diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 939a2ca..6935f6a 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -2277,7 +2277,7 @@ mod tests { } } - fn copy_mp4_to_db(db: &TestDb) { + fn copy_mp4_to_db(db: &mut TestDb) { let (extra_data, mut input) = stream::FFMPEG .open( "test".to_owned(), @@ -2322,7 +2322,13 @@ mod tests { }; frame_time += recording::Duration(i64::from(pkt.duration)); output - .write(pkt.data, frame_time, pkt.pts, pkt.is_key) + .write( + &mut db.shutdown_rx, + pkt.data, + frame_time, + pkt.pts, + pkt.is_key, + ) .unwrap(); end_pts = Some(pkt.pts + i64::from(pkt.duration)); } @@ -2811,8 +2817,8 @@ mod tests { #[tokio::test] async fn test_round_trip() { testutil::init(); - let db = TestDb::new(RealClocks {}); - copy_mp4_to_db(&db); + let mut db = TestDb::new(RealClocks {}); + copy_mp4_to_db(&mut db); let mp4 = create_mp4_from_db(&db, 0, 0, false); traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; @@ -2840,8 +2846,8 @@ mod tests { #[tokio::test] async fn test_round_trip_with_subtitles() { testutil::init(); - let db = TestDb::new(RealClocks {}); - copy_mp4_to_db(&db); + let mut db = TestDb::new(RealClocks {}); + copy_mp4_to_db(&mut db); let mp4 = create_mp4_from_db(&db, 0, 0, true); traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; @@ -2869,8 +2875,8 @@ mod tests { #[tokio::test] async fn test_round_trip_with_edit_list() { testutil::init(); - let db = TestDb::new(RealClocks {}); - copy_mp4_to_db(&db); + let mut db = TestDb::new(RealClocks {}); + copy_mp4_to_db(&mut db); let mp4 = create_mp4_from_db(&db, 1, 0, false); traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; @@ -2898,8 +2904,8 @@ mod tests { #[tokio::test] async fn test_round_trip_with_edit_list_and_subtitles() { testutil::init(); - let db = TestDb::new(RealClocks {}); - copy_mp4_to_db(&db); + let mut db = TestDb::new(RealClocks {}); + copy_mp4_to_db(&mut db); let off = 2 * TIME_UNITS_PER_SEC; let mp4 = create_mp4_from_db(&db, i32::try_from(off).unwrap(), 0, true); traverse(mp4.clone()).await; @@ -2928,8 +2934,8 @@ mod tests { #[tokio::test] async fn test_round_trip_with_shorten() { testutil::init(); - let db = TestDb::new(RealClocks {}); - copy_mp4_to_db(&db); + let mut db = TestDb::new(RealClocks {}); + copy_mp4_to_db(&mut db); let mp4 = create_mp4_from_db(&db, 0, 1, false); traverse(mp4.clone()).await; let new_filename = write_mp4(&mp4, db.tmpdir.path()).await; diff --git a/server/src/streamer.rs b/server/src/streamer.rs index a0215ca..71ff7ba 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -8,7 +8,6 @@ use db::{dir, recording, writer, Camera, Database, Stream}; use failure::{bail, Error}; use log::{debug, info, trace, warn}; use std::result::Result; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use url::Url; @@ -22,7 +21,7 @@ where pub opener: &'a dyn stream::Opener, pub transport: retina::client::Transport, pub db: &'tmp Arc>, - pub shutdown: &'tmp Arc, + pub shutdown_rx: &'tmp base::shutdown::Receiver, } /// Connects to a given RTSP stream and writes recordings to the database via [`writer::Writer`]. @@ -31,7 +30,7 @@ pub struct Streamer<'a, C> where C: Clocks + Clone, { - shutdown: Arc, + shutdown_rx: base::shutdown::Receiver, // State below is only used by the thread in Run. rotate_offset_sec: i64, @@ -69,7 +68,7 @@ where bail!("RTSP URL shouldn't include credentials"); } Ok(Streamer { - shutdown: env.shutdown.clone(), + shutdown_rx: env.shutdown_rx.clone(), rotate_offset_sec, rotate_interval_sec, db: env.db.clone(), @@ -94,7 +93,7 @@ where /// Note that when using Retina as the RTSP library, this must be called /// within a tokio runtime context; see [tokio::runtime::Handle]. pub fn run(&mut self) { - while !self.shutdown.load(Ordering::SeqCst) { + while self.shutdown_rx.check().is_ok() { if let Err(e) = self.run_once() { let sleep_time = time::Duration::seconds(1); warn!( @@ -124,7 +123,7 @@ where d, status.num_sessions ); - std::thread::sleep(d); + self.shutdown_rx.wait_for(d)?; waited = true; } } else { @@ -164,7 +163,7 @@ where self.stream_id, video_sample_entry_id, ); - while !self.shutdown.load(Ordering::SeqCst) { + while self.shutdown_rx.check().is_ok() { let pkt = { let _t = TimerGuard::new(&clocks, || "getting next packet"); stream.next()? @@ -214,7 +213,13 @@ where } }; let _t = TimerGuard::new(&clocks, || format!("writing {} bytes", pkt.data.len())); - w.write(pkt.data, local_time, pkt.pts, pkt.is_key)?; + w.write( + &mut self.shutdown_rx, + pkt.data, + local_time, + pkt.pts, + pkt.is_key, + )?; rotate = Some(r); } if rotate.is_some() { @@ -236,7 +241,6 @@ mod tests { use parking_lot::Mutex; use std::cmp; use std::convert::TryFrom; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use time; @@ -312,7 +316,7 @@ mod tests { struct MockOpener { expected_url: url::Url, streams: Mutex)>>, - shutdown: Arc, + shutdown_tx: Mutex>, } impl stream::Opener for MockOpener { @@ -333,7 +337,7 @@ mod tests { } None => { trace!("MockOpener shutting down"); - self.shutdown.store(true, Ordering::SeqCst); + self.shutdown_tx.lock().take(); bail!("done") } } @@ -380,16 +384,17 @@ mod tests { stream.ts_offset = 123456; // starting pts of the input should be irrelevant stream.ts_offset_pkts_left = u32::max_value(); stream.pkts_left = u32::max_value(); + let (shutdown_tx, shutdown_rx) = base::shutdown::channel(); let opener = MockOpener { expected_url: url::Url::parse("rtsp://test-camera/main").unwrap(), streams: Mutex::new(vec![(extra_data, Box::new(stream))]), - shutdown: Arc::new(AtomicBool::new(false)), + shutdown_tx: Mutex::new(Some(shutdown_tx)), }; let db = testutil::TestDb::new(clocks.clone()); let env = super::Environment { opener: &opener, db: &db.db, - shutdown: &opener.shutdown, + shutdown_rx: &shutdown_rx, transport: retina::client::Transport::Tcp, }; let mut stream;