handle stale RTSP sessions

* upgrade to Retina 0.3.1 which automatically tears down sessions
* wait out stale sessions before reconnecting
* wait for teardown to complete before shutting down

This adds some pressure on #117: it will keep waiting for the stale
session to expire even if the user has requested shutdown. I'll try
to address that next.
This commit is contained in:
Scott Lamb 2021-09-09 21:14:01 -07:00
parent 981a91c425
commit 92f594ef58
7 changed files with 68 additions and 14 deletions

View File

@ -16,8 +16,13 @@ Each release is tagged in Git and on the Docker repository
* fix [#157](https://github.com/scottlamb/moonfire-nvr/issues/157): broken * fix [#157](https://github.com/scottlamb/moonfire-nvr/issues/157): broken
live view when using multi-view and selecting the first listed camera live view when using multi-view and selecting the first listed camera
then selecting another camera for the upper left grid square. then selecting another camera for the upper left grid square.
* support `--rtsp-transport=udp`, which improves compatibility with Reolink * support `--rtsp-transport=udp`, which may work better with cameras that
cameras. use old versions of the live555 library, including many Reolink models.
* send RTSP `TEARDOWN` requests on UDP or with old live555 versions; wait out
stale sessions before reconnecting to the same camera. This may improve
reliability with old live555 versions when using TCP also.
* improve compatibility with cameras that send non-compliant SDP, including
models from Geovision and Anpviz.
## `v0.6.5` (2021-08-13) ## `v0.6.5` (2021-08-13)

21
server/Cargo.lock generated
View File

@ -1868,9 +1868,9 @@ dependencies = [
[[package]] [[package]]
name = "retina" name = "retina"
version = "0.3.0" version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "446070f3caf291e982d240c7f921837f6da0cbffbc26f1d785b0a8d214f2cadd" checksum = "819b5befff8af9b03e21256d577d101754527eb981dcd2804cb8725ce1dca978"
dependencies = [ dependencies = [
"base64", "base64",
"bitreader", "bitreader",
@ -1886,7 +1886,7 @@ dependencies = [
"rand", "rand",
"rtp-rs", "rtp-rs",
"rtsp-types", "rtsp-types",
"sdp", "sdp-types",
"smallvec", "smallvec",
"thiserror", "thiserror",
"time", "time",
@ -2024,14 +2024,13 @@ dependencies = [
] ]
[[package]] [[package]]
name = "sdp" name = "sdp-types"
version = "0.1.5" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db73ce8329b973830407fb1ba0e51bc32716392281f7757a92f372a1420bb8ec" checksum = "ae499f6886cff026ebd8355c8f67a1881cd15f23ce89de4aab13588cf52142dd"
dependencies = [ dependencies = [
"rand", "bstr",
"thiserror", "fallible-iterator",
"url",
] ]
[[package]] [[package]]
@ -2364,9 +2363,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.10.1" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92036be488bb6594459f2e03b60e42df6f937fe6ca5c5ffdcb539c6b84dc40f5" checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",

View File

@ -46,7 +46,7 @@ nom = "7.0.0"
parking_lot = { version = "0.11.1", features = [] } parking_lot = { version = "0.11.1", features = [] }
protobuf = { git = "https://github.com/stepancheg/rust-protobuf" } protobuf = { git = "https://github.com/stepancheg/rust-protobuf" }
reffers = "0.6.0" reffers = "0.6.0"
retina = "0.3.0" retina = "0.3.1"
ring = "0.16.2" ring = "0.16.2"
rusqlite = "0.25.3" rusqlite = "0.25.3"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }

View File

@ -158,6 +158,7 @@ fn press_test_inner(
username, username,
password, password,
transport: retina::client::Transport::Tcp, transport: retina::client::Transport::Tcp,
session_group: Default::default(),
}, },
)?; )?;
Ok(format!( Ok(format!(

View File

@ -10,6 +10,7 @@ use failure::{bail, Error, ResultExt};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::future::FutureExt; use futures::future::FutureExt;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use log::error;
use log::{info, warn}; use log::{info, warn};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
@ -216,6 +217,8 @@ async fn async_run(args: &Args) -> Result<i32, Error> {
// Start a streamer for each stream. // Start a streamer for each stream.
let shutdown_streamers = Arc::new(AtomicBool::new(false)); let shutdown_streamers = Arc::new(AtomicBool::new(false));
let mut streamers = Vec::new(); let mut streamers = Vec::new();
let mut session_groups_by_camera: FnvHashMap<i32, Arc<retina::client::SessionGroup>> =
FnvHashMap::default();
let syncers = if !args.read_only { let syncers = if !args.read_only {
let l = db.lock(); let l = db.lock();
let mut dirs = FnvHashMap::with_capacity_and_hasher( let mut dirs = FnvHashMap::with_capacity_and_hasher(
@ -271,6 +274,10 @@ async fn async_run(args: &Args) -> Result<i32, Error> {
}; };
let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64; let rotate_offset_sec = streamer::ROTATE_INTERVAL_SEC * i as i64 / streams as i64;
let syncer = syncers.get(&sample_file_dir_id).unwrap(); let syncer = syncers.get(&sample_file_dir_id).unwrap();
let session_group = session_groups_by_camera
.entry(camera.id)
.or_default()
.clone();
let mut streamer = streamer::Streamer::new( let mut streamer = streamer::Streamer::new(
&env, &env,
syncer.dir.clone(), syncer.dir.clone(),
@ -278,6 +285,7 @@ async fn async_run(args: &Args) -> Result<i32, Error> {
*id, *id,
camera, camera,
stream, stream,
session_group,
rotate_offset_sec, rotate_offset_sec,
streamer::ROTATE_INTERVAL_SEC, streamer::ROTATE_INTERVAL_SEC,
)?; )?;
@ -344,6 +352,14 @@ async fn async_run(args: &Args) -> Result<i32, Error> {
info!("Waiting for HTTP requests to finish."); info!("Waiting for HTTP requests to finish.");
server_handle.await??; server_handle.await??;
info!("Waiting for TEARDOWN requests to complete.");
for g in session_groups_by_camera.values() {
if let Err(e) = g.await_teardown().await {
error!("{}", e);
}
}
info!("Exiting."); info!("Exiting.");
Ok(0) Ok(0)
} }

View File

@ -15,6 +15,7 @@ use std::convert::TryFrom;
use std::ffi::CString; use std::ffi::CString;
use std::pin::Pin; use std::pin::Pin;
use std::result::Result; use std::result::Result;
use std::sync::Arc;
use url::Url; use url::Url;
static START_FFMPEG: parking_lot::Once = parking_lot::Once::new(); static START_FFMPEG: parking_lot::Once = parking_lot::Once::new();
@ -62,6 +63,7 @@ pub enum Source<'a> {
username: Option<String>, username: Option<String>,
password: Option<String>, password: Option<String>,
transport: Transport, transport: Transport,
session_group: Arc<retina::client::SessionGroup>,
}, },
} }
@ -73,6 +75,7 @@ pub enum Source {
username: Option<String>, username: Option<String>,
password: Option<String>, password: Option<String>,
transport: Transport, transport: Transport,
session_group: Arc<retina::client::SessionGroup>,
}, },
} }
@ -141,6 +144,7 @@ impl Opener for Ffmpeg {
username, username,
password, password,
transport, transport,
..
} => { } => {
let mut open_options = ffmpeg::avutil::Dictionary::new(); let mut open_options = ffmpeg::avutil::Dictionary::new();
open_options open_options
@ -301,6 +305,7 @@ impl Opener for RetinaOpener {
username, username,
password, password,
transport, transport,
session_group,
} => ( } => (
url, url,
retina::client::SessionOptions::default() retina::client::SessionOptions::default()
@ -313,6 +318,7 @@ impl Opener for RetinaOpener {
_ => bail!("must supply username when supplying password"), _ => bail!("must supply username when supplying password"),
}) })
.transport(transport) .transport(transport)
.session_group(session_group)
.user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))), .user_agent(format!("Moonfire NVR {}", env!("CARGO_PKG_VERSION"))),
), ),
}; };

View File

@ -42,6 +42,7 @@ where
opener: &'a dyn stream::Opener, opener: &'a dyn stream::Opener,
transport: retina::client::Transport, transport: retina::client::Transport,
stream_id: i32, stream_id: i32,
session_group: Arc<retina::client::SessionGroup>,
short_name: String, short_name: String,
url: Url, url: Url,
username: Option<String>, username: Option<String>,
@ -59,6 +60,7 @@ where
stream_id: i32, stream_id: i32,
c: &Camera, c: &Camera,
s: &Stream, s: &Stream,
session_group: Arc<retina::client::SessionGroup>,
rotate_offset_sec: i64, rotate_offset_sec: i64,
rotate_interval_sec: i64, rotate_interval_sec: i64,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
@ -76,6 +78,7 @@ where
opener: env.opener, opener: env.opener,
transport: env.transport, transport: env.transport,
stream_id, stream_id,
session_group,
short_name: format!("{}-{}", c.short_name, s.type_.as_str()), short_name: format!("{}-{}", c.short_name, s.type_.as_str()),
url, url,
username: c.username.clone(), username: c.username.clone(),
@ -110,6 +113,28 @@ where
info!("{}: Opening input: {}", self.short_name, self.url.as_str()); info!("{}: Opening input: {}", self.short_name, self.url.as_str());
let clocks = self.db.clocks(); let clocks = self.db.clocks();
let mut waited = false;
loop {
let status = self.session_group.stale_sessions();
if let Some(max_expires) = status.max_expires {
if let Some(d) = max_expires.checked_duration_since(tokio::time::Instant::now()) {
log::info!(
"{}: Waiting {:?} for {} stale sessions to expire",
&self.short_name,
d,
status.num_sessions
);
std::thread::sleep(d);
waited = true;
}
} else {
if waited {
log::info!("{}: Done waiting", &self.short_name);
}
break;
}
}
let (extra_data, mut stream) = { let (extra_data, mut stream) = {
let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str())); let _t = TimerGuard::new(&clocks, || format!("opening {}", self.url.as_str()));
self.opener.open( self.opener.open(
@ -119,6 +144,7 @@ where
username: self.username.clone(), username: self.username.clone(),
password: self.password.clone(), password: self.password.clone(),
transport: self.transport, transport: self.transport,
session_group: self.session_group.clone(),
}, },
)? )?
}; };
@ -383,6 +409,7 @@ mod tests {
testutil::TEST_STREAM_ID, testutil::TEST_STREAM_ID,
camera, camera,
s, s,
Arc::new(retina::client::SessionGroup::default()),
0, 0,
3, 3,
) )