diff --git a/Cargo.lock b/Cargo.lock index 310835479..8181e97e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -921,20 +921,6 @@ dependencies = [ "itertools", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -969,16 +955,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.15" @@ -1146,14 +1122,11 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "fuser" -version = "0.11.0" +version = "0.12.0" dependencies = [ - "async-trait", "bincode", "clap", - "crossbeam", "env_logger", - "futures", "libc", "log", "memchr", @@ -1800,7 +1773,6 @@ dependencies = [ "assert_fs", "async-channel", "async-lock", - "async-trait", "aws-config", "aws-sdk-s3", "base16ct", diff --git a/mountpoint-s3/Cargo.toml b/mountpoint-s3/Cargo.toml index 2ca46ccfc..44d8f6bfd 100644 --- a/mountpoint-s3/Cargo.toml +++ b/mountpoint-s3/Cargo.toml @@ -13,7 +13,6 @@ mountpoint-s3-crt = { path = "../mountpoint-s3-crt" } anyhow = { version = "1.0.64", features = ["backtrace"] } async-channel = "1.8.0" async-lock = "2.6.0" -async-trait = "0.1.57" bytes = "1.2.1" clap = { version = "3.2.12", features = ["derive"] } ctrlc = "3.2.3" diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 6dcbae79e..ce11e63dc 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -1,8 +1,8 @@ -use async_trait::async_trait; +use futures::executor::block_on; use futures::task::Spawn; use std::ffi::OsStr; use std::time::Duration; -use tracing::instrument; +use tracing::{instrument, Instrument}; use crate::fs::{DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig}; use fuser::{ @@ -28,43 +28,42 @@ where } } -#[async_trait] impl Filesystem for S3FuseFilesystem where Client: ObjectClient + Send + Sync + 'static, Runtime: Spawn + Send + Sync, { #[instrument(level = "debug", skip_all)] - async fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> { - self.fs.init(config).await + fn init(&self, _req: &Request<'_>, config: &mut KernelConfig) -> Result<(), libc::c_int> { + block_on(self.fs.init(config).in_current_span()) } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, name=?name))] - async fn lookup(&self, _req: &Request<'_>, parent: InodeNo, name: &OsStr, reply: ReplyEntry) { - match self.fs.lookup(parent, name).await { + fn lookup(&self, _req: &Request<'_>, parent: InodeNo, name: &OsStr, reply: ReplyEntry) { + match block_on(self.fs.lookup(parent, name).in_current_span()) { Ok(entry) => reply.entry(&entry.ttl, &entry.attr, entry.generation), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino))] - async fn getattr(&self, _req: &Request<'_>, ino: InodeNo, reply: ReplyAttr) { - match self.fs.getattr(ino).await { + fn getattr(&self, _req: &Request<'_>, ino: InodeNo, reply: ReplyAttr) { + match block_on(self.fs.getattr(ino).in_current_span()) { Ok(attr) => reply.attr(&attr.ttl, &attr.attr), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino))] - async fn open(&self, _req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) { - match self.fs.open(ino, flags).await { + fn open(&self, _req: &Request<'_>, ino: InodeNo, flags: i32, reply: ReplyOpen) { + match block_on(self.fs.open(ino, flags).in_current_span()) { Ok(opened) => reply.opened(opened.fh, opened.flags), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, size=size))] - async fn read( + fn read( &self, _req: &Request<'_>, ino: InodeNo, @@ -103,29 +102,26 @@ where inner: reply, bytes_sent: &mut bytes_sent, }; - self.fs.read(ino, fh, offset, size, flags, lock, replier).await; + block_on( + self.fs + .read(ino, fh, offset, size, flags, lock, replier) + .in_current_span(), + ); // return value of read is proof a reply was sent metrics::counter!("fuse.bytes_read", bytes_sent as u64); } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent))] - async fn opendir(&self, _req: &Request<'_>, parent: InodeNo, flags: i32, reply: ReplyOpen) { - match self.fs.opendir(parent, flags).await { + fn opendir(&self, _req: &Request<'_>, parent: InodeNo, flags: i32, reply: ReplyOpen) { + match block_on(self.fs.opendir(parent, flags).in_current_span()) { Ok(opened) => reply.opened(opened.fh, opened.flags), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, fh=fh, offset=offset))] - async fn readdir( - &self, - _req: &Request<'_>, - parent: InodeNo, - fh: u64, - offset: i64, - mut reply: fuser::ReplyDirectory, - ) { + fn readdir(&self, _req: &Request<'_>, parent: InodeNo, fh: u64, offset: i64, mut reply: fuser::ReplyDirectory) { struct ReplyDirectory<'a> { inner: &'a mut fuser::ReplyDirectory, } @@ -146,14 +142,14 @@ where let replier = ReplyDirectory { inner: &mut reply }; - match self.fs.readdir(parent, fh, offset, replier).await { + match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) { Ok(_) => reply.ok(), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=parent, fh=fh, offset=offset))] - async fn readdirplus( + fn readdirplus( &self, _req: &Request<'_>, parent: InodeNo, @@ -181,14 +177,14 @@ where let replier = ReplyDirectoryPlus { inner: &mut reply }; - match self.fs.readdir(parent, fh, offset, replier).await { + match block_on(self.fs.readdir(parent, fh, offset, replier).in_current_span()) { Ok(_) => reply.ok(), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh))] - async fn release( + fn release( &self, _req: &Request<'_>, ino: InodeNo, @@ -198,14 +194,14 @@ where flush: bool, reply: ReplyEmpty, ) { - match self.fs.release(ino, fh, flags, lock_owner, flush).await { + match block_on(self.fs.release(ino, fh, flags, lock_owner, flush).in_current_span()) { Ok(()) => reply.ok(), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), parent=parent, name=?name))] - async fn mknod( + fn mknod( &self, _req: &Request<'_>, parent: InodeNo, @@ -218,14 +214,14 @@ where // mode_t is u32 on Linux but u16 on macOS, so cast it here let mode = mode as libc::mode_t; - match self.fs.mknod(parent, name, mode, umask, rdev).await { + match block_on(self.fs.mknod(parent, name, mode, umask, rdev).in_current_span()) { Ok(entry) => reply.entry(&entry.ttl, &entry.attr, entry.generation), Err(e) => reply.error(e), } } #[instrument(level="debug", skip_all, fields(req=_req.unique(), ino=ino, fh=fh, offset=offset, length=data.len()))] - async fn write( + fn write( &self, _req: &Request<'_>, ino: InodeNo, @@ -237,11 +233,11 @@ where lock_owner: Option, reply: ReplyWrite, ) { - match self - .fs - .write(ino, fh, offset, data, write_flags, flags, lock_owner) - .await - { + match block_on( + self.fs + .write(ino, fh, offset, data, write_flags, flags, lock_owner) + .in_current_span(), + ) { Ok(bytes_written) => reply.written(bytes_written), Err(e) => reply.error(e), } diff --git a/mountpoint-s3/src/main.rs b/mountpoint-s3/src/main.rs index 20ad6546c..a5cbdf267 100644 --- a/mountpoint-s3/src/main.rs +++ b/mountpoint-s3/src/main.rs @@ -355,12 +355,8 @@ fn mount(args: CliArgs) -> anyhow::Result { let session = Session::new(fs, &args.mount_point, &options).context("Failed to create FUSE session")?; - let session = if let Some(thread_count) = args.thread_count { - BackgroundSession::new_multi_thread(session, thread_count as usize) - } else { - BackgroundSession::new(session) - }; - let session = session.context("Failed to start FUSE session")?; + // TODO correctly handle multi-threading and unmounting + let session = BackgroundSession::new(session).context("Failed to start FUSE session")?; tracing::info!("successfully mounted {:?}", args.mount_point); diff --git a/vendor-fuser.sh b/vendor-fuser.sh index a46f537c9..0090a71e9 100755 --- a/vendor-fuser.sh +++ b/vendor-fuser.sh @@ -13,11 +13,11 @@ fi rm -rf $FUSER_FULL_PATH -git clone --branch fuser/async ssh://git@github.com/awslabs/mountpoint-s3.git $FUSER_FULL_PATH +git clone --branch fuser/fork ssh://git@github.com/awslabs/mountpoint-s3.git $FUSER_FULL_PATH COMMIT=$(git -C $FUSER_FULL_PATH rev-parse --short HEAD) rm -rf $FUSER_FULL_PATH/.git git add $FUSER_FULL_PATH -git commit -m "Update vendored fuser to $COMMIT" \ No newline at end of file +git commit -m "Update vendored fuser to $COMMIT" -s