]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
java: add fine grained synchronization
authorNoah Watkins <noahwatkins@gmail.com>
Fri, 11 Jan 2013 23:56:10 +0000 (15:56 -0800)
committerNoah Watkins <noahwatkins@gmail.com>
Mon, 14 Jan 2013 21:11:09 +0000 (13:11 -0800)
Adds r/w lock to protect against some races.

1. Mutual exclusion for mount/unmount prevents races between the two in
libcephfs, which isn't safe (access to ceph_mount_info state).

2. An extremely narrow race between unmount and ceph_* calls in
libcephfs. ThreadA calls ceph_xxx, is_mounted test passes, then ThreadB
calls unmount and destroys the client. ThreadA resumes with a bad client
pointer.

3. Race between unmount and ceph_* calls in JNI. In JNI we hold the
CephContext reference across ceph_* calls. If the ceph mount were to be
released while a thread was returning from a ceph_* call then an attempt
to write to the log (e.g. the return value) would reference bad context.
Since ceph_release is only called by finalize() then no thread can be in
JNI.  So this is actually safe.

Using r/w here provides trade-off between allowing concurrency into
libcephfs, and not having to constantly update the Java bindings. The
only assumption is that unmount/mount race with the rest of the
interface.

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
src/java/java/com/ceph/fs/CephMount.java

index 85119c36882c305616c522eb56fcae1227d437e3..11338585be81a7b5a2f0ecc562ca0cbb1bd02a73 100644 (file)
@@ -21,6 +21,8 @@ package com.ceph.fs;
 
 import java.io.IOException;
 import java.io.FileNotFoundException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.Arrays;
 import java.lang.String;
 
@@ -89,6 +91,13 @@ public class CephMount {
    */
   static native void native_initialize();
 
+  /*
+   * RW lock used for fine grained synchronization to native
+   */
+  private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock();
+  private final Lock rlock = rwlock.readLock();
+  private final Lock wlock = rwlock.writeLock();
+
   /*
    * Controls clean-up synchronization between the constructor and finalize().
    * If native_ceph_create fails, then we want a call to finalize() to not
@@ -138,7 +147,12 @@ public class CephMount {
    * @param root The path to use as the root (pass null for "/").
    */
   public void mount(String root) {
-    native_ceph_mount(instance_ptr, root);
+    wlock.lock();
+    try {
+      native_ceph_mount(instance_ptr, root);
+    } finally {
+      wlock.unlock();
+    }
   }
 
   private static native int native_ceph_mount(long mountp, String root);
@@ -150,7 +164,12 @@ public class CephMount {
    * previously set are not reset.
    */
   public void unmount() {
-    native_ceph_unmount(instance_ptr);
+    wlock.lock();
+    try {
+      native_ceph_unmount(instance_ptr);
+    } finally {
+      wlock.unlock();
+    }
   }
 
   private static native int native_ceph_unmount(long mountp);
@@ -166,7 +185,12 @@ public class CephMount {
    * @param path The path to the configuration file.
    */
   public void conf_read_file(String path) throws FileNotFoundException {
-    native_ceph_conf_read_file(instance_ptr, path);
+    rlock.lock();
+    try {
+      native_ceph_conf_read_file(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_conf_read_file(long mountp, String path);
@@ -178,7 +202,12 @@ public class CephMount {
    * @param value The new value of the option.
    */
   public void conf_set(String option, String value) {
-    native_ceph_conf_set(instance_ptr, option, value);
+    rlock.lock();
+    try {
+      native_ceph_conf_set(instance_ptr, option, value);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_conf_set(long mountp, String option, String value);
@@ -190,7 +219,12 @@ public class CephMount {
    * @return The value of the option or null if option not found
    */
   public String conf_get(String option) {
-    return native_ceph_conf_get(instance_ptr, option);
+    rlock.lock();
+    try {
+      return native_ceph_conf_get(instance_ptr, option);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String native_ceph_conf_get(long mountp, String option);
@@ -202,7 +236,12 @@ public class CephMount {
    * @param statvfs CephStatVFS structure to hold status.
    */
   public void statfs(String path, CephStatVFS statvfs) throws FileNotFoundException {
-    native_ceph_statfs(instance_ptr, path, statvfs);
+    rlock.lock();
+    try {
+      native_ceph_statfs(instance_ptr, path, statvfs);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_statfs(long mountp, String path, CephStatVFS statvfs);
@@ -213,7 +252,12 @@ public class CephMount {
    * @return The current working directory in Ceph.
    */
   public String getcwd() {
-    return native_ceph_getcwd(instance_ptr);
+    rlock.lock();
+    try {
+      return native_ceph_getcwd(instance_ptr);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String native_ceph_getcwd(long mountp);
@@ -224,7 +268,12 @@ public class CephMount {
    * @param path The directory set as the cwd.
    */
   public void chdir(String path) throws FileNotFoundException {
-    native_ceph_chdir(instance_ptr, path);
+    rlock.lock();
+    try {
+      native_ceph_chdir(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_chdir(long mountp, String cwd);
@@ -236,7 +285,12 @@ public class CephMount {
    * @return List of files and directories excluding "." and "..".
    */
   public String[] listdir(String dir) throws FileNotFoundException {
-    return native_ceph_listdir(instance_ptr, dir);
+    rlock.lock();
+    try {
+      return native_ceph_listdir(instance_ptr, dir);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String[] native_ceph_listdir(long mountp, String path);
@@ -248,7 +302,12 @@ public class CephMount {
    * @param newpath The name of the link.
    */
   public void link(String oldpath, String newpath) throws FileNotFoundException {
-    native_ceph_link(instance_ptr, oldpath, newpath);
+    rlock.lock();
+    try {
+      native_ceph_link(instance_ptr, oldpath, newpath);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_link(long mountp, String existing, String newname);
@@ -259,7 +318,12 @@ public class CephMount {
    * @param path The name to unlink/delete.
    */
   public void unlink(String path) throws FileNotFoundException {
-    native_ceph_unlink(instance_ptr, path);
+    rlock.lock();
+    try {
+      native_ceph_unlink(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_unlink(long mountp, String path);
@@ -271,7 +335,12 @@ public class CephMount {
    * @param to The new path.
    */
   public void rename(String from, String to) throws FileNotFoundException {
-    native_ceph_rename(instance_ptr, from, to);
+    rlock.lock();
+    try {
+      native_ceph_rename(instance_ptr, from, to);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_rename(long mountp, String from, String to);
@@ -283,7 +352,12 @@ public class CephMount {
    * @param mode The mode of the new directory.
    */
   public void mkdir(String path, int mode) {
-    native_ceph_mkdir(instance_ptr, path, mode);
+    rlock.lock();
+    try {
+      native_ceph_mkdir(instance_ptr, path, mode);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_mkdir(long mountp, String path, int mode);
@@ -295,7 +369,12 @@ public class CephMount {
    * @param mode The mode of the new directory.
    */
   public void mkdirs(String path, int mode) throws IOException {
-    native_ceph_mkdirs(instance_ptr, path, mode);
+    rlock.lock();
+    try {
+      native_ceph_mkdirs(instance_ptr, path, mode);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_mkdirs(long mountp, String path, int mode);
@@ -306,7 +385,12 @@ public class CephMount {
    * @param path The directory to delete.
    */
   public void rmdir(String path) throws FileNotFoundException {
-    native_ceph_rmdir(instance_ptr, path);
+    rlock.lock();
+    try {
+      native_ceph_rmdir(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_rmdir(long mountp, String path);
@@ -315,7 +399,12 @@ public class CephMount {
    * Read the value of a symbolic link.
    */
   public String readlink(String path) throws FileNotFoundException {
-    return native_ceph_readlink(instance_ptr, path);
+    rlock.lock();
+    try {
+      return native_ceph_readlink(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String native_ceph_readlink(long mountp, String path);
@@ -327,7 +416,12 @@ public class CephMount {
    * @param newpath Name of the link.
    */
   public void symlink(String oldpath, String newpath) {
-    native_ceph_symlink(instance_ptr, oldpath, newpath);
+    rlock.lock();
+    try {
+      native_ceph_symlink(instance_ptr, oldpath, newpath);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_symlink(long mountp, String existing, String newname);
@@ -339,7 +433,12 @@ public class CephMount {
    * @param stat CephStat structure to hold file status.
    */
   public void lstat(String path, CephStat stat) throws FileNotFoundException, CephNotDirectoryException {
-    native_ceph_lstat(instance_ptr, path, stat);
+    rlock.lock();
+    try {
+      native_ceph_lstat(instance_ptr, path, stat);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_lstat(long mountp, String path, CephStat stat);
@@ -352,7 +451,12 @@ public class CephMount {
    * @param mask Mask specifying which attributes to set.
    */
   public void setattr(String path, CephStat stat, int mask) throws FileNotFoundException {
-    native_ceph_setattr(instance_ptr, path, stat, mask);
+    rlock.lock();
+    try {
+      native_ceph_setattr(instance_ptr, path, stat, mask);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_setattr(long mountp, String relpath, CephStat stat, int mask);
@@ -364,7 +468,12 @@ public class CephMount {
    * @param mode New mode bits.
    */
   public void chmod(String path, int mode) throws FileNotFoundException {
-    native_ceph_chmod(instance_ptr, path, mode);
+    rlock.lock();
+    try {
+      native_ceph_chmod(instance_ptr, path, mode);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_chmod(long mountp, String path, int mode);
@@ -376,7 +485,12 @@ public class CephMount {
    * @param size New file length.
    */
   public void truncate(String path, long size) throws FileNotFoundException {
-    native_ceph_truncate(instance_ptr, path, size);
+    rlock.lock();
+    try {
+      native_ceph_truncate(instance_ptr, path, size);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_truncate(long mountp, String path, long size);
@@ -390,7 +504,12 @@ public class CephMount {
    * @return File descriptor.
    */
   public int open(String path, int flags, int mode) throws FileNotFoundException {
-    return native_ceph_open(instance_ptr, path, flags, mode);
+    rlock.lock();
+    try {
+      return native_ceph_open(instance_ptr, path, flags, mode);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_open(long mountp, String path, int flags, int mode);
@@ -409,8 +528,13 @@ public class CephMount {
    */
   public int open(String path, int flags, int mode, int stripe_unit, int stripe_count,
       int object_size, String data_pool) throws FileNotFoundException {
-    return native_ceph_open_layout(instance_ptr, path, flags, mode, stripe_unit,
-        stripe_count, object_size, data_pool);
+    rlock.lock();
+    try {
+      return native_ceph_open_layout(instance_ptr, path, flags, mode, stripe_unit,
+          stripe_count, object_size, data_pool);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_open_layout(long mountp, String path,
@@ -422,7 +546,12 @@ public class CephMount {
    * @param fd The file descriptor.
    */
   public void close(int fd) {
-    native_ceph_close(instance_ptr, fd);
+    rlock.lock();
+    try {
+      native_ceph_close(instance_ptr, fd);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_close(long mountp, int fd);
@@ -436,7 +565,12 @@ public class CephMount {
    * @return The new offset.
    */
   public long lseek(int fd, long offset, int whence) {
-    return native_ceph_lseek(instance_ptr, fd, offset, whence);
+    rlock.lock();
+    try {
+      return native_ceph_lseek(instance_ptr, fd, offset, whence);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native long native_ceph_lseek(long mountp, int fd, long offset, int whence);
@@ -451,7 +585,12 @@ public class CephMount {
    * @return The number of bytes read.
    */
   public long read(int fd, byte[] buf, long size, long offset) {
-    return native_ceph_read(instance_ptr, fd, buf, size, offset);
+    rlock.lock();
+    try {
+      return native_ceph_read(instance_ptr, fd, buf, size, offset);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native long native_ceph_read(long mountp, int fd, byte[] buf, long size, long offset);
@@ -466,7 +605,12 @@ public class CephMount {
    * @return The number of bytes written.
    */
   public long write(int fd, byte[] buf, long size, long offset) {
-    return native_ceph_write(instance_ptr, fd, buf, size, offset);
+    rlock.lock();
+    try {
+      return native_ceph_write(instance_ptr, fd, buf, size, offset);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native long native_ceph_write(long mountp, int fd, byte[] buf, long size, long offset);
@@ -478,7 +622,12 @@ public class CephMount {
    * @param size New file size.
    */
   public void ftruncate(int fd, long size) {
-    native_ceph_ftruncate(instance_ptr, fd, size);
+    rlock.lock();
+    try {
+      native_ceph_ftruncate(instance_ptr, fd, size);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_ftruncate(long mountp, int fd, long size);
@@ -490,7 +639,12 @@ public class CephMount {
    * @param dataonly Synchronize only data.
    */
   public void fsync(int fd, boolean dataonly) {
-    native_ceph_fsync(instance_ptr, fd, dataonly);
+    rlock.lock();
+    try {
+      native_ceph_fsync(instance_ptr, fd, dataonly);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_fsync(long mountp, int fd, boolean dataonly);
@@ -502,7 +656,12 @@ public class CephMount {
    * @param stat The object in which to store the status.
    */
   public void fstat(int fd, CephStat stat) {
-    native_ceph_fstat(instance_ptr, fd, stat);
+    rlock.lock();
+    try {
+      native_ceph_fstat(instance_ptr, fd, stat);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_fstat(long mountp, int fd, CephStat stat);
@@ -511,7 +670,12 @@ public class CephMount {
    * Synchronize the client with the file system.
    */
   public void sync_fs() {
-    native_ceph_sync_fs(instance_ptr);
+    rlock.lock();
+    try {
+      native_ceph_sync_fs(instance_ptr);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_sync_fs(long mountp);
@@ -529,7 +693,12 @@ public class CephMount {
    * details.
    */
   public long getxattr(String path, String name, byte[] buf) throws FileNotFoundException {
-    return native_ceph_getxattr(instance_ptr, path, name, buf);
+    rlock.lock();
+    try {
+      return native_ceph_getxattr(instance_ptr, path, name, buf);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native long native_ceph_getxattr(long mountp, String path, String name, byte[] buf);
@@ -547,7 +716,12 @@ public class CephMount {
    * details.
    */
   public long lgetxattr(String path, String name, byte[] buf) throws FileNotFoundException {
-    return native_ceph_lgetxattr(instance_ptr, path, name, buf);
+    rlock.lock();
+    try {
+      return native_ceph_lgetxattr(instance_ptr, path, name, buf);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native long native_ceph_lgetxattr(long mountp, String path, String name, byte[] buf);
@@ -559,7 +733,12 @@ public class CephMount {
    * @return List of attribute names.
    */
   public String[] listxattr(String path) throws FileNotFoundException {
-    return native_ceph_listxattr(instance_ptr, path);
+    rlock.lock();
+    try {
+      return native_ceph_listxattr(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String[] native_ceph_listxattr(long mountp, String path);
@@ -571,7 +750,12 @@ public class CephMount {
    * @return List of attribute names.
    */
   public String[] llistxattr(String path) throws FileNotFoundException {
-    return native_ceph_llistxattr(instance_ptr, path);
+    rlock.lock();
+    try {
+      return native_ceph_llistxattr(instance_ptr, path);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native String[] native_ceph_llistxattr(long mountp, String path);
@@ -583,7 +767,12 @@ public class CephMount {
    * @param name Name of attribute.
    */
   public void removexattr(String path, String name) throws FileNotFoundException {
-    native_ceph_removexattr(instance_ptr, path, name);
+    rlock.lock();
+    try {
+      native_ceph_removexattr(instance_ptr, path, name);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_removexattr(long mountp, String path, String name);
@@ -595,7 +784,12 @@ public class CephMount {
    * @param name Name of attribute.
    */
   public void lremovexattr(String path, String name) throws FileNotFoundException {
-    native_ceph_lremovexattr(instance_ptr, path, name);
+    rlock.lock();
+    try {
+      native_ceph_lremovexattr(instance_ptr, path, name);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_lremovexattr(long mountp, String path, String name);
@@ -610,7 +804,12 @@ public class CephMount {
    * @param flags Flag controlling behavior (XATTR_CREATE/REPLACE/NONE).
    */
   public void setxattr(String path, String name, byte[] buf, long size, int flags) throws FileNotFoundException {
-    native_ceph_setxattr(instance_ptr, path, name, buf, size, flags);
+    rlock.lock();
+    try {
+      native_ceph_setxattr(instance_ptr, path, name, buf, size, flags);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_setxattr(long mountp, String path, String name, byte[] buf, long size, int flags);
@@ -625,7 +824,12 @@ public class CephMount {
    * @param flags Flag controlling behavior (XATTR_CREATE/REPLACE/NONE).
    */
   public void lsetxattr(String path, String name, byte[] buf, long size, int flags) throws FileNotFoundException {
-    native_ceph_lsetxattr(instance_ptr, path, name, buf, size, flags);
+    rlock.lock();
+    try {
+      native_ceph_lsetxattr(instance_ptr, path, name, buf, size, flags);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_lsetxattr(long mountp, String path, String name, byte[] buf, long size, int flags);
@@ -637,7 +841,12 @@ public class CephMount {
    * @return The stripe unit.
    */
   public int get_file_stripe_unit(int fd) {
-    return native_ceph_get_file_stripe_unit(instance_ptr, fd);
+    rlock.lock();
+    try {
+      return native_ceph_get_file_stripe_unit(instance_ptr, fd);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_get_file_stripe_unit(long mountp, int fd);
@@ -649,7 +858,12 @@ public class CephMount {
    * @return The file replication.
    */
   public int get_file_replication(int fd) {
-    return native_ceph_get_file_replication(instance_ptr, fd);
+    rlock.lock();
+    try {
+      return native_ceph_get_file_replication(instance_ptr, fd);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_get_file_replication(long mountp, int fd);
@@ -660,7 +874,12 @@ public class CephMount {
    * @param state Enable or disable localized reads.
    */
   public void localize_reads(boolean state) {
-    native_ceph_localize_reads(instance_ptr, state);
+    rlock.lock();
+    try {
+      native_ceph_localize_reads(instance_ptr, state);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_localize_reads(long mountp, boolean on);
@@ -671,7 +890,12 @@ public class CephMount {
    * @return Stripe unit granularity.
    */
   public int get_stripe_unit_granularity() {
-    return native_ceph_get_stripe_unit_granularity(instance_ptr);
+    rlock.lock();
+    try {
+      return native_ceph_get_stripe_unit_granularity(instance_ptr);
+    } finally {
+      rlock.unlock();
+    }
   }
 
   private static native int native_ceph_get_stripe_unit_granularity(long mountp);