]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test/libcephfs/flock.cc: fix synchronization points
authorYan, Zheng <zyan@redhat.com>
Thu, 19 Mar 2015 09:34:47 +0000 (17:34 +0800)
committerYan, Zheng <zyan@redhat.com>
Fri, 20 Mar 2015 08:00:59 +0000 (16:00 +0800)
In the threesome test cases, the main thread/process call PING_WORKER()
twice at each synchronization point. It's possible that one worker gets
both notifications and takes two steps forward, another worker gets no
notification.

The fix is use seperate semaphores for odd/even number synchronization
points.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
src/test/libcephfs/flock.cc

index b1b95464daa4d0a05821d819f725034bb9ca76d0..d92a8429dd2e8a7129bf531687cb409844ec4cdf 100644 (file)
@@ -12,6 +12,7 @@
  *
  */
 
+#include <pthread.h>
 #include "gtest/gtest.h"
 #ifndef GTEST_IS_THREADSAFE
 #error "!GTEST_IS_THREADSAFE"
@@ -27,7 +28,6 @@
 #include <sys/xattr.h>
 
 #include <stdlib.h>
-#include <pthread.h>
 #include <semaphore.h>
 #include <time.h>
 #include <sys/mman.h>
@@ -125,24 +125,36 @@ TEST(LibCephFS, BasicLocking) {
 struct str_ConcurrentLocking {
   const char *file;
   struct ceph_mount_info *cmount;  // !NULL if shared
-  sem_t sem;
-  sem_t semReply;
+  sem_t sem[2];
+  sem_t semReply[2];
+  void sem_init(int pshared) {
+    ASSERT_EQ(0, ::sem_init(&sem[0], pshared, 0));
+    ASSERT_EQ(0, ::sem_init(&sem[1], pshared, 0));
+    ASSERT_EQ(0, ::sem_init(&semReply[0], pshared, 0));
+    ASSERT_EQ(0, ::sem_init(&semReply[1], pshared, 0));
+  }
+  void sem_destroy() {
+    ASSERT_EQ(0, ::sem_destroy(&sem[0]));
+    ASSERT_EQ(0, ::sem_destroy(&sem[1]));
+    ASSERT_EQ(0, ::sem_destroy(&semReply[0]));
+    ASSERT_EQ(0, ::sem_destroy(&semReply[1]));
+  }
 };
 
 // Wakeup main (for (N) steps)
-#define PING_MAIN() ASSERT_EQ(0, sem_post(&s.sem))
+#define PING_MAIN(n) ASSERT_EQ(0, sem_post(&s.sem[n%2]))
 // Wait for main to wake us up (for (RN) steps)
-#define WAIT_MAIN() \
-  ASSERT_EQ(0, sem_timedwait(&s.semReply, abstime(ts, waitSlowMs)))
+#define WAIT_MAIN(n) \
+  ASSERT_EQ(0, sem_timedwait(&s.semReply[n%2], abstime(ts, waitSlowMs)))
 
 // Wakeup worker (for (RN) steps)
-#define PING_WORKER() ASSERT_EQ(0, sem_post(&s.semReply))
+#define PING_WORKER(n) ASSERT_EQ(0, sem_post(&s.semReply[n%2]))
 // Wait for worker to wake us up (for (N) steps)
-#define WAIT_WORKER() \
-  ASSERT_EQ(0, sem_timedwait(&s.sem, abstime(ts, waitSlowMs)))
+#define WAIT_WORKER(n) \
+  ASSERT_EQ(0, sem_timedwait(&s.sem[n%2], abstime(ts, waitSlowMs)))
 // Worker shall not wake us up (for (N) steps)
-#define NOT_WAIT_WORKER() \
-  ASSERT_EQ(-1, sem_timedwait(&s.sem, abstime(ts, waitMs)))
+#define NOT_WAIT_WORKER(n) \
+  ASSERT_EQ(-1, sem_timedwait(&s.sem[n%2], abstime(ts, waitMs)))
 
 // Do twice an operation
 #define TWICE(EXPR) do {                       \
@@ -159,30 +171,30 @@ static void thread_ConcurrentLocking(str_ConcurrentLocking& s) {
 
   const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode);
   ASSERT_GE(fd, 0); 
-  PING_MAIN(); // (1)
+  PING_MAIN(1); // (1)
 
   ASSERT_EQ(-EWOULDBLOCK,
            ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
-  PING_MAIN(); // (2)
+  PING_MAIN(2); // (2)
 
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
-  PING_MAIN(); // (3)
+  PING_MAIN(3); // (3)
 
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, pthread_self()));
-  PING_MAIN(); // (4)
+  PING_MAIN(4); // (4)
 
-  WAIT_MAIN(); // (R1)
+  WAIT_MAIN(1); // (R1)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
-  PING_MAIN(); // (5)
+  PING_MAIN(5); // (5)
 
-  WAIT_MAIN(); // (R2)
+  WAIT_MAIN(2); // (R2)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
-  PING_MAIN(); // (6)
+  PING_MAIN(6); // (6)
 
-  WAIT_MAIN(); // (R3)
+  WAIT_MAIN(3); // (R3)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
-  PING_MAIN(); // (7)
+  PING_MAIN(7); // (7)
 }
 
 // Used by ConcurrentLocking test
@@ -210,48 +222,47 @@ TEST(LibCephFS, ConcurrentLocking) {
   pthread_t thread;
   struct timespec ts;
   str_ConcurrentLocking s = { c_file, cmount };
-  ASSERT_EQ(0, sem_init(&s.sem, 0, 0));
-  ASSERT_EQ(0, sem_init(&s.semReply, 0, 0));
+  s.sem_init(0);
   ASSERT_EQ(0, pthread_create(&thread, NULL, thread_ConcurrentLocking_, &s));
   // Synchronization point with thread (failure: thread is dead)
-  WAIT_WORKER(); // (1)
+  WAIT_WORKER(1); // (1)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (2)
+  NOT_WAIT_WORKER(2); // (2)
 
   // Unlock
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
 
   // Shall have lock
   // Synchronization point with thread (failure: thread is dead)
-  WAIT_WORKER(); // (2)
+  WAIT_WORKER(2); // (2)
 
   // Synchronization point with thread (failure: thread is dead)
-  WAIT_WORKER(); // (3)
+  WAIT_WORKER(3); // (3)
 
   // Wait for thread to share lock
-  WAIT_WORKER(); // (4)
+  WAIT_WORKER(4); // (4)
   ASSERT_EQ(-EWOULDBLOCK,
            ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
 
   // Wake up thread to unlock shared lock
-  PING_WORKER(); // (R1)
-  WAIT_WORKER(); // (5)
+  PING_WORKER(1); // (R1)
+  WAIT_WORKER(5); // (5)
 
   // Now we can lock exclusively
   // Upgrade to exclusive lock (as per POSIX)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
 
   // Wake up thread to lock shared lock
-  PING_WORKER(); // (R2)
+  PING_WORKER(2); // (R2)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (6)
+  NOT_WAIT_WORKER(6); // (6)
 
   // Release lock ; thread will get it
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
-  WAIT_WORKER(); // (6)
+  WAIT_WORKER(6); // (6)
 
   // We no longer have the lock
   ASSERT_EQ(-EWOULDBLOCK,
@@ -260,8 +271,8 @@ TEST(LibCephFS, ConcurrentLocking) {
            ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
 
   // Wake up thread to unlock exclusive lock
-  PING_WORKER(); // (R3)
-  WAIT_WORKER(); // (7)
+  PING_WORKER(3); // (R3)
+  WAIT_WORKER(7); // (7)
 
   // We can lock it again
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
@@ -271,8 +282,7 @@ TEST(LibCephFS, ConcurrentLocking) {
   void *retval = (void*) (uintptr_t) -1;
   ASSERT_EQ(0, pthread_join(thread, &retval));
   ASSERT_EQ(NULL, retval);
-  ASSERT_EQ(0, sem_destroy(&s.sem));
-  ASSERT_EQ(0, sem_destroy(&s.semReply));
+  s.sem_destroy();
   ASSERT_EQ(0, ceph_close(cmount, fd));
   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
   CLEANUP_CEPH();
@@ -295,49 +305,48 @@ TEST(LibCephFS, ThreesomeLocking) {
   pthread_t thread[2];
   struct timespec ts;
   str_ConcurrentLocking s = { c_file, cmount };
-  ASSERT_EQ(0, sem_init(&s.sem, 0, 0));
-  ASSERT_EQ(0, sem_init(&s.semReply, 0, 0));
+  s.sem_init(0);
   ASSERT_EQ(0, pthread_create(&thread[0], NULL, thread_ConcurrentLocking_, &s));
   ASSERT_EQ(0, pthread_create(&thread[1], NULL, thread_ConcurrentLocking_, &s));
   // Synchronization point with thread (failure: thread is dead)
-  TWICE(WAIT_WORKER()); // (1)
+  TWICE(WAIT_WORKER(1)); // (1)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (2)
+  NOT_WAIT_WORKER(2); // (2)
 
   // Unlock
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
 
   // Shall have lock
   TWICE(// Synchronization point with thread (failure: thread is dead)
-       WAIT_WORKER(); // (2)
+       WAIT_WORKER(2); // (2)
        
        // Synchronization point with thread (failure: thread is dead)
-       WAIT_WORKER()); // (3)
+       WAIT_WORKER(3)); // (3)
   
   // Wait for thread to share lock
-  TWICE(WAIT_WORKER()); // (4)
+  TWICE(WAIT_WORKER(4)); // (4)
   ASSERT_EQ(-EWOULDBLOCK,
            ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, pthread_self()));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
 
   // Wake up thread to unlock shared lock
-  TWICE(PING_WORKER(); // (R1)
-       WAIT_WORKER()); // (5)
+  TWICE(PING_WORKER(1); // (R1)
+       WAIT_WORKER(5)); // (5)
 
   // Now we can lock exclusively
   // Upgrade to exclusive lock (as per POSIX)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, pthread_self()));
 
   TWICE(  // Wake up thread to lock shared lock
-       PING_WORKER(); // (R2)
+       PING_WORKER(2); // (R2)
        
        // Shall not have lock immediately
-       NOT_WAIT_WORKER()); // (6)
+       NOT_WAIT_WORKER(6)); // (6)
   
   // Release lock ; thread will get it
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, pthread_self()));
-  TWICE(WAIT_WORKER(); // (6)
+  TWICE(WAIT_WORKER(6); // (6)
        
        // We no longer have the lock
        ASSERT_EQ(-EWOULDBLOCK,
@@ -346,8 +355,8 @@ TEST(LibCephFS, ThreesomeLocking) {
                  ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, pthread_self()));
        
        // Wake up thread to unlock exclusive lock
-       PING_WORKER(); // (R3)
-       WAIT_WORKER(); // (7)
+       PING_WORKER(3); // (R3)
+       WAIT_WORKER(7); // (7)
        );
   
   // We can lock it again
@@ -360,8 +369,7 @@ TEST(LibCephFS, ThreesomeLocking) {
   ASSERT_EQ(NULL, retval);
   ASSERT_EQ(0, pthread_join(thread[1], &retval));
   ASSERT_EQ(NULL, retval);
-  ASSERT_EQ(0, sem_destroy(&s.sem));
-  ASSERT_EQ(0, sem_destroy(&s.semReply));
+  s.sem_destroy();
   ASSERT_EQ(0, ceph_close(cmount, fd));
   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
   CLEANUP_CEPH();
@@ -378,7 +386,7 @@ static void process_ConcurrentLocking(str_ConcurrentLocking& s) {
   const pid_t mypid = getpid();
   PROCESS_SLOW_MS();
 
-  PING_MAIN(); // (1)
+  PING_MAIN(1); // (1)
 
   struct ceph_mount_info *cmount = NULL;
   struct timespec ts;
@@ -388,35 +396,34 @@ static void process_ConcurrentLocking(str_ConcurrentLocking& s) {
 
   const int fd = ceph_open(cmount, s.file, O_RDWR | O_CREAT, fileMode);
   ASSERT_GE(fd, 0); 
-  WAIT_MAIN(); // (R0)
+  WAIT_MAIN(1); // (R1)
 
   ASSERT_EQ(-EWOULDBLOCK,
            ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
-  PING_MAIN(); // (2)
+  PING_MAIN(2); // (2)
 
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
-  PING_MAIN(); // (3)
+  PING_MAIN(3); // (3)
 
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH, mypid));
-  PING_MAIN(); // (4)
+  PING_MAIN(4); // (4)
 
-  WAIT_MAIN(); // (R1)
+  WAIT_MAIN(2); // (R2)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
-  PING_MAIN(); // (5)
+  PING_MAIN(5); // (5)
 
-  WAIT_MAIN(); // (R2)
+  WAIT_MAIN(3); // (R3)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
-  PING_MAIN(); // (6)
+  PING_MAIN(6); // (6)
 
-  WAIT_MAIN(); // (R3)
+  WAIT_MAIN(4); // (R4)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
-  PING_MAIN(); // (7)
+  PING_MAIN(7); // (7)
 
   CLEANUP_CEPH();
 
-  ASSERT_EQ(0, sem_destroy(&s.sem));
-  ASSERT_EQ(0, sem_destroy(&s.semReply));
+  s.sem_destroy();
   exit(EXIT_SUCCESS);
 }
 
@@ -434,8 +441,7 @@ TEST(LibCephFS, InterProcessLocking) {
          -1, 0));
   str_ConcurrentLocking &s = *shs;
   s.file = c_file;
-  ASSERT_EQ(0, sem_init(&s.sem, 1, 0));
-  ASSERT_EQ(0, sem_init(&s.semReply, 1, 0));
+  s.sem_init(1);
 
   // Start locker process
   const pid_t pid = fork();
@@ -456,52 +462,52 @@ TEST(LibCephFS, InterProcessLocking) {
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
 
   // Synchronization point with process (failure: process is dead)
-  WAIT_WORKER(); // (1)
-  PING_WORKER(); // (R0)
+  WAIT_WORKER(1); // (1)
+  PING_WORKER(1); // (R1)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (2)
+  NOT_WAIT_WORKER(2); // (2)
 
   // Unlock
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
 
   // Shall have lock
   // Synchronization point with process (failure: process is dead)
-  WAIT_WORKER(); // (2)
+  WAIT_WORKER(2); // (2)
 
   // Synchronization point with process (failure: process is dead)
-  WAIT_WORKER(); // (3)
+  WAIT_WORKER(3); // (3)
 
   // Wait for process to share lock
-  WAIT_WORKER(); // (4)
+  WAIT_WORKER(4); // (4)
   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
 
   // Wake up process to unlock shared lock
-  PING_WORKER(); // (R1)
-  WAIT_WORKER(); // (5)
+  PING_WORKER(2); // (R2)
+  WAIT_WORKER(5); // (5)
 
   // Now we can lock exclusively
   // Upgrade to exclusive lock (as per POSIX)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
 
   // Wake up process to lock shared lock
-  PING_WORKER(); // (R2)
+  PING_WORKER(3); // (R3)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (6)
+  NOT_WAIT_WORKER(6); // (6)
 
   // Release lock ; process will get it
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
-  WAIT_WORKER(); // (6)
+  WAIT_WORKER(6); // (6)
 
   // We no longer have the lock
   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
   ASSERT_EQ(-EWOULDBLOCK, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
 
   // Wake up process to unlock exclusive lock
-  PING_WORKER(); // (R3)
-  WAIT_WORKER(); // (7)
+  PING_WORKER(4); // (R4)
+  WAIT_WORKER(7); // (7)
 
   // We can lock it again
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
@@ -513,8 +519,7 @@ TEST(LibCephFS, InterProcessLocking) {
   ASSERT_EQ(EXIT_SUCCESS, status);
 
   // Cleanup
-  ASSERT_EQ(0, sem_destroy(&s.sem));
-  ASSERT_EQ(0, sem_destroy(&s.semReply));
+  s.sem_destroy();
   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
   ASSERT_EQ(0, ceph_close(cmount, fd));
   ASSERT_EQ(0, ceph_unlink(cmount, c_file));
@@ -535,8 +540,7 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) {
          -1, 0));
   str_ConcurrentLocking &s = *shs;
   s.file = c_file;
-  ASSERT_EQ(0, sem_init(&s.sem, 1, 0));
-  ASSERT_EQ(0, sem_init(&s.semReply, 1, 0));
+  s.sem_init(1);
 
   // Start locker processes
   pid_t pid[2];
@@ -564,45 +568,45 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) {
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
 
   // Synchronization point with process (failure: process is dead)
-  TWICE(WAIT_WORKER()); // (1)
-  TWICE(PING_WORKER()); // (R0)
+  TWICE(WAIT_WORKER(1)); // (1)
+  TWICE(PING_WORKER(1)); // (R1)
 
   // Shall not have lock immediately
-  NOT_WAIT_WORKER(); // (2)
+  NOT_WAIT_WORKER(2); // (2)
 
   // Unlock
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
 
   // Shall have lock
   TWICE(// Synchronization point with process (failure: process is dead)
-       WAIT_WORKER(); // (2)
+       WAIT_WORKER(2); // (2)
        
        // Synchronization point with process (failure: process is dead)
-       WAIT_WORKER()); // (3)
+       WAIT_WORKER(3)); // (3)
   
   // Wait for process to share lock
-  TWICE(WAIT_WORKER()); // (4)
+  TWICE(WAIT_WORKER(4)); // (4)
   ASSERT_EQ(-EWOULDBLOCK,
            ceph_flock(cmount, fd, LOCK_EX | LOCK_NB, mypid));
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
 
   // Wake up process to unlock shared lock
-  TWICE(PING_WORKER(); // (R1)
-       WAIT_WORKER()); // (5)
+  TWICE(PING_WORKER(2); // (R2)
+       WAIT_WORKER(5)); // (5)
 
   // Now we can lock exclusively
   // Upgrade to exclusive lock (as per POSIX)
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_EX, mypid));
 
   TWICE(  // Wake up process to lock shared lock
-       PING_WORKER(); // (R2)
+       PING_WORKER(3); // (R3)
        
        // Shall not have lock immediately
-       NOT_WAIT_WORKER()); // (6)
+       NOT_WAIT_WORKER(6)); // (6)
   
   // Release lock ; process will get it
   ASSERT_EQ(0, ceph_flock(cmount, fd, LOCK_UN, mypid));
-  TWICE(WAIT_WORKER(); // (6)
+  TWICE(WAIT_WORKER(6); // (6)
        
        // We no longer have the lock
        ASSERT_EQ(-EWOULDBLOCK,
@@ -611,8 +615,8 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) {
                  ceph_flock(cmount, fd, LOCK_SH | LOCK_NB, mypid));
        
        // Wake up process to unlock exclusive lock
-       PING_WORKER(); // (R3)
-       WAIT_WORKER(); // (7)
+       PING_WORKER(4); // (R4)
+       WAIT_WORKER(7); // (7)
        );
   
   // We can lock it again
@@ -627,8 +631,7 @@ TEST(LibCephFS, ThreesomeInterProcessLocking) {
   ASSERT_EQ(EXIT_SUCCESS, status);
 
   // Cleanup
-  ASSERT_EQ(0, sem_destroy(&s.sem));
-  ASSERT_EQ(0, sem_destroy(&s.semReply));
+  s.sem_destroy();
   ASSERT_EQ(0, munmap(shs, sizeof(*shs)));
   ASSERT_EQ(0, ceph_close(cmount, fd));
   ASSERT_EQ(0, ceph_unlink(cmount, c_file));