From 955a0a8c155f17be2d56c48adc86358c59ceb97a Mon Sep 17 00:00:00 2001 From: Scott Lamb Date: Wed, 29 Aug 2018 22:26:19 -0700 Subject: [PATCH] upgrade to hyper 0.12.x Just one (intentional) functional change---now the streamers start shutting down while the webserver shuts down gracefully. --- Cargo.lock | 188 ++++++++++++++++++++++++++++++------------------ Cargo.toml | 14 ++-- src/body.rs | 100 ++++++++++++++++++++++++++ src/cmds/run.rs | 31 ++++---- src/main.rs | 4 +- src/mp4.rs | 131 +++++++++++++++++---------------- src/slices.rs | 18 +++-- src/web.rs | 110 ++++++++++++++-------------- 8 files changed, 376 insertions(+), 220 deletions(-) create mode 100644 src/body.rs diff --git a/Cargo.lock b/Cargo.lock index 7176695..f8837a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -330,6 +330,23 @@ dependencies = [ "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "h2" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.2.4 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "indexmap 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "string 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "http" version = "0.1.10" @@ -342,14 +359,16 @@ dependencies = [ [[package]] name = "http-serve" -version = "0.0.1" -source = "git+https://github.com/scottlamb/http-serve?rev=b361ee#b361ee6dd5afc6a30eaf1d54be072310a1f04778" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ + "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "flate2 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", + "httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)", "mime 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", @@ -361,6 +380,11 @@ name = "httparse" version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "httpdate" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "hyper" version = "0.11.27" @@ -370,7 +394,6 @@ dependencies = [ "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "httparse 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "language-tags 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", @@ -382,12 +405,37 @@ dependencies = [ "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hyper" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.9 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "h2 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", + "httparse 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "itoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", + "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", + "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", + "want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "hyper-tls" version = "0.1.4" @@ -412,6 +460,11 @@ dependencies = [ "unicode-normalization 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "indexmap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "iovec" version = "0.1.2" @@ -497,14 +550,6 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "log" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "log" version = "0.4.4" @@ -676,13 +721,12 @@ dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "http-serve 0.0.1 (git+https://github.com/scottlamb/http-serve?rev=b361ee)", - "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", + "http-serve 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "memmap 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", - "mime 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "moonfire-base 0.0.1", "moonfire-db 0.0.1", "moonfire-ffmpeg 0.0.1", @@ -699,8 +743,8 @@ dependencies = [ "smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.40 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-signal 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -953,16 +997,6 @@ dependencies = [ "proc-macro2 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rand" -version = "0.3.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "rand" version = "0.4.3" @@ -1081,6 +1115,14 @@ name = "rustc-demangle" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "ryu" version = "0.2.5" @@ -1130,6 +1172,19 @@ dependencies = [ "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "serde" version = "1.0.75" @@ -1180,21 +1235,11 @@ name = "siphasher" version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "slab" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "slab" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "smallvec" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "smallvec" version = "0.6.5" @@ -1208,6 +1253,11 @@ name = "stable_deref_trait" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "string" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "strsim" version = "0.7.0" @@ -1234,11 +1284,6 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "take" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "tempdir" version = "0.3.7" @@ -1361,23 +1406,6 @@ dependencies = [ "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-proto" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", - "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-reactor" version = "0.1.4" @@ -1405,15 +1433,16 @@ dependencies = [ [[package]] name = "tokio-signal" -version = "0.1.5" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.43 (registry+https://github.com/rust-lang/crates.io-index)", "mio 0.6.15 (registry+https://github.com/rust-lang/crates.io-index)", "mio-uds 0.6.6 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-core 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-reactor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -1509,6 +1538,11 @@ name = "try-lock" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "try-lock" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "ucd-util" version = "0.1.1" @@ -1616,6 +1650,16 @@ dependencies = [ "try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "want" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", + "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "winapi" version = "0.2.8" @@ -1701,12 +1745,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" "checksum futures 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)" = "884dbe32a6ae4cd7da5c6db9b78114449df9953b8d490c9d7e1b51720b922c62" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" +"checksum h2 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "a27e7ed946e8335bdf9a191bc1b9b14a03ba822d013d2f58437f4fabcbd7fc2c" "checksum http 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "dca621d0fa606a5ff2850b6e337b57ad6137ee4d67e940449643ff45af6874c6" -"checksum http-serve 0.0.1 (git+https://github.com/scottlamb/http-serve?rev=b361ee)" = "" +"checksum http-serve 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "393ee8e71920168e84fa80efcd300bf90435118eb037f674aae92e28f82ec5ef" "checksum httparse 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7b6288d7db100340ca12873fd4d08ad1b8f206a9457798dfb17c018a33fee540" +"checksum httpdate 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" "checksum hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)" = "34a590ca09d341e94cddf8e5af0bbccde205d5fbc2fa3c09dd67c7f85cea59d7" +"checksum hyper 0.12.9 (registry+https://github.com/rust-lang/crates.io-index)" = "081289d17dce471c8cbc0e69a3dd073b627e08338561d1167ab620b754d9fe90" "checksum hyper-tls 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "ffb1bd5e518d3065840ab315dbbf44e4420e5f7d80e2cb93fa6ffffc50522378" "checksum idna 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +"checksum indexmap 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "08173ba1e906efb6538785a8844dd496f5d34f0a2d88038e95195172fc667220" "checksum iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dbe6e417e7d0975db6512b90796e8ce223145ac4e33c377e4a42882a0e88bb08" "checksum itoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "5adb58558dcd1d786b5f0bd15f3226ee23486e24b7b58304b60f64dc68e62606" "checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" @@ -1719,7 +1767,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum libsqlite3-sys 0.9.3 (registry+https://github.com/rust-lang/crates.io-index)" = "d3711dfd91a1081d2458ad2d06ea30a8755256e74038be2ad927d94e1c955ca8" "checksum linked-hash-map 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7860ec297f7008ff7a1e3382d7f7e1dcd69efc94751a2284bafc3d013c2aa939" "checksum lock_api 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "949826a5ccf18c1b3a7c3d57692778d21768b79e46eb9dd07bfc4c2160036c54" -"checksum log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" "checksum log 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cba860f648db8e6f269df990180c2217f333472b4a6e901e97446858487971e2" "checksum lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4d06ff7ff06f729ce5f4e227876cb88d10bc59cd4ae1e09fbb2bde15c850dc21" "checksum maplit 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "08cbb6b4fef96b6d77bfc40ec491b1690c779e77b05cd9f07f787ed376fd4c43" @@ -1761,7 +1808,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum proc-macro2 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "ee5697238f0d893c7f0ecc59c0999f18d2af85e424de441178bcacc9f9e6cf67" "checksum protobuf 2.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "128a4f37a2df739a567a8685b17f54aa19b9c3c4a6a5b8731e97a419b3db451c" "checksum quote 0.6.8 (registry+https://github.com/rust-lang/crates.io-index)" = "dd636425967c33af890042c483632d33fa7a18f19ad1d7ea72e8998c6ef8dea5" -"checksum rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)" = "15a732abf9d20f0ad8eeb6f909bf6868722d9a06e1e50802b6a70351f40b4eb1" "checksum rand 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "8356f47b32624fef5b3301c1be97e5944ecdd595409cc5da11d05f211db6cfbd" "checksum rand 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e464cd887e869cddcae8792a4ee31d23c7edd516700695608f5b98c67ee0131c" "checksum rand_core 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "edecf0f94da5551fc9b492093e30b041a891657db7940ee221f9d2f66e82eef2" @@ -1775,6 +1821,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum reqwest 0.8.8 (registry+https://github.com/rust-lang/crates.io-index)" = "738769ec83daf6c1929dc9dae7d69ed3779b55ae5c356e989dcd3aa677d8486e" "checksum rusqlite 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c9d9118f1ce84d8d0b67f9779936432fb42bb620cef2122409d786892cce9a3c" "checksum rustc-demangle 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "bcfe5b13211b4d78e5c2cadfebd7769197d95c639c35a50057eb4c05de811395" +"checksum rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" "checksum ryu 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e7c066b8e2923f05d4718a06d2622f189ff362bc642bfade6c6629f0440f3827" "checksum safemem 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e27a8b19b835f7aea908818e871f5cc3a5a186550c30773be987e155e8163d8f" "checksum schannel 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "dc1fabf2a7b6483a141426e1afd09ad543520a77ac49bd03c286e7696ccfd77f" @@ -1782,21 +1829,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "94258f53601af11e6a49f722422f6e3425c52b06245a5cf9bc09908b174f5e27" "checksum security-framework 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "dfa44ee9c54ce5eecc9de7d5acbad112ee58755239381f687e564004ba4a2332" "checksum security-framework-sys 0.1.16 (registry+https://github.com/rust-lang/crates.io-index)" = "5421621e836278a0b139268f36eee0dc7e389b784dc3f79d8f11aabadf41bead" +"checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +"checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" "checksum serde 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)" = "22d340507cea0b7e6632900a176101fea959c7065d93ba555072da90aaaafc87" "checksum serde_derive 1.0.75 (registry+https://github.com/rust-lang/crates.io-index)" = "234fc8b737737b148ccd625175fc6390f5e4dacfdaa543cb93a3430d984a9119" "checksum serde_json 1.0.26 (registry+https://github.com/rust-lang/crates.io-index)" = "44dd2cfde475037451fa99b7e5df77aa3cfd1536575fa8e7a538ab36dcde49ae" "checksum serde_urlencoded 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "aaed41d9fb1e2f587201b863356590c90c1157495d811430a0c0325fe8169650" "checksum signal-hook 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "bfc9a251608ddaef559cd5b08b539cbda8c36d0efe0506f9a864765d75dbd665" "checksum siphasher 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac" -"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum slab 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5f9776d6b986f77b35c6cf846c11ad986ff128fe0b2b63a3628e3755e8d3102d" -"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" "checksum smallvec 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "153ffa32fd170e9944f7e0838edf824a754ec4c1fc64746fcc9fe1f8fa602e5d" "checksum stable_deref_trait 1.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "dba1a27d3efae4351c8051072d619e3ade2820635c3958d826bfea39d59b54c8" +"checksum string 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "00caf261d6f90f588f8450b8e1230fa0d5be49ee6140fdfbcb55335aff350970" "checksum strsim 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb4f380125926a99e52bc279241539c018323fab05ad6368b56f93d9369ff550" "checksum syn 0.14.9 (registry+https://github.com/rust-lang/crates.io-index)" = "261ae9ecaa397c42b960649561949d69311f08eeaea86a65696e6e46517cf741" "checksum synstructure 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "85bb9b7550d063ea184027c9b8c20ac167cd36d3e06b3a40bceb9d746dc1a7b7" -"checksum take 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b157868d8ac1f56b64604539990685fa7611d8fa9e5476cf0c02cf34d32917c5" "checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum term_size 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9e5b9a66db815dcfd2da92db471106457082577c3c278d4138ab3e3b4e189327" "checksum thread_local 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)" = "c6b53e329000edc2b34dbe8545fd20e55a333362d0a321909685a19bd28c3f1b" @@ -1808,10 +1855,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-executor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "84823b932d566bc3c6aa644df4ca36cb38593c50b7db06011fd4e12e31e4047e" "checksum tokio-fs 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "b5cbe4ca6e71cb0b62a66e4e6f53a8c06a6eefe46cc5f665ad6f274c9906f135" "checksum tokio-io 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8d6cc2de7725863c86ac71b0b9068476fec50834f055a243558ef1655bbd34cb" -"checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389" "checksum tokio-reactor 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "df6a7ea7d65e0fc1398de28959de8be96909986a7d2e01d4f86d3433dfb91aed" "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162" -"checksum tokio-signal 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f46863230f9a05cf52d173721ec391b9c5782a2465f593029922b8782b9ffe" +"checksum tokio-signal 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "527342552ec4a6049f787ccc9e2d6e0eab77bfe6cb7ec7a05c0391e370f466c3" "checksum tokio-tcp 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5b4c329b47f071eb8a746040465fa751bd95e4716e98daef6a9b4e434c17d565" "checksum tokio-threadpool 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a5758cecb6e0633cea5d563ac07c975e04961690b946b04fd84e7d6445a8f6af" "checksum tokio-timer 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "d03fa701f9578a01b7014f106b47f0a363b4727a7f3f75d666e312ab7acbbf1c" @@ -1820,6 +1866,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-uds 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "424c1ed15a0132251813ccea50640b224c809d6ceafb88154c1a8775873a0e89" "checksum toml 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "a0263c6c02c4db6c8f7681f9fd35e90de799ebd4cfdeab77a38f4ff6b3d8c0d9" "checksum try-lock 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee2aa4715743892880f70885373966c83d73ef1b0838a664ef0c76fffd35e7c2" +"checksum try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e604eb7b43c06650e854be16a2a03155743d3752dd1c943f6829e26b7a36e382" "checksum ucd-util 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fd2be2d6639d0f8fe6cdda291ad456e23629558d466e2789d2c3e9892bda285d" "checksum unicase 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" "checksum unicase 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "284b6d3db520d67fbe88fd778c21510d1b0ba4a551e5d0fbb023d33405f6de8a" @@ -1836,6 +1883,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum version_check 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7716c242968ee87e5542f8021178248f267f295a5c4803beae8b8b7fd9bc6051" "checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum want 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "a05d9d966753fa4b5c8db73fcab5eed4549cfe0e1e4e66911e5564a0085c35d1" +"checksum want 0.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "797464475f30ddb8830cc529aaaae648d581f99e2036a928877dfde027ddf6b3" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" "checksum winapi 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "773ef9dcc5f24b7d850d0ff101e542ff24c3b090a9768e03ff889fdef41f00fd" "checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" diff --git a/Cargo.toml b/Cargo.toml index 4b77085..3a32a91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,13 +25,12 @@ futures = "0.1" futures-cpupool = "0.1" fnv = "1.0" http = "0.1.5" -http-serve = { git = "https://github.com/scottlamb/http-serve", rev = "b361ee" } -hyper = "0.11.25" +http-serve = "0.1.0" +hyper = "0.12.9" lazy_static = "1.0" libc = "0.2" log = { version = "0.4", features = ["release_max_level_info"] } memmap = "0.6" -mime = "0.3" moonfire-base = { path = "base" } moonfire-db = { path = "db" } moonfire-ffmpeg = { path = "ffmpeg" } @@ -46,8 +45,8 @@ serde_derive = "1.0" serde_json = "1.0" smallvec = "0.6" time = "0.1" -tokio-core = "0.1" -tokio-signal = "0.1" +tokio = "0.1.8" +tokio-signal = "0.2" url = "1.4" uuid = { version = "0.6", features = ["serde", "std", "v4"] } @@ -65,8 +64,3 @@ debug = true [profile.bench] debug = true - -[replace] - -# This hyper fork has a patch to disable Nagle's algorithm. -"hyper:0.11.16" = { git = "https://github.com/scottlamb/hyper", branch = "moonfire-on-0.11.x" } diff --git a/src/body.rs b/src/body.rs new file mode 100644 index 0000000..d623c1c --- /dev/null +++ b/src/body.rs @@ -0,0 +1,100 @@ +// This file is part of Moonfire NVR, a security camera network video recorder. +// Copyright (C) 2018 Scott Lamb +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// In addition, as a special exception, the copyright holders give +// permission to link the code of portions of this program with the +// OpenSSL library under certain conditions as described in each +// individual source file, and distribute linked combinations including +// the two. +// +// You must obey the GNU General Public License in all respects for all +// of the code used other than OpenSSL. If you modify file(s) with this +// exception, you may extend this exception to your version of the +// file(s), but you are not obligated to do so. If you do not wish to do +// so, delete this exception statement from your version. If you delete +// this exception statement from all source files in the program, then +// also delete it here. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Tools for implementing a `http_serve::Entity` body composed from many "slices". + +use failure::Error; +use futures::{Stream, stream}; +use hyper::body::Payload; +use reffers::ARefs; +use std::error::Error as StdError; + +pub struct Chunk(ARefs<'static, [u8]>); + +//pub type CompatError = ::failure::Compat; +pub type BoxedError = Box; +pub type BodyStream = Box + Send + 'static>; + +pub fn wrap_error(e: Error) -> BoxedError { + Box::new(e.compat()) +} + +impl From> for Chunk { + fn from(r: ARefs<'static, [u8]>) -> Self { Chunk(r) } +} + +impl From<&'static [u8]> for Chunk { + fn from(r: &'static [u8]) -> Self { Chunk(ARefs::new(r)) } +} + +impl From> for Chunk { + fn from(r: Vec) -> Self { Chunk(ARefs::new(r).map(|v| &v[..])) } +} + +impl ::bytes::Buf for Chunk { + fn remaining(&self) -> usize { self.0.len() } + fn bytes(&self) -> &[u8] { &*self.0 } + fn advance(&mut self, cnt: usize) { + self.0 = ::std::mem::replace(&mut self.0, ARefs::new(&[][..])).map(|b| &b[cnt..]); + } +} + +pub struct Body(BodyStream); + +impl Payload for Body { + type Data = Chunk; + type Error = BoxedError; + + fn poll_data(&mut self) -> ::futures::Poll, Self::Error> { + self.0.poll() + } +} + +impl From for Body { + fn from(b: BodyStream) -> Self { Body(b) } +} + +impl From<&'static [u8]> for Body { + fn from(c: &'static [u8]) -> Self { + Body(Box::new(stream::once(Ok(c.into())))) + } +} + +impl From for Body { + fn from(e: Error) -> Self { + Body(Box::new(stream::once(Err(wrap_error(e))))) + } +} + +//impl> From for Body { +// fn from(c: C) -> Self { +// Body(Box::new(stream::once(Ok(c.into())))) +// } +//} diff --git a/src/cmds/run.rs b/src/cmds/run.rs index c1183b8..d9ca10d 100644 --- a/src/cmds/run.rs +++ b/src/cmds/run.rs @@ -33,12 +33,13 @@ use db::{self, dir, writer}; use failure::Error; use fnv::FnvHashMap; use futures::{Future, Stream}; +use std::error::Error as StdError; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use stream; use streamer; -use tokio_core::reactor; +use tokio; use tokio_signal::unix::{Signal, SIGINT, SIGTERM}; use web; @@ -78,12 +79,12 @@ struct Args { flag_allow_origin: Option, } -fn setup_shutdown_future(h: &reactor::Handle) -> Box> { - let int = Signal::new(SIGINT, h).flatten_stream().into_future(); - let term = Signal::new(SIGTERM, h).flatten_stream().into_future(); - Box::new(int.select(term) - .map(|_| ()) - .map_err(|_| ())) +fn setup_shutdown() -> impl Future + Send { + let int = Signal::new(SIGINT).flatten_stream().into_future(); + let term = Signal::new(SIGTERM).flatten_stream().into_future(); + int.select(term) + .map(|_| ()) + .map_err(|_| ()) } fn resolve_zone() -> String { @@ -193,14 +194,18 @@ pub fn run() -> Result<(), Error> { // Start the web interface. let addr = args.flag_http_addr.parse().unwrap(); - let server = ::hyper::server::Http::new() - .bind(&addr, move || Ok(s.clone())) - .unwrap(); + let server = ::hyper::server::Server::bind(&addr).tcp_nodelay(true).serve( + move || Ok::<_, Box>(s.clone())); - let shutdown = setup_shutdown_future(&server.handle()); + let shutdown = setup_shutdown().shared(); info!("Ready to serve HTTP requests"); - server.run_until(shutdown).unwrap(); + let reactor = ::std::thread::spawn({ + let shutdown = shutdown.clone(); + || tokio::run(server.with_graceful_shutdown(shutdown.map(|_| ())) + .map_err(|e| error!("hyper error: {}", e))) + }); + shutdown.wait().unwrap(); info!("Shutting down streamers."); shutdown_streamers.store(true, Ordering::SeqCst); @@ -218,6 +223,8 @@ pub fn run() -> Result<(), Error> { } } + info!("Waiting for HTTP requests to finish."); + reactor.join().unwrap(); info!("Exiting."); Ok(()) } diff --git a/src/main.rs b/src/main.rs index 182fc4a..6854831 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,7 +47,6 @@ extern crate libc; extern crate reffers; extern crate rusqlite; extern crate memmap; -extern crate mime; extern crate moonfire_base as base; extern crate moonfire_db as db; extern crate moonfire_ffmpeg; @@ -60,13 +59,14 @@ extern crate serde; extern crate serde_json; extern crate smallvec; extern crate time; -extern crate tokio_core; +extern crate tokio; extern crate tokio_signal; extern crate url; extern crate uuid; use base::clock as clock; +mod body; mod cmds; mod h264; mod json; diff --git a/src/mp4.rs b/src/mp4.rs index f84c023..f0b9fce 100644 --- a/src/mp4.rs +++ b/src/mp4.rs @@ -79,20 +79,22 @@ extern crate time; use base::strutil; -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use body::{Chunk, BoxedError, wrap_error}; use db::recording::{self, TIME_UNITS_PER_SEC}; use db::{self, dir}; use failure::Error; +use futures::Stream; use futures::stream; use http; +use http::header::HeaderValue; use http_serve; -use hyper::header; use memmap; use openssl::hash; use parking_lot::{Once, ONCE_INIT}; use reffers::ARefs; -use slices::{self, Body, Chunk, Slices}; +use slices::{self, Slices}; use smallvec::SmallVec; use std::cell::UnsafeCell; use std::cmp; @@ -101,6 +103,7 @@ use std::io; use std::ops::Range; use std::mem; use std::sync::Arc; +use std::time::SystemTime; /// This value should be incremented any time a change is made to this file that causes different /// bytes to be output for a particular set of `Mp4Builder` options. Incrementing this value will @@ -615,7 +618,7 @@ impl Slice { let mp4 = ARefs::new(mp4.0.clone()); let r = r.start as usize .. r.end as usize; let p = self.p(); - mp4.try_map(|mp4| Ok(&mp4.segments[p].get_index(&mp4.db, f)?[r])) + Ok(mp4.try_map(|mp4| Ok::<_, Error>(&mp4.segments[p].get_index(&mp4.db, f)?[r]))?.into()) } fn wrap_truns(&self, mp4: &File, r: Range, len: usize) -> Result { @@ -629,16 +632,17 @@ impl Slice { mp4.0.db.lock() .with_recording_playback(s.s.id, &mut |playback| s.truns(playback, pos, len))?; let truns = ARefs::new(truns); - Ok(truns.map(|t| &t[r.start as usize .. r.end as usize])) + Ok(truns.map(|t| &t[r.start as usize .. r.end as usize]).into()) } } impl slices::Slice for Slice { type Ctx = File; - type Chunk = slices::Chunk; + type Chunk = Chunk; fn end(&self) -> u64 { return self.0 & 0xFF_FF_FF_FF_FF } - fn get_range(&self, f: &File, range: Range, len: u64) -> Body { + fn get_range(&self, f: &File, range: Range, len: u64) + -> Box + Send> { trace!("getting mp4 slice {:?}'s range {:?} / {}", self, range, len); let p = self.p(); let res = match self.t() { @@ -649,12 +653,12 @@ impl slices::Slice for Slice { }, SliceType::Buf => { let r = ARefs::new(f.0.clone()); - Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize])) + Ok(r.map(|f| &f.buf[p+range.start as usize .. p+range.end as usize]).into()) }, SliceType::VideoSampleEntry => { let r = ARefs::new(f.0.clone()); Ok(r.map(|f| &f.video_sample_entries[p] - .data[range.start as usize .. range.end as usize])) + .data[range.start as usize .. range.end as usize]).into()) }, SliceType::Stts => self.wrap_index(f, range.clone(), &Segment::stts), SliceType::Stsz => self.wrap_index(f, range.clone(), &Segment::stsz), @@ -665,15 +669,12 @@ impl slices::Slice for Slice { SliceType::Truns => self.wrap_truns(f, range.clone(), len as usize), }; Box::new(stream::once(res - .map_err(|e| { - error!("Error producing {:?}: {:?}", self, e); - ::hyper::Error::Incomplete - }) + .map_err(|e| wrap_error(e)) .and_then(move |c| { - if c.len() != (range.end - range.start) as usize { - error!("Error producing {:?}: range {:?} produced incorrect len {}.", - self, range, c.len()); - return Err(::hyper::Error::Incomplete); + if c.remaining() != (range.end - range.start) as usize { + return Err(wrap_error(format_err!( + "Error producing {:?}: range {:?} produced incorrect len {}.", + self, range, c.remaining()))); } Ok(c) }))) @@ -868,8 +869,8 @@ impl FileBuilder { } debug!("segments: {:#?}", self.segments); debug!("slices: {:?}", self.body.slices); - let mtime = ::std::time::UNIX_EPOCH + - ::std::time::Duration::from_secs(max_end as u64); + let last_modified = ::std::time::UNIX_EPOCH + + ::std::time::Duration::from_secs(max_end as u64); Ok(File(Arc::new(FileInner { db, dirs_by_stream_id, @@ -878,8 +879,9 @@ impl FileBuilder { buf: self.body.buf, video_sample_entries: self.video_sample_entries, initial_sample_byte_pos, - last_modified: mtime.into(), - etag: header::EntityTag::strong(strutil::hex(&etag.finish()?)), + last_modified, + etag: HeaderValue::from_str(&format!("\"{}\"", &strutil::hex(&etag.finish()?))) + .expect("hex string should be valid UTF-8"), }))) } @@ -1420,8 +1422,8 @@ struct FileInner { buf: Vec, video_sample_entries: SmallVec<[Arc; 1]>, initial_sample_byte_pos: u64, - last_modified: header::HttpDate, - etag: header::EntityTag, + last_modified: SystemTime, + etag: HeaderValue, } impl FileInner { @@ -1433,7 +1435,7 @@ impl FileInner { let r = s.s.sample_file_range(); pos += r.end - r.start; } - Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize])) + Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into()) } /// Gets a `Chunk` of video sample data from disk. @@ -1459,7 +1461,7 @@ impl FileInner { .map(&f)? }); use core::ops::Deref; - Ok(ARefs::new(mmap).map(|m| m.deref())) + Ok(ARefs::new(mmap).map(|m| m.deref()).into()) } fn get_subtitle_sample_data(&self, i: usize, r: Range, l: u64) -> Result { @@ -1475,7 +1477,7 @@ impl FileInner { use std::io::Write; write!(v, "{}", tm.strftime(SUBTITLE_TEMPLATE)?)?; } - Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize])) + Ok(ARefs::new(v).map(|v| &v[r.start as usize .. r.end as usize]).into()) } } @@ -1483,8 +1485,8 @@ impl FileInner { pub struct File(Arc); impl http_serve::Entity for File { - type Chunk = slices::Chunk; - type Body = slices::Body; + type Data = Chunk; + type Error = BoxedError; fn add_headers(&self, hdrs: &mut http::header::HeaderMap) { let mut mime = BytesMut::with_capacity(64); @@ -1502,10 +1504,13 @@ impl http_serve::Entity for File { hdrs.insert(http::header::CONTENT_TYPE, http::header::HeaderValue::from_shared(mime.freeze()).unwrap()); } - fn last_modified(&self) -> Option { Some(self.0.last_modified) } - fn etag(&self) -> Option { Some(self.0.etag.clone()) } + fn last_modified(&self) -> Option { Some(self.0.last_modified) } + fn etag(&self) -> Option { Some(self.0.etag.clone()) } fn len(&self) -> u64 { self.0.slices.len() } - fn get_range(&self, range: Range) -> Body { self.0.slices.get_range(self, range) } + fn get_range(&self, range: Range) + -> Box + Send> { + self.0.slices.get_range(self, range) + } } /// Tests. There are two general strategies used to validate the resulting files: @@ -1521,6 +1526,7 @@ impl http_serve::Entity for File { #[cfg(test)] mod tests { use base::strutil; + use bytes::Buf; use byteorder::{BigEndian, ByteOrder}; use clock::RealClocks; use db::recording::{self, TIME_UNITS_PER_SEC}; @@ -1528,7 +1534,6 @@ mod tests { use db::writer; use futures::Future; use futures::Stream as FuturesStream; - use hyper::header; use openssl::hash; use http_serve::{self, Entity}; use std::fs; @@ -1538,26 +1543,28 @@ mod tests { use super::*; use stream::{self, Opener, Stream}; - fn fill_slice(slice: &mut [u8], e: &E, start: u64) { + fn fill_slice(slice: &mut [u8], e: &E, start: u64) + where E::Error : ::std::fmt::Debug { let mut p = 0; e.get_range(start .. start + slice.len() as u64) .for_each(|chunk| { - let c: &[u8] = chunk.as_ref(); + let c: &[u8] = chunk.bytes(); slice[p .. p + c.len()].copy_from_slice(c); p += c.len(); - Ok::<_, ::hyper::Error>(()) + Ok::<_, E::Error>(()) }) .wait() .unwrap(); } /// Returns the SHA-1 digest of the given `Entity`. - fn digest(e: &E) -> hash::DigestBytes { + fn digest(e: &E) -> hash::DigestBytes + where E::Error : ::std::fmt::Debug { e.get_range(0 .. e.len()) .fold(hash::Hasher::new(hash::MessageDigest::sha1()).unwrap(), |mut sha1, chunk| { - let c: &[u8] = chunk.as_ref(); + let c: &[u8] = chunk.bytes(); sha1.update(c).unwrap(); - Ok::<_, ::hyper::Error>(sha1) + Ok::<_, E::Error>(sha1) }) .wait() .unwrap() @@ -1812,7 +1819,7 @@ mod tests { use ::std::io::Write; mp4.get_range(0 .. mp4.len()) .for_each(|chunk| { - out.write_all(&chunk)?; + out.write_all(chunk.bytes())?; Ok(()) }) .wait() @@ -2116,8 +2123,8 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("1e5331e8371bd97ac3158b3a86494abc87cdc70e", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "04298efb2df0cc45a6cea65dfdf2e817a3b42ca8"; - assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); + const EXPECTED_ETAG: &'static str = "\"04298efb2df0cc45a6cea65dfdf2e817a3b42ca8\""; + assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); @@ -2137,8 +2144,8 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("de382684a471f178e4e3a163762711b0653bfd83", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "16a4f6348560c3de0d149675dccba21ef7906be3"; - assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); + const EXPECTED_ETAG: &'static str = "\"16a4f6348560c3de0d149675dccba21ef7906be3\""; + assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); @@ -2158,8 +2165,8 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("d655945f94e18e6ed88a2322d27522aff6f76403", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "80e418b029e81aa195f90aa6b806015a5030e5be"; - assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); + const EXPECTED_ETAG: &'static str = "\"80e418b029e81aa195f90aa6b806015a5030e5be\""; + assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); @@ -2179,8 +2186,8 @@ mod tests { // combine ranges from the new format with ranges from the old format. let sha1 = digest(&mp4); assert_eq!("e0d28ddf08e24575a82657b1ce0b2da73f32fd88", strutil::hex(&sha1[..])); - const EXPECTED_ETAG: &'static str = "5bfea0f20108a7c5b77ef1e21d82ef2abc29540f"; - assert_eq!(Some(header::EntityTag::strong(EXPECTED_ETAG.to_owned())), mp4.etag()); + const EXPECTED_ETAG: &'static str = "\"5bfea0f20108a7c5b77ef1e21d82ef2abc29540f\""; + assert_eq!(Some(HeaderValue::from_str(EXPECTED_ETAG).unwrap()), mp4.etag()); drop(db.syncer_channel); db.db.lock().clear_on_flush(); db.syncer_join.join().unwrap(); @@ -2195,12 +2202,11 @@ mod bench { use base::clock::RealClocks; use db::recording; use db::testutil::{self, TestDb}; - use futures::Stream; - use futures::future; + use futures::{Future, future}; use hyper; use http_serve; - use reffers::ARefs; use self::test::Bencher; + use std::error::Error as StdError; use super::tests::create_mp4_from_db; use url::Url; @@ -2225,14 +2231,14 @@ mod bench { let (tx, rx) = ::std::sync::mpsc::channel(); ::std::thread::spawn(move || { let addr = "127.0.0.1:0".parse().unwrap(); - let server = hyper::server::Http::new() - .bind(&addr, move || Ok(MyService(mp4.clone()))) - .unwrap(); - tx.send(server.local_addr().unwrap()).unwrap(); - server.run().unwrap(); + let server = hyper::server::Server::bind(&addr) + .tcp_nodelay(true) + .serve(move || Ok::<_, Box>(MyService(mp4.clone()))); + tx.send(server.local_addr()).unwrap(); + ::tokio::run(server.map_err(|e| panic!(e))); }); let addr = rx.recv().unwrap(); - BenchServer{ + BenchServer { url: Url::parse(&format!("http://{}:{}/", addr.ip(), addr.port())).unwrap(), generated_len: p, } @@ -2241,14 +2247,13 @@ mod bench { struct MyService(super::File); - impl hyper::server::Service for MyService { - type Request = hyper::server::Request; - type Response = hyper::server::Response< - Box, Error = hyper::Error> + Send>>; - type Error = hyper::Error; - type Future = future::FutureResult; + impl hyper::service::Service for MyService { + type ReqBody = ::hyper::Body; + type ResBody = ::body::Body; + type Error = ::body::BoxedError; + type Future = future::FutureResult<::http::Response, Self::Error>; - fn call(&self, req: hyper::server::Request) -> Self::Future { + fn call(&mut self, req: ::http::Request) -> Self::Future { future::ok(http_serve::serve(self.0.clone(), &req)) } } diff --git a/src/slices.rs b/src/slices.rs index be09527..5dda76e 100644 --- a/src/slices.rs +++ b/src/slices.rs @@ -30,16 +30,13 @@ //! Tools for implementing a `http_serve::Entity` body composed from many "slices". +use body::{BoxedError, wrap_error}; use failure::Error; use futures::stream; use futures::Stream; -use reffers::ARefs; use std::fmt; use std::ops::Range; -pub type Chunk = ARefs<'static, [u8]>; -pub type Body = Box + Send>; - /// Writes a byte range to the given `io::Write` given a context argument; meant for use with /// `Slices`. pub trait Slice : fmt::Debug + Sized + Sync + 'static { @@ -54,7 +51,7 @@ pub trait Slice : fmt::Debug + Sized + Sync + 'static { /// The additional argument `ctx` is as supplied to the `Slices`. /// The additional argument `l` is the length of this slice, as determined by the `Slices`. fn get_range(&self, ctx: &Self::Ctx, r: Range, len: u64) - -> Box + Send>; + -> Box + Send>; fn get_slices(ctx: &Self::Ctx) -> &Slices; } @@ -113,10 +110,10 @@ impl Slices where S: Slice { /// Writes `range` to `out`. /// This interface mirrors `http_serve::Entity::write_to`, with the additional `ctx` argument. pub fn get_range(&self, ctx: &S::Ctx, range: Range) - -> Box + Send> { + -> Box + Send> { if range.start > range.end || range.end > self.len { - error!("Bad range {:?} for slice of length {}", range, self.len); - return Box::new(stream::once(Err(::hyper::Error::Incomplete))); + return Box::new(stream::once(Err(wrap_error(format_err!( + "Bad range {:?} for slice of length {}", range, self.len))))); } // Binary search for the first slice of the range to write, determining its index and @@ -143,7 +140,7 @@ impl Slices where S: Slice { let l = s_end - slice_start; body = s.get_range(&c, start_pos .. min_end - slice_start, l); }; - Some(Ok::<_, ::hyper::Error>((body, (c, i+1, 0, min_end)))) + Some(Ok::<_, BoxedError>((body, (c, i+1, 0, min_end)))) }); Box::new(bodies.flatten()) } @@ -151,6 +148,7 @@ impl Slices where S: Slice { #[cfg(test)] mod tests { + use body::BoxedError; use db::testutil; use futures::{Future, Stream}; use futures::stream; @@ -176,7 +174,7 @@ mod tests { fn end(&self) -> u64 { self.end } fn get_range(&self, _ctx: &&'static Slices, r: Range, _l: u64) - -> Box + Send> { + -> Box + Send> { Box::new(stream::once(Ok(FakeChunk{slice: self.name, range: r}))) } diff --git a/src/web.rs b/src/web.rs index db2d8fa..bba5b38 100644 --- a/src/web.rs +++ b/src/web.rs @@ -31,25 +31,22 @@ extern crate hyper; use base::strutil; +use body::{Body, BoxedError, wrap_error}; use core::borrow::Borrow; use core::str::FromStr; use db::{self, recording}; use db::dir::SampleFileDir; use failure::Error; use fnv::FnvHashMap; -use futures::{future, stream}; +use futures::future; use futures_cpupool; use json; -use http; +use http::{self, Request, Response, status::StatusCode}; use http_serve; -use hyper::header::{self, Header}; -use hyper::server::{self, Request, Response}; -use mime; +use http::header::{self, HeaderValue}; use mp4; -use reffers::ARefs; use regex::Regex; use serde_json; -use slices; use std::collections::HashMap; use std::cmp; use std::fs; @@ -185,7 +182,7 @@ impl Segments { /// The files themselves are opened on every request so they can be changed during development. #[derive(Debug)] struct UiFile { - mime: http::header::HeaderValue, + mime: HeaderValue, path: PathBuf, } @@ -193,21 +190,21 @@ struct ServiceInner { db: Arc, dirs_by_stream_id: Arc>>, ui_files: HashMap, - allow_origin: Option, + allow_origin: Option, pool: futures_cpupool::CpuPool, time_zone_name: String, } impl ServiceInner { - fn not_found(&self) -> Result, Error> { - let body: slices::Body = Box::new(stream::once(Ok(ARefs::new(&b"not found"[..])))); - Ok(Response::new() - .with_status(hyper::StatusCode::NotFound) - .with_header(header::ContentType(mime::TEXT_PLAIN)) - .with_body(body)) + fn not_found(&self) -> Result, Error> { + let body: Body = (&b"not found"[..]).into(); + Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .header(header::CONTENT_TYPE, HeaderValue::from_static("text/plain")) + .body(body)?) } - fn top_level(&self, req: &Request) -> Result, Error> { + fn top_level(&self, req: &Request<::hyper::Body>) -> Result, Error> { let mut days = false; if let Some(q) = req.uri().query() { for (key, value) in form_urlencoded::parse(q.as_bytes()) { @@ -219,8 +216,10 @@ impl ServiceInner { } } - let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON)); - if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() { + let (mut resp, writer) = http_serve::streaming_body(&req).build(); + resp.headers_mut().insert(header::CONTENT_TYPE, + HeaderValue::from_static("application/json")); + if let Some(mut w) = writer { let db = self.db.lock(); serde_json::to_writer(&mut w, &json::TopLevel { time_zone_name: &self.time_zone_name, @@ -230,9 +229,11 @@ impl ServiceInner { Ok(resp) } - fn camera(&self, req: &Request, uuid: Uuid) -> Result, Error> { - let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON)); - if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() { + fn camera(&self, req: &Request<::hyper::Body>, uuid: Uuid) -> Result, Error> { + let (mut resp, writer) = http_serve::streaming_body(&req).build(); + resp.headers_mut().insert(header::CONTENT_TYPE, + HeaderValue::from_static("application/json")); + if let Some(mut w) = writer { let db = self.db.lock(); let camera = db.get_camera(uuid) .ok_or_else(|| format_err!("no such camera {}", uuid))?; @@ -241,8 +242,8 @@ impl ServiceInner { Ok(resp) } - fn stream_recordings(&self, req: &Request, uuid: Uuid, type_: db::StreamType) - -> Result, Error> { + fn stream_recordings(&self, req: &Request<::hyper::Body>, uuid: Uuid, type_: db::StreamType) + -> Result, Error> { let (r, split) = { let mut time = recording::Time(i64::min_value()) .. recording::Time(i64::max_value()); let mut split = recording::Duration(i64::max_value()); @@ -286,14 +287,17 @@ impl ServiceInner { Ok(()) })?; } - let mut resp = Response::new().with_header(header::ContentType(mime::APPLICATION_JSON)); - if let Some(mut w) = http_serve::streaming_body(&req, &mut resp).build() { + let (mut resp, writer) = http_serve::streaming_body(&req).build(); + resp.headers_mut().insert(header::CONTENT_TYPE, + HeaderValue::from_static("application/json")); + if let Some(mut w) = writer { serde_json::to_writer(&mut w, &out)? }; Ok(resp) } - fn init_segment(&self, sha1: [u8; 20], req: &Request) -> Result, Error> { + fn init_segment(&self, sha1: [u8; 20], req: &Request<::hyper::Body>) + -> Result, Error> { let mut builder = mp4::FileBuilder::new(mp4::Type::InitSegment); let db = self.db.lock(); for ent in db.video_sample_entries_by_id().values() { @@ -306,8 +310,9 @@ impl ServiceInner { self.not_found() } - fn stream_view_mp4(&self, req: &Request, uuid: Uuid, stream_type_: db::StreamType, - mp4_type_: mp4::Type) -> Result, Error> { + fn stream_view_mp4(&self, req: &Request<::hyper::Body>, uuid: Uuid, + stream_type_: db::StreamType, mp4_type_: mp4::Type) + -> Result, Error> { let stream_id = { let db = self.db.lock(); let camera = db.get_camera(uuid) @@ -403,14 +408,14 @@ impl ServiceInner { Ok(http_serve::serve(mp4, req)) } - fn static_file(&self, req: &Request) -> Result, Error> { + fn static_file(&self, req: &Request<::hyper::Body>) -> Result, Error> { let s = match self.ui_files.get(req.uri().path()) { None => { return self.not_found() }, Some(s) => s, }; let f = fs::File::open(&s.path)?; let mut hdrs = http::HeaderMap::new(); - hdrs.insert(http::header::CONTENT_TYPE, s.mime.clone()); + hdrs.insert(header::CONTENT_TYPE, s.mime.clone()); let e = http_serve::ChunkedReadFile::new(f, Some(self.pool.clone()), hdrs)?; Ok(http_serve::serve(e, &req)) } @@ -445,7 +450,7 @@ impl Service { }; let allow_origin = match allow_origin { None => None, - Some(o) => Some(header::AccessControlAllowOrigin::parse_header(&header::Raw::from(o))?), + Some(o) => Some(HeaderValue::from_str(&o)?), }; Ok(Service(Arc::new(ServiceInner { db, @@ -492,22 +497,22 @@ impl Service { }, }; files.insert(p, UiFile { - mime: http::header::HeaderValue::from_static(mime), + mime: HeaderValue::from_static(mime), path: e.path(), }); } } } -impl server::Service for Service { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = future::FutureResult; +impl ::hyper::service::Service for Service { + type ReqBody = ::hyper::Body; + type ResBody = Body; + type Error = BoxedError; + type Future = future::FutureResult, Self::Error>; - fn call(&self, req: Request) -> Self::Future { + fn call(&mut self, req: Request<::hyper::Body>) -> Self::Future { debug!("request on: {}", req.uri()); - let res = match decode_path(req.uri().path()) { + let mut res = match decode_path(req.uri().path()) { Path::InitSegment(sha1) => self.0.init_segment(sha1, &req), Path::TopLevel => self.0.top_level(&req), Path::Camera(uuid) => self.0.camera(&req, uuid), @@ -521,15 +526,12 @@ impl server::Service for Service { Path::NotFound => self.0.not_found(), Path::Static => self.0.static_file(&req), }; - let res = if let Some(ref o) = self.0.allow_origin { - res.map(|resp| resp.with_header(o.clone())) - } else { - res - }; - future::result(res.map_err(|e| { - error!("error: {}", e); - hyper::Error::Incomplete - })) + if let Ok(ref mut resp) = res { + if let Some(ref o) = self.0.allow_origin { + resp.headers_mut().insert(header::ACCESS_CONTROL_ALLOW_ORIGIN, o.clone()); + } + } + future::result(res.map_err(|e| wrap_error(e))) } } @@ -570,8 +572,10 @@ mod bench { extern crate test; use db::testutil::{self, TestDb}; + use futures::Future; use hyper; use self::test::Bencher; + use std::error::Error as StdError; use uuid::Uuid; struct Server { @@ -589,11 +593,11 @@ mod bench { let addr = "127.0.0.1:0".parse().unwrap(); let service = super::Service::new(db.db.clone(), None, None, "".to_owned()).unwrap(); - let server = hyper::server::Http::new() - .bind(&addr, move || Ok(service.clone())) - .unwrap(); - tx.send(server.local_addr().unwrap()).unwrap(); - server.run().unwrap(); + let server = hyper::server::Server::bind(&addr) + .tcp_nodelay(true) + .serve(move || Ok::<_, Box>(service.clone())); + tx.send(server.local_addr()).unwrap(); + ::tokio::run(server.map_err(|e| panic!(e))); }); let addr = rx.recv().unwrap(); Server {