From ce244ca940e0bee24d0eb52affb3d4bc9054a854 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 19 Mar 2015 17:34:47 +0800 Subject: [PATCH] test/libcephfs/flock.cc: fix synchronization points In the threesome test cases, the main thread/process call PING_WORKER() twice at each synchronization point. It's possible that one worker gets both notifications and takes two steps forward, another worker gets no notification. The fix is use seperate semaphores for odd/even number synchronization points. Signed-off-by: Yan, Zheng --- src/test/libcephfs/flock.cc | 203 ++++++++++++++++++------------------ 1 file changed, 103 insertions(+), 100 deletions(-) diff --git a/src/test/libcephfs/flock.cc b/src/test/libcephfs/flock.cc index b1b95464daa4d..d92a8429dd2e8 100644 --- a/src/test/libcephfs/flock.cc +++ b/src/test/libcephfs/flock.cc @@ -12,6 +12,7 @@ * */ +#include #include "gtest/gtest.h" #ifndef GTEST_IS_THREADSAFE #error "!GTEST_IS_THREADSAFE" @@ -27,7 +28,6 @@ #include #include -#include #include #include #include @@ -125,24 +125,36 @@ TEST(LibCephFS, BasicLocking) { struct str_ConcurrentLocking { const char *file; struct ceph_mount_info *cmount; // !NULL if shared - sem_t sem; - sem_t semReply; + sem_t sem[2]; + sem_t semReply[2]; + void sem_init(int pshared) { + ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0)); + ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0)); + ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0)); + ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0)); + } + void sem_destroy() { + ASSERT_EQ(0, ::sem_destroy(&sem[0])); + ASSERT_EQ(0, ::sem_destroy(&sem[1])); + ASSERT_EQ(0, ::sem_destroy(&semReply[0])); + ASSERT_EQ(0, ::sem_destroy(&semReply[1])); + } }; // Wakeup main (for (N) steps) -#define PING_MAIN() ASSERT_EQ(0, sem_post(&s.sem)) +#define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2])) // Wait for main to wake us up (for (RN) steps) -#define WAIT_MAIN() \ - ASSERT_EQ(0, sem_timedwait(&s.semReply, abstime(ts, waitSlowMs))) +#define WAIT_MAIN(n) \ + ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs))) // Wakeup worker (for (RN) steps) -#define PING_WORKER() ASSERT_EQ(0, sem_post(&s.semReply)) +#define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2])) // Wait for worker to wake us up (for (N) steps) -#define WAIT_WORKER() \ - ASSERT_EQ(0, sem_timedwait(&s.sem, abstime(ts, waitSlowMs))) +#define WAIT_WORKER(n) \ + ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs))) // Worker shall not wake us up (for (N) steps) -#define NOT_WAIT_WORKER() \ - ASSERT_EQ(-1, sem_timedwait(&s.sem, abstime(ts, waitMs))) +#define NOT_WAIT_WORKER(n) \ + ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs))) // Do twice an operation #define TWICE(EXPR) do { \ @@ -159,30 +171,30 @@ static void thread_ConcurrentLocking(str_ConcurrentLocking& s) { const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode); ASSERT_GE(fd, 0); - PING_MAIN(); // (1) + PING_MAIN(1); // (1) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self())); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self())); - PING_MAIN(); // (2) + PING_MAIN(2); // (2) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); - PING_MAIN(); // (3) + PING_MAIN(3); // (3) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, pthread_self())); - PING_MAIN(); // (4) + PING_MAIN(4); // (4) - WAIT_MAIN(); // (R1) + WAIT_MAIN(1); // (R1) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); - PING_MAIN(); // (5) + PING_MAIN(5); // (5) - WAIT_MAIN(); // (R2) + WAIT_MAIN(2); // (R2) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self())); - PING_MAIN(); // (6) + PING_MAIN(6); // (6) - WAIT_MAIN(); // (R3) + WAIT_MAIN(3); // (R3) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); - PING_MAIN(); // (7) + PING_MAIN(7); // (7) } // Used by ConcurrentLocking test @@ -210,48 +222,47 @@ TEST(LibCephFS, ConcurrentLocking) { pthread_t thread; struct timespec ts; str_ConcurrentLocking s = { c_file, cmount }; - ASSERT_EQ(0, sem_init(&s.sem, 0, 0)); - ASSERT_EQ(0, sem_init(&s.semReply, 0, 0)); + s.sem_init(0); ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentLocking_, &s)); // Synchronization point with thread (failure: thread is dead) - WAIT_WORKER(); // (1) + WAIT_WORKER(1); // (1) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (2) + NOT_WAIT_WORKER(2); // (2) // Unlock ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); // Shall have lock // Synchronization point with thread (failure: thread is dead) - WAIT_WORKER(); // (2) + WAIT_WORKER(2); // (2) // Synchronization point with thread (failure: thread is dead) - WAIT_WORKER(); // (3) + WAIT_WORKER(3); // (3) // Wait for thread to share lock - WAIT_WORKER(); // (4) + WAIT_WORKER(4); // (4) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self())); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self())); // Wake up thread to unlock shared lock - PING_WORKER(); // (R1) - WAIT_WORKER(); // (5) + PING_WORKER(1); // (R1) + WAIT_WORKER(5); // (5) // Now we can lock exclusively // Upgrade to exclusive lock (as per POSIX) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self())); // Wake up thread to lock shared lock - PING_WORKER(); // (R2) + PING_WORKER(2); // (R2) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (6) + NOT_WAIT_WORKER(6); // (6) // Release lock ; thread will get it ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); - WAIT_WORKER(); // (6) + WAIT_WORKER(6); // (6) // We no longer have the lock ASSERT_EQ(-EWOULDBLOCK, @@ -260,8 +271,8 @@ TEST(LibCephFS, ConcurrentLocking) { ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self())); // Wake up thread to unlock exclusive lock - PING_WORKER(); // (R3) - WAIT_WORKER(); // (7) + PING_WORKER(3); // (R3) + WAIT_WORKER(7); // (7) // We can lock it again ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self())); @@ -271,8 +282,7 @@ TEST(LibCephFS, ConcurrentLocking) { void *retval = (void*) (uintptr_t) -1; ASSERT_EQ(0, pthread_join(thread, &retval)); ASSERT_EQ(NULL, retval); - ASSERT_EQ(0, sem_destroy(&s.sem)); - ASSERT_EQ(0, sem_destroy(&s.semReply)); + s.sem_destroy(); ASSERT_EQ(0, ceph_close(cmount, fd)); ASSERT_EQ(0, ceph_unlink(cmount, c_file)); CLEANUP_CEPH(); @@ -295,49 +305,48 @@ TEST(LibCephFS, ThreesomeLocking) { pthread_t thread[2]; struct timespec ts; str_ConcurrentLocking s = { c_file, cmount }; - ASSERT_EQ(0, sem_init(&s.sem, 0, 0)); - ASSERT_EQ(0, sem_init(&s.semReply, 0, 0)); + s.sem_init(0); ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentLocking_, &s)); ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentLocking_, &s)); // Synchronization point with thread (failure: thread is dead) - TWICE(WAIT_WORKER()); // (1) + TWICE(WAIT_WORKER(1)); // (1) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (2) + NOT_WAIT_WORKER(2); // (2) // Unlock ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); // Shall have lock TWICE(// Synchronization point with thread (failure: thread is dead) - WAIT_WORKER(); // (2) + WAIT_WORKER(2); // (2) // Synchronization point with thread (failure: thread is dead) - WAIT_WORKER()); // (3) + WAIT_WORKER(3)); // (3) // Wait for thread to share lock - TWICE(WAIT_WORKER()); // (4) + TWICE(WAIT_WORKER(4)); // (4) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self())); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self())); // Wake up thread to unlock shared lock - TWICE(PING_WORKER(); // (R1) - WAIT_WORKER()); // (5) + TWICE(PING_WORKER(1); // (R1) + WAIT_WORKER(5)); // (5) // Now we can lock exclusively // Upgrade to exclusive lock (as per POSIX) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self())); TWICE( // Wake up thread to lock shared lock - PING_WORKER(); // (R2) + PING_WORKER(2); // (R2) // Shall not have lock immediately - NOT_WAIT_WORKER()); // (6) + NOT_WAIT_WORKER(6)); // (6) // Release lock ; thread will get it ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self())); - TWICE(WAIT_WORKER(); // (6) + TWICE(WAIT_WORKER(6); // (6) // We no longer have the lock ASSERT_EQ(-EWOULDBLOCK, @@ -346,8 +355,8 @@ TEST(LibCephFS, ThreesomeLocking) { ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self())); // Wake up thread to unlock exclusive lock - PING_WORKER(); // (R3) - WAIT_WORKER(); // (7) + PING_WORKER(3); // (R3) + WAIT_WORKER(7); // (7) ); // We can lock it again @@ -360,8 +369,7 @@ TEST(LibCephFS, ThreesomeLocking) { ASSERT_EQ(NULL, retval); ASSERT_EQ(0, pthread_join(thread[1], &retval)); ASSERT_EQ(NULL, retval); - ASSERT_EQ(0, sem_destroy(&s.sem)); - ASSERT_EQ(0, sem_destroy(&s.semReply)); + s.sem_destroy(); ASSERT_EQ(0, ceph_close(cmount, fd)); ASSERT_EQ(0, ceph_unlink(cmount, c_file)); CLEANUP_CEPH(); @@ -378,7 +386,7 @@ static void process_ConcurrentLocking(str_ConcurrentLocking& s) { const pid_t mypid = getpid(); PROCESS_SLOW_MS(); - PING_MAIN(); // (1) + PING_MAIN(1); // (1) struct ceph_mount_info *cmount = NULL; struct timespec ts; @@ -388,35 +396,34 @@ static void process_ConcurrentLocking(str_ConcurrentLocking& s) { const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode); ASSERT_GE(fd, 0); - WAIT_MAIN(); // (R0) + WAIT_MAIN(1); // (R1) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid)); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); - PING_MAIN(); // (2) + PING_MAIN(2); // (2) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); - PING_MAIN(); // (3) + PING_MAIN(3); // (3) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, mypid)); - PING_MAIN(); // (4) + PING_MAIN(4); // (4) - WAIT_MAIN(); // (R1) + WAIT_MAIN(2); // (R2) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); - PING_MAIN(); // (5) + PING_MAIN(5); // (5) - WAIT_MAIN(); // (R2) + WAIT_MAIN(3); // (R3) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); - PING_MAIN(); // (6) + PING_MAIN(6); // (6) - WAIT_MAIN(); // (R3) + WAIT_MAIN(4); // (R4) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); - PING_MAIN(); // (7) + PING_MAIN(7); // (7) CLEANUP_CEPH(); - ASSERT_EQ(0, sem_destroy(&s.sem)); - ASSERT_EQ(0, sem_destroy(&s.semReply)); + s.sem_destroy(); exit(EXIT_SUCCESS); } @@ -434,8 +441,7 @@ TEST(LibCephFS, InterProcessLocking) { -1, 0)); str_ConcurrentLocking &s = *shs; s.file = c_file; - ASSERT_EQ(0, sem_init(&s.sem, 1, 0)); - ASSERT_EQ(0, sem_init(&s.semReply, 1, 0)); + s.sem_init(1); // Start locker process const pid_t pid = fork(); @@ -456,52 +462,52 @@ TEST(LibCephFS, InterProcessLocking) { ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); // Synchronization point with process (failure: process is dead) - WAIT_WORKER(); // (1) - PING_WORKER(); // (R0) + WAIT_WORKER(1); // (1) + PING_WORKER(1); // (R1) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (2) + NOT_WAIT_WORKER(2); // (2) // Unlock ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); // Shall have lock // Synchronization point with process (failure: process is dead) - WAIT_WORKER(); // (2) + WAIT_WORKER(2); // (2) // Synchronization point with process (failure: process is dead) - WAIT_WORKER(); // (3) + WAIT_WORKER(3); // (3) // Wait for process to share lock - WAIT_WORKER(); // (4) + WAIT_WORKER(4); // (4) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid)); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid)); // Wake up process to unlock shared lock - PING_WORKER(); // (R1) - WAIT_WORKER(); // (5) + PING_WORKER(2); // (R2) + WAIT_WORKER(5); // (5) // Now we can lock exclusively // Upgrade to exclusive lock (as per POSIX) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); // Wake up process to lock shared lock - PING_WORKER(); // (R2) + PING_WORKER(3); // (R3) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (6) + NOT_WAIT_WORKER(6); // (6) // Release lock ; process will get it ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); - WAIT_WORKER(); // (6) + WAIT_WORKER(6); // (6) // We no longer have the lock ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid)); ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid)); // Wake up process to unlock exclusive lock - PING_WORKER(); // (R3) - WAIT_WORKER(); // (7) + PING_WORKER(4); // (R4) + WAIT_WORKER(7); // (7) // We can lock it again ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid)); @@ -513,8 +519,7 @@ TEST(LibCephFS, InterProcessLocking) { ASSERT_EQ(EXIT_SUCCESS, status); // Cleanup - ASSERT_EQ(0, sem_destroy(&s.sem)); - ASSERT_EQ(0, sem_destroy(&s.semReply)); + s.sem_destroy(); ASSERT_EQ(0, munmap(shs, sizeof(*shs))); ASSERT_EQ(0, ceph_close(cmount, fd)); ASSERT_EQ(0, ceph_unlink(cmount, c_file)); @@ -535,8 +540,7 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) { -1, 0)); str_ConcurrentLocking &s = *shs; s.file = c_file; - ASSERT_EQ(0, sem_init(&s.sem, 1, 0)); - ASSERT_EQ(0, sem_init(&s.semReply, 1, 0)); + s.sem_init(1); // Start locker processes pid_t pid[2]; @@ -564,45 +568,45 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) { ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); // Synchronization point with process (failure: process is dead) - TWICE(WAIT_WORKER()); // (1) - TWICE(PING_WORKER()); // (R0) + TWICE(WAIT_WORKER(1)); // (1) + TWICE(PING_WORKER(1)); // (R1) // Shall not have lock immediately - NOT_WAIT_WORKER(); // (2) + NOT_WAIT_WORKER(2); // (2) // Unlock ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); // Shall have lock TWICE(// Synchronization point with process (failure: process is dead) - WAIT_WORKER(); // (2) + WAIT_WORKER(2); // (2) // Synchronization point with process (failure: process is dead) - WAIT_WORKER()); // (3) + WAIT_WORKER(3)); // (3) // Wait for process to share lock - TWICE(WAIT_WORKER()); // (4) + TWICE(WAIT_WORKER(4)); // (4) ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid)); ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid)); // Wake up process to unlock shared lock - TWICE(PING_WORKER(); // (R1) - WAIT_WORKER()); // (5) + TWICE(PING_WORKER(2); // (R2) + WAIT_WORKER(5)); // (5) // Now we can lock exclusively // Upgrade to exclusive lock (as per POSIX) ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid)); TWICE( // Wake up process to lock shared lock - PING_WORKER(); // (R2) + PING_WORKER(3); // (R3) // Shall not have lock immediately - NOT_WAIT_WORKER()); // (6) + NOT_WAIT_WORKER(6)); // (6) // Release lock ; process will get it ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid)); - TWICE(WAIT_WORKER(); // (6) + TWICE(WAIT_WORKER(6); // (6) // We no longer have the lock ASSERT_EQ(-EWOULDBLOCK, @@ -611,8 +615,8 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) { ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid)); // Wake up process to unlock exclusive lock - PING_WORKER(); // (R3) - WAIT_WORKER(); // (7) + PING_WORKER(4); // (R4) + WAIT_WORKER(7); // (7) ); // We can lock it again @@ -627,8 +631,7 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) { ASSERT_EQ(EXIT_SUCCESS, status); // Cleanup - ASSERT_EQ(0, sem_destroy(&s.sem)); - ASSERT_EQ(0, sem_destroy(&s.semReply)); + s.sem_destroy(); ASSERT_EQ(0, munmap(shs, sizeof(*shs))); ASSERT_EQ(0, ceph_close(cmount, fd)); ASSERT_EQ(0, ceph_unlink(cmount, c_file)); -- 2.39.5