From 8e5688dc45f5a76d145d03a859ad0160a12e43c8 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Fri, 27 Oct 2023 10:02:35 -0500 Subject: [PATCH] Allow repeated readdir offsets (#581) * Allow repeated readdir offsets POSIX allows seeking an open directory handle, which in FUSE means the `offset` can be any offset we've previously returned. This is pretty annoying for us to implement since we're streaming directory entries from S3 with ListObjects, which can't resume from an arbitrary index, and can't fit its continuation tokens into a 64-bit offset anyway. So we're probably never going to truly support seeking a directory handle. But there's a special case we've seen come up a couple of times (#477, #520): some applications read one page of directory entries and then seek back to 0 and do it again. I don't fully understand _why_ they do this, but it's common enough that it's worth special casing. This change makes open directory handles remember their most recent response so that they can repeat it if asked for the same offset again. It's not too complicated other than needing to make sure we do readdirplus correctly (managing the lookup counts for entries that are being returned a second time). I've tested this by running the PHP example from #477, which now works. Signed-off-by: James Bornholt * PR feedback Signed-off-by: James Bornholt * Changelog and docs Signed-off-by: James Bornholt --------- Signed-off-by: James Bornholt --- doc/SEMANTICS.md | 2 +- mountpoint-s3/CHANGELOG.md | 6 ++ mountpoint-s3/src/fs.rs | 128 +++++++++++++++++++++++------- mountpoint-s3/src/fuse.rs | 41 ++++------ mountpoint-s3/src/inode.rs | 3 +- mountpoint-s3/tests/common/mod.rs | 29 +------ mountpoint-s3/tests/fs.rs | 93 +++++++++++++++++++++- 7 files changed, 219 insertions(+), 83 deletions(-) diff --git a/doc/SEMANTICS.md b/doc/SEMANTICS.md index 490e72b6c..6a2e7d583 100644 --- a/doc/SEMANTICS.md +++ b/doc/SEMANTICS.md @@ -168,7 +168,7 @@ the following behavior: ### Directory operations -Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. +Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. However, seeking (`lseek`) on directory handles is not supported. Creating directories (`mkdir`) is supported, with the following behavior: diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index b338e0d8f..91b7e663f 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,5 +1,11 @@ ## Unreleased +### Breaking changes +* No breaking changes. + +### Other changes +* Some applications that read directory entries out of order (for example, [PHP](https://github.com/awslabs/mountpoint-s3/issues/477)) will now work correctly. ([#581](https://github.com/awslabs/mountpoint-s3/pull/581)) + ## v1.1.0 (October 23, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index f2ac0da5b..0cfbea909 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -3,7 +3,7 @@ use futures::task::Spawn; use nix::unistd::{getgid, getuid}; use std::collections::HashMap; -use std::ffi::OsStr; +use std::ffi::{OsStr, OsString}; use std::str::FromStr; use std::time::{Duration, UNIX_EPOCH}; use time::OffsetDateTime; @@ -35,6 +35,7 @@ struct DirHandle { ino: InodeNo, handle: ReaddirHandle, offset: AtomicI64, + last_response: AsyncMutex)>>, } impl DirHandle { @@ -402,15 +403,18 @@ pub struct Opened { pub trait DirectoryReplier { /// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not /// added. - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - generation: u64, - ttl: Duration, - ) -> bool; + fn add(&mut self, entry: DirectoryEntry) -> bool; +} + +#[derive(Debug, Clone)] +pub struct DirectoryEntry { + pub ino: u64, + pub offset: i64, + pub name: OsString, + pub attr: FileAttr, + pub generation: u64, + pub ttl: Duration, + lookup: LookedUp, } /// Reply to a `read` call. This is funky because we want the reply to happen with only a borrow of @@ -717,6 +721,7 @@ where ino: parent, handle: inode_handle, offset: AtomicI64::new(0), + last_response: AsyncMutex::new(None), }; let mut dir_handles = self.dir_handles.write().await; @@ -764,19 +769,77 @@ where }; if offset != dir_handle.offset() { + // POSIX allows seeking an open directory. That's a pain for us since we are streaming + // the directory entries and don't want to keep them all in memory. But one common case + // we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that + // request offset 0 twice in a row. So we remember the last response and, if repeated, + // we return it again. + let last_response = dir_handle.last_response.lock().await; + if let Some((last_offset, entries)) = last_response.as_ref() { + if offset == *last_offset { + trace!(offset, "repeating readdir response"); + for entry in entries { + if reply.add(entry.clone()) { + break; + } + // We are returning this result a second time, so the contract is that we + // must remember it again, except that readdirplus specifies that . and .. + // are never incremented. + if is_readdirplus && entry.name != "." && entry.name != ".." { + dir_handle.handle.remember(&entry.lookup); + } + } + return Ok(reply); + } + } return Err(err!( libc::EINVAL, - "offset mismatch, expected={}, actual={}", + "out-of-order readdir, expected={}, actual={}", dir_handle.offset(), offset )); } + /// Wrap a replier to duplicate the entries and store them in `dir_handle.last_response` so + /// we can re-use them if the directory handle rewinds + struct Reply { + reply: R, + entries: Vec, + } + + impl Reply { + async fn finish(self, offset: i64, dir_handle: &DirHandle) -> R { + *dir_handle.last_response.lock().await = Some((offset, self.entries)); + self.reply + } + } + + impl DirectoryReplier for Reply { + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.reply.add(entry.clone()); + if !result { + self.entries.push(entry); + } + result + } + } + + let mut reply = Reply { reply, entries: vec![] }; + if dir_handle.offset() < 1 { let lookup = self.superblock.getattr(&self.client, parent, false).await?; let attr = self.make_attr(&lookup); - if reply.add(parent, dir_handle.offset() + 1, ".", attr, 0u64, lookup.validity()) { - return Ok(reply); + let entry = DirectoryEntry { + ino: parent, + offset: dir_handle.offset() + 1, + name: ".".into(), + attr, + generation: 0, + ttl: lookup.validity(), + lookup, + }; + if reply.add(entry) { + return Ok(reply.finish(offset, &dir_handle).await); } dir_handle.next_offset(); } @@ -786,36 +849,41 @@ where .getattr(&self.client, dir_handle.handle.parent(), false) .await?; let attr = self.make_attr(&lookup); - if reply.add( - dir_handle.handle.parent(), - dir_handle.offset() + 1, - "..", + let entry = DirectoryEntry { + ino: dir_handle.handle.parent(), + offset: dir_handle.offset() + 1, + name: "..".into(), attr, - 0u64, - lookup.validity(), - ) { - return Ok(reply); + generation: 0, + ttl: lookup.validity(), + lookup, + }; + if reply.add(entry) { + return Ok(reply.finish(offset, &dir_handle).await); } dir_handle.next_offset(); } loop { let next = match dir_handle.handle.next(&self.client).await? { - None => return Ok(reply), + None => return Ok(reply.finish(offset, &dir_handle).await), Some(next) => next, }; let attr = self.make_attr(&next); - if reply.add( - attr.ino, - dir_handle.offset() + 1, - next.inode.name(), + let entry = DirectoryEntry { + ino: attr.ino, + offset: dir_handle.offset() + 1, + name: next.inode.name().into(), attr, - 0u64, - next.validity(), - ) { + generation: 0, + ttl: next.validity(), + lookup: next.clone(), + }; + + if reply.add(entry) { dir_handle.handle.readd(next); - return Ok(reply); + return Ok(reply.finish(offset, &dir_handle).await); } if is_readdirplus { dir_handle.handle.remember(&next); diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 9d21482a4..f65fa6ab5 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -4,17 +4,19 @@ use futures::executor::block_on; use futures::task::Spawn; use std::ffi::OsStr; use std::path::Path; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use time::OffsetDateTime; use tracing::{instrument, Instrument}; -use crate::fs::{self, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno}; +use crate::fs::{ + self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno, +}; use crate::prefix::Prefix; #[cfg(target_os = "macos")] use fuser::ReplyXTimes; use fuser::{ - FileAttr, Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, - ReplyIoctl, ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, + Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl, + ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, }; use mountpoint_s3_client::ObjectClient; @@ -168,16 +170,8 @@ where } impl<'a> DirectoryReplier for ReplyDirectory<'a> { - fn add>( - &mut self, - ino: InodeNo, - offset: i64, - name: T, - attr: FileAttr, - _generation: u64, - _ttl: Duration, - ) -> bool { - let result = self.inner.add(ino, offset, attr.kind, name); + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.inner.add(entry.ino, entry.offset, entry.attr.kind, entry.name); if !result { *self.count += 1; } @@ -215,16 +209,15 @@ where } impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> { - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - generation: u64, - ttl: Duration, - ) -> bool { - let result = self.inner.add(ino, offset, name, &ttl, &attr, generation); + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.inner.add( + entry.ino, + entry.offset, + entry.name, + &entry.ttl, + &entry.attr, + entry.generation, + ); if !result { *self.count += 1; } diff --git a/mountpoint-s3/src/inode.rs b/mountpoint-s3/src/inode.rs index 53a381388..ae8d7e745 100644 --- a/mountpoint-s3/src/inode.rs +++ b/mountpoint-s3/src/inode.rs @@ -1167,7 +1167,8 @@ impl Inode { pub fn dec_lookup_count(&self, n: u64) -> u64 { let mut state = self.inner.sync.write().unwrap(); let lookup_count = &mut state.lookup_count; - *lookup_count -= n; + debug_assert!(n <= *lookup_count, "lookup count cannot go negative"); + *lookup_count = lookup_count.saturating_sub(n); trace!(new_lookup_count = lookup_count, "decremented lookup count"); *lookup_count } diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 72b98fcd2..611bddfb3 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -2,7 +2,7 @@ use aws_sdk_s3::config::Region; use aws_sdk_s3::primitives::ByteStream; use fuser::{FileAttr, FileType}; use futures::executor::ThreadPool; -use mountpoint_s3::fs::{self, DirectoryReplier, ReadReplier, ToErrno}; +use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno}; use mountpoint_s3::prefix::Prefix; use mountpoint_s3::{S3Filesystem, S3FilesystemConfig}; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; @@ -11,10 +11,8 @@ use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; use rand::rngs::OsRng; use rand::RngCore; use std::collections::VecDeque; -use std::ffi::{OsStr, OsString}; use std::future::Future; use std::sync::Arc; -use std::time::Duration; pub fn make_test_filesystem( bucket: &str, @@ -105,14 +103,6 @@ pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u3 assert_eq!(attr.perm, perm); } -#[derive(Debug, Clone)] -pub struct DirectoryEntry { - pub ino: u64, - pub offset: i64, - pub attr: FileAttr, - pub name: OsString, -} - #[derive(Debug, Default)] pub struct DirectoryReply { readdir_limit: usize, @@ -120,24 +110,11 @@ pub struct DirectoryReply { } impl DirectoryReplier for &mut DirectoryReply { - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - _generation: u64, - _ttl: Duration, - ) -> bool { + fn add(&mut self, entry: DirectoryEntry) -> bool { if self.readdir_limit > 0 && !self.entries.is_empty() && self.entries.len() % self.readdir_limit == 0 { true } else { - self.entries.push_back(DirectoryEntry { - ino, - offset, - attr, - name: name.as_ref().to_os_string(), - }); + self.entries.push_back(entry); false } } diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index 307157dbf..8345d756e 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime}; use test_case::test_case; mod common; -use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, ReadReply}; +use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply, ReadReply}; #[test_case(""; "unprefixed")] #[test_case("test_prefix/"; "prefixed")] @@ -1232,3 +1232,94 @@ async fn test_flexible_retrieval_objects() { } } } + +#[tokio::test] +async fn test_readdir_rewind() { + let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default()); + + for i in 0..10 { + client.add_object(&format!("foo{i}"), b"foo".into()); + } + + let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; + + let mut reply = DirectoryReply::new(5); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut reply) + .await + .unwrap(); + let entries = reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries.len(), 5); + + // Trying to read out of order should fail (only the previous or next offsets are valid) + assert!(reply.entries.back().unwrap().offset > 1); + fs.readdirplus(FUSE_ROOT_INODE, dir_handle, 1, &mut Default::default()) + .await + .expect_err("out of order"); + + // Requesting the same buffer size should work fine + let mut new_reply = DirectoryReply::new(5); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries, new_entries); + + // Requesting a smaller buffer works fine and returns a prefix + let mut new_reply = DirectoryReply::new(3); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(&entries[..3], new_entries); + + // Requesting a larger buffer works fine, but only partially fills (which is allowed) + let mut new_reply = DirectoryReply::new(10); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries, new_entries); + + // And we can resume the stream from the end of the first request + let mut next_page = DirectoryReply::new(0); + let _ = fs + .readdirplus( + FUSE_ROOT_INODE, + dir_handle, + reply.entries.back().unwrap().offset, + &mut next_page, + ) + .await + .unwrap(); + assert_eq!(next_page.entries.len(), 7); // 10 directory entries + . + .. = 12, minus the 5 we already saw + assert_eq!(next_page.entries.front().unwrap().name, "foo3"); + + for entry in reply.entries { + // We know we're in the root dir, so the . and .. entries will both be FUSE_ROOT_INODE + if entry.ino != FUSE_ROOT_INODE { + // Each inode in this list should be remembered twice since we did two `readdirplus`es. + // Forget will panic if this makes the lookup count underflow. + fs.forget(entry.ino, 2).await; + } + } +}