rework stream threading model

Fixes #206. 307a388 switched to creating a single-threaded runtime for
each stream, then destroying prior to waiting for TEARDOWN on shutdown.
This meant that the shutdown process could panic with this error:

```
panic at '/home/slamb/git/retina/src/client/mod.rs:219:22': teardown Sender shouldn't be dropped: RecvError(())
```

Let's switch back to expecting a multithreaded runtime context.
Create one for the config subcommand, too.

Don't go all the way back to the old code with its channels, though.
That had the downside that the underlying retina::Session might outlive
the caller, so there could still be an active session when we start
the next one. I haven't seen this cause problems in practice but it
still doesn't seem right.
This commit is contained in:
Scott Lamb 2022-04-13 11:39:38 -07:00
parent 5e7d558f99
commit 7b0a489541
6 changed files with 46 additions and 43 deletions

View File

@ -86,17 +86,16 @@ Moonfire NVR names a few important thread types as follows:
* `main`: during `moonfire-nvr run`, the main thread does initial setup then
just waits for the other threads. In other subcommands, it does everything.
* `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main` or `sub`): these
threads receive video from the cameras (via RTSP) and write it to disk.
* `s-CAMERA-TYPE` (one per stream, where `TYPE` is `main`, `sub`, or `ext`):
these threads write video to disk.
* `sync-PATH` (one per sample file directory): These threads call `fsync` to
* commit sample files to disk, delete old sample files, and flush the
database.
* `r-PATH` (one per sample file directory): These threads read sample files
from disk for serving `.mp4` files.
* `tokio-runtime-worker` (one per core, unless overridden with
`--worker-threads`): these threads handle HTTP requests.
When using `--rtsp-library=retina`, they also read video data from cameras
via RTSP.
`--worker-threads`): these threads handle HTTP requests and read video
data from cameras via RTSP.
* `logger`: this thread writes the log buffer to `stderr`. Logging is
asynchronous; other threads don't wait for log messages to be written
unless the log buffer is full.

View File

@ -204,18 +204,14 @@ fn press_edit(siv: &mut Cursive, db: &Arc<db::Database>, id: Option<i32>) {
}
fn press_test_inner(
handle: tokio::runtime::Handle,
url: Url,
username: String,
password: String,
transport: retina::client::Transport,
) -> Result<String, Error> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.enable_io()
.build()?;
let _guard = rt.enter();
let _enter = handle.enter();
let (extra_data, stream) = stream::OPENER.open(
&rt,
"test stream".to_owned(),
url,
retina::client::SessionOptions::default()
@ -262,8 +258,12 @@ fn press_test(siv: &mut Cursive, t: db::StreamType) {
// siv.cb_sink doesn't actually wake up the event loop. Tell siv to poll, as a workaround.
siv.set_fps(5);
let sink = siv.cb_sink().clone();
// Note: this expects to be called within a tokio runtime. Currently this
// is set up by the config subcommand's run().
let handle = tokio::runtime::Handle::current();
::std::thread::spawn(move || {
let r = press_test_inner(url.clone(), username, password, transport);
let r = press_test_inner(handle, url.clone(), username, password, transport);
sink.send(Box::new(move |siv: &mut Cursive| {
// Polling is no longer necessary.
siv.set_fps(0);

View File

@ -36,6 +36,13 @@ pub fn run(args: Args) -> Result<i32, Error> {
let clocks = clock::RealClocks {};
let db = Arc::new(db::Database::new(clocks, conn, true)?);
// This runtime is needed by the "Test" button in the camera config.
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.build()?;
let _enter = rt.enter();
let mut siv = cursive::default();
//siv.add_global_callback('q', |s| s.quit());

View File

@ -303,6 +303,7 @@ async fn inner(
}
// Then start up streams.
let handle = tokio::runtime::Handle::current();
let l = db.lock();
for (i, (id, stream)) in l.streams_by_id().iter().enumerate() {
if stream.config.mode != db::json::STREAM_MODE_RECORD {
@ -342,10 +343,14 @@ async fn inner(
)?;
info!("Starting streamer for {}", streamer.short_name());
let name = format!("s-{}", streamer.short_name());
let handle = handle.clone();
streamers.push(
thread::Builder::new()
.name(name)
.spawn(move || streamer.run())
.spawn(move || {
let _enter = handle.enter();
streamer.run();
})
.expect("can't create thread"),
);
}

View File

@ -20,17 +20,13 @@ pub trait Opener: Send + Sync {
/// Opens the given RTSP URL.
///
/// Note: despite the blocking interface, this expects to be called from
/// a tokio runtime with IO and time enabled. Takes the
/// [`tokio::runtime::Runtime`] rather than using
/// `tokio::runtime::Handle::current()` because `Runtime::block_on` can
/// drive IO and timers while `Handle::block_on` can not.
fn open<'a>(
/// the context of a multithreaded tokio runtime with IO and time enabled.
fn open(
&self,
rt: &'a tokio::runtime::Runtime,
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream + 'a>), Error>;
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error>;
}
pub struct VideoFrame {
@ -54,33 +50,33 @@ pub struct RealOpener;
pub const OPENER: RealOpener = RealOpener;
impl Opener for RealOpener {
fn open<'a>(
fn open(
&self,
rt: &'a tokio::runtime::Runtime,
label: String,
url: Url,
options: retina::client::SessionOptions,
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream + 'a>), Error> {
) -> Result<(db::VideoSampleEntryToInsert, Box<dyn Stream>), Error> {
let options = options.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION")));
let (session, video_params, first_frame) = rt.block_on(tokio::time::timeout(
let rt_handle = tokio::runtime::Handle::current();
let (session, video_params, first_frame) = rt_handle.block_on(tokio::time::timeout(
RETINA_TIMEOUT,
RetinaStream::play(&label, url, options),
))??;
let extra_data = h264::parse_extra_data(video_params.extra_data())?;
let stream = Box::new(RetinaStream {
rt,
label,
session,
rt_handle,
first_frame: Some(first_frame),
});
Ok((extra_data, stream))
}
}
struct RetinaStream<'a> {
rt: &'a tokio::runtime::Runtime,
struct RetinaStream {
label: String,
session: Pin<Box<Demuxed>>,
rt_handle: tokio::runtime::Handle,
/// The first frame, if not yet returned from `next`.
///
@ -89,7 +85,7 @@ struct RetinaStream<'a> {
first_frame: Option<retina::codec::VideoFrame>,
}
impl<'a> RetinaStream<'a> {
impl RetinaStream {
/// Plays to first frame. No timeout; that's the caller's responsibility.
async fn play(
label: &str,
@ -179,14 +175,14 @@ impl<'a> RetinaStream<'a> {
}
}
impl<'a> Stream for RetinaStream<'a> {
impl Stream for RetinaStream {
fn tool(&self) -> Option<&retina::client::Tool> {
Pin::into_inner(self.session.as_ref()).tool()
}
fn next(&mut self) -> Result<VideoFrame, Error> {
let frame = self.first_frame.take().map(Ok).unwrap_or_else(|| {
self.rt
self.rt_handle
.block_on(tokio::time::timeout(
RETINA_TIMEOUT,
RetinaStream::fetch_next_frame(&self.label, self.session.as_mut()),

View File

@ -110,15 +110,12 @@ where
}
/// Runs the streamer; blocks.
///
/// Note: despite the blocking interface, this expects to be called from
/// the context of a multithreaded tokio runtime with IO and time enabled.
pub fn run(&mut self) {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.unwrap();
let _guard = rt.enter();
while self.shutdown_rx.check().is_ok() {
if let Err(e) = self.run_once(&rt) {
if let Err(e) = self.run_once() {
let sleep_time = time::Duration::seconds(1);
warn!(
"{}: sleeping for {} after error: {}",
@ -132,10 +129,11 @@ where
info!("{}: shutting down", self.short_name);
}
fn run_once(&mut self, rt: &tokio::runtime::Runtime) -> Result<(), Error> {
fn run_once(&mut self) -> Result<(), Error> {
info!("{}: Opening input: {}", self.short_name, self.url.as_str());
let clocks = self.db.clocks();
let handle = tokio::runtime::Handle::current();
let mut waited = false;
loop {
let status = self.session_group.stale_sessions();
@ -146,7 +144,7 @@ where
max_expires.saturating_duration_since(tokio::time::Instant::now()),
status.num_sessions
);
rt.block_on(async {
handle.block_on(async {
tokio::select! {
_ = self.session_group.await_stale_sessions(&status) => Ok(()),
_ = self.shutdown_rx.as_future() => Err(base::shutdown::ShutdownError),
@ -164,7 +162,6 @@ where
let (extra_data, mut stream) = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open(
rt,
self.short_name.clone(),
self.url.clone(),
retina::client::SessionOptions::default()
@ -365,7 +362,6 @@ mod tests {
impl stream::Opener for MockOpener {
fn open(
&self,
_rt: &tokio::runtime::Runtime,
_label: String,
url: url::Url,
_options: retina::client::SessionOptions,
@ -409,8 +405,8 @@ mod tests {
.unwrap()
}
#[test]
fn basic() {
#[tokio::test]
async fn basic() {
testutil::init();
// 2015-04-25 00:00:00 UTC
let clocks = clock::SimulatedClocks::new(time::Timespec::new(1429920000, 0));