ASSERT_EQ(0, ceph_unmount(cmount));
ceph_shutdown(cmount);
}
+
+static bool write_done = false;
+static bool read_done = false;
+static std::mutex mtx;
+static std::condition_variable cond;
+
+void io_callback(struct ceph_ll_io_info *io_info) {
+ std::unique_lock lock(mtx);
+ if (io_info->write) {
+ std::cout << "written=" << io_info->result << std::endl;
+ write_done = true;
+ } else {
+ std::cout << "read=" << io_info->result << std::endl;
+ read_done = true;
+ }
+ cond.notify_one();
+}
+
+TEST(LibCephFS, AsyncReadAndWriteMultiClient) {
+ pid_t mypid = getpid();
+ struct ceph_mount_info *w_cmount, *r_cmount;
+ UserPerm *w_perms, *r_perms = NULL;
+ Inode *w_parent, *w_inode, *r_parent, *r_inode = NULL;
+ struct ceph_statx stx = {0};
+ struct ceph_ll_io_info io_info;
+ struct iovec iov;
+ struct Fh *w_fh, *r_fh;
+ uint8_t buf[131072];
+ char filename[PATH_MAX];
+
+ sprintf(filename, "/nonblock_test_%d", mypid);
+
+ ASSERT_EQ(ceph_create(&w_cmount, NULL), 0);
+ ASSERT_EQ(ceph_conf_read_file(w_cmount, NULL), 0);
+ ASSERT_EQ(0, ceph_conf_parse_env(w_cmount, NULL));
+ ASSERT_EQ(0, ceph_mount(w_cmount, NULL));
+
+ ASSERT_EQ(ceph_create(&r_cmount, NULL), 0);
+ ASSERT_EQ(ceph_conf_read_file(r_cmount, NULL), 0);
+ ASSERT_EQ(0, ceph_conf_parse_env(r_cmount, NULL));
+ ASSERT_EQ(0, ceph_mount(r_cmount, NULL));
+
+ w_perms = ceph_mount_perms(w_cmount);
+ ASSERT_EQ(ceph_ll_lookup_root(w_cmount, &w_parent), 0);
+
+ r_perms = ceph_mount_perms(r_cmount);
+ ASSERT_EQ(ceph_ll_lookup_root(r_cmount, &r_parent), 0);
+
+ ASSERT_EQ(ceph_ll_create(w_cmount, w_parent, filename, 0744,
+ O_RDWR | O_CREAT | O_EXCL | O_NOFOLLOW,
+ &w_inode, &w_fh, &stx, CEPH_STATX_INO, 0, w_perms), 0);
+
+ ASSERT_EQ(ceph_ll_lookup(r_cmount, r_parent, filename, &r_inode,
+ &stx, CEPH_STATX_INO, 0,r_perms), 0);
+
+ ASSERT_EQ(ceph_ll_open(r_cmount, r_inode, O_RDONLY | O_NOFOLLOW,
+ &r_fh, r_perms), 0);
+
+ iov.iov_base = buf;
+ iov.iov_len = sizeof(buf);
+
+ io_info.callback = io_callback;
+ io_info.iov = &iov;
+ io_info.iovcnt = 1;
+ io_info.off = 0;
+ io_info.result = 0;
+
+ io_info.fh = w_fh;
+ io_info.write = true;
+
+ ASSERT_EQ(ceph_ll_nonblocking_readv_writev(w_cmount, &io_info), 0);
+
+ std::cout << ": waiting for write to finish" << std::endl;
+ {
+ std::unique_lock lock(mtx);
+ cond.wait(lock, []{
+ return write_done;
+ });
+ }
+ std::cout << ": write finished" << std::endl;
+ ASSERT_EQ(io_info.result, sizeof(buf));
+
+ io_info.fh = r_fh;
+ io_info.write = false;
+ io_info.result = 0;
+
+ ASSERT_EQ(ceph_ll_nonblocking_readv_writev(r_cmount, &io_info), 0);
+
+ std::cout << ": waiting for read to finish" << std::endl;
+ {
+ std::unique_lock lock(mtx);
+ cond.wait(lock, []{
+ return read_done;
+ });
+ }
+ std::cout << ": read finished" << std::endl;
+ ASSERT_EQ(io_info.result, sizeof(buf));
+
+ ASSERT_EQ(0, ceph_unmount(w_cmount));
+ ASSERT_EQ(0, ceph_unmount(r_cmount));
+ ceph_shutdown(w_cmount);
+ ceph_shutdown(r_cmount);
+}