From 7b0a489541c183deadcb03bbe221602d1f5f47e5 Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 13 Apr 2022 11:39:38 -0700 Subject: [PATCH] rework stream threading model Fixes #206. 307a388 switched to creating a single-threaded runtime for each stream, then destroying prior to waiting for TEARDOWN on shutdown. This meant that the shutdown process could panic with this error: ``` panic at '/home/slamb/git/retina/src/client/mod.rs:219:22': teardown Sender shouldn't be dropped: RecvError(()) ``` Let's switch back to expecting a multithreaded runtime context. Create one for the config subcommand, too. Don't go all the way back to the old code with its channels, though. That had the downside that the underlying retina::Session might outlive the caller, so there could still be an active session when we start the next one. I haven't seen this cause problems in practice but it still doesn't seem right. --- guide/troubleshooting.md | 9 ++++----- server/src/cmds/config/cameras.rs | 14 +++++++------- server/src/cmds/config/mod.rs | 7 +++++++ server/src/cmds/run/mod.rs | 7 ++++++- server/src/stream.rs | 30 +++++++++++++----------------- server/src/streamer.rs | 22 +++++++++------------- 6 files changed, 46 insertions(+), 43 deletions(-) diff --git a/guide/troubleshooting.md b/guide/troubleshooting.md index 77e75ed..6b9177e 100644 --- a/guide/troubleshooting.md +++ b/guide/troubleshooting.md @@ -86,17 +86,16 @@ Moonfire NVR names a few important thread types as follows: * `main`: during `moonfire-nvr run`, the main thread does initial setup then just waits for the other threads. In other subcommands, it does everything. -* `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main` or `sub`): these - threads receive video from the cameras (via RTSP) and write it to disk. +* `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main`, `sub`, or `ext`): + these threads write video to disk. * `sync-PATH` (one per sample file directory): These threads call `fsync` to * commit sample files to disk, delete old sample files, and flush the database. * `r-PATH` (one per sample file directory): These threads read sample files from disk for serving `.mp4` files. * `tokio-runtime-worker` (one per core, unless overridden with - `--worker-threads`): these threads handle HTTP requests. - When using `--rtsp-library=retina`, they also read video data from cameras - via RTSP. + `--worker-threads`): these threads handle HTTP requests and read video + data from cameras via RTSP. * `logger`: this thread writes the log buffer to `stderr`. Logging is asynchronous; other threads don't wait for log messages to be written unless the log buffer is full. diff --git a/server/src/cmds/config/cameras.rs b/server/src/cmds/config/cameras.rs index c875b76..55c192e 100644 --- a/server/src/cmds/config/cameras.rs +++ b/server/src/cmds/config/cameras.rs @@ -204,18 +204,14 @@ fn press_edit(siv: &mut Cursive, db: &Arc, id: Option) { } fn press_test_inner( + handle: tokio::runtime::Handle, url: Url, username: String, password: String, transport: retina::client::Transport, ) -> Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .enable_io() - .build()?; - let _guard = rt.enter(); + let _enter = handle.enter(); let (extra_data, stream) = stream::OPENER.open( - &rt, "test stream".to_owned(), url, retina::client::SessionOptions::default() @@ -262,8 +258,12 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) { // siv.cb_sink doesn't actually wake up the event loop. Tell siv to poll, as a workaround. siv.set_fps(5); let sink = siv.cb_sink().clone(); + + // Note: this expects to be called within a tokio runtime. Currently this + // is set up by the config subcommand's run(). + let handle = tokio::runtime::Handle::current(); ::std::thread::spawn(move || { - let r = press_test_inner(url.clone(), username, password, transport); + let r = press_test_inner(handle, url.clone(), username, password, transport); sink.send(Box::new(move |siv: &mut Cursive| { // Polling is no longer necessary. siv.set_fps(0); diff --git a/server/src/cmds/config/mod.rs b/server/src/cmds/config/mod.rs index c66676a..d68892d 100644 --- a/server/src/cmds/config/mod.rs +++ b/server/src/cmds/config/mod.rs @@ -36,6 +36,13 @@ pub fn run(args: Args) -> Result { let clocks = clock::RealClocks {}; let db = Arc::new(db::Database::new(clocks, conn, true)?); + // This runtime is needed by the "Test" button in the camera config. + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .build()?; + let _enter = rt.enter(); + let mut siv = cursive::default(); //siv.add_global_callback('q', |s| s.quit()); diff --git a/server/src/cmds/run/mod.rs b/server/src/cmds/run/mod.rs index 8ad6c8d..c5d0eb4 100644 --- a/server/src/cmds/run/mod.rs +++ b/server/src/cmds/run/mod.rs @@ -303,6 +303,7 @@ async fn inner( } // Then start up streams. + let handle = tokio::runtime::Handle::current(); let l = db.lock(); for (i, (id, stream)) in l.streams_by_id().iter().enumerate() { if stream.config.mode != db::json::STREAM_MODE_RECORD { @@ -342,10 +343,14 @@ async fn inner( )?; info!("Starting streamer for {}", streamer.short_name()); let name = format!("s-{}", streamer.short_name()); + let handle = handle.clone(); streamers.push( thread::Builder::new() .name(name) - .spawn(move || streamer.run()) + .spawn(move || { + let _enter = handle.enter(); + streamer.run(); + }) .expect("can't create thread"), ); } diff --git a/server/src/stream.rs b/server/src/stream.rs index 4f72006..bfb82c2 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -20,17 +20,13 @@ pub trait Opener: Send + Sync { /// Opens the given RTSP URL. /// /// Note: despite the blocking interface, this expects to be called from - /// a tokio runtime with IO and time enabled. Takes the - /// [`tokio::runtime::Runtime`] rather than using - /// `tokio::runtime::Handle::current()` because `Runtime::block_on` can - /// drive IO and timers while `Handle::block_on` can not. - fn open<'a>( + /// the context of a multithreaded tokio runtime with IO and time enabled. + fn open( &self, - rt: &'a tokio::runtime::Runtime, label: String, url: Url, options: retina::client::SessionOptions, - ) -> Result<(db::VideoSampleEntryToInsert, Box), Error>; + ) -> Result<(db::VideoSampleEntryToInsert, Box), Error>; } pub struct VideoFrame { @@ -54,33 +50,33 @@ pub struct RealOpener; pub const OPENER: RealOpener = RealOpener; impl Opener for RealOpener { - fn open<'a>( + fn open( &self, - rt: &'a tokio::runtime::Runtime, label: String, url: Url, options: retina::client::SessionOptions, - ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { + ) -> Result<(db::VideoSampleEntryToInsert, Box), Error> { let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))); - let (session, video_params, first_frame) = rt.block_on(tokio::time::timeout( + let rt_handle = tokio::runtime::Handle::current(); + let (session, video_params, first_frame) = rt_handle.block_on(tokio::time::timeout( RETINA_TIMEOUT, RetinaStream::play(&label, url, options), ))??; let extra_data = h264::parse_extra_data(video_params.extra_data())?; let stream = Box::new(RetinaStream { - rt, label, session, + rt_handle, first_frame: Some(first_frame), }); Ok((extra_data, stream)) } } -struct RetinaStream<'a> { - rt: &'a tokio::runtime::Runtime, +struct RetinaStream { label: String, session: Pin>, + rt_handle: tokio::runtime::Handle, /// The first frame, if not yet returned from `next`. /// @@ -89,7 +85,7 @@ struct RetinaStream<'a> { first_frame: Option, } -impl<'a> RetinaStream<'a> { +impl RetinaStream { /// Plays to first frame. No timeout; that's the caller's responsibility. async fn play( label: &str, @@ -179,14 +175,14 @@ impl<'a> RetinaStream<'a> { } } -impl<'a> Stream for RetinaStream<'a> { +impl Stream for RetinaStream { fn tool(&self) -> Option<&retina::client::Tool> { Pin::into_inner(self.session.as_ref()).tool() } fn next(&mut self) -> Result { let frame = self.first_frame.take().map(Ok).unwrap_or_else(|| { - self.rt + self.rt_handle .block_on(tokio::time::timeout( RETINA_TIMEOUT, RetinaStream::fetch_next_frame(&self.label, self.session.as_mut()), diff --git a/server/src/streamer.rs b/server/src/streamer.rs index c4f617b..a2b7264 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -110,15 +110,12 @@ where } /// Runs the streamer; blocks. + /// + /// Note: despite the blocking interface, this expects to be called from + /// the context of a multithreaded tokio runtime with IO and time enabled. pub fn run(&mut self) { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build() - .unwrap(); - let _guard = rt.enter(); while self.shutdown_rx.check().is_ok() { - if let Err(e) = self.run_once(&rt) { + if let Err(e) = self.run_once() { let sleep_time = time::Duration::seconds(1); warn!( "{}: sleeping for {} after error: {}", @@ -132,10 +129,11 @@ where info!("{}: shutting down", self.short_name); } - fn run_once(&mut self, rt: &tokio::runtime::Runtime) -> Result<(), Error> { + fn run_once(&mut self) -> Result<(), Error> { info!("{}: Opening input: {}", self.short_name, self.url.as_str()); let clocks = self.db.clocks(); + let handle = tokio::runtime::Handle::current(); let mut waited = false; loop { let status = self.session_group.stale_sessions(); @@ -146,7 +144,7 @@ where max_expires.saturating_duration_since(tokio::time::Instant::now()), status.num_sessions ); - rt.block_on(async { + handle.block_on(async { tokio::select! { _ = self.session_group.await_stale_sessions(&status) => Ok(()), _ = self.shutdown_rx.as_future() => Err(base::shutdown::ShutdownError), @@ -164,7 +162,6 @@ where let (extra_data, mut stream) = { let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); self.opener.open( - rt, self.short_name.clone(), self.url.clone(), retina::client::SessionOptions::default() @@ -365,7 +362,6 @@ mod tests { impl stream::Opener for MockOpener { fn open( &self, - _rt: &tokio::runtime::Runtime, _label: String, url: url::Url, _options: retina::client::SessionOptions, @@ -409,8 +405,8 @@ mod tests { .unwrap() } - #[test] - fn basic() { + #[tokio::test] + async fn basic() { testutil::init(); // 2015-04-25 00:00:00 UTC let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));