From e2fe5f1719897c78565e12c748ab08c57a327016 Mon Sep 17 00:00:00 2001 From: Venky Shankar Date: Mon, 2 Jun 2025 05:08:01 +0000 Subject: [PATCH] test/libcephfs: validate asynchronous write and fsync executing concurrently This synthetic reproducer does three things: - setup a client mount with a configuration to delay write operations and initiates a write operation via a thread. - a thread that invokes asynchronous fsync - a thread that invokes setxattr for the client to track early replies Without the fix[0], the test reproduces the following crash: ``` /home/vshankar/ceph/src/client/Client.cc: In function 'void Client::put_request(MetaRequest*)' thread 7f7210ff9640 time 2025-06-03T09:34:45.634974+0000 /home/vshankar/ceph/src/client/Client.cc: 2290: FAILED ceph_assert(request->ref >= 1) ceph version 20.3.0-673-gdd152807f7e (dd152807f7e7f7a82df6cfc0159f5fc65f60ecd5) tentacle (dev - Debug) 1: (ceph::__ceph_assert_fail(char const*, char const*, int, char const*)+0x119) [0x7f72222ebb98] 2: (ceph::__ceph_assert_fail(ceph::assert_data const&)+0x17) [0x7f72222ebedc] 3: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x6a075) [0x7f7222e6a075] 4: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xb8289) [0x7f7222eb8289] 5: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xee951) [0x7f7222eee951] 6: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xf167c) [0x7f7222ef167c] 7: (Context::complete(int)+0x9) [0x7f7222e5949d] 8: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x16a853) [0x7f7222f6a853] 9: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xa7cc5) [0x7f7222ea7cc5] 10: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xf128d) [0x7f7222ef128d] 11: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x16e09d) [0x7f7222f6e09d] 12: (Context::complete(int)+0x9) [0x7f7222e5949d] 13: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x6d108) [0x7f7222e6d108] 14: (Context::complete(int)+0x9) [0x7f7222e5949d] 15: (Finisher::finisher_thread_entry()+0x665) [0x7f722226fdc1] 16: (Finisher::FinisherThread::entry()+0xd) [0x7f7222270ddf] 17: (Thread::entry_wrapper()+0x2f) [0x7f72222b88f5] 18: (Thread::_entry_func(void*)+0x9) [0x7f72222b8907] 19: /lib64/libc.so.6(+0x89e92) [0x7f7221089e92] 20: /lib64/libc.so.6(+0x10ef20) [0x7f722110ef20] [1] 2162689 IOT instruction (core dumped) ./bin/ceph_test_libcephfs --gtest_filter=LibCephFS.ConcurrentWriteAndFsync ``` [0]: https://github.com/ceph/ceph/pull/63619 Fixes: http://tracker.ceph.com/issues/71515 Signed-off-by: Venky Shankar --- src/test/libcephfs/test.cc | 133 +++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/src/test/libcephfs/test.cc b/src/test/libcephfs/test.cc index 48a20609716..38740c5d916 100644 --- a/src/test/libcephfs/test.cc +++ b/src/test/libcephfs/test.cc @@ -4202,6 +4202,7 @@ TEST(LibCephFS, InodeGetPut) { } static bool write_done = false; +static bool fsync_done = false; static bool read_done = false; static std::mutex mtx; static std::condition_variable cond; @@ -4391,3 +4392,135 @@ TEST(LibCephFS, UnmountHangAfterOpenatFilePath) { ASSERT_EQ(0, ceph_release(cmount)); } + +void write_fsync_io_callback(struct ceph_ll_io_info *io_info) { + std::unique_lock lock(mtx); + if (io_info->write) { + std::cout << "written=" << io_info->result << std::endl; + write_done = true; + } else { + std::cout << "fsync" << std::endl; + fsync_done = true; + } + cond.notify_one(); +} + +static void writer_func(struct ceph_mount_info *cmount, Fh *fh) { + int iterations = 2; + uint8_t buf[131072]; + struct ceph_ll_io_info io_info; + struct iovec iov; + + io_info.callback = write_fsync_io_callback; + io_info.iov = &iov; + io_info.iovcnt = 1; + io_info.off = 0; + io_info.fh = fh; + io_info.write = true; + io_info.fsync = false; + + while (--iterations > 0) { + iov.iov_base = buf; + iov.iov_len = sizeof(buf); + io_info.result = 0; + write_done = false; + ASSERT_EQ(ceph_ll_nonblocking_readv_writev(cmount, &io_info), 0); + std::cout << ": waiting for write to finish" << std::endl; + { + std::unique_lock lock(mtx); + cond.wait(lock, []{ + return write_done; + }); + } + std::cout << ": write finished" << std::endl; + ASSERT_EQ(io_info.result, sizeof(buf)); + } +} + +static void fsync_func(struct ceph_mount_info *cmount, Inode *in) { + int iterations = 1000; + struct ceph_ll_io_info io_info; + + io_info.callback = write_fsync_io_callback; + + std::cout << ": fsync thread sleeping" << std::endl; + sleep(3); + std::cout << ": fsync thread wokeup" << std::endl; + + while (--iterations > 0) { + io_info.result = 0; + fsync_done = false; + ASSERT_EQ(ceph_ll_nonblocking_fsync(cmount, in, &io_info), 0); + std::cout << ": waiting for fsync to finish" << std::endl; + { + std::unique_lock lock(mtx); + cond.wait(lock, []{ + return fsync_done; + }); + } + std::cout << ": fsync finished" << std::endl; + } +} + +static void do_unsafe_ops(struct ceph_mount_info *cmount, std::string path) { + int iterations = 200; + + std::cout << ": setxattr thread sleeping" << std::endl; + sleep(2); + std::cout << ": setxattr thread wokeup" << std::endl; + + while (--iterations > 0) { + ASSERT_EQ(0, ceph_setxattr(cmount, path.c_str(), "user.key1", "value1", 6, 0)); + sleep(1); + } +} + +TEST(LibCephFS, ConcurrentWriteAndFsync) { + pid_t mypid = getpid(); + struct ceph_mount_info *cmount; + UserPerm *perms = NULL; + Inode *parent, *inode = NULL; + struct ceph_statx stx = {0}; + struct Fh *fh; + char filename[PATH_MAX]; + + // for now use a single thread for performing write and fsync (each). + // in the future if we need to increase operation concurrency adjust + // @nthreads as required. + const int nthreads = 2; + std::thread unsafe_ops; + std::thread threads[nthreads]; + + sprintf(filename, "/contest_%d", mypid); + + ASSERT_EQ(ceph_create(&cmount, NULL), 0); + ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0); + ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL)); + ASSERT_EQ(0, ceph_conf_set(cmount, "client_oc", "0")); + ASSERT_EQ(0, ceph_conf_set(cmount, "client_inject_write_delay_secs", "10")); + ASSERT_EQ(0, ceph_mount(cmount, NULL)); + + perms = ceph_mount_perms(cmount); + ASSERT_EQ(ceph_ll_lookup_root(cmount, &parent), 0); + + ASSERT_EQ(ceph_ll_create(cmount, parent, filename, 0744, + O_RDWR | O_CREAT | O_EXCL | O_NOFOLLOW, + &inode, &fh, &stx, CEPH_STATX_INO, 0, perms), 0); + + unsafe_ops = std::thread(do_unsafe_ops, cmount, std::string(filename)); + + for (int i = 0; i < nthreads/2; ++i) { + threads[i] = std::thread(writer_func, cmount, fh); + } + for (int i = 1; i < nthreads; ++i) { + threads[i] = std::thread(fsync_func, cmount, inode); + } + + for (int i = 0; i < nthreads; ++i) { + threads[i].join(); + } + unsafe_ops.join(); + + ASSERT_EQ(0, ceph_unmount(cmount)); + ceph_shutdown(cmount); +} -- 2.39.5