From a2462bbaea13f7a3f3eb65e7430b30618bc203b8 Mon Sep 17 00:00:00 2001 From: Jesse Luehrs Date: Fri, 25 Feb 2022 17:32:58 -0500 Subject: move to tokio --- Cargo.lock | 640 +++++++++++++---------------------------- Cargo.toml | 20 +- src/main.rs | 9 +- src/mutex.rs | 10 +- src/parse/ast.rs | 35 ++- src/prelude.rs | 8 +- src/runner/builtins/command.rs | 66 +++-- src/runner/builtins/mod.rs | 16 +- src/runner/command.rs | 10 +- src/runner/mod.rs | 101 ++++--- src/shell/event.rs | 38 +-- src/shell/history/entry.rs | 20 +- src/shell/history/mod.rs | 152 +++++----- src/shell/history/pty.rs | 118 ++++---- src/shell/mod.rs | 115 ++++---- 15 files changed, 551 insertions(+), 807 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 428f50f..685e62f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,9 +13,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.53" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0" +checksum = "159bb86af3a200e19a068f4224eae4c8bb2d0fa054c7e5d1cacd5cef95e684cd" [[package]] name = "arrayvec" @@ -23,140 +23,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" -[[package]] -name = "async-channel" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" -dependencies = [ - "concurrent-queue", - "event-listener", - "futures-core", -] - -[[package]] -name = "async-executor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" -dependencies = [ - "async-task", - "concurrent-queue", - "fastrand", - "futures-lite", - "once_cell", - "slab", -] - -[[package]] -name = "async-global-executor" -version = "2.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9586ec52317f36de58453159d48351bc244bc24ced3effc1fce22f3d48664af6" -dependencies = [ - "async-channel", - "async-executor", - "async-io", - "async-mutex", - "blocking", - "futures-lite", - "num_cpus", - "once_cell", -] - -[[package]] -name = "async-io" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a811e6a479f2439f0c04038796b5cfb3d2ad56c230e0f2d3f7b04d68cfee607b" -dependencies = [ - "concurrent-queue", - "futures-lite", - "libc", - "log", - "once_cell", - "parking", - "polling", - "slab", - "socket2", - "waker-fn", - "winapi 0.3.9", -] - -[[package]] -name = "async-lock" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a8ea61bf9947a1007c5cada31e647dbc77b103c679858150003ba697ea798b" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-mutex" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" -dependencies = [ - "event-listener", -] - -[[package]] -name = "async-process" -version = "1.3.0" -source = "git+https://github.com/doy/async-process#5e25598d6fcf3865f2b9e106ba049a26a490a884" -dependencies = [ - "async-io", - "blocking", - "cfg-if 1.0.0", - "event-listener", - "futures-lite", - "libc", - "once_cell", - "signal-hook", - "winapi 0.3.9", -] - -[[package]] -name = "async-std" -version = "1.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8056f1455169ab86dd47b47391e4ab0cbd25410a70e9fe675544f49bafaf952" -dependencies = [ - "async-channel", - "async-global-executor", - "async-io", - "async-lock", - "async-process", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite", - "gloo-timers", - "kv-log-macro", - "log", - "memchr", - "num_cpus", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - -[[package]] -name = "async-task" -version = "4.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d306121baf53310a3fd342d88dc0824f6bbeace68347593658525565abee8" - -[[package]] -name = "atomic-waker" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" - [[package]] name = "atty" version = "0.2.14" @@ -210,26 +76,6 @@ dependencies = [ "byte-tools", ] -[[package]] -name = "blocking" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046e47d4b2d391b1f6f8b407b1deb8dee56c1852ccd868becf2710f601b5f427" -dependencies = [ - "async-channel", - "async-task", - "atomic-waker", - "fastrand", - "futures-lite", - "once_cell", -] - -[[package]] -name = "bumpalo" -version = "3.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899" - [[package]] name = "byte-tools" version = "0.3.1" @@ -243,16 +89,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] -name = "cache-padded" -version = "1.2.0" +name = "bytes" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" -version = "1.0.72" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" dependencies = [ "jobserver", ] @@ -285,35 +131,6 @@ dependencies = [ "vec_map", ] -[[package]] -name = "concurrent-queue" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" -dependencies = [ - "cache-padded", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" -dependencies = [ - "cfg-if 1.0.0", - "lazy_static", -] - -[[package]] -name = "ctor" -version = "0.1.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "digest" version = "0.8.1" @@ -323,27 +140,12 @@ dependencies = [ "generic-array", ] -[[package]] -name = "event-listener" -version = "2.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" - [[package]] name = "fake-simd" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -[[package]] -name = "fastrand" -version = "1.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" -dependencies = [ - "instant", -] - [[package]] name = "filetime" version = "0.2.15" @@ -401,42 +203,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" -[[package]] -name = "futures-channel" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" -dependencies = [ - "futures-core", -] - [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" -[[package]] -name = "futures-io" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" - -[[package]] -name = "futures-lite" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" -dependencies = [ - "fastrand", - "futures-core", - "futures-io", - "memchr", - "parking", - "pin-project-lite", - "waker-fn", -] - [[package]] name = "futures-macro" version = "0.3.21" @@ -448,6 +220,12 @@ dependencies = [ "syn", ] +[[package]] +name = "futures-sink" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" + [[package]] name = "futures-task" version = "0.3.21" @@ -479,9 +257,9 @@ dependencies = [ [[package]] name = "git2" -version = "0.13.25" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29229cc1b24c0e6062f6e742aa3e256492a5323365e5ed3413599f8a5eff7d6" +checksum = "94781080dd1a6b55dea7c46540d5bac87742a22f6dc2d84e54a5071ad6f0e387" dependencies = [ "bitflags", "libc", @@ -498,18 +276,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" -[[package]] -name = "gloo-timers" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d12a7f4e95cfe710f1d624fb1210b7d961a5fb05c4fd942f4feab06e61f590e" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "heck" version = "0.3.3" @@ -570,15 +336,6 @@ dependencies = [ "libc", ] -[[package]] -name = "instant" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "iovec" version = "0.1.4" @@ -603,15 +360,6 @@ dependencies = [ "libc", ] -[[package]] -name = "js-sys" -version = "0.3.56" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" -dependencies = [ - "wasm-bindgen", -] - [[package]] name = "kernel32-sys" version = "0.2.2" @@ -622,15 +370,6 @@ dependencies = [ "winapi-build", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -645,15 +384,15 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.117" +version = "0.2.119" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c" +checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" [[package]] name = "libgit2-sys" -version = "0.12.26+1.3.0" +version = "0.13.0+1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e1c899248e606fbfe68dcb31d8b0176ebab833b103824af31bddf4b7457494" +checksum = "864e22fc06cae62860398cd854c93d5867f11c02ec916aa1417b440f170df23a" dependencies = [ "cc", "libc", @@ -689,6 +428,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "lock_api" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.14" @@ -696,7 +444,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" dependencies = [ "cfg-if 1.0.0", - "value-bag", ] [[package]] @@ -745,12 +492,25 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.2", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba272f85fa0b41fc91872be579b3bbe0f56b792aa361a380eb669469f68dafb2" +dependencies = [ + "libc", + "log", + "miow 0.3.7", + "ntapi", + "winapi 0.3.9", +] + [[package]] name = "mio-extras" version = "2.0.6" @@ -759,7 +519,7 @@ checksum = "52403fe290012ce777c4626790c8951324a2b9e3316b3143779c72b029742f19" dependencies = [ "lazycell", "log", - "mio", + "mio 0.6.23", "slab", ] @@ -775,15 +535,22 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "nbsh" version = "0.1.0" dependencies = [ "anyhow", - "async-std", "bincode", - "blocking", - "futures-lite", + "bytes", "futures-util", "git2", "glob", @@ -797,11 +564,13 @@ dependencies = [ "pest_derive", "pty-process", "serde", - "signal-hook-async-std", "structopt", "terminal_size", "textmode", "time", + "tokio", + "tokio-stream", + "tokio-util", "unicode-width", "users", "vt100", @@ -821,11 +590,9 @@ dependencies = [ [[package]] name = "nix" version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f866317acbd3a240710c63f065ffb1e4fd466259045ccb504130b7f668f35c6" +source = "git+https://github.com/nix-rust/nix#6123083a4702002e1958229735a4fd70cb326257" dependencies = [ "bitflags", - "cc", "cfg-if 1.0.0", "libc", "memoffset", @@ -843,12 +610,21 @@ dependencies = [ "fsevent-sys", "inotify", "libc", - "mio", + "mio 0.6.23", "mio-extras", "walkdir", "winapi 0.3.9", ] +[[package]] +name = "ntapi" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -900,10 +676,27 @@ dependencies = [ ] [[package]] -name = "parking" -version = "2.0.0" +name = "parking_lot" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" +checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28141e0cc4143da2443301914478dc976a61ffdb3f043058310c70df2fed8954" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "smallvec", + "windows-sys", +] [[package]] name = "paw" @@ -999,19 +792,6 @@ version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" -[[package]] -name = "polling" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "log", - "wepoll-ffi", - "winapi 0.3.9", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1048,13 +828,11 @@ dependencies = [ [[package]] name = "pty-process" version = "0.2.0" -source = "git+https://github.com/doy/pty-process#ebcf5f15081f6a84c861eb2aecbf962396a88695" +source = "git+https://github.com/doy/pty-process#aca49b2f55df04142d4b2fcfcb361361e76d0820" dependencies = [ - "async-io", - "async-process", - "futures-io", "libc", "nix", + "tokio", ] [[package]] @@ -1084,6 +862,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "serde" version = "1.0.136" @@ -1116,28 +900,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "signal-hook" -version = "0.3.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "647c97df271007dcea485bb74ffdb57f2e683f1306c854f468a0c244badabf2d" -dependencies = [ - "libc", - "signal-hook-registry", -] - -[[package]] -name = "signal-hook-async-std" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4aa94397e2023af5b7cff5b8d4785e935cfb77f0e4aab0cae3b26258ace556" -dependencies = [ - "async-io", - "futures-lite", - "libc", - "signal-hook", -] - [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1153,6 +915,12 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" +[[package]] +name = "smallvec" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" + [[package]] name = "socket2" version = "0.4.4" @@ -1228,14 +996,12 @@ dependencies = [ [[package]] name = "textmode" version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf9ecdb23aae465624a900ae8c795e17f46e081b8454f8ea5b3b5c27a9e7884" +source = "git+https://github.com/doy/textmode#193e1963afc4e9e78122573cd5b9831f9a847345" dependencies = [ - "blocking", - "futures-lite", "itoa", "nix", "terminal_size", + "tokio", "vt100", ] @@ -1275,6 +1041,62 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" +[[package]] +name = "tokio" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" +dependencies = [ + "bytes", + "libc", + "memchr", + "mio 0.8.0", + "num_cpus", + "once_cell", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "typenum" version = "1.15.0" @@ -1348,16 +1170,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "936e4b492acfd135421d8dca4b1aa80a7bfc26e702ef3af710e0752684df5372" -[[package]] -name = "value-bag" -version = "1.0.0-alpha.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79923f7731dc61ebfba3633098bf3ac533bbd35ccd8c57e7088d9a5eebe0263f" -dependencies = [ - "ctor", - "version_check", -] - [[package]] name = "vcpkg" version = "0.2.15" @@ -1409,12 +1221,6 @@ dependencies = [ "quote", ] -[[package]] -name = "waker-fn" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" - [[package]] name = "walkdir" version = "2.3.2" @@ -1426,91 +1232,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "wasm-bindgen" -version = "0.2.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" -dependencies = [ - "cfg-if 1.0.0", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" -dependencies = [ - "bumpalo", - "lazy_static", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-futures" -version = "0.4.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb6ec270a31b1d3c7e266b999739109abce8b6c87e4b31fcfcd788b65267395" -dependencies = [ - "cfg-if 1.0.0", - "js-sys", - "wasm-bindgen", - "web-sys", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.79" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" - -[[package]] -name = "web-sys" -version = "0.3.56" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "wepoll-ffi" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" -dependencies = [ - "cc", -] - [[package]] name = "winapi" version = "0.2.8" @@ -1554,6 +1275,49 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-sys" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3df6e476185f92a12c072be4a189a0210dcdcf512a1891d6dff9edb874deadc6" +dependencies = [ + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8e92753b1c443191654ec532f14c199742964a061be25d77d7a96f09db20bf5" + +[[package]] +name = "windows_i686_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a711c68811799e017b6038e0922cb27a5e2f43a2ddb609fe0b6f3eeda9de615" + +[[package]] +name = "windows_i686_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c11bb1a02615db74680b32a68e2d61f553cc24c4eb5b4ca10311740e44172" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c912b12f7454c6620635bbff3450962753834be2a594819bd5e945af18ec64bc" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index 03430c4..f27b4ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,16 +6,14 @@ edition = "2021" license = "MIT" [dependencies] -anyhow = "1.0.53" -async-std = { version = "1.10.0", features = ["unstable"] } +anyhow = "1.0.55" bincode = "1.3.3" -blocking = "1.1.0" -futures-lite = "1.12.0" +bytes = "1.1.0" futures-util = "0.3.21" -git2 = "0.13.25" +git2 = "0.14.0" glob = "0.3.0" hostname = "0.3.1" -libc = "0.2.117" +libc = "0.2.119" nix = "0.23.1" notify = "4.0.17" once_cell = "1.9.0" @@ -24,16 +22,20 @@ pest = "2.1.3" pest_derive = "2.1.0" pty-process = { version = "0.2.0", features = ["async"] } serde = { version = "1.0.136", features = ["derive"] } -signal-hook-async-std = "0.2.2" structopt = { version = "0.3.26", features = ["paw", "wrap_help"] } terminal_size = "0.1.17" textmode = { version = "0.3.0", features = ["async"] } time = { version = "0.3.7", features = ["formatting", "parsing"] } +tokio = { version = "1.17.0", features = ["full"] } +tokio-stream = "0.1.8" +tokio-util = { version = "0.7.0", features = ["io"] } unicode-width = "0.1.9" users = "0.11.0" vt100 = "0.15.1" [patch.crates-io] -# https://github.com/smol-rs/async-process/pull/19 -async-process = { git = "https://github.com/doy/async-process" } +nix = { git = "https://github.com/nix-rust/nix" } pty-process = { git = "https://github.com/doy/pty-process" } +textmode = { git = "https://github.com/doy/textmode" } + +[features] diff --git a/src/main.rs b/src/main.rs index a7d3f4b..e9c420d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -36,17 +36,18 @@ struct Opt { status_fd: Option, } +#[tokio::main] async fn async_main(opt: Opt) -> anyhow::Result { if let Some(command) = opt.command { - let shell_write = opt.status_fd.and_then(|fd| { + let mut shell_write = opt.status_fd.and_then(|fd| { nix::sys::stat::fstat(fd).ok().map(|_| { // Safety: we don't create File instances for or read/write // data on this fd anywhere else - unsafe { async_std::fs::File::from_raw_fd(fd) } + unsafe { tokio::fs::File::from_raw_fd(fd) } }) }); - return runner::run(&command, shell_write.as_ref()).await; + return runner::run(&command, &mut shell_write).await; } shell::main().await @@ -54,7 +55,7 @@ async fn async_main(opt: Opt) -> anyhow::Result { #[paw::main] fn main(opt: Opt) { - match async_std::task::block_on(async_main(opt)) { + match async_main(opt) { Ok(code) => { std::process::exit(code); } diff --git a/src/mutex.rs b/src/mutex.rs index df28ffc..e49df9c 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -1,10 +1,10 @@ -pub type Mutex = async_std::sync::Arc>; -pub type Guard = async_std::sync::MutexGuardArc; +pub type Mutex = std::sync::Arc>; +pub type Guard = tokio::sync::OwnedMutexGuard; -pub fn new(t: T) -> async_std::sync::Arc> { - async_std::sync::Arc::new(async_std::sync::Mutex::new(t)) +pub fn new(t: T) -> Mutex { + std::sync::Arc::new(tokio::sync::Mutex::new(t)) } pub fn clone(m: &Mutex) -> Mutex { - async_std::sync::Arc::clone(m) + std::sync::Arc::clone(m) } diff --git a/src/parse/ast.rs b/src/parse/ast.rs index e2d5840..46aa63a 100644 --- a/src/parse/ast.rs +++ b/src/parse/ast.rs @@ -97,7 +97,7 @@ impl Pipeline { .into_iter() .map(|exe| exe.eval(env)) .collect::>() - .collect::>() + .try_collect() .await?, }) } @@ -137,7 +137,7 @@ impl Exe { arg.eval(env).await.map(IntoIterator::into_iter) }) .collect::>() - .collect::, _>>() + .try_collect::>() .await? .into_iter() .flatten() @@ -147,7 +147,7 @@ impl Exe { .into_iter() .map(|arg| arg.eval(env)) .collect::>() - .collect::>() + .try_collect() .await?, }) } @@ -330,12 +330,12 @@ impl WordPart { match self { Self::Alternation(_) => unreachable!(), Self::Substitution(commands) => { - let mut cmd = async_std::process::Command::new( + let mut cmd = tokio::process::Command::new( std::env::current_exe().unwrap(), ); cmd.args(&["-c", &commands]); - cmd.stdin(async_std::process::Stdio::inherit()); - cmd.stderr(async_std::process::Stdio::inherit()); + cmd.stdin(std::process::Stdio::inherit()); + cmd.stderr(std::process::Stdio::inherit()); let mut out = String::from_utf8(cmd.output().await.unwrap().stdout) .unwrap(); @@ -408,15 +408,20 @@ impl Redirect { let mut iter = pair.into_inner(); let prefix = iter.next().unwrap().as_str(); - let (from, dir) = if let Some(from) = prefix.strip_suffix(">>") { - (from, super::Direction::Append) - } else if let Some(from) = prefix.strip_suffix('>') { - (from, super::Direction::Out) - } else if let Some(from) = prefix.strip_suffix('<') { - (from, super::Direction::In) - } else { - unreachable!() - }; + let (from, dir) = prefix.strip_suffix(">>").map_or_else( + || { + prefix.strip_suffix('>').map_or_else( + || { + ( + prefix.strip_suffix('<').unwrap(), + super::Direction::In, + ) + }, + |from| (from, super::Direction::Out), + ) + }, + |from| (from, super::Direction::Append), + ); let from = if from.is_empty() { match dir { super::Direction::In => 0, diff --git a/src/prelude.rs b/src/prelude.rs index c647591..9c14a4b 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,10 +1,10 @@ pub use crate::env::Env; -pub use async_std::io::{ReadExt as _, WriteExt as _}; -pub use async_std::stream::StreamExt as _; -pub use futures_lite::future::FutureExt as _; +pub use futures_util::future::FutureExt as _; +pub use futures_util::stream::StreamExt as _; +pub use futures_util::stream::TryStreamExt as _; +pub use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -pub use async_std::os::unix::process::CommandExt as _; pub use std::os::unix::ffi::{OsStrExt as _, OsStringExt as _}; pub use std::os::unix::io::{AsRawFd as _, FromRawFd as _, IntoRawFd as _}; pub use std::os::unix::process::ExitStatusExt as _; diff --git a/src/runner/builtins/command.rs b/src/runner/builtins/command.rs index c0d3a84..e0e1853 100644 --- a/src/runner/builtins/command.rs +++ b/src/runner/builtins/command.rs @@ -97,7 +97,7 @@ impl Cfg { pub struct Io { fds: std::collections::HashMap< std::os::unix::io::RawFd, - std::sync::Arc, + std::sync::Arc>, >, } @@ -108,7 +108,7 @@ impl Io { } } - fn stdin(&self) -> Option> { + fn stdin(&self) -> Option>> { self.fds.get(&0).map(std::sync::Arc::clone) } @@ -120,11 +120,13 @@ impl Io { 0, // Safety: we just acquired stdin via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { File::input(stdin.into_raw_fd()) }), + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { + File::input(stdin.into_raw_fd()) + })), ); } - fn stdout(&self) -> Option> { + fn stdout(&self) -> Option>> { self.fds.get(&1).map(std::sync::Arc::clone) } @@ -136,13 +138,13 @@ impl Io { 1, // Safety: we just acquired stdout via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { File::output(stdout.into_raw_fd()) - }), + })), ); } - fn stderr(&self) -> Option> { + fn stderr(&self) -> Option>> { self.fds.get(&2).map(std::sync::Arc::clone) } @@ -154,9 +156,9 @@ impl Io { 2, // Safety: we just acquired stderr via into_raw_fd, which acquires // ownership of the fd, so we are now the sole owner - std::sync::Arc::new(unsafe { + std::sync::Arc::new(tokio::sync::Mutex::new(unsafe { File::output(stderr.into_raw_fd()) - }), + })), ); } @@ -172,13 +174,17 @@ impl Io { crate::parse::Direction::In => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(unsafe { File::input(fd) }) + std::sync::Arc::new(tokio::sync::Mutex::new( + unsafe { File::input(fd) }, + )) } crate::parse::Direction::Out | crate::parse::Direction::Append => { // Safety: we just opened fd, and nothing else has // or can use it - std::sync::Arc::new(unsafe { File::output(fd) }) + std::sync::Arc::new(tokio::sync::Mutex::new( + unsafe { File::output(fd) }, + )) } } } @@ -190,7 +196,7 @@ impl Io { pub async fn read_line_stdin(&self) -> anyhow::Result<(String, bool)> { let mut buf = vec![]; if let Some(fh) = self.stdin() { - if let File::In(fh) = &*fh { + if let File::In(fh) = &mut *fh.clone().lock_owned().await { // we have to read only a single character at a time here // because stdin needs to be shared across all commands in the // command list, some of which may be builtins and others of @@ -199,9 +205,7 @@ impl Io { // no longer be available to the next command, since we have // them buffered in memory rather than them being on the stdin // pipe. - let mut bytes = fh.bytes(); - while let Some(byte) = bytes.next().await { - let byte = byte?; + while let Ok(byte) = fh.read_u8().await { buf.push(byte); if byte == b'\n' { break; @@ -219,8 +223,8 @@ impl Io { pub async fn write_stdout(&self, buf: &[u8]) -> anyhow::Result<()> { if let Some(fh) = self.stdout() { - if let File::Out(fh) = &*fh { - Ok((&*fh).write_all(buf).await.map(|_| ())?) + if let File::Out(fh) = &mut *fh.clone().lock_owned().await { + Ok(fh.write_all(buf).await.map(|_| ())?) } else { Ok(()) } @@ -231,8 +235,8 @@ impl Io { pub async fn write_stderr(&self, buf: &[u8]) -> anyhow::Result<()> { if let Some(fh) = self.stderr() { - if let File::Out(fh) = &*fh { - Ok((&*fh).write_all(buf).await.map(|_| ())?) + if let File::Out(fh) = &mut *fh.clone().lock_owned().await { + Ok(fh.write_all(buf).await.map(|_| ())?) } else { Ok(()) } @@ -244,7 +248,7 @@ impl Io { pub fn setup_command(mut self, cmd: &mut crate::runner::Command) { if let Some(stdin) = self.fds.remove(&0) { if let Ok(stdin) = std::sync::Arc::try_unwrap(stdin) { - let stdin = stdin.into_raw_fd(); + let stdin = stdin.into_inner().into_raw_fd(); if stdin != 0 { // Safety: we just acquired stdin via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -256,7 +260,7 @@ impl Io { } if let Some(stdout) = self.fds.remove(&1) { if let Ok(stdout) = std::sync::Arc::try_unwrap(stdout) { - let stdout = stdout.into_raw_fd(); + let stdout = stdout.into_inner().into_raw_fd(); if stdout != 1 { // Safety: we just acquired stdout via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -268,7 +272,7 @@ impl Io { } if let Some(stderr) = self.fds.remove(&2) { if let Ok(stderr) = std::sync::Arc::try_unwrap(stderr) { - let stderr = stderr.into_raw_fd(); + let stderr = stderr.into_inner().into_raw_fd(); if stderr != 2 { // Safety: we just acquired stderr via into_raw_fd, which // acquires ownership of the fd, so we are now the sole @@ -291,23 +295,24 @@ impl Drop for Io { #[derive(Debug)] pub enum File { - In(async_std::fs::File), - Out(async_std::fs::File), + In(tokio::fs::File), + Out(tokio::fs::File), } impl File { // Safety: fd must not be owned by any other File object pub unsafe fn input(fd: std::os::unix::io::RawFd) -> Self { - Self::In(async_std::fs::File::from_raw_fd(fd)) + Self::In(tokio::fs::File::from_raw_fd(fd)) } // Safety: fd must not be owned by any other File object pub unsafe fn output(fd: std::os::unix::io::RawFd) -> Self { - Self::Out(async_std::fs::File::from_raw_fd(fd)) + Self::Out(tokio::fs::File::from_raw_fd(fd)) } - fn maybe_drop(file: std::sync::Arc) { + fn maybe_drop(file: std::sync::Arc>) { if let Ok(file) = std::sync::Arc::try_unwrap(file) { + let file = file.into_inner(); if file.as_raw_fd() <= 2 { let _ = file.into_raw_fd(); } @@ -326,7 +331,10 @@ impl std::os::unix::io::AsRawFd for File { impl std::os::unix::io::IntoRawFd for File { fn into_raw_fd(self) -> std::os::unix::io::RawFd { match self { - Self::In(fh) | Self::Out(fh) => fh.into_raw_fd(), + Self::In(fh) | Self::Out(fh) => { + // XXX + fh.try_into_std().unwrap().into_raw_fd() + } } } } @@ -373,7 +381,7 @@ impl<'a> Child<'a> { ) -> std::pin::Pin< Box< dyn std::future::Future< - Output = anyhow::Result, + Output = anyhow::Result, > + Send + Sync + 'a, diff --git a/src/runner/builtins/mod.rs b/src/runner/builtins/mod.rs index 5205856..87b5ae7 100644 --- a/src/runner/builtins/mod.rs +++ b/src/runner/builtins/mod.rs @@ -88,7 +88,7 @@ fn cd( dir.display() ); } - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -119,7 +119,7 @@ fn set( }; std::env::set_var(k, v); - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -145,7 +145,7 @@ fn unset( }; std::env::remove_var(k); - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -174,7 +174,7 @@ fn echo( .write_stderr(format!("echo: {}", e).as_bytes()) .await .unwrap(); - return async_std::process::ExitStatus::from_raw(1 << 8); + return std::process::ExitStatus::from_raw(1 << 8); } }; } @@ -188,7 +188,7 @@ fn echo( } } - async_std::process::ExitStatus::from_raw(0) + std::process::ExitStatus::from_raw(0) } Ok(command::Child::new_fut(async move { @@ -221,11 +221,7 @@ fn read( }; std::env::set_var(var, val); - async_std::process::ExitStatus::from_raw(if done { - 1 << 8 - } else { - 0 - }) + std::process::ExitStatus::from_raw(if done { 1 << 8 } else { 0 }) } Ok(command::Child::new_fut(async move { diff --git a/src/runner/command.rs b/src/runner/command.rs index 5d4c11e..c7224e6 100644 --- a/src/runner/command.rs +++ b/src/runner/command.rs @@ -27,7 +27,7 @@ impl Command { pub fn new_binary(exe: crate::parse::Exe) -> Self { let exe_path = exe.exe().to_path_buf(); let redirects = exe.redirects().to_vec(); - let mut cmd = async_std::process::Command::new(exe.exe()); + let mut cmd = tokio::process::Command::new(exe.exe()); cmd.args(exe.args()); Self { inner: Inner::Binary(cmd), @@ -146,19 +146,19 @@ impl Command { } pub enum Inner { - Binary(async_std::process::Command), + Binary(tokio::process::Command), Builtin(super::builtins::Command), } pub enum Child<'a> { - Binary(async_std::process::Child), + Binary(tokio::process::Child), Builtin(super::builtins::Child<'a>), } impl<'a> Child<'a> { pub fn id(&self) -> Option { match self { - Self::Binary(child) => Some(child.id()), + Self::Binary(child) => child.id(), Self::Builtin(child) => child.id(), } } @@ -176,7 +176,7 @@ impl<'a> Child<'a> { > { Box::pin(async move { match self { - Self::Binary(child) => Ok(child.status_no_drop().await?), + Self::Binary(mut child) => Ok(child.wait().await?), Self::Builtin(child) => Ok(child.status().await?), } }) diff --git a/src/runner/mod.rs b/src/runner/mod.rs index 1a5003f..d06b332 100644 --- a/src/runner/mod.rs +++ b/src/runner/mod.rs @@ -70,7 +70,7 @@ enum Frame { pub async fn run( commands: &str, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result { let mut env = Env::new_from_env()?; run_commands(commands, &mut env, shell_write).await?; @@ -86,7 +86,7 @@ pub async fn run( async fn run_commands( commands: &str, env: &mut Env, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result<()> { let commands = crate::parse::ast::Commands::parse(commands)?; let commands = commands.commands(); @@ -152,7 +152,7 @@ async fn run_commands( .map(IntoIterator::into_iter) }) .collect::>() - .collect::, _>>().await? + .try_collect::>().await? .into_iter() .flatten() .collect() @@ -231,7 +231,7 @@ async fn run_commands( async fn run_pipeline( pipeline: crate::parse::ast::Pipeline, env: &mut Env, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> anyhow::Result<()> { write_event(shell_write, Event::RunPipeline(env.idx(), pipeline.span())) .await?; @@ -240,9 +240,9 @@ async fn run_pipeline( // level would not be safe, because in the case of a command line like // "echo foo; ls", we would pass the stdout fd to the ls process while it // is still open here, and may still have data buffered. - let stdin = unsafe { async_std::fs::File::from_raw_fd(0) }; - let stdout = unsafe { async_std::fs::File::from_raw_fd(1) }; - let stderr = unsafe { async_std::fs::File::from_raw_fd(2) }; + let stdin = unsafe { std::fs::File::from_raw_fd(0) }; + let stdout = unsafe { std::fs::File::from_raw_fd(1) }; + let stderr = unsafe { std::fs::File::from_raw_fd(2) }; let mut io = builtins::Io::new(); io.set_stdin(stdin); io.set_stdout(stdout); @@ -265,10 +265,10 @@ async fn run_pipeline( } async fn write_event( - fh: Option<&async_std::fs::File>, + fh: &mut Option, event: Event, ) -> anyhow::Result<()> { - if let Some(mut fh) = fh { + if let Some(fh) = fh { fh.write_all(&bincode::serialize(&event)?).await?; fh.flush().await?; } @@ -322,11 +322,11 @@ async fn wait_children( pg: Option, env: &Env, io: &builtins::Io, - shell_write: Option<&async_std::fs::File>, + shell_write: &mut Option, ) -> std::process::ExitStatus { enum Res { Child(nix::Result), - Builtin(Option<(anyhow::Result, bool)>), + Builtin((anyhow::Result, bool)), } macro_rules! bail { @@ -353,7 +353,8 @@ async fn wait_children( (sys::id_to_pid(child.id().unwrap()), (child, i == count - 1)) }) .collect(); - let mut builtins: futures_util::stream::FuturesUnordered<_> = + let mut builtin_count = builtins.len(); + let builtins: futures_util::stream::FuturesUnordered<_> = builtins .into_iter() .map(|(i, child)| async move { @@ -361,47 +362,40 @@ async fn wait_children( }) .collect(); - let (wait_w, wait_r) = async_std::channel::unbounded(); - let new_wait = move || { - if let Some(pg) = pg { - let wait_w = wait_w.clone(); - async_std::task::spawn(async move { - let res = blocking::unblock(move || { - nix::sys::wait::waitpid( - sys::neg_pid(pg), - Some(nix::sys::wait::WaitPidFlag::WUNTRACED), - ) - }) - .await; - if wait_w.is_closed() { - // we shouldn't be able to drop real process terminations + let (wait_w, wait_r) = tokio::sync::mpsc::unbounded_channel(); + if let Some(pg) = pg { + tokio::task::spawn_blocking(move || loop { + let res = nix::sys::wait::waitpid( + sys::neg_pid(pg), + Some(nix::sys::wait::WaitPidFlag::WUNTRACED), + ); + match wait_w.send(res) { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(res)) => { + // we should never drop wait_r while there are still valid + // things to read assert!(res.is_err()); - } else { - wait_w.send(res).await.unwrap(); + break; } - }); - } - }; - - new_wait(); - loop { - if children.is_empty() && builtins.is_empty() { - break; - } + } + }); + } - let child = async { Res::Child(wait_r.recv().await.unwrap()) }; - let builtin = async { - Res::Builtin(if builtins.is_empty() { - std::future::pending().await - } else { - builtins.next().await - }) - }; - match child.race(builtin).await { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(wait_r) + .map(Res::Child) + .boxed(), + builtins.map(Res::Builtin).boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Child(Ok(status)) => { match status { - // we can't call child.status() here to unify these branches - // because our waitpid call already collected the status + // we can't call child.status() here to unify these + // branches because our waitpid call already collected the + // status nix::sys::wait::WaitStatus::Exited(pid, code) => { let (_, last) = children.remove(&pid).unwrap(); if last { @@ -449,12 +443,11 @@ async fn wait_children( } _ => {} } - new_wait(); } Res::Child(Err(e)) => { bail!(e); } - Res::Builtin(Some((Ok(status), last))) => { + Res::Builtin((Ok(status), last)) => { // this conversion is safe because the Signal enum is // repr(i32) #[allow(clippy::as_conversions)] @@ -470,11 +463,15 @@ async fn wait_children( if last { final_status = Some(status); } + builtin_count -= 1; } - Res::Builtin(Some((Err(e), _))) => { + Res::Builtin((Err(e), _)) => { bail!(e); } - Res::Builtin(None) => {} + } + + if children.is_empty() && builtin_count == 0 { + break; } } diff --git a/src/shell/event.rs b/src/shell/event.rs index 025f3c4..ad14705 100644 --- a/src/shell/event.rs +++ b/src/shell/event.rs @@ -11,22 +11,23 @@ pub enum Event { } pub struct Reader { - pending: async_std::sync::Mutex, - cvar: async_std::sync::Condvar, + pending: tokio::sync::Mutex, + cvar: tokio::sync::Notify, } impl Reader { pub fn new( - input: async_std::channel::Receiver, - ) -> async_std::sync::Arc { - let this = async_std::sync::Arc::new(Self { - pending: async_std::sync::Mutex::new(Pending::new()), - cvar: async_std::sync::Condvar::new(), - }); + mut input: tokio::sync::mpsc::UnboundedReceiver, + ) -> std::sync::Arc { + let this = Self { + pending: tokio::sync::Mutex::new(Pending::new()), + cvar: tokio::sync::Notify::new(), + }; + let this = std::sync::Arc::new(this); { - let this = async_std::sync::Arc::clone(&this); - async_std::task::spawn(async move { - while let Ok(event) = input.recv().await { + let this = this.clone(); + tokio::task::spawn(async move { + while let Some(event) = input.recv().await { this.new_event(Some(event)).await; } this.new_event(None).await; @@ -36,13 +37,14 @@ impl Reader { } pub async fn recv(&self) -> Option { - let mut pending = self - .cvar - .wait_until(self.pending.lock().await, |pending| { - pending.has_event() - }) - .await; - pending.get_event() + loop { + let mut pending = self.pending.lock().await; + if pending.has_event() { + return pending.get_event(); + } + drop(pending); + self.cvar.notified().await; + } } async fn new_event(&self, event: Option) { diff --git a/src/shell/history/entry.rs b/src/shell/history/entry.rs index a45d99d..97e8a7b 100644 --- a/src/shell/history/entry.rs +++ b/src/shell/history/entry.rs @@ -16,8 +16,8 @@ pub struct Entry { visual_bell: bool, real_bell_pending: bool, fullscreen: Option, - input: async_std::channel::Sender>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, start_time: time::OffsetDateTime, start_instant: std::time::Instant, } @@ -27,8 +27,8 @@ impl Entry { cmdline: String, env: Env, size: (u16, u16), - input: async_std::channel::Sender>, - resize: async_std::channel::Sender<(u16, u16)>, + input: tokio::sync::mpsc::UnboundedSender>, + resize: tokio::sync::mpsc::UnboundedSender<(u16, u16)>, ) -> Self { let span = (0, cmdline.len()); Self { @@ -229,13 +229,13 @@ impl Entry { pub async fn send_input(&self, bytes: Vec) { if self.running() { - self.input.send(bytes).await.unwrap(); + self.input.send(bytes).unwrap(); } } pub async fn resize(&mut self, size: (u16, u16)) { if self.running() { - self.resize.send(size).await.unwrap(); + self.resize.send(size).unwrap(); self.vt.set_size(size.0, size.1); } } @@ -341,11 +341,11 @@ impl Entry { pub async fn finish( &mut self, env: Env, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) { self.state = State::Exited(ExitInfo::new(env.latest_status())); self.env = env; - event_w.send(Event::PtyClose).await.unwrap(); + event_w.send(Event::PtyClose).unwrap(); } fn exit_info(&self) -> Option<&ExitInfo> { @@ -369,12 +369,12 @@ impl Entry { } struct ExitInfo { - status: async_std::process::ExitStatus, + status: std::process::ExitStatus, instant: std::time::Instant, } impl ExitInfo { - fn new(status: async_std::process::ExitStatus) -> Self { + fn new(status: std::process::ExitStatus) -> Self { Self { status, instant: std::time::Instant::now(), diff --git a/src/shell/history/mod.rs b/src/shell/history/mod.rs index 1bc4e62..2eeab0b 100644 --- a/src/shell/history/mod.rs +++ b/src/shell/history/mod.rs @@ -67,7 +67,7 @@ impl History { out: &mut impl textmode::Textmode, idx: usize, ) { - let mut entry = self.entries[idx].lock_arc().await; + let mut entry = self.entries[idx].clone().lock_owned().await; entry.render_fullscreen(out); } @@ -78,7 +78,7 @@ impl History { pub async fn resize(&mut self, size: (u16, u16)) { self.size = size; for entry in &self.entries { - entry.lock_arc().await.resize(size).await; + entry.clone().lock_owned().await.resize(size).await; } } @@ -86,10 +86,10 @@ impl History { &mut self, cmdline: &str, env: &Env, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> anyhow::Result { - let (input_w, input_r) = async_std::channel::unbounded(); - let (resize_w, resize_r) = async_std::channel::unbounded(); + let (input_w, input_r) = tokio::sync::mpsc::unbounded_channel(); + let (resize_w, resize_r) = tokio::sync::mpsc::unbounded_channel(); let entry = crate::mutex::new(Entry::new( cmdline.to_string(), @@ -112,7 +112,7 @@ impl History { } pub async fn entry(&self, idx: usize) -> crate::mutex::Guard { - self.entries[idx].lock_arc().await + self.entries[idx].clone().lock_owned().await } pub fn entry_count(&self) -> usize { @@ -173,7 +173,7 @@ impl History { for (idx, entry) in self.entries.iter().enumerate().rev().skip(self.scroll_pos) { - let entry = entry.lock_arc().await; + let entry = entry.clone().lock_owned().await; let focused = focus.map_or(false, |focus| idx == focus); used_lines += entry.lines(self.entry_count(), focused && !scrolling); @@ -221,13 +221,13 @@ fn run_commands( cmdline: String, entry: crate::mutex::Mutex, mut env: Env, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender, ) { - async_std::task::spawn(async move { + tokio::task::spawn(async move { let pty = match pty::Pty::new( - entry.lock_arc().await.size(), + entry.clone().lock_owned().await.size(), &entry, input_r, resize_r, @@ -235,14 +235,12 @@ fn run_commands( ) { Ok(pty) => pty, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!("nbsh: failed to allocate pty: {}\r\n", e) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( - 1 << 8, - )); + env.set_status(std::process::ExitStatus::from_raw(1 << 8)); entry.finish(env, event_w).await; return; } @@ -254,7 +252,7 @@ fn run_commands( { Ok(status) => status, Err(e) => { - let mut entry = entry.lock_arc().await; + let mut entry = entry.clone().lock_owned().await; entry.process( format!( "nbsh: failed to spawn {}: {}\r\n", @@ -262,7 +260,7 @@ fn run_commands( ) .as_bytes(), ); - env.set_status(async_std::process::ExitStatus::from_raw( + env.set_status(std::process::ExitStatus::from_raw( 1 << 8, )); entry.finish(env, event_w).await; @@ -271,7 +269,7 @@ fn run_commands( }; env.set_status(status); - entry.lock_arc().await.finish(env, event_w).await; + entry.clone().lock_owned().await.finish(env, event_w).await; pty.close().await; }); } @@ -280,12 +278,19 @@ async fn spawn_commands( cmdline: &str, pty: &pty::Pty, env: &mut Env, - event_w: async_std::channel::Sender, -) -> anyhow::Result { + event_w: tokio::sync::mpsc::UnboundedSender, +) -> anyhow::Result { + enum Res { + Read(crate::runner::Event), + Exit(std::io::Result), + } + let mut cmd = pty_process::Command::new(std::env::current_exe()?); cmd.args(&["-c", cmdline, "--status-fd", "3"]); env.apply(&mut cmd); let (from_r, from_w) = nix::unistd::pipe2(nix::fcntl::OFlag::O_CLOEXEC)?; + // Safety: from_r was just opened above and is not used anywhere else + let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; // Safety: dup2 is an async-signal-safe function unsafe { cmd.pre_exec(move || { @@ -293,90 +298,63 @@ async fn spawn_commands( Ok(()) }); } - let child = pty.spawn(cmd)?; + let mut child = pty.spawn(cmd)?; nix::unistd::close(from_w)?; - let (read_w, read_r) = async_std::channel::unbounded(); - let new_read = move || { - let read_w = read_w.clone(); - async_std::task::spawn(async move { - let event = blocking::unblock(move || { - // Safety: from_r was just opened above and is only - // referenced in this closure, which takes ownership of it - // at the start and returns ownership of it at the end - let fh = unsafe { std::fs::File::from_raw_fd(from_r) }; - let event = bincode::deserialize_from(&fh); - let _ = fh.into_raw_fd(); - event - }) - .await; - if read_w.is_closed() { - // we should never drop read_r while there are still valid - // things to read - assert!(event.is_err()); - } else { - read_w.send(event).await.unwrap(); + let (read_w, read_r) = tokio::sync::mpsc::unbounded_channel(); + tokio::task::spawn_blocking(move || loop { + let event = bincode::deserialize_from(&fh); + match event { + Ok(event) => { + read_w.send(event).unwrap(); + } + Err(e) => { + match &*e { + bincode::ErrorKind::Io(io_e) => { + assert!( + io_e.kind() == std::io::ErrorKind::UnexpectedEof + ); + } + e => { + panic!("{}", e); + } + } + break; } - }); - }; - - new_read(); - let mut read_done = false; - let mut exit_done = None; - loop { - enum Res { - Read(bincode::Result), - Exit(std::io::Result), } + }); - let read_r = read_r.clone(); - let read = async move { Res::Read(read_r.recv().await.unwrap()) }; - let exit = async { - Res::Exit(if exit_done.is_none() { - child.status_no_drop().await - } else { - std::future::pending().await - }) - }; - match read.or(exit).await { - Res::Read(Ok(event)) => match event { + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_stream::wrappers::UnboundedReceiverStream::new(read_r) + .map(Res::Read) + .boxed(), + futures_util::stream::once(child.wait()) + .map(Res::Exit) + .boxed(), + ] + .into_iter() + .collect(); + let mut exit_status = None; + while let Some(res) = stream.next().await { + match res { + Res::Read(event) => match event { crate::runner::Event::RunPipeline(idx, span) => { - event_w - .send(Event::ChildRunPipeline(idx, span)) - .await - .unwrap(); - new_read(); + event_w.send(Event::ChildRunPipeline(idx, span)).unwrap(); } crate::runner::Event::Suspend(idx) => { - event_w.send(Event::ChildSuspend(idx)).await.unwrap(); - new_read(); + event_w.send(Event::ChildSuspend(idx)).unwrap(); } crate::runner::Event::Exit(new_env) => { *env = new_env; - read_done = true; } }, - Res::Read(Err(e)) => { - if let bincode::ErrorKind::Io(io_e) = &*e { - if io_e.kind() == std::io::ErrorKind::UnexpectedEof { - read_done = true; - } else { - anyhow::bail!(e); - } - } else { - anyhow::bail!(e); - } - } Res::Exit(Ok(status)) => { - exit_done = Some(status); + exit_status = Some(status); } Res::Exit(Err(e)) => { anyhow::bail!(e); } } - if let (true, Some(status)) = (read_done, exit_done) { - nix::unistd::close(from_r)?; - return Ok(status); - } } + Ok(exit_status.unwrap()) } diff --git a/src/shell/history/pty.rs b/src/shell/history/pty.rs index 5a51e73..acfe500 100644 --- a/src/shell/history/pty.rs +++ b/src/shell/history/pty.rs @@ -1,26 +1,26 @@ use crate::shell::prelude::*; pub struct Pty { - pty: async_std::sync::Arc, - close_w: async_std::channel::Sender<()>, + pts: pty_process::Pts, + close_w: tokio::sync::mpsc::UnboundedSender<()>, } impl Pty { pub fn new( size: (u16, u16), entry: &crate::mutex::Mutex, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> anyhow::Result { - let (close_w, close_r) = async_std::channel::unbounded(); + let (close_w, close_r) = tokio::sync::mpsc::unbounded_channel(); let pty = pty_process::Pty::new()?; pty.resize(pty_process::Size::new(size.0, size.1))?; - let pty = async_std::sync::Arc::new(pty); + let pts = pty.pts()?; - async_std::task::spawn(pty_task( - async_std::sync::Arc::clone(&pty), + tokio::task::spawn(pty_task( + pty, crate::mutex::clone(entry), input_r, resize_r, @@ -28,80 +28,74 @@ impl Pty { event_w, )); - Ok(Self { pty, close_w }) + Ok(Self { pts, close_w }) } pub fn spawn( &self, mut cmd: pty_process::Command, - ) -> anyhow::Result { - Ok(cmd.spawn(&self.pty)?) + ) -> anyhow::Result { + Ok(cmd.spawn(&self.pts)?) } pub async fn close(&self) { - self.close_w.send(()).await.unwrap(); + self.close_w.send(()).unwrap(); } } async fn pty_task( - pty: async_std::sync::Arc, + pty: pty_process::Pty, entry: crate::mutex::Mutex, - input_r: async_std::channel::Receiver>, - resize_r: async_std::channel::Receiver<(u16, u16)>, - close_r: async_std::channel::Receiver<()>, - event_w: async_std::channel::Sender, + input_r: tokio::sync::mpsc::UnboundedReceiver>, + resize_r: tokio::sync::mpsc::UnboundedReceiver<(u16, u16)>, + close_r: tokio::sync::mpsc::UnboundedReceiver<()>, + event_w: tokio::sync::mpsc::UnboundedSender, ) { - loop { - enum Res { - Read(Result), - Write(Result, async_std::channel::RecvError>), - Resize(Result<(u16, u16), async_std::channel::RecvError>), - Close(Result<(), async_std::channel::RecvError>), - } - let mut buf = [0_u8; 4096]; - let read = async { Res::Read((&*pty).read(&mut buf).await) }; - let write = async { Res::Write(input_r.recv().await) }; - let resize = async { Res::Resize(resize_r.recv().await) }; - let close = async { Res::Close(close_r.recv().await) }; - match read.race(write).race(resize).or(close).await { + enum Res { + Read(Result), + Write(Vec), + Resize((u16, u16)), + Close(()), + } + + let (pty_r, mut pty_w) = pty.into_split(); + let mut stream: futures_util::stream::SelectAll<_> = [ + tokio_util::io::ReaderStream::new(pty_r) + .map(Res::Read) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(input_r) + .map(Res::Write) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(resize_r) + .map(Res::Resize) + .boxed(), + tokio_stream::wrappers::UnboundedReceiverStream::new(close_r) + .map(Res::Close) + .boxed(), + ] + .into_iter() + .collect(); + while let Some(res) = stream.next().await { + match res { Res::Read(res) => match res { Ok(bytes) => { - entry.lock_arc().await.process(&buf[..bytes]); - event_w.send(Event::PtyOutput).await.unwrap(); + entry.clone().lock_owned().await.process(&bytes); + event_w.send(Event::PtyOutput).unwrap(); } Err(e) => { - if e.raw_os_error() == Some(libc::EIO) { - continue; - } panic!("pty read failed: {:?}", e); } }, - Res::Write(res) => match res { - Ok(bytes) => { - (&*pty).write(&bytes).await.unwrap(); - } - Err(e) => { - panic!("failed to read from input channel: {}", e); - } - }, - Res::Resize(res) => match res { - Ok(size) => { - pty.resize(pty_process::Size::new(size.0, size.1)) - .unwrap(); - } - Err(e) => { - panic!("failed to read from resize channel: {}", e); - } - }, - Res::Close(res) => match res { - Ok(()) => { - event_w.send(Event::PtyClose).await.unwrap(); - return; - } - Err(e) => { - panic!("failed to read from close channel: {}", e); - } - }, + Res::Write(bytes) => { + pty_w.write(&bytes).await.unwrap(); + } + Res::Resize(size) => pty_w + .resize(pty_process::Size::new(size.0, size.1)) + .unwrap(), + Res::Close(()) => { + event_w.send(Event::PtyClose).unwrap(); + return; + } } } } diff --git a/src/shell/mod.rs b/src/shell/mod.rs index 9c4002b..82d2021 100644 --- a/src/shell/mod.rs +++ b/src/shell/mod.rs @@ -10,7 +10,7 @@ mod prelude; mod readline; pub async fn main() -> anyhow::Result { - let mut input = textmode::Input::new().await?; + let mut input = textmode::blocking::Input::new()?; let mut output = textmode::Output::new().await?; // avoid the guards getting stuck in a task that doesn't run to @@ -18,23 +18,23 @@ pub async fn main() -> anyhow::Result { let _input_guard = input.take_raw_guard(); let _output_guard = output.take_screen_guard(); - let (event_w, event_r) = async_std::channel::unbounded(); + let (event_w, event_r) = tokio::sync::mpsc::unbounded_channel(); { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let signals = signal_hook_async_std::Signals::new(&[ - nix::sys::signal::Signal::SIGWINCH as i32, - ])?; + let mut signals = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change(), + )?; let event_w = event_w.clone(); - async_std::task::spawn(async move { - // nix::sys::signal::Signal is repr(i32) - #[allow(clippy::as_conversions)] - let mut signals = async_std::stream::once( - nix::sys::signal::Signal::SIGWINCH as i32, - ) - .chain(signals); - while signals.next().await.is_some() { + tokio::task::spawn(async move { + event_w + .send(Event::Resize(terminal_size::terminal_size().map_or( + (24, 80), + |(terminal_size::Width(w), terminal_size::Height(h))| { + (h, w) + }, + ))) + .unwrap(); + while signals.recv().await.is_some() { event_w .send(Event::Resize( terminal_size::terminal_size().map_or( @@ -45,7 +45,6 @@ pub async fn main() -> anyhow::Result { )| { (h, w) }, ), )) - .await .unwrap(); } }); @@ -53,9 +52,9 @@ pub async fn main() -> anyhow::Result { { let event_w = event_w.clone(); - async_std::task::spawn(async move { - while let Some(key) = input.read_key().await.unwrap() { - event_w.send(Event::Key(key)).await.unwrap(); + std::thread::spawn(move || { + while let Some(key) = input.read_key().unwrap() { + event_w.send(Event::Key(key)).unwrap(); } }); } @@ -63,33 +62,35 @@ pub async fn main() -> anyhow::Result { // redraw the clock every second { let event_w = event_w.clone(); - async_std::task::spawn(async move { - let first_sleep = 1_000_000_000_u64.saturating_sub( - time::OffsetDateTime::now_utc().nanosecond().into(), - ); - async_std::task::sleep(std::time::Duration::from_nanos( - first_sleep, - )) - .await; - let mut interval = async_std::stream::interval( + tokio::task::spawn(async move { + let now_clock = time::OffsetDateTime::now_utc(); + let now_instant = tokio::time::Instant::now(); + let mut interval = tokio::time::interval_at( + now_instant + + std::time::Duration::from_nanos( + 1_000_000_000_u64 + .saturating_sub(now_clock.nanosecond().into()), + ), std::time::Duration::from_secs(1), ); - event_w.send(Event::ClockTimer).await.unwrap(); - while interval.next().await.is_some() { - event_w.send(Event::ClockTimer).await.unwrap(); + loop { + interval.tick().await; + event_w.send(Event::ClockTimer).unwrap(); } }); } - let (git_w, git_r): (async_std::channel::Sender, _) = - async_std::channel::unbounded(); + let (git_w, mut git_r): ( + tokio::sync::mpsc::UnboundedSender, + _, + ) = tokio::sync::mpsc::unbounded_channel(); { let event_w = event_w.clone(); // clippy can't tell that we assign to this later #[allow(clippy::no_effect_underscore_binding)] let mut _active_watcher = None; - async_std::task::spawn(async move { - while let Ok(mut dir) = git_r.recv().await { + tokio::task::spawn(async move { + while let Some(mut dir) = git_r.recv().await { while let Ok(newer_dir) = git_r.try_recv() { dir = newer_dir; } @@ -97,7 +98,8 @@ pub async fn main() -> anyhow::Result { if repo.is_some() { let (sync_watch_w, sync_watch_r) = std::sync::mpsc::channel(); - let (watch_w, watch_r) = async_std::channel::unbounded(); + let (watch_w, mut watch_r) = + tokio::sync::mpsc::unbounded_channel(); let mut watcher = notify::RecommendedWatcher::new( sync_watch_w, std::time::Duration::from_millis(100), @@ -106,31 +108,25 @@ pub async fn main() -> anyhow::Result { watcher .watch(&dir, notify::RecursiveMode::Recursive) .unwrap(); - async_std::task::spawn(blocking::unblock(move || { + tokio::task::spawn_blocking(move || { while let Ok(event) = sync_watch_r.recv() { let watch_w = watch_w.clone(); - let send_failed = - async_std::task::block_on(async move { - watch_w.send(event).await.is_err() - }); + let send_failed = watch_w.send(event).is_err(); if send_failed { break; } } - })); + }); let event_w = event_w.clone(); - async_std::task::spawn(async move { - while watch_r.recv().await.is_ok() { + tokio::task::spawn(async move { + while watch_r.recv().await.is_some() { let repo = git2::Repository::discover(&dir).ok(); - let info = blocking::unblock(|| { + let info = tokio::task::spawn_blocking(|| { repo.map(|repo| git::Info::new(&repo)) }) - .await; - if event_w - .send(Event::GitInfo(info)) - .await - .is_err() - { + .await + .unwrap(); + if event_w.send(Event::GitInfo(info)).is_err() { break; } } @@ -139,24 +135,25 @@ pub async fn main() -> anyhow::Result { } else { _active_watcher = None; } - let info = blocking::unblock(|| { + let info = tokio::task::spawn_blocking(|| { repo.map(|repo| git::Info::new(&repo)) }) - .await; - event_w.send(Event::GitInfo(info)).await.unwrap(); + .await + .unwrap(); + event_w.send(Event::GitInfo(info)).unwrap(); } }); } let mut shell = Shell::new(crate::info::get_offset())?; let mut prev_dir = shell.env.pwd().to_path_buf(); - git_w.send(prev_dir.clone()).await.unwrap(); + git_w.send(prev_dir.clone()).unwrap(); let event_reader = event::Reader::new(event_r); while let Some(event) = event_reader.recv().await { let dir = shell.env().pwd(); if dir != prev_dir { prev_dir = dir.to_path_buf(); - git_w.send(dir.to_path_buf()).await.unwrap(); + git_w.send(dir.to_path_buf()).unwrap(); } match shell.handle_event(event, &event_w).await { Some(Action::Refresh) => { @@ -322,7 +319,7 @@ impl Shell { pub async fn handle_event( &mut self, event: Event, - event_w: &async_std::channel::Sender, + event_w: &tokio::sync::mpsc::UnboundedSender, ) -> Option { match event { Event::Key(key) => { @@ -405,7 +402,7 @@ impl Shell { async fn handle_key_escape( &mut self, key: textmode::Key, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> Option { match key { textmode::Key::Ctrl(b'd') => { @@ -514,7 +511,7 @@ impl Shell { async fn handle_key_readline( &mut self, key: textmode::Key, - event_w: async_std::channel::Sender, + event_w: tokio::sync::mpsc::UnboundedSender, ) -> Option { match key { textmode::Key::Char(c) => { -- cgit v1.2.3-54-g00ecf