}
static bool write_done = false;
+static bool fsync_done = false;
static bool read_done = false;
static std::mutex mtx;
static std::condition_variable cond;
ASSERT_EQ(0, ceph_release(cmount));
}
+
+void write_fsync_io_callback(struct ceph_ll_io_info *io_info) {
+ std::unique_lock lock(mtx);
+ if (io_info->write) {
+ std::cout << "written=" << io_info->result << std::endl;
+ write_done = true;
+ } else {
+ std::cout << "fsync" << std::endl;
+ fsync_done = true;
+ }
+ cond.notify_one();
+}
+
+static void writer_func(struct ceph_mount_info *cmount, Fh *fh) {
+ int iterations = 2;
+ uint8_t buf[131072];
+ struct ceph_ll_io_info io_info;
+ struct iovec iov;
+
+ io_info.callback = write_fsync_io_callback;
+ io_info.iov = &iov;
+ io_info.iovcnt = 1;
+ io_info.off = 0;
+ io_info.fh = fh;
+ io_info.write = true;
+ io_info.fsync = false;
+
+ while (--iterations > 0) {
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+ io_info.result = 0;
+ write_done = false;
+ ASSERT_EQ(ceph_ll_nonblocking_readv_writev(cmount, &io_info), 0);
+ std::cout << ": waiting for write to finish" << std::endl;
+ {
+ std::unique_lock lock(mtx);
+ cond.wait(lock, []{
+ return write_done;
+ });
+ }
+ std::cout << ": write finished" << std::endl;
+ ASSERT_EQ(io_info.result, sizeof(buf));
+ }
+}
+
+static void fsync_func(struct ceph_mount_info *cmount, Inode *in) {
+ int iterations = 1000;
+ struct ceph_ll_io_info io_info;
+
+ io_info.callback = write_fsync_io_callback;
+
+ std::cout << ": fsync thread sleeping" << std::endl;
+ sleep(3);
+ std::cout << ": fsync thread wokeup" << std::endl;
+
+ while (--iterations > 0) {
+ io_info.result = 0;
+ fsync_done = false;
+ ASSERT_EQ(ceph_ll_nonblocking_fsync(cmount, in, &io_info), 0);
+ std::cout << ": waiting for fsync to finish" << std::endl;
+ {
+ std::unique_lock lock(mtx);
+ cond.wait(lock, []{
+ return fsync_done;
+ });
+ }
+ std::cout << ": fsync finished" << std::endl;
+ }
+}
+
+static void do_unsafe_ops(struct ceph_mount_info *cmount, std::string path) {
+ int iterations = 200;
+
+ std::cout << ": setxattr thread sleeping" << std::endl;
+ sleep(2);
+ std::cout << ": setxattr thread wokeup" << std::endl;
+
+ while (--iterations > 0) {
+ ASSERT_EQ(0, ceph_setxattr(cmount, path.c_str(), "user.key1", "value1", 6, 0));
+ sleep(1);
+ }
+}
+
+TEST(LibCephFS, ConcurrentWriteAndFsync) {
+ pid_t mypid = getpid();
+ struct ceph_mount_info *cmount;
+ UserPerm *perms = NULL;
+ Inode *parent, *inode = NULL;
+ struct ceph_statx stx = {0};
+ struct Fh *fh;
+ char filename[PATH_MAX];
+
+ // for now use a single thread for performing write and fsync (each).
+ // in the future if we need to increase operation concurrency adjust
+ // @nthreads as required.
+ const int nthreads = 2;
+ std::thread unsafe_ops;
+ std::thread threads[nthreads];
+
+ sprintf(filename, "/contest_%d", mypid);
+
+ ASSERT_EQ(ceph_create(&cmount, NULL), 0);
+ ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0);
+ ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
+ ASSERT_EQ(0, ceph_conf_set(cmount, "client_oc", "0"));
+ ASSERT_EQ(0, ceph_conf_set(cmount, "client_inject_write_delay_secs", "10"));
+ ASSERT_EQ(0, ceph_mount(cmount, NULL));
+
+ perms = ceph_mount_perms(cmount);
+ ASSERT_EQ(ceph_ll_lookup_root(cmount, &parent), 0);
+
+ ASSERT_EQ(ceph_ll_create(cmount, parent, filename, 0744,
+ O_RDWR | O_CREAT | O_EXCL | O_NOFOLLOW,
+ &inode, &fh, &stx, CEPH_STATX_INO, 0, perms), 0);
+
+ unsafe_ops = std::thread(do_unsafe_ops, cmount, std::string(filename));
+
+ for (int i = 0; i < nthreads/2; ++i) {
+ threads[i] = std::thread(writer_func, cmount, fh);
+ }
+ for (int i = 1; i < nthreads; ++i) {
+ threads[i] = std::thread(fsync_func, cmount, inode);
+ }
+
+ for (int i = 0; i < nthreads; ++i) {
+ threads[i].join();
+ }
+ unsafe_ops.join();
+
+ ASSERT_EQ(0, ceph_unmount(cmount));
+ ceph_shutdown(cmount);
+}