diff --git a/CHANGELOG.md b/CHANGELOG.md index a7b3f1e..cdffb2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,13 +10,16 @@ even on minor releases, e.g. `v0.7.5` -> `v0.7.6`. ## unreleased -* upgrade to Retina 0.4.9, adding support for recording MJPEG video. Note +* upgrade to Retina 0.4.10, adding support for recording MJPEG video. Note browser playback is unlikely to work. * bump minimum Rust version to 1.79. * in UI's list view, add a tooltip on the end time which shows why the recording ended. * fix [#121](https://github.com/scottlamb/moonfire-nvr/issues/121): iPhone live view. +* update to hyper and http version 1.0. In the process, no longer wait for + pending HTTP requests on shutdown. This just extended the time Moonfire was + running without streaming. ## v0.7.16 (2024-05-30) diff --git a/server/Cargo.lock b/server/Cargo.lock index 6f7a945..0b5b422 100644 --- a/server/Cargo.lock +++ b/server/Cargo.lock @@ -101,9 +101,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.7" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" @@ -235,7 +235,7 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -282,16 +282,6 @@ dependencies = [ "futures", ] -[[package]] -name = "core-foundation" -version = "0.9.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" -dependencies = [ - "core-foundation-sys", - "libc", -] - [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -474,15 +464,6 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" -[[package]] -name = "encoding_rs" -version = "0.8.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" -dependencies = [ - "cfg-if", -] - [[package]] name = "enum-map" version = "2.7.3" @@ -727,25 +708,6 @@ dependencies = [ "syn", ] -[[package]] -name = "h2" -version = "0.3.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "h264-reader" version = "0.7.0" @@ -815,9 +777,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -826,9 +788,9 @@ dependencies = [ [[package]] name = "http-auth" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "643c9bbf6a4ea8a656d6b4cd53d34f79e3f841ad5203c1a55fb7d761923bc255" +checksum = "150fa4a9462ef926824cf4519c84ed652ca8f4fbae34cb8af045b5cbcaf98822" dependencies = [ "base64", "digest", @@ -841,24 +803,35 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.6" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body", "pin-project-lite", ] [[package]] name = "http-serve" -version = "0.3.6" +version = "0.4.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496ab5f39096e4c915f167c276aea19521ed862beb50f7d2bc530578535689d7" +checksum = "4e30e587eb43944a00ad6b239f691694564fc050c81b86e39006082037188084" dependencies = [ "bytes", "flate2", - "futures-channel", "futures-core", "futures-util", "http", @@ -869,6 +842,7 @@ dependencies = [ "mime", "pin-project", "smallvec", + "sync_wrapper", "tokio", "winapi", ] @@ -887,26 +861,42 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.30" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", + "smallvec", + "tokio", + "want", +] + +[[package]] +name = "hyper-util" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", "socket2", "tokio", + "tower", "tower-service", "tracing", - "want", ] [[package]] @@ -1227,8 +1217,11 @@ dependencies = [ "git-version", "h264-reader", "http", + "http-body", + "http-body-util", "http-serve", "hyper", + "hyper-util", "itertools", "libc", "libsystemd", @@ -1251,7 +1244,6 @@ dependencies = [ "serde", "serde_json", "smallvec", - "sync_wrapper", "tempfile", "time 0.1.45", "tokio", @@ -1481,7 +1473,7 @@ dependencies = [ "libc", "redox_syscall 0.5.3", "smallvec", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -1748,19 +1740,19 @@ checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" [[package]] name = "reqwest" -version = "0.11.27" +version = "0.12.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +checksum = "f8f4955649ef5c38cc7f9e8aa41761d48fb9677197daea9984dc54f56aad5e63" dependencies = [ "base64", "bytes", - "encoding_rs", "futures-core", "futures-util", - "h2", "http", "http-body", + "http-body-util", "hyper", + "hyper-util", "ipnet", "js-sys", "log", @@ -1772,21 +1764,20 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper", - "system-configuration", "tokio", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "winreg", + "windows-registry", ] [[package]] name = "retina" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef9828fb04b8b2bd763887cf4be07aa85aecaa7fce3ee3c7f57bf61e804e9e5c" +checksum = "cd5652758580215edaf590a03298ff72ff5f965c2dea8d6e3f058ef728fbf773" dependencies = [ "base64", "bitstream-io", @@ -2119,29 +2110,11 @@ dependencies = [ [[package]] name = "sync_wrapper" -version = "0.1.2" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" - -[[package]] -name = "system-configuration" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" dependencies = [ - "bitflags 1.3.2", - "core-foundation", - "system-configuration-sys", -] - -[[package]] -name = "system-configuration-sys" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" -dependencies = [ - "core-foundation-sys", - "libc", + "futures-core", ] [[package]] @@ -2288,9 +2261,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.1" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" dependencies = [ "futures-util", "log", @@ -2345,6 +2318,27 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -2461,9 +2455,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" dependencies = [ "byteorder", "bytes", @@ -2474,7 +2468,6 @@ dependencies = [ "rand", "sha1", "thiserror", - "url", "utf-8", ] @@ -2753,16 +2746,37 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-sys" -version = "0.48.0" +name = "windows-registry" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ - "windows-targets 0.48.5", + "windows-result", + "windows-strings", + "windows-targets", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets", ] [[package]] @@ -2771,7 +2785,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -2780,22 +2794,7 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets 0.52.6", -] - -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -2804,46 +2803,28 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2856,48 +2837,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2913,16 +2870,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winreg" -version = "0.50.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" -dependencies = [ - "cfg-if", - "windows-sys 0.48.0", -] - [[package]] name = "xi-unicode" version = "0.3.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index 6775a8b..d5dbb34 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -23,7 +23,7 @@ bundled-ui = [] members = ["base", "db"] [workspace.dependencies] -base64 = "0.21.0" +base64 = "0.22.0" h264-reader = "0.7.0" itertools = "0.12.0" nix = "0.27.0" @@ -47,9 +47,9 @@ cursive = { version = "0.21.1", default-features = false, features = ["termion-b db = { package = "moonfire-db", path = "db" } futures = "0.3" h264-reader = { workspace = true } -http = "0.2.3" -http-serve = { version = "0.3.1", features = ["dir"] } -hyper = { version = "0.14.2", features = ["http1", "server", "stream", "tcp"] } +http = "1.1.0" +http-serve = { version = "0.4.0-rc.1", features = ["dir"] } +hyper = { version = "1.4.1", features = ["http1", "server"] } itertools = { workspace = true } libc = "0.2" log = { version = "0.4" } @@ -66,10 +66,9 @@ rusqlite = { workspace = true } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" smallvec = { version = "1.7", features = ["union"] } -sync_wrapper = "0.1.0" time = "0.1" tokio = { version = "1.24", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } -tokio-tungstenite = "0.20.0" +tokio-tungstenite = "0.23.1" toml = "0.8" tracing = { workspace = true, features = ["log"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } @@ -81,6 +80,9 @@ url = "2.1.1" uuid = { version = "1.1.2", features = ["serde", "std", "v4"] } flate2 = "1.0.26" git-version = "0.3.5" +hyper-util = { version = "0.1.7", features = ["server-graceful", "tokio"] } +http-body = "1.0.1" +http-body-util = "0.1.2" [target.'cfg(target_os = "linux")'.dependencies] libsystemd = "0.7.0" @@ -93,7 +95,7 @@ walkdir = "2.3.3" [dev-dependencies] mp4 = { git = "https://github.com/scottlamb/mp4-rust", branch = "moonfire" } num-rational = { version = "0.4.0", default-features = false, features = ["std"] } -reqwest = { version = "0.11.0", default-features = false, features = ["json"] } +reqwest = { version = "0.12.0", default-features = false, features = ["json"] } tempfile = "3.2.0" tracing-test = "0.2.4" @@ -122,4 +124,4 @@ debug = 1 tracing = { git = "https://github.com/scottlamb/tracing", rev = "861b443d7b2da400ca7b09111957f33c80135908" } tracing-core = { git = "https://github.com/scottlamb/tracing", rev = "861b443d7b2da400ca7b09111957f33c80135908" } tracing-log = { git = "https://github.com/scottlamb/tracing", rev = "861b443d7b2da400ca7b09111957f33c80135908" } -tracing-subscriber = { git = "https://github.com/scottlamb/tracing", rev = "861b443d7b2da400ca7b09111957f33c80135908" } \ No newline at end of file +tracing-subscriber = { git = "https://github.com/scottlamb/tracing", rev = "861b443d7b2da400ca7b09111957f33c80135908" } diff --git a/server/base/shutdown.rs b/server/base/shutdown.rs index 1ab157f..fd6d88c 100644 --- a/server/base/shutdown.rs +++ b/server/base/shutdown.rs @@ -147,6 +147,18 @@ impl<'receiver> Future for ReceiverRefFuture<'receiver> { } } +impl Drop for ReceiverRefFuture<'_> { + fn drop(&mut self) { + if self.waker_i == NO_WAKER { + return; + } + let mut l = self.receiver.0.wakers.lock().unwrap(); + if let Some(wakers) = &mut *l { + wakers.remove(self.waker_i); + } + } +} + impl Future for ReceiverFuture { type Output = (); @@ -156,6 +168,18 @@ impl Future for ReceiverFuture { } } +impl Drop for ReceiverFuture { + fn drop(&mut self) { + if self.waker_i == NO_WAKER { + return; + } + let mut l = self.receiver.wakers.lock().unwrap(); + if let Some(wakers) = &mut *l { + wakers.remove(self.waker_i); + } + } +} + /// Returns a sender and receiver for graceful shutdown. /// /// Dropping the sender will request shutdown. diff --git a/server/src/body.rs b/server/src/body.rs index c929ed2..5287e84 100644 --- a/server/src/body.rs +++ b/server/src/body.rs @@ -11,20 +11,15 @@ //! although this is a pretty small optimization. //! //! Some day I expect [bytes::Bytes] will expose its vtable (see link above), -//! allowing us to minimize reference-counting while using the standard -//! [hyper::Body]. +//! allowing us to minimize reference-counting without a custom chunk type. use base::Error; -use futures::{stream, Stream}; use reffers::ARefss; use std::error::Error as StdError; -use std::pin::Pin; -use sync_wrapper::SyncWrapper; pub struct Chunk(ARefss<'static, [u8]>); pub type BoxedError = Box; -pub type BodyStream = Box> + Send>; pub fn wrap_error(e: Error) -> BoxedError { Box::new(e) @@ -72,55 +67,4 @@ impl hyper::body::Buf for Chunk { } } -// This SyncWrapper stuff is blindly copied from hyper's body type. -// See , matched by -// . -pub struct Body(SyncWrapper>); - -impl hyper::body::HttpBody for Body { - type Data = Chunk; - type Error = BoxedError; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut std::task::Context, - ) -> std::task::Poll>> { - // This is safe because the pin is not structural. - // https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field - // (The field _holds_ a pin, but isn't itself pinned.) - unsafe { self.get_unchecked_mut() } - .0 - .get_mut() - .as_mut() - .poll_next(cx) - } - - fn poll_trailers( - self: Pin<&mut Self>, - _cx: &mut std::task::Context, - ) -> std::task::Poll, Self::Error>> { - std::task::Poll::Ready(Ok(None)) - } -} - -impl From for Body { - fn from(b: BodyStream) -> Self { - Body(SyncWrapper::new(Pin::from(b))) - } -} - -impl> From for Body { - fn from(c: C) -> Self { - Body(SyncWrapper::new(Box::pin(stream::once( - futures::future::ok(c.into()), - )))) - } -} - -impl From for Body { - fn from(e: Error) -> Self { - Body(SyncWrapper::new(Box::pin(stream::once( - futures::future::err(wrap_error(e)), - )))) - } -} +pub type Body = http_serve::Body; diff --git a/server/src/bundled_ui.rs b/server/src/bundled_ui.rs index 6cd66ba..e9b83eb 100644 --- a/server/src/bundled_ui.rs +++ b/server/src/bundled_ui.rs @@ -6,8 +6,8 @@ use base::FastHashMap; use http::{header, HeaderMap, HeaderValue}; -use std::io::Read; use std::sync::OnceLock; +use std::{io::Read, pin::Pin}; use crate::body::{BoxedError, Chunk}; @@ -150,9 +150,9 @@ impl http_serve::Entity for Entity { fn get_range( &self, range: std::ops::Range, - ) -> Box> + Send + Sync> { + ) -> Pin> + Send + Sync>> { let file = self.file; - Box::new(futures::stream::once(async move { + Box::pin(futures::stream::once(async move { let r = usize::try_from(range.start)?..usize::try_from(range.end)?; let Some(data) = file.data.get(r) else { let len = file.data.len(); diff --git a/server/src/cmds/run/config.rs b/server/src/cmds/run/config.rs index da45faf..8592ca6 100644 --- a/server/src/cmds/run/config.rs +++ b/server/src/cmds/run/config.rs @@ -99,7 +99,7 @@ pub struct BindConfig { pub own_uid_is_privileged: bool, } -#[derive(Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize)] #[serde(deny_unknown_fields)] #[serde(rename_all = "camelCase")] pub enum AddressConfig { @@ -118,3 +118,14 @@ pub enum AddressConfig { /// page](https://www.freedesktop.org/software/systemd/man/systemd.socket.html). Systemd(#[cfg_attr(not(target_os = "linux"), allow(unused))] String), } + +impl std::fmt::Display for AddressConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AddressConfig::Ipv4(addr) => write!(f, "ipv4:{}", addr), + AddressConfig::Ipv6(addr) => write!(f, "ipv6:{}", addr), + AddressConfig::Unix(path) => write!(f, "unix:{}", path.display()), + AddressConfig::Systemd(name) => write!(f, "systemd:{name}"), + } + } +} diff --git a/server/src/cmds/run/mod.rs b/server/src/cmds/run/mod.rs index 0a566fd..f977e8f 100644 --- a/server/src/cmds/run/mod.rs +++ b/server/src/cmds/run/mod.rs @@ -11,7 +11,7 @@ use base::FastHashMap; use base::{bail, Error}; use bpaf::Bpaf; use db::{dir, writer}; -use hyper::service::{make_service_fn, service_fn}; +use hyper::service::service_fn; use itertools::Itertools; use retina::client::SessionGroup; use std::net::SocketAddr; @@ -448,35 +448,43 @@ async fn inner( // Start the web interface(s). let own_euid = nix::unistd::Uid::effective(); let mut preopened = get_preopened_sockets()?; - let web_handles: Result, Error> = config - .binds - .iter() - .map(|b| { - let svc = Arc::new(web::Service::new(web::Config { - db: db.clone(), - ui_dir: Some(&config.ui_dir), - allow_unauthenticated_permissions: b - .allow_unauthenticated_permissions - .clone() - .map(db::Permissions::from), - trust_forward_hdrs: b.trust_forward_headers, - time_zone_name: time_zone_name.clone(), - privileged_unix_uid: b.own_uid_is_privileged.then_some(own_euid), - })?); - let make_svc = make_service_fn(move |conn: &crate::web::accept::Conn| { + for bind in &config.binds { + let svc = Arc::new(web::Service::new(web::Config { + db: db.clone(), + ui_dir: Some(&config.ui_dir), + allow_unauthenticated_permissions: bind + .allow_unauthenticated_permissions + .clone() + .map(db::Permissions::from), + trust_forward_hdrs: bind.trust_forward_headers, + time_zone_name: time_zone_name.clone(), + privileged_unix_uid: bind.own_uid_is_privileged.then_some(own_euid), + })?); + let mut listener = make_listener(&bind.address, &mut preopened)?; + let addr = bind.address.clone(); + tokio::spawn(async move { + loop { + let conn = match listener.accept().await { + Ok(c) => c, + Err(e) => { + error!(err = %e, listener = %addr, "accept failed; will retry in 1 sec"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + let svc = Arc::clone(&svc); let conn_data = *conn.data(); - futures::future::ok::<_, std::convert::Infallible>(service_fn({ - let svc = Arc::clone(&svc); - move |req| Arc::clone(&svc).serve(req, conn_data) - })) - }); - let listener = make_listener(&b.address, &mut preopened)?; - let server = ::hyper::Server::builder(listener).serve(make_svc); - let server = server.with_graceful_shutdown(shutdown_rx.future()); - Ok(tokio::spawn(server)) - }) - .collect(); - let web_handles = web_handles?; + let io = hyper_util::rt::TokioIo::new(conn); + let svc = Arc::clone(&svc); + let svc_fn = service_fn(move |req| Arc::clone(&svc).serve(req, conn_data)); + tokio::spawn( + hyper::server::conn::http1::Builder::new() + .serve_connection(io, svc_fn) + .with_upgrades(), + ); + } + }); + } if !preopened.is_empty() { warn!( "ignoring systemd sockets not referenced in config: {}", @@ -524,13 +532,6 @@ async fn inner( .await .map_err(|e| err!(Unknown, source(e)))?; - info!("Waiting for HTTP requests to finish."); - for h in web_handles { - h.await - .map_err(|e| err!(Unknown, source(e)))? - .map_err(|e| err!(Unknown, source(e)))?; - } - info!("Waiting for TEARDOWN requests to complete."); for g in session_groups_by_camera.values() { if let Err(err) = g.await_teardown().await { diff --git a/server/src/mp4.rs b/server/src/mp4.rs index 03557e9..bd14d11 100644 --- a/server/src/mp4.rs +++ b/server/src/mp4.rs @@ -74,6 +74,7 @@ use std::fmt; use std::io; use std::mem; use std::ops::Range; +use std::pin::Pin; use std::sync::Arc; use std::sync::Once; use std::time::SystemTime; @@ -1971,7 +1972,7 @@ impl http_serve::Entity for File { fn get_range( &self, range: Range, - ) -> Box> + Send + Sync> { + ) -> Pin> + Send + Sync>> { self.0.slices.get_range(self, range) } } diff --git a/server/src/slices.rs b/server/src/slices.rs index 7653aa4..e380813 100644 --- a/server/src/slices.rs +++ b/server/src/slices.rs @@ -132,10 +132,10 @@ where &self, ctx: &S::Ctx, range: Range, - ) -> Box> + Sync + Send> { + ) -> Pin> + Sync + Send>> { #[allow(clippy::suspicious_operation_groupings)] if range.start > range.end || range.end > self.len { - return Box::new(stream::once(futures::future::err(wrap_error(err!( + return Box::pin(stream::once(futures::future::err(wrap_error(err!( Internal, msg("bad range {:?} for slice of length {}", range, self.len), ))))); @@ -173,7 +173,7 @@ where futures::future::ready(Some((Pin::from(body), (c, i + 1, 0, min_end)))) }, ); - Box::new(bodies.flatten().in_current_span()) + Box::pin(bodies.flatten().in_current_span()) } } diff --git a/server/src/web/accept.rs b/server/src/web/accept.rs index 5959c20..0c33594 100644 --- a/server/src/web/accept.rs +++ b/server/src/web/accept.rs @@ -2,51 +2,40 @@ // Copyright (C) 2021 The Moonfire NVR Authors; see AUTHORS and LICENSE.txt. // SPDX-License-Identifier: GPL-v3.0-or-later WITH GPL-3.0-linking-exception. -//! Unified [`hyper::server::accept::Accept`] impl for TCP and Unix sockets. +//! Unified connection handling for TCP and Unix sockets. use std::pin::Pin; -use hyper::server::accept::Accept; - pub enum Listener { Tcp(tokio::net::TcpListener), Unix(tokio::net::UnixListener), } -impl Accept for Listener { - type Conn = Conn; - type Error = std::io::Error; - - fn poll_accept( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> { - match Pin::into_inner(self) { - Listener::Tcp(l) => Pin::new(l).poll_accept(cx)?.map(|(s, a)| { - if let Err(e) = s.set_nodelay(true) { - return Some(Err(e)); - } - Some(Ok(Conn { +impl Listener { + pub async fn accept(&mut self) -> std::io::Result { + match self { + Listener::Tcp(l) => { + let (s, a) = l.accept().await?; + s.set_nodelay(true)?; + Ok(Conn { stream: Stream::Tcp(s), data: ConnData { client_unix_uid: None, client_addr: Some(a), }, - })) - }), - Listener::Unix(l) => Pin::new(l).poll_accept(cx)?.map(|(s, _a)| { - let ucred = match s.peer_cred() { - Err(e) => return Some(Err(e)), - Ok(ucred) => ucred, - }; - Some(Ok(Conn { + }) + } + Listener::Unix(l) => { + let (s, _a) = l.accept().await?; + let ucred = s.peer_cred()?; + Ok(Conn { stream: Stream::Unix(s), data: ConnData { client_unix_uid: Some(nix::unistd::Uid::from_raw(ucred.uid())), client_addr: None, }, - })) - }), + }) + } } } } diff --git a/server/src/web/live.rs b/server/src/web/live.rs index 3a579e6..5043978 100644 --- a/server/src/web/live.rs +++ b/server/src/web/live.rs @@ -10,12 +10,12 @@ use base::{bail, err, Error}; use futures::SinkExt; use http::header; use tokio::sync::broadcast::error::RecvError; -use tokio_tungstenite::{tungstenite, WebSocketStream}; +use tokio_tungstenite::tungstenite; use uuid::Uuid; use crate::mp4; -use super::{Caller, Service}; +use super::{websocket::WebSocketStream, Caller, Service}; /// Interval at which to send keepalives if there are no frames. /// @@ -27,7 +27,7 @@ const KEEPALIVE_AFTER_IDLE: tokio::time::Duration = tokio::time::Duration::from_ impl Service { pub(super) async fn stream_live_m4s( self: Arc, - ws: &mut WebSocketStream, + ws: &mut WebSocketStream, caller: Result, uuid: Uuid, stream_type: db::StreamType, @@ -111,7 +111,7 @@ impl Service { &self, open_id: u32, stream_id: i32, - ws: &mut tokio_tungstenite::WebSocketStream, + ws: &mut WebSocketStream, live: db::LiveFrame, start_at_key: bool, ) -> Result { diff --git a/server/src/web/mod.rs b/server/src/web/mod.rs index 5dff1db..aa3205e 100644 --- a/server/src/web/mod.rs +++ b/server/src/web/mod.rs @@ -66,7 +66,10 @@ struct Caller { type ResponseResult = Result, base::Error>; -fn serve_json(req: &Request, out: &T) -> ResponseResult { +fn serve_json( + req: &R, + out: &T, +) -> ResponseResult { let (mut resp, writer) = http_serve::streaming_body(req).build(); resp.headers_mut().insert( header::CONTENT_TYPE, @@ -84,15 +87,14 @@ fn csrf_matches(csrf: &str, session: auth::SessionHash) -> bool { ::ring::constant_time::verify_slices_are_equal(&b64[..], csrf.as_bytes()).is_ok() } -/// Extracts `s` cookie from the HTTP request. Does not authenticate. -fn extract_sid(req: &Request) -> Option { - for hdr in req.headers().get_all(header::COOKIE) { +/// Extracts `s` cookie from the HTTP request headers. Does not authenticate. +fn extract_sid(req_hdrs: &http::HeaderMap) -> Option { + for hdr in req_hdrs.get_all(header::COOKIE) { for mut cookie in hdr.as_bytes().split(|&b| b == b';') { if cookie.starts_with(b" ") { cookie = &cookie[1..]; } - if cookie.starts_with(b"s=") { - let s = &cookie[2..]; + if let Some(s) = cookie.strip_prefix(b"s=") { if let Ok(s) = auth::RawSessionId::decode_base64(s) { return Some(s); } @@ -107,7 +109,9 @@ fn extract_sid(req: &Request) -> Option { /// This returns the request body as bytes rather than performing /// deserialization. Keeping the bytes allows the caller to use a `Deserialize` /// that borrows from the bytes. -async fn extract_json_body(req: &mut Request) -> Result { +async fn into_json_body( + req: Request, +) -> Result<(http::request::Parts, Bytes), base::Error> { let correct_mime_type = match req.headers().get(header::CONTENT_TYPE) { Some(t) if t == "application/json" => true, Some(t) if t == "application/json; charset=UTF-8" => true, @@ -119,10 +123,12 @@ async fn extract_json_body(req: &mut Request) -> Result>(body: &'a [u8]) -> Result { @@ -207,7 +213,7 @@ impl Service { /// as well as returning it to the HTTP client. async fn serve_inner( self: Arc, - req: Request<::hyper::Body>, + req: Request<::hyper::body::Incoming>, authreq: auth::Request, conn_data: ConnData, ) -> ResponseResult { @@ -310,7 +316,7 @@ impl Service { /// them to hyper as `Ok` results. pub async fn serve( self: Arc, - req: Request<::hyper::Body>, + req: Request<::hyper::body::Incoming>, conn_data: ConnData, ) -> Result, std::convert::Infallible> { let request_id = ulid::Ulid::new(); @@ -378,7 +384,7 @@ impl Service { Ok(response) } - fn top_level(&self, req: &Request<::hyper::Body>, caller: Caller) -> ResponseResult { + fn top_level(&self, req: &Request<::hyper::body::Incoming>, caller: Caller) -> ResponseResult { let mut days = false; let mut camera_configs = false; if let Some(q) = req.uri().query() { @@ -411,7 +417,7 @@ impl Service { ) } - fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> ResponseResult { + fn camera(&self, req: &Request<::hyper::body::Incoming>, uuid: Uuid) -> ResponseResult { let db = self.db.lock(); let camera = db .get_camera(uuid) @@ -424,7 +430,7 @@ impl Service { fn stream_recordings( &self, - req: &Request<::hyper::Body>, + req: &Request<::hyper::body::Incoming>, uuid: Uuid, type_: db::StreamType, ) -> ResponseResult { @@ -501,7 +507,12 @@ impl Service { serve_json(req, &out) } - fn init_segment(&self, id: i32, debug: bool, req: &Request<::hyper::Body>) -> ResponseResult { + fn init_segment( + &self, + id: i32, + debug: bool, + req: &Request<::hyper::body::Incoming>, + ) -> ResponseResult { let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment); let db = self.db.lock(); let Some(ent) = db.video_sample_entries_by_id().get(&id) else { @@ -520,7 +531,7 @@ impl Service { fn request( &self, - req: &Request<::hyper::Body>, + req: &Request<::hyper::body::Incoming>, authreq: &auth::Request, caller: Caller, ) -> ResponseResult { @@ -551,7 +562,7 @@ impl Service { host.as_deref(), &authreq.addr, agent.as_deref(), - self.is_secure(req), + self.is_secure(req.headers()), &caller, ), )) @@ -561,10 +572,9 @@ impl Service { /// Moonfire NVR currently doesn't directly serve `https`, but it supports /// proxies which set the `X-Forwarded-Proto` header. See `guide/secure.md` /// for more information. - fn is_secure(&self, req: &Request<::hyper::Body>) -> bool { + fn is_secure(&self, hdrs: &http::HeaderMap) -> bool { self.trust_forward_hdrs - && req - .headers() + && hdrs .get("X-Forwarded-Proto") .map(|v| v.as_bytes() == b"https") .unwrap_or(false) @@ -586,12 +596,12 @@ impl Service { /// performing. fn authenticate( &self, - req: &Request, + req: &Request, authreq: &auth::Request, conn_data: &ConnData, unauth_path: bool, ) -> Result { - if let Some(sid) = extract_sid(req) { + if let Some(sid) = extract_sid(req.headers()) { match self .db .lock() @@ -652,8 +662,9 @@ impl Service { #[cfg(test)] mod tests { use db::testutil::{self, TestDb}; - use futures::future::FutureExt; - use http::{header, Request}; + // use futures::future::FutureExt; + // use http::{header, Request}; + use http::header; use std::sync::Arc; pub(super) struct Server { @@ -679,36 +690,43 @@ mod tests { }) .unwrap(), ); - let make_svc = hyper::service::make_service_fn(move |_conn| { - futures::future::ok::<_, std::convert::Infallible>(hyper::service::service_fn({ - let s = Arc::clone(&service); - move |req| { - Arc::clone(&s).serve( - req, - super::accept::ConnData { - client_unix_uid: None, - client_addr: None, - }, - ) - } - })) - }); - let (tx, rx) = std::sync::mpsc::channel(); + let (addr_tx, addr_rx) = std::sync::mpsc::channel(); let handle = ::std::thread::spawn(move || { - let addr = ([127, 0, 0, 1], 0).into(); let rt = tokio::runtime::Runtime::new().unwrap(); - let srv = { - let _guard = rt.enter(); - hyper::server::Server::bind(&addr) - .tcp_nodelay(true) - .serve(make_svc) - }; - let addr = srv.local_addr(); // resolve port 0 to a real ephemeral port number. - tx.send(addr).unwrap(); - rt.block_on(srv.with_graceful_shutdown(shutdown_rx.map(|_| ()))) - .unwrap(); + let service = Arc::clone(&service); + rt.block_on(async move { + let addr = std::net::SocketAddr::from((std::net::Ipv4Addr::LOCALHOST, 0)); + let listener = tokio::net::TcpListener::bind(&addr).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let mut shutdown_rx = std::pin::pin!(shutdown_rx); + addr_tx.send(addr).unwrap(); + loop { + let (tcp, _) = tokio::select! { + r = listener.accept() => r.unwrap(), + _ = shutdown_rx.as_mut() => return, + }; + tcp.set_nodelay(true).unwrap(); + let io = hyper_util::rt::TokioIo::new(tcp); + let service = Arc::clone(&service); + let serve = move |req| { + Arc::clone(&service).serve( + req, + super::accept::ConnData { + client_unix_uid: None, + client_addr: None, + }, + ) + }; + tokio::task::spawn(async move { + hyper::server::conn::http1::Builder::new() + .serve_connection(io, hyper::service::service_fn(serve)) + .await + .unwrap(); + }); + } + }); }); - let addr = rx.recv().unwrap(); + let addr = addr_rx.recv().unwrap(); // Create a user. let mut c = db::UserChange::add_user("slamb".to_owned()); @@ -726,7 +744,7 @@ mod tests { impl Drop for Server { fn drop(&mut self) { - self.shutdown_tx.take().unwrap().send(()).unwrap(); + let _ = self.shutdown_tx.take().unwrap().send(()); self.handle.take().unwrap().join().unwrap() } } @@ -746,15 +764,15 @@ mod tests { #[test] fn test_extract_sid() { - let req = Request::builder() - .header(header::COOKIE, "foo=asdf; bar=asdf") - .header( - header::COOKIE, - "s=OsL6Cg4ikLw6UIXOT28tI+vPez3qWACovI+nLHWyjsW1ERX83qRrOR3guKedc8IP", - ) - .body(hyper::Body::empty()) - .unwrap(); - let sid = super::extract_sid(&req).unwrap(); + let mut hdrs = http::HeaderMap::new(); + hdrs.append(header::COOKIE, "foo=asdf; bar=asdf".parse().unwrap()); + hdrs.append( + header::COOKIE, + "s=OsL6Cg4ikLw6UIXOT28tI+vPez3qWACovI+nLHWyjsW1ERX83qRrOR3guKedc8IP" + .parse() + .unwrap(), + ); + let sid = super::extract_sid(&hdrs).unwrap(); assert_eq!(sid.as_ref(), &b":\xc2\xfa\n\x0e\"\x90\xbc:P\x85\xceOo-#\xeb\xcf{=\xeaX\x00\xa8\xbc\x8f\xa7,u\xb2\x8e\xc5\xb5\x11\x15\xfc\xde\xa4k9\x1d\xe0\xb8\xa7\x9ds\xc2\x0f"[..]); } } diff --git a/server/src/web/session.rs b/server/src/web/session.rs index 00b400e..5a8de79 100644 --- a/server/src/web/session.rs +++ b/server/src/web/session.rs @@ -13,15 +13,13 @@ use tracing::{info, warn}; use crate::{json, web::parse_json_body}; -use super::{ - csrf_matches, extract_json_body, extract_sid, plain_response, ResponseResult, Service, -}; +use super::{csrf_matches, extract_sid, into_json_body, plain_response, ResponseResult, Service}; use std::convert::TryFrom; impl Service { pub(super) async fn login( &self, - mut req: Request<::hyper::Body>, + req: Request<::hyper::body::Incoming>, authreq: auth::Request, ) -> ResponseResult { if *req.method() != Method::POST { @@ -30,9 +28,9 @@ impl Service { "POST expected", )); } - let r = extract_json_body(&mut req).await?; - let r: json::LoginRequest = parse_json_body(&r)?; - let Some(host) = req.headers().get(header::HOST) else { + let (parts, b) = into_json_body(req).await?; + let r: json::LoginRequest = parse_json_body(&b)?; + let Some(host) = parts.headers.get(header::HOST) else { bail!(InvalidArgument, msg("missing Host header")); }; let host = host.as_bytes(); @@ -45,7 +43,7 @@ impl Service { // If the request came in over https, tell the browser to only send the cookie on https // requests also. - let is_secure = self.is_secure(&req); + let is_secure = self.is_secure(&parts.headers); // Use SameSite=Lax rather than SameSite=Strict. Safari apparently doesn't send // SameSite=Strict cookies on WebSocket upgrade requests. There's no real security @@ -76,7 +74,7 @@ impl Service { pub(super) async fn logout( &self, - mut req: Request, + req: Request, authreq: auth::Request, ) -> ResponseResult { if *req.method() != Method::POST { @@ -85,11 +83,11 @@ impl Service { "POST expected", )); } - let r = extract_json_body(&mut req).await?; - let r: json::LogoutRequest = parse_json_body(&r)?; + let (parts, b) = into_json_body(req).await?; + let r: json::LogoutRequest = parse_json_body(&b)?; let mut res = Response::new(b""[..].into()); - if let Some(sid) = extract_sid(&req) { + if let Some(sid) = extract_sid(&parts.headers) { let mut l = self.db.lock(); let hash = sid.hash(); match l.authenticate_session(authreq.clone(), &hash) { diff --git a/server/src/web/signals.rs b/server/src/web/signals.rs index 4ce366c..cf42dcf 100644 --- a/server/src/web/signals.rs +++ b/server/src/web/signals.rs @@ -12,8 +12,8 @@ use url::form_urlencoded; use crate::json; use super::{ - extract_json_body, parse_json_body, plain_response, require_csrf_if_session, serve_json, - Caller, ResponseResult, Service, + into_json_body, parse_json_body, plain_response, require_csrf_if_session, serve_json, Caller, + ResponseResult, Service, }; use std::borrow::Borrow; @@ -21,7 +21,7 @@ use std::borrow::Borrow; impl Service { pub(super) async fn signals( &self, - req: Request, + req: Request, caller: Caller, ) -> ResponseResult { match *req.method() { @@ -34,12 +34,16 @@ impl Service { } } - async fn post_signals(&self, mut req: Request, caller: Caller) -> ResponseResult { + async fn post_signals( + &self, + req: Request, + caller: Caller, + ) -> ResponseResult { if !caller.permissions.update_signals { bail!(PermissionDenied, msg("update_signals required")); } - let r = extract_json_body(&mut req).await?; - let r: json::PostSignalsRequest = parse_json_body(&r)?; + let (parts, b) = into_json_body(req).await?; + let r: json::PostSignalsRequest = parse_json_body(&b)?; require_csrf_if_session(&caller, r.csrf)?; let now = recording::Time::new(self.db.clocks().realtime()); let mut l = self.db.lock(); @@ -52,10 +56,10 @@ impl Service { json::PostSignalsTimeBase::Now(d) => now + d, }; l.update_signals(start..end, &r.signal_ids, &r.states)?; - serve_json(&req, &json::PostSignalsResponse { time_90k: now }) + serve_json(&parts, &json::PostSignalsResponse { time_90k: now }) } - fn get_signals(&self, req: &Request) -> ResponseResult { + fn get_signals(&self, req: &Request) -> ResponseResult { let mut time = recording::Time::MIN..recording::Time::MAX; if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { diff --git a/server/src/web/static_file.rs b/server/src/web/static_file.rs index 00b9f68..e402964 100644 --- a/server/src/web/static_file.rs +++ b/server/src/web/static_file.rs @@ -49,7 +49,7 @@ impl Ui { async fn serve( &self, path: &str, - req: &Request, + req: &Request, cache_control: &'static str, content_type: &'static str, ) -> ResponseResult { @@ -89,7 +89,7 @@ impl Ui { impl Service { /// Serves a static file if possible. - pub(super) async fn static_file(&self, req: Request) -> ResponseResult { + pub(super) async fn static_file(&self, req: Request) -> ResponseResult { let Some(static_req) = StaticFileRequest::parse(req.uri().path()) else { bail!(NotFound, msg("static file not found")); }; diff --git a/server/src/web/users.rs b/server/src/web/users.rs index 79b30d8..3c05709 100644 --- a/server/src/web/users.rs +++ b/server/src/web/users.rs @@ -10,12 +10,16 @@ use http::{Method, Request, StatusCode}; use crate::json::{self, PutUsersResponse, UserSubset, UserWithId}; use super::{ - extract_json_body, parse_json_body, plain_response, require_csrf_if_session, serve_json, - Caller, ResponseResult, Service, + into_json_body, parse_json_body, plain_response, require_csrf_if_session, serve_json, Caller, + ResponseResult, Service, }; impl Service { - pub(super) async fn users(&self, req: Request, caller: Caller) -> ResponseResult { + pub(super) async fn users( + &self, + req: Request, + caller: Caller, + ) -> ResponseResult { match *req.method() { Method::GET | Method::HEAD => self.get_users(req, caller).await, Method::POST => self.post_users(req, caller).await, @@ -26,7 +30,11 @@ impl Service { } } - async fn get_users(&self, req: Request, caller: Caller) -> ResponseResult { + async fn get_users( + &self, + req: Request, + caller: Caller, + ) -> ResponseResult { if !caller.permissions.admin_users { bail!(Unauthenticated, msg("must have admin_users permission")); } @@ -42,12 +50,16 @@ impl Service { serve_json(&req, &json::GetUsersResponse { users }) } - async fn post_users(&self, mut req: Request, caller: Caller) -> ResponseResult { + async fn post_users( + &self, + req: Request, + caller: Caller, + ) -> ResponseResult { if !caller.permissions.admin_users { bail!(Unauthenticated, msg("must have admin_users permission")); } - let r = extract_json_body(&mut req).await?; - let mut r: json::PutUsers = parse_json_body(&r)?; + let (parts, b) = into_json_body(req).await?; + let mut r: json::PutUsers = parse_json_body(&b)?; require_csrf_if_session(&caller, r.csrf)?; let username = r .user @@ -69,12 +81,12 @@ impl Service { } let mut l = self.db.lock(); let user = l.apply_user_change(change)?; - serve_json(&req, &PutUsersResponse { id: user.id }) + serve_json(&parts, &PutUsersResponse { id: user.id }) } pub(super) async fn user( &self, - req: Request, + req: Request, caller: Caller, id: i32, ) -> ResponseResult { @@ -89,7 +101,12 @@ impl Service { } } - async fn get_user(&self, req: Request, caller: Caller, id: i32) -> ResponseResult { + async fn get_user( + &self, + req: Request, + caller: Caller, + id: i32, + ) -> ResponseResult { require_same_or_admin(&caller, id)?; let db = self.db.lock(); let user = db @@ -101,15 +118,15 @@ impl Service { async fn delete_user( &self, - mut req: Request, + req: Request, caller: Caller, id: i32, ) -> ResponseResult { if !caller.permissions.admin_users { bail!(Unauthenticated, msg("must have admin_users permission")); } - let r = extract_json_body(&mut req).await?; - let r: json::DeleteUser = parse_json_body(&r)?; + let (_parts, b) = into_json_body(req).await?; + let r: json::DeleteUser = parse_json_body(&b)?; require_csrf_if_session(&caller, r.csrf)?; let mut l = self.db.lock(); l.delete_user(id)?; @@ -118,13 +135,13 @@ impl Service { async fn patch_user( &self, - mut req: Request, + req: Request, caller: Caller, id: i32, ) -> ResponseResult { require_same_or_admin(&caller, id)?; - let r = extract_json_body(&mut req).await?; - let r: json::PostUser = parse_json_body(&r)?; + let (_parts, b) = into_json_body(req).await?; + let r: json::PostUser = parse_json_body(&b)?; let mut db = self.db.lock(); let user = db .get_user_by_id_mut(id) diff --git a/server/src/web/view.rs b/server/src/web/view.rs index ff327da..5de6b4f 100644 --- a/server/src/web/view.rs +++ b/server/src/web/view.rs @@ -28,7 +28,7 @@ use super::{Caller, ResponseResult, Service}; impl Service { pub(super) fn stream_view_mp4( &self, - req: &Request<::hyper::Body>, + req: &Request<::hyper::body::Incoming>, caller: Caller, uuid: Uuid, stream_type: db::StreamType, diff --git a/server/src/web/websocket.rs b/server/src/web/websocket.rs index 4e2b7b3..50424ca 100644 --- a/server/src/web/websocket.rs +++ b/server/src/web/websocket.rs @@ -11,20 +11,23 @@ use crate::body::Body; use base::{bail, err}; use futures::{Future, SinkExt}; use http::{header, Request, Response}; -use tokio_tungstenite::{tungstenite, WebSocketStream}; +use tokio_tungstenite::tungstenite; use tracing::Instrument; +pub type WebSocketStream = + tokio_tungstenite::WebSocketStream>; + /// Upgrades to WebSocket and runs the supplied stream handler in a separate tokio task. /// /// Fails on `Origin` mismatch with an HTTP-level error. If the handler returns /// an error, tries to send it to the client before dropping the stream. pub(super) fn upgrade( - req: Request<::hyper::Body>, + req: Request<::hyper::body::Incoming>, handler: H, ) -> Result, base::Error> where for<'a> H: FnOnce( - &'a mut WebSocketStream, + &'a mut WebSocketStream, ) -> Pin> + Send + 'a>> + Send + 'static, @@ -35,10 +38,8 @@ where check_origin(req.headers())?; // Otherwise, upgrade and handle the rest in a separate task. - let response = - tungstenite::handshake::server::create_response_with_body(&req, hyper::Body::empty) - .map_err(|e| err!(InvalidArgument, source(e)))?; - let (parts, _) = response.into_parts(); + let response = tungstenite::handshake::server::create_response_with_body(&req, Body::empty) + .map_err(|e| err!(InvalidArgument, source(e)))?; let span = tracing::info_span!("websocket"); tokio::spawn( async move { @@ -49,7 +50,9 @@ where return; } }; - let mut ws = tokio_tungstenite::WebSocketStream::from_raw_socket( + let upgraded = hyper_util::rt::TokioIo::new(upgraded); + + let mut ws = WebSocketStream::from_raw_socket( upgraded, tungstenite::protocol::Role::Server, None, @@ -66,7 +69,7 @@ where } .instrument(span), ); - Ok(Response::from_parts(parts, Body::from(""))) + Ok(response) } /// Checks the `Host` and `Origin` headers match, if the latter is supplied.