]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
test/libcephfs: validate asynchronous write and fsync executing concurrently
authorVenky Shankar <vshankar@redhat.com>
Mon, 2 Jun 2025 05:08:01 +0000 (05:08 +0000)
committerVenky Shankar <vshankar@redhat.com>
Tue, 9 Sep 2025 04:38:59 +0000 (04:38 +0000)
This synthetic reproducer does three things:

- setup a client mount with a configuration to delay write operations and
  initiates a write operation via a thread.
- a thread that invokes asynchronous fsync
- a thread that invokes setxattr for the client to track early replies

Without the fix[0], the test reproduces the following crash:

```
/home/vshankar/ceph/src/client/Client.cc: In function 'void Client::put_request(MetaRequest*)' thread 7f7210ff9640 time 2025-06-03T09:34:45.634974+0000
/home/vshankar/ceph/src/client/Client.cc: 2290: FAILED ceph_assert(request->ref >= 1)
 ceph version 20.3.0-673-gdd152807f7e (dd152807f7e7f7a82df6cfc0159f5fc65f60ecd5) tentacle (dev - Debug)
 1: (ceph::__ceph_assert_fail(char const*, char const*, int, char const*)+0x119) [0x7f72222ebb98]
 2: (ceph::__ceph_assert_fail(ceph::assert_data const&)+0x17) [0x7f72222ebedc]
 3: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x6a075) [0x7f7222e6a075]
 4: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xb8289) [0x7f7222eb8289]
 5: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xee951) [0x7f7222eee951]
 6: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xf167c) [0x7f7222ef167c]
 7: (Context::complete(int)+0x9) [0x7f7222e5949d]
 8: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x16a853) [0x7f7222f6a853]
 9: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xa7cc5) [0x7f7222ea7cc5]
 10: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0xf128d) [0x7f7222ef128d]
 11: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x16e09d) [0x7f7222f6e09d]
 12: (Context::complete(int)+0x9) [0x7f7222e5949d]
 13: /home/vshankar/ceph/build/lib/libcephfs.so.2(+0x6d108) [0x7f7222e6d108]
 14: (Context::complete(int)+0x9) [0x7f7222e5949d]
 15: (Finisher::finisher_thread_entry()+0x665) [0x7f722226fdc1]
 16: (Finisher::FinisherThread::entry()+0xd) [0x7f7222270ddf]
 17: (Thread::entry_wrapper()+0x2f) [0x7f72222b88f5]
 18: (Thread::_entry_func(void*)+0x9) [0x7f72222b8907]
 19: /lib64/libc.so.6(+0x89e92) [0x7f7221089e92]
 20: /lib64/libc.so.6(+0x10ef20) [0x7f722110ef20]
[1]    2162689 IOT instruction (core dumped)  ./bin/ceph_test_libcephfs --gtest_filter=LibCephFS.ConcurrentWriteAndFsync
```

[0]: https://github.com/ceph/ceph/pull/63619

Fixes: http://tracker.ceph.com/issues/71515
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/test/libcephfs/test.cc

index 48a2060971648099a06561a07c704cec75515f77..38740c5d916c7b7c0d389ee29ebc9bea9a257355 100644 (file)
@@ -4202,6 +4202,7 @@ TEST(LibCephFS, InodeGetPut) {
 }
 
 static bool write_done = false;
+static bool fsync_done = false;
 static bool read_done = false;
 static std::mutex mtx;
 static std::condition_variable cond;
@@ -4391,3 +4392,135 @@ TEST(LibCephFS, UnmountHangAfterOpenatFilePath) {
 
   ASSERT_EQ(0, ceph_release(cmount));
 }
+
+void write_fsync_io_callback(struct ceph_ll_io_info *io_info) {
+  std::unique_lock lock(mtx);
+  if (io_info->write) {
+    std::cout << "written=" << io_info->result << std::endl;
+    write_done = true;
+  } else {
+    std::cout << "fsync" << std::endl;
+    fsync_done = true;
+  }
+  cond.notify_one();
+}
+
+static void writer_func(struct ceph_mount_info *cmount, Fh *fh) {
+  int iterations = 2;
+  uint8_t buf[131072];
+  struct ceph_ll_io_info io_info;
+  struct iovec iov;
+
+  io_info.callback = write_fsync_io_callback;
+  io_info.iov = &iov;
+  io_info.iovcnt = 1;
+  io_info.off = 0;
+  io_info.fh = fh;
+  io_info.write = true;
+  io_info.fsync = false;
+
+  while (--iterations > 0) {
+    iov.iov_base = buf;
+    iov.iov_len = sizeof(buf);
+    io_info.result = 0;
+    write_done = false;
+    ASSERT_EQ(ceph_ll_nonblocking_readv_writev(cmount, &io_info), 0);
+    std::cout << ": waiting for write to finish" << std::endl;
+    {
+      std::unique_lock lock(mtx);
+      cond.wait(lock, []{
+        return write_done;
+      });
+    }
+    std::cout << ": write finished" << std::endl;
+    ASSERT_EQ(io_info.result, sizeof(buf));
+  }
+}
+
+static void fsync_func(struct ceph_mount_info *cmount, Inode *in) {
+  int iterations = 1000;
+  struct ceph_ll_io_info io_info;
+
+  io_info.callback = write_fsync_io_callback;
+
+  std::cout << ": fsync thread sleeping" << std::endl;
+  sleep(3);
+  std::cout << ": fsync thread wokeup" << std::endl;
+
+  while (--iterations > 0) {
+    io_info.result = 0;
+    fsync_done = false;
+    ASSERT_EQ(ceph_ll_nonblocking_fsync(cmount, in, &io_info), 0);
+    std::cout << ": waiting for fsync to finish" << std::endl;
+    {
+      std::unique_lock lock(mtx);
+      cond.wait(lock, []{
+        return fsync_done;
+      });
+    }
+    std::cout << ": fsync finished" << std::endl;
+  }
+}
+
+static void do_unsafe_ops(struct ceph_mount_info *cmount, std::string path) {
+  int iterations = 200;
+
+  std::cout << ": setxattr thread sleeping" << std::endl;
+  sleep(2);
+  std::cout << ": setxattr thread wokeup" << std::endl;
+
+  while (--iterations > 0) {
+    ASSERT_EQ(0, ceph_setxattr(cmount, path.c_str(), "user.key1", "value1", 6, 0));
+    sleep(1);
+  }
+}
+
+TEST(LibCephFS, ConcurrentWriteAndFsync) {
+  pid_t mypid = getpid();
+  struct ceph_mount_info *cmount;
+  UserPerm *perms = NULL;
+  Inode *parent, *inode = NULL;
+  struct ceph_statx stx = {0};
+  struct Fh *fh;
+  char filename[PATH_MAX];
+
+  // for now use a single thread for performing write and fsync (each).
+  // in the future if we need to increase operation concurrency adjust
+  // @nthreads as required.
+  const int nthreads = 2;
+  std::thread unsafe_ops;
+  std::thread threads[nthreads];
+
+  sprintf(filename, "/contest_%d", mypid);
+
+  ASSERT_EQ(ceph_create(&cmount, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
+  ASSERT_EQ(0, ceph_conf_set(cmount, "client_oc", "0"));
+  ASSERT_EQ(0, ceph_conf_set(cmount, "client_inject_write_delay_secs", "10"));
+  ASSERT_EQ(0, ceph_mount(cmount, NULL));
+
+  perms = ceph_mount_perms(cmount);
+  ASSERT_EQ(ceph_ll_lookup_root(cmount, &parent), 0);
+
+  ASSERT_EQ(ceph_ll_create(cmount, parent, filename, 0744,
+                           O_RDWR | O_CREAT | O_EXCL | O_NOFOLLOW,
+                           &inode, &fh, &stx, CEPH_STATX_INO, 0, perms), 0);
+
+  unsafe_ops = std::thread(do_unsafe_ops, cmount, std::string(filename));
+
+  for (int i = 0; i < nthreads/2; ++i) {
+    threads[i] = std::thread(writer_func, cmount, fh);
+  }
+  for (int i = 1; i < nthreads; ++i) {
+    threads[i] = std::thread(fsync_func, cmount, inode);
+  }
+
+  for (int i = 0; i < nthreads; ++i) {
+    threads[i].join();
+  }
+  unsafe_ops.join();
+
+  ASSERT_EQ(0, ceph_unmount(cmount));
+  ceph_shutdown(cmount);
+}