From 13cb196ea7e862aa5554bb3d9b1d8f2fa8533b5f Mon Sep 17 00:00:00 2001 From: Noah Watkins Date: Fri, 11 Jan 2013 15:56:10 -0800 Subject: [PATCH] java: add fine grained synchronization Adds r/w lock to protect against some races. 1. Mutual exclusion for mount/unmount prevents races between the two in libcephfs, which isn't safe (access to ceph_mount_info state). 2. An extremely narrow race between unmount and ceph_* calls in libcephfs. ThreadA calls ceph_xxx, is_mounted test passes, then ThreadB calls unmount and destroys the client. ThreadA resumes with a bad client pointer. 3. Race between unmount and ceph_* calls in JNI. In JNI we hold the CephContext reference across ceph_* calls. If the ceph mount were to be released while a thread was returning from a ceph_* call then an attempt to write to the log (e.g. the return value) would reference bad context. Since ceph_release is only called by finalize() then no thread can be in JNI. So this is actually safe. Using r/w here provides trade-off between allowing concurrency into libcephfs, and not having to constantly update the Java bindings. The only assumption is that unmount/mount race with the rest of the interface. Signed-off-by: Noah Watkins --- src/java/java/com/ceph/fs/CephMount.java | 312 +++++++++++++++++++---- 1 file changed, 268 insertions(+), 44 deletions(-) diff --git a/src/java/java/com/ceph/fs/CephMount.java b/src/java/java/com/ceph/fs/CephMount.java index 85119c36882c3..11338585be81a 100644 --- a/src/java/java/com/ceph/fs/CephMount.java +++ b/src/java/java/com/ceph/fs/CephMount.java @@ -21,6 +21,8 @@ package com.ceph.fs; import java.io.IOException; import java.io.FileNotFoundException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.Arrays; import java.lang.String; @@ -89,6 +91,13 @@ public class CephMount { */ static native void native_initialize(); + /* + * RW lock used for fine grained synchronization to native + */ + private final ReentrantReadWriteLock rwlock = new ReentrantReadWriteLock(); + private final Lock rlock = rwlock.readLock(); + private final Lock wlock = rwlock.writeLock(); + /* * Controls clean-up synchronization between the constructor and finalize(). * If native_ceph_create fails, then we want a call to finalize() to not @@ -138,7 +147,12 @@ public class CephMount { * @param root The path to use as the root (pass null for "/"). */ public void mount(String root) { - native_ceph_mount(instance_ptr, root); + wlock.lock(); + try { + native_ceph_mount(instance_ptr, root); + } finally { + wlock.unlock(); + } } private static native int native_ceph_mount(long mountp, String root); @@ -150,7 +164,12 @@ public class CephMount { * previously set are not reset. */ public void unmount() { - native_ceph_unmount(instance_ptr); + wlock.lock(); + try { + native_ceph_unmount(instance_ptr); + } finally { + wlock.unlock(); + } } private static native int native_ceph_unmount(long mountp); @@ -166,7 +185,12 @@ public class CephMount { * @param path The path to the configuration file. */ public void conf_read_file(String path) throws FileNotFoundException { - native_ceph_conf_read_file(instance_ptr, path); + rlock.lock(); + try { + native_ceph_conf_read_file(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native int native_ceph_conf_read_file(long mountp, String path); @@ -178,7 +202,12 @@ public class CephMount { * @param value The new value of the option. */ public void conf_set(String option, String value) { - native_ceph_conf_set(instance_ptr, option, value); + rlock.lock(); + try { + native_ceph_conf_set(instance_ptr, option, value); + } finally { + rlock.unlock(); + } } private static native int native_ceph_conf_set(long mountp, String option, String value); @@ -190,7 +219,12 @@ public class CephMount { * @return The value of the option or null if option not found */ public String conf_get(String option) { - return native_ceph_conf_get(instance_ptr, option); + rlock.lock(); + try { + return native_ceph_conf_get(instance_ptr, option); + } finally { + rlock.unlock(); + } } private static native String native_ceph_conf_get(long mountp, String option); @@ -202,7 +236,12 @@ public class CephMount { * @param statvfs CephStatVFS structure to hold status. */ public void statfs(String path, CephStatVFS statvfs) throws FileNotFoundException { - native_ceph_statfs(instance_ptr, path, statvfs); + rlock.lock(); + try { + native_ceph_statfs(instance_ptr, path, statvfs); + } finally { + rlock.unlock(); + } } private static native int native_ceph_statfs(long mountp, String path, CephStatVFS statvfs); @@ -213,7 +252,12 @@ public class CephMount { * @return The current working directory in Ceph. */ public String getcwd() { - return native_ceph_getcwd(instance_ptr); + rlock.lock(); + try { + return native_ceph_getcwd(instance_ptr); + } finally { + rlock.unlock(); + } } private static native String native_ceph_getcwd(long mountp); @@ -224,7 +268,12 @@ public class CephMount { * @param path The directory set as the cwd. */ public void chdir(String path) throws FileNotFoundException { - native_ceph_chdir(instance_ptr, path); + rlock.lock(); + try { + native_ceph_chdir(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native int native_ceph_chdir(long mountp, String cwd); @@ -236,7 +285,12 @@ public class CephMount { * @return List of files and directories excluding "." and "..". */ public String[] listdir(String dir) throws FileNotFoundException { - return native_ceph_listdir(instance_ptr, dir); + rlock.lock(); + try { + return native_ceph_listdir(instance_ptr, dir); + } finally { + rlock.unlock(); + } } private static native String[] native_ceph_listdir(long mountp, String path); @@ -248,7 +302,12 @@ public class CephMount { * @param newpath The name of the link. */ public void link(String oldpath, String newpath) throws FileNotFoundException { - native_ceph_link(instance_ptr, oldpath, newpath); + rlock.lock(); + try { + native_ceph_link(instance_ptr, oldpath, newpath); + } finally { + rlock.unlock(); + } } private static native int native_ceph_link(long mountp, String existing, String newname); @@ -259,7 +318,12 @@ public class CephMount { * @param path The name to unlink/delete. */ public void unlink(String path) throws FileNotFoundException { - native_ceph_unlink(instance_ptr, path); + rlock.lock(); + try { + native_ceph_unlink(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native int native_ceph_unlink(long mountp, String path); @@ -271,7 +335,12 @@ public class CephMount { * @param to The new path. */ public void rename(String from, String to) throws FileNotFoundException { - native_ceph_rename(instance_ptr, from, to); + rlock.lock(); + try { + native_ceph_rename(instance_ptr, from, to); + } finally { + rlock.unlock(); + } } private static native int native_ceph_rename(long mountp, String from, String to); @@ -283,7 +352,12 @@ public class CephMount { * @param mode The mode of the new directory. */ public void mkdir(String path, int mode) { - native_ceph_mkdir(instance_ptr, path, mode); + rlock.lock(); + try { + native_ceph_mkdir(instance_ptr, path, mode); + } finally { + rlock.unlock(); + } } private static native int native_ceph_mkdir(long mountp, String path, int mode); @@ -295,7 +369,12 @@ public class CephMount { * @param mode The mode of the new directory. */ public void mkdirs(String path, int mode) throws IOException { - native_ceph_mkdirs(instance_ptr, path, mode); + rlock.lock(); + try { + native_ceph_mkdirs(instance_ptr, path, mode); + } finally { + rlock.unlock(); + } } private static native int native_ceph_mkdirs(long mountp, String path, int mode); @@ -306,7 +385,12 @@ public class CephMount { * @param path The directory to delete. */ public void rmdir(String path) throws FileNotFoundException { - native_ceph_rmdir(instance_ptr, path); + rlock.lock(); + try { + native_ceph_rmdir(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native int native_ceph_rmdir(long mountp, String path); @@ -315,7 +399,12 @@ public class CephMount { * Read the value of a symbolic link. */ public String readlink(String path) throws FileNotFoundException { - return native_ceph_readlink(instance_ptr, path); + rlock.lock(); + try { + return native_ceph_readlink(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native String native_ceph_readlink(long mountp, String path); @@ -327,7 +416,12 @@ public class CephMount { * @param newpath Name of the link. */ public void symlink(String oldpath, String newpath) { - native_ceph_symlink(instance_ptr, oldpath, newpath); + rlock.lock(); + try { + native_ceph_symlink(instance_ptr, oldpath, newpath); + } finally { + rlock.unlock(); + } } private static native int native_ceph_symlink(long mountp, String existing, String newname); @@ -339,7 +433,12 @@ public class CephMount { * @param stat CephStat structure to hold file status. */ public void lstat(String path, CephStat stat) throws FileNotFoundException, CephNotDirectoryException { - native_ceph_lstat(instance_ptr, path, stat); + rlock.lock(); + try { + native_ceph_lstat(instance_ptr, path, stat); + } finally { + rlock.unlock(); + } } private static native int native_ceph_lstat(long mountp, String path, CephStat stat); @@ -352,7 +451,12 @@ public class CephMount { * @param mask Mask specifying which attributes to set. */ public void setattr(String path, CephStat stat, int mask) throws FileNotFoundException { - native_ceph_setattr(instance_ptr, path, stat, mask); + rlock.lock(); + try { + native_ceph_setattr(instance_ptr, path, stat, mask); + } finally { + rlock.unlock(); + } } private static native int native_ceph_setattr(long mountp, String relpath, CephStat stat, int mask); @@ -364,7 +468,12 @@ public class CephMount { * @param mode New mode bits. */ public void chmod(String path, int mode) throws FileNotFoundException { - native_ceph_chmod(instance_ptr, path, mode); + rlock.lock(); + try { + native_ceph_chmod(instance_ptr, path, mode); + } finally { + rlock.unlock(); + } } private static native int native_ceph_chmod(long mountp, String path, int mode); @@ -376,7 +485,12 @@ public class CephMount { * @param size New file length. */ public void truncate(String path, long size) throws FileNotFoundException { - native_ceph_truncate(instance_ptr, path, size); + rlock.lock(); + try { + native_ceph_truncate(instance_ptr, path, size); + } finally { + rlock.unlock(); + } } private static native int native_ceph_truncate(long mountp, String path, long size); @@ -390,7 +504,12 @@ public class CephMount { * @return File descriptor. */ public int open(String path, int flags, int mode) throws FileNotFoundException { - return native_ceph_open(instance_ptr, path, flags, mode); + rlock.lock(); + try { + return native_ceph_open(instance_ptr, path, flags, mode); + } finally { + rlock.unlock(); + } } private static native int native_ceph_open(long mountp, String path, int flags, int mode); @@ -409,8 +528,13 @@ public class CephMount { */ public int open(String path, int flags, int mode, int stripe_unit, int stripe_count, int object_size, String data_pool) throws FileNotFoundException { - return native_ceph_open_layout(instance_ptr, path, flags, mode, stripe_unit, - stripe_count, object_size, data_pool); + rlock.lock(); + try { + return native_ceph_open_layout(instance_ptr, path, flags, mode, stripe_unit, + stripe_count, object_size, data_pool); + } finally { + rlock.unlock(); + } } private static native int native_ceph_open_layout(long mountp, String path, @@ -422,7 +546,12 @@ public class CephMount { * @param fd The file descriptor. */ public void close(int fd) { - native_ceph_close(instance_ptr, fd); + rlock.lock(); + try { + native_ceph_close(instance_ptr, fd); + } finally { + rlock.unlock(); + } } private static native int native_ceph_close(long mountp, int fd); @@ -436,7 +565,12 @@ public class CephMount { * @return The new offset. */ public long lseek(int fd, long offset, int whence) { - return native_ceph_lseek(instance_ptr, fd, offset, whence); + rlock.lock(); + try { + return native_ceph_lseek(instance_ptr, fd, offset, whence); + } finally { + rlock.unlock(); + } } private static native long native_ceph_lseek(long mountp, int fd, long offset, int whence); @@ -451,7 +585,12 @@ public class CephMount { * @return The number of bytes read. */ public long read(int fd, byte[] buf, long size, long offset) { - return native_ceph_read(instance_ptr, fd, buf, size, offset); + rlock.lock(); + try { + return native_ceph_read(instance_ptr, fd, buf, size, offset); + } finally { + rlock.unlock(); + } } private static native long native_ceph_read(long mountp, int fd, byte[] buf, long size, long offset); @@ -466,7 +605,12 @@ public class CephMount { * @return The number of bytes written. */ public long write(int fd, byte[] buf, long size, long offset) { - return native_ceph_write(instance_ptr, fd, buf, size, offset); + rlock.lock(); + try { + return native_ceph_write(instance_ptr, fd, buf, size, offset); + } finally { + rlock.unlock(); + } } private static native long native_ceph_write(long mountp, int fd, byte[] buf, long size, long offset); @@ -478,7 +622,12 @@ public class CephMount { * @param size New file size. */ public void ftruncate(int fd, long size) { - native_ceph_ftruncate(instance_ptr, fd, size); + rlock.lock(); + try { + native_ceph_ftruncate(instance_ptr, fd, size); + } finally { + rlock.unlock(); + } } private static native int native_ceph_ftruncate(long mountp, int fd, long size); @@ -490,7 +639,12 @@ public class CephMount { * @param dataonly Synchronize only data. */ public void fsync(int fd, boolean dataonly) { - native_ceph_fsync(instance_ptr, fd, dataonly); + rlock.lock(); + try { + native_ceph_fsync(instance_ptr, fd, dataonly); + } finally { + rlock.unlock(); + } } private static native int native_ceph_fsync(long mountp, int fd, boolean dataonly); @@ -502,7 +656,12 @@ public class CephMount { * @param stat The object in which to store the status. */ public void fstat(int fd, CephStat stat) { - native_ceph_fstat(instance_ptr, fd, stat); + rlock.lock(); + try { + native_ceph_fstat(instance_ptr, fd, stat); + } finally { + rlock.unlock(); + } } private static native int native_ceph_fstat(long mountp, int fd, CephStat stat); @@ -511,7 +670,12 @@ public class CephMount { * Synchronize the client with the file system. */ public void sync_fs() { - native_ceph_sync_fs(instance_ptr); + rlock.lock(); + try { + native_ceph_sync_fs(instance_ptr); + } finally { + rlock.unlock(); + } } private static native int native_ceph_sync_fs(long mountp); @@ -529,7 +693,12 @@ public class CephMount { * details. */ public long getxattr(String path, String name, byte[] buf) throws FileNotFoundException { - return native_ceph_getxattr(instance_ptr, path, name, buf); + rlock.lock(); + try { + return native_ceph_getxattr(instance_ptr, path, name, buf); + } finally { + rlock.unlock(); + } } private static native long native_ceph_getxattr(long mountp, String path, String name, byte[] buf); @@ -547,7 +716,12 @@ public class CephMount { * details. */ public long lgetxattr(String path, String name, byte[] buf) throws FileNotFoundException { - return native_ceph_lgetxattr(instance_ptr, path, name, buf); + rlock.lock(); + try { + return native_ceph_lgetxattr(instance_ptr, path, name, buf); + } finally { + rlock.unlock(); + } } private static native long native_ceph_lgetxattr(long mountp, String path, String name, byte[] buf); @@ -559,7 +733,12 @@ public class CephMount { * @return List of attribute names. */ public String[] listxattr(String path) throws FileNotFoundException { - return native_ceph_listxattr(instance_ptr, path); + rlock.lock(); + try { + return native_ceph_listxattr(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native String[] native_ceph_listxattr(long mountp, String path); @@ -571,7 +750,12 @@ public class CephMount { * @return List of attribute names. */ public String[] llistxattr(String path) throws FileNotFoundException { - return native_ceph_llistxattr(instance_ptr, path); + rlock.lock(); + try { + return native_ceph_llistxattr(instance_ptr, path); + } finally { + rlock.unlock(); + } } private static native String[] native_ceph_llistxattr(long mountp, String path); @@ -583,7 +767,12 @@ public class CephMount { * @param name Name of attribute. */ public void removexattr(String path, String name) throws FileNotFoundException { - native_ceph_removexattr(instance_ptr, path, name); + rlock.lock(); + try { + native_ceph_removexattr(instance_ptr, path, name); + } finally { + rlock.unlock(); + } } private static native int native_ceph_removexattr(long mountp, String path, String name); @@ -595,7 +784,12 @@ public class CephMount { * @param name Name of attribute. */ public void lremovexattr(String path, String name) throws FileNotFoundException { - native_ceph_lremovexattr(instance_ptr, path, name); + rlock.lock(); + try { + native_ceph_lremovexattr(instance_ptr, path, name); + } finally { + rlock.unlock(); + } } private static native int native_ceph_lremovexattr(long mountp, String path, String name); @@ -610,7 +804,12 @@ public class CephMount { * @param flags Flag controlling behavior (XATTR_CREATE/REPLACE/NONE). */ public void setxattr(String path, String name, byte[] buf, long size, int flags) throws FileNotFoundException { - native_ceph_setxattr(instance_ptr, path, name, buf, size, flags); + rlock.lock(); + try { + native_ceph_setxattr(instance_ptr, path, name, buf, size, flags); + } finally { + rlock.unlock(); + } } private static native int native_ceph_setxattr(long mountp, String path, String name, byte[] buf, long size, int flags); @@ -625,7 +824,12 @@ public class CephMount { * @param flags Flag controlling behavior (XATTR_CREATE/REPLACE/NONE). */ public void lsetxattr(String path, String name, byte[] buf, long size, int flags) throws FileNotFoundException { - native_ceph_lsetxattr(instance_ptr, path, name, buf, size, flags); + rlock.lock(); + try { + native_ceph_lsetxattr(instance_ptr, path, name, buf, size, flags); + } finally { + rlock.unlock(); + } } private static native int native_ceph_lsetxattr(long mountp, String path, String name, byte[] buf, long size, int flags); @@ -637,7 +841,12 @@ public class CephMount { * @return The stripe unit. */ public int get_file_stripe_unit(int fd) { - return native_ceph_get_file_stripe_unit(instance_ptr, fd); + rlock.lock(); + try { + return native_ceph_get_file_stripe_unit(instance_ptr, fd); + } finally { + rlock.unlock(); + } } private static native int native_ceph_get_file_stripe_unit(long mountp, int fd); @@ -649,7 +858,12 @@ public class CephMount { * @return The file replication. */ public int get_file_replication(int fd) { - return native_ceph_get_file_replication(instance_ptr, fd); + rlock.lock(); + try { + return native_ceph_get_file_replication(instance_ptr, fd); + } finally { + rlock.unlock(); + } } private static native int native_ceph_get_file_replication(long mountp, int fd); @@ -660,7 +874,12 @@ public class CephMount { * @param state Enable or disable localized reads. */ public void localize_reads(boolean state) { - native_ceph_localize_reads(instance_ptr, state); + rlock.lock(); + try { + native_ceph_localize_reads(instance_ptr, state); + } finally { + rlock.unlock(); + } } private static native int native_ceph_localize_reads(long mountp, boolean on); @@ -671,7 +890,12 @@ public class CephMount { * @return Stripe unit granularity. */ public int get_stripe_unit_granularity() { - return native_ceph_get_stripe_unit_granularity(instance_ptr); + rlock.lock(); + try { + return native_ceph_get_stripe_unit_granularity(instance_ptr); + } finally { + rlock.unlock(); + } } private static native int native_ceph_get_stripe_unit_granularity(long mountp); -- 2.39.5