}
#endif
+
+TEST(LibCephFS, CreateParallel)
+{
+ char fname[4096] = "dir/file_parallel";
+ std::mutex lock; // for memory barrier
+ std::array<int, 256> fds;
+ std::array<std::thread, fds.size()> threads;
+ std::fill(fds.begin(), fds.end(), -1);
+
+ struct ceph_mount_info *cmount;
+ ASSERT_EQ(0, ceph_create(&cmount, NULL));
+ ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL));
+ ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
+ ASSERT_EQ(0, ceph_mount(cmount, "/"));
+
+ ASSERT_EQ(0, ceph_mkdirs(cmount, "dir/dummy", 0777));
+
+ ASSERT_EQ(0, ceph_unmount(cmount));
+ ASSERT_EQ(0, ceph_mount(cmount, "/"));
+
+ {
+ struct stat buf;
+ ASSERT_EQ(0, ceph_stat(cmount, "dir", &buf));
+ }
+
+ for (size_t i = 0; i < fds.size(); ++i) {
+ auto l = [cmount,&fname,&fds,&lock](int i) {
+ int fd = ceph_open(cmount, fname, O_CREAT|O_WRONLY, 0777);
+ std::lock_guard locker(lock);
+ fds[i] = fd;
+ };
+ threads[i] = std::thread(std::move(l), i);
+ }
+
+ for (size_t i = 0; i < fds.size(); ++i) {
+ threads[i].join();
+ }
+ std::lock_guard locker(lock);
+ for (size_t i = 0; i < fds.size(); ++i) {
+ ASSERT_GE(fds[i], 0);
+ }
+
+ ASSERT_EQ(0, ceph_unlink(cmount, fname));
+ ASSERT_EQ(0, ceph_rmdir(cmount, "dir/dummy"));
+ ASSERT_EQ(0, ceph_rmdir(cmount, "dir"));
+
+ ceph_shutdown(cmount);
+}
+
+TEST(LibCephFS, CreateExclParallel)
+{
+ char fname[4096] = "dir/file_parallel";
+ std::mutex lock; // for memory barrier
+ std::array<int, 256> fds;
+ std::array<std::thread, fds.size()> threads;
+ std::fill(fds.begin(), fds.end(), -1);
+
+ struct ceph_mount_info *cmount;
+ ASSERT_EQ(0, ceph_create(&cmount, NULL));
+ ASSERT_EQ(0, ceph_conf_read_file(cmount, NULL));
+ ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
+ ASSERT_EQ(0, ceph_mount(cmount, "/"));
+
+ ASSERT_EQ(0, ceph_mkdirs(cmount, "dir/dummy", 0777));
+
+ ASSERT_EQ(0, ceph_unmount(cmount));
+ ASSERT_EQ(0, ceph_mount(cmount, "/"));
+
+ {
+ struct stat buf;
+ ASSERT_EQ(0, ceph_stat(cmount, "dir", &buf));
+ }
+
+ for (size_t i = 0; i < fds.size(); ++i) {
+ auto l = [cmount,&fname,&fds,&lock](int i) {
+ int fd = ceph_open(cmount, fname, O_CREAT|O_EXCL|O_WRONLY, 0777);
+ std::lock_guard locker(lock);
+ fds[i] = fd;
+ };
+ threads[i] = std::thread(std::move(l), i);
+ }
+
+ for (size_t i = 0; i < fds.size(); ++i) {
+ threads[i].join();
+ }
+ std::lock_guard locker(lock);
+ int found = -1;
+ for (size_t i = 0; i < fds.size(); ++i) {
+ if (fds[i] >= 0) {
+ ASSERT_EQ(found, -1);
+ found = i;
+ } else {
+ ASSERT_EQ(fds[i], -EEXIST);
+ }
+ }
+
+ ASSERT_EQ(0, ceph_unlink(cmount, fname));
+ ASSERT_EQ(0, ceph_rmdir(cmount, "dir/dummy"));
+ ASSERT_EQ(0, ceph_rmdir(cmount, "dir"));
+
+ ceph_shutdown(cmount);
+}
+
static void get_current_time_utimbuf(struct utimbuf *utb)
{
utime_t t = ceph_clock_now();