diff --git a/CHANGELOG.md b/CHANGELOG.md index ded524b..d2581b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,8 +16,13 @@ Each release is tagged in Git and on the Docker repository * fix [#157](https://github.com/scottlamb/moonfire-nvr/issues/157): broken live view when using multi-view and selecting the first listed camera then selecting another camera for the upper left grid square. -* support `--rtsp-transport=udp`, which improves compatibility with Reolink - cameras. +* support `--rtsp-transport=udp`, which may work better with cameras that + use old versions of the live555 library, including many Reolink models. +* send RTSP `TEARDOWN` requests on UDP or with old live555 versions; wait out + stale sessions before reconnecting to the same camera. This may improve + reliability with old live555 versions when using TCP also. +* improve compatibility with cameras that send non-compliant SDP, including + models from Geovision and Anpviz. ## `v0.6.5` (2021-08-13) diff --git a/server/Cargo.lock b/server/Cargo.lock index 3b87fa3..6b2a2c7 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -1868,9 +1868,9 @@ dependencies = [ [[package]] name = "retina" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "446070f3caf291e982d240c7f921837f6da0cbffbc26f1d785b0a8d214f2cadd" +checksum = "819b5befff8af9b03e21256d577d101754527eb981dcd2804cb8725ce1dca978" dependencies = [ "base64", "bitreader", @@ -1886,7 +1886,7 @@ dependencies = [ "rand", "rtp-rs", "rtsp-types", - "sdp", + "sdp-types", "smallvec", "thiserror", "time", @@ -2024,14 +2024,13 @@ dependencies = [ ] [[package]] -name = "sdp" -version = "0.1.5" +name = "sdp-types" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db73ce8329b973830407fb1ba0e51bc32716392281f7757a92f372a1420bb8ec" +checksum = "ae499f6886cff026ebd8355c8f67a1881cd15f23ce89de4aab13588cf52142dd" dependencies = [ - "rand", - "thiserror", - "url", + "bstr", + "fallible-iterator", ] [[package]] @@ -2364,9 +2363,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.10.1" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92036be488bb6594459f2e03b60e42df6f937fe6ca5c5ffdcb539c6b84dc40f5" +checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce" dependencies = [ "autocfg", "bytes", diff --git a/server/Cargo.toml b/server/Cargo.toml index 473dbf0..6283e24 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -46,7 +46,7 @@ nom = "7.0.0" parking_lot = { version = "0.11.1", features = [] } protobuf = { git = "https://github.com/stepancheg/rust-protobuf" } reffers = "0.6.0" -retina = "0.3.0" +retina = "0.3.1" ring = "0.16.2" rusqlite = "0.25.3" serde = { version = "1.0", features = ["derive"] } diff --git a/server/src/cmds/config/cameras.rs b/server/src/cmds/config/cameras.rs index f511189..be02eb7 100644 --- a/server/src/cmds/config/cameras.rs +++ b/server/src/cmds/config/cameras.rs @@ -158,6 +158,7 @@ fn press_test_inner( username, password, transport: retina::client::Transport::Tcp, + session_group: Default::default(), }, )?; Ok(format!( diff --git a/server/src/cmds/run.rs b/server/src/cmds/run.rs index bf89ce3..4f71ea0 100644 --- a/server/src/cmds/run.rs +++ b/server/src/cmds/run.rs @@ -10,6 +10,7 @@ use failure::{bail, Error, ResultExt}; use fnv::FnvHashMap; use futures::future::FutureExt; use hyper::service::{make_service_fn, service_fn}; +use log::error; use log::{info, warn}; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; @@ -216,6 +217,8 @@ async fn async_run(args: &Args) -> Result { // Start a streamer for each stream. let shutdown_streamers = Arc::new(AtomicBool::new(false)); let mut streamers = Vec::new(); + let mut session_groups_by_camera: FnvHashMap> = + FnvHashMap::default(); let syncers = if !args.read_only { let l = db.lock(); let mut dirs = FnvHashMap::with_capacity_and_hasher( @@ -271,6 +274,10 @@ async fn async_run(args: &Args) -> Result { }; let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let syncer = syncers.get(&sample_file_dir_id).unwrap(); + let session_group = session_groups_by_camera + .entry(camera.id) + .or_default() + .clone(); let mut streamer = streamer::Streamer::new( &env, syncer.dir.clone(), @@ -278,6 +285,7 @@ async fn async_run(args: &Args) -> Result { *id, camera, stream, + session_group, rotate_offset_sec, streamer::ROTATE_INTERVAL_SEC, )?; @@ -344,6 +352,14 @@ async fn async_run(args: &Args) -> Result { info!("Waiting for HTTP requests to finish."); server_handle.await??; + + info!("Waiting for TEARDOWN requests to complete."); + for g in session_groups_by_camera.values() { + if let Err(e) = g.await_teardown().await { + error!("{}", e); + } + } + info!("Exiting."); Ok(0) } diff --git a/server/src/stream.rs b/server/src/stream.rs index 2c5bc37..60762ad 100644 --- a/server/src/stream.rs +++ b/server/src/stream.rs @@ -15,6 +15,7 @@ use std::convert::TryFrom; use std::ffi::CString; use std::pin::Pin; use std::result::Result; +use std::sync::Arc; use url::Url; static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); @@ -62,6 +63,7 @@ pub enum Source<'a> { username: Option, password: Option, transport: Transport, + session_group: Arc, }, } @@ -73,6 +75,7 @@ pub enum Source { username: Option, password: Option, transport: Transport, + session_group: Arc, }, } @@ -141,6 +144,7 @@ impl Opener for Ffmpeg { username, password, transport, + .. } => { let mut open_options = ffmpeg::avutil::Dictionary::new(); open_options @@ -301,6 +305,7 @@ impl Opener for RetinaOpener { username, password, transport, + session_group, } => ( url, retina::client::SessionOptions::default() @@ -313,6 +318,7 @@ impl Opener for RetinaOpener { _ => bail!("must supply username when supplying password"), }) .transport(transport) + .session_group(session_group) .user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))), ), }; diff --git a/server/src/streamer.rs b/server/src/streamer.rs index 7becc86..a0215ca 100644 --- a/server/src/streamer.rs +++ b/server/src/streamer.rs @@ -42,6 +42,7 @@ where opener: &'a dyn stream::Opener, transport: retina::client::Transport, stream_id: i32, + session_group: Arc, short_name: String, url: Url, username: Option, @@ -59,6 +60,7 @@ where stream_id: i32, c: &Camera, s: &Stream, + session_group: Arc, rotate_offset_sec: i64, rotate_interval_sec: i64, ) -> Result { @@ -76,6 +78,7 @@ where opener: env.opener, transport: env.transport, stream_id, + session_group, short_name: format!("{}-{}", c.short_name, s.type_.as_str()), url, username: c.username.clone(), @@ -110,6 +113,28 @@ where info!("{}: Opening input: {}", self.short_name, self.url.as_str()); let clocks = self.db.clocks(); + let mut waited = false; + loop { + let status = self.session_group.stale_sessions(); + if let Some(max_expires) = status.max_expires { + if let Some(d) = max_expires.checked_duration_since(tokio::time::Instant::now()) { + log::info!( + "{}: Waiting {:?} for {} stale sessions to expire", + &self.short_name, + d, + status.num_sessions + ); + std::thread::sleep(d); + waited = true; + } + } else { + if waited { + log::info!("{}: Done waiting", &self.short_name); + } + break; + } + } + let (extra_data, mut stream) = { let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); self.opener.open( @@ -119,6 +144,7 @@ where username: self.username.clone(), password: self.password.clone(), transport: self.transport, + session_group: self.session_group.clone(), }, )? }; @@ -383,6 +409,7 @@ mod tests { testutil::TEST_STREAM_ID, camera, s, + Arc::new(retina::client::SessionGroup::default()), 0, 3, )