]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Now wraps all ceph calls in a CephFS ABC with a CephTalker implementing
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 15 Oct 2009 18:35:38 +0000 (11:35 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 16 Oct 2009 19:21:25 +0000 (12:21 -0700)
src/client/hadoop/ceph/CephFS.java [new file with mode: 0644]
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephTalker.java [new file with mode: 0644]

diff --git a/src/client/hadoop/ceph/CephFS.java b/src/client/hadoop/ceph/CephFS.java
new file mode 100644 (file)
index 0000000..a95fdc8
--- /dev/null
@@ -0,0 +1,189 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * 
+ * Abstract base class for communicating with a Ceph filesystem and its
+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
+ * As only the Ceph package should be using this directly, all methods
+ * are protected.
+ */
+package org.apache.hadoop.fs.ceph;
+
+abstract class CephFS {
+
+       /*
+        * Performs any necessary setup to allow general use of the filesystem.
+        * Inputs:
+        *  String argsuments -- a command-line style input of Ceph config params
+        *  int block_size -- the size in bytes to use for blocks
+        * Returns: true on success, false otherwise
+        */
+       abstract protected boolean ceph_initializeClient(String arguments, int block_size);
+       
+       /*
+        * Returns the current working directory.(absolute) as a String
+        */
+       abstract protected String ceph_getcwd();
+       /*
+        * Changes the working directory.
+        * Inputs:
+        *  String path: The path (relative or absolute) to switch to
+        * Returns: true on success, false otherwise.
+        */
+       abstract protected boolean ceph_setcwd(String path);
+       /*
+        * Given a path to a directory, removes the directory.if empty.
+        * Inputs:
+        *  jstring j_path: The path (relative or absolute) to the directory
+        * Returns: true on successful delete; false otherwise
+        */
+  abstract protected boolean ceph_rmdir(String path);
+       /*
+        * Given a path, unlinks it.
+        * Inputs:
+        *  String path: The path (relative or absolute) to the file or empty dir
+        * Returns: true if the unlink occurred, false otherwise.
+        */
+  abstract protected boolean ceph_unlink(String path);
+       /*
+        * Changes a given path name to a new name.
+        * Inputs:
+        *  jstring j_from: The path whose name you want to change.
+        *  jstring j_to: The new name for the path.
+        * Returns: true if the rename occurred, false otherwise
+        */
+  abstract protected boolean ceph_rename(String old_path, String new_path);
+       /*
+        * Returns true if it the input path exists, false
+        * if it does not or there is an unexpected failure.
+        */
+  abstract protected boolean ceph_exists(String path);
+       /*
+        * Get the block size for a given path.
+        * Input:
+        *  String path: The path (relative or absolute) you want
+        *  the block size for.
+        * Returns: block size if the path exists, otherwise a negative number
+        *  corresponding to the standard C++ error codes (which are positive).
+        */
+  abstract protected long ceph_getblocksize(String path);
+       /*
+        * Returns true if the given path is a directory, false otherwise.
+        */
+  abstract protected boolean ceph_isdirectory(String path);
+       /*
+        * Returns true if the given path is a file; false otherwise.
+        */
+  abstract protected boolean ceph_isfile(String path);
+       /*
+        * Get the contents of a given directory.
+        * Inputs:
+        *  String path: The path (relative or absolute) to the directory.
+        * Returns: A Java String[] of the contents of the directory, or
+        *  NULL if there is an error (ie, path is not a dir). This listing
+        *  will not contain . or .. entries.
+        */
+  abstract protected String[] ceph_getdir(String path);
+       /*
+        * Create the specified directory and any required intermediate ones with the
+        * given mode.
+        */
+  abstract protected int ceph_mkdirs(String path, int mode);
+       /*
+        * Open a file to append. If the file does not exist, it will be created.
+        * Opening a dir is possible but may have bad results.
+        * Inputs:
+        *  String path: The path to open.
+        * Returns: an int filehandle, or a number<0 if an error occurs.
+        */
+  abstract protected int ceph_open_for_append(String path);
+       /*
+        * Open a file for reading.
+        * Opening a dir is possible but may have bad results.
+        * Inputs:
+        *  String path: The path to open.
+        * Returns: an int filehandle, or a number<0 if an error occurs.
+        */
+  abstract protected int ceph_open_for_read(String path);
+       /*
+        * Opens a file for overwriting; creates it if necessary.
+        * Opening a dir is possible but may have bad results.
+        * Inputs:
+        *  String path: The path to open.
+        *  int mode: The mode to open with.
+        * Returns: an int filehandle, or a number<0 if an error occurs.
+        */
+  abstract protected int ceph_open_for_overwrite(String path, int mode);
+       /*
+        * Closes a given filehandle.
+        */
+  abstract protected int ceph_close(int filehandle);
+       /*
+        * Change the mode on a path.
+        * Inputs:
+        *  String path: The path to change mode on.
+        *  int mode: The mode to apply.
+        * Returns: true if the mode is properly applied, false if there
+        *  is any error.
+        */
+  abstract protected boolean ceph_setPermission(String path, int mode);
+       /*
+        * Closes the Ceph client. This should be called before shutting down
+        * (multiple times is okay but redundant).
+        */
+  abstract protected boolean ceph_kill_client();
+       /*
+        * Get the statistics on a path returned in a custom format defined below.
+        * Inputs:
+        *  String path: The path to stat.
+        *  Stat fill: The stat object to fill.
+        * Returns: true if the stat is successful, false otherwise.
+        */
+  abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
+       /*
+        * Statfs a filesystem in a custom format defined in CephFileSystem.
+        * Inputs:
+        *  String path: A path on the filesystem that you wish to stat.
+        *  CephStat fill: The CephStat object to fill.
+        * Returns: true if successful and the CephStat is filled; false otherwise.
+        */
+  abstract protected int ceph_statfs(String Path, CephFileSystem.CephStat fill);
+       /*
+        * Check how many times a path should be replicated (if it is
+        * degraded it may not actually be replicated this often).
+        * Inputs:
+        *  String path: The path to check.
+        * Returns: an int containing the number of times replicated.
+        */
+  abstract protected int ceph_replication(String path);
+       /*
+        * Find the IP:port addresses of the primary OSD for a given file and offset.
+        * Inputs:
+        *  int fh: The filehandle for the file.
+        *  long offset: The offset to get the location of.
+        * Returns: a String of the location as IP, or NULL if there is an error.
+        */
+  abstract protected String ceph_hosts(int fh, long offset);
+       /*
+        * Set the mtime and atime for a given path.
+        * Inputs:
+        *  String path: The path to set the times for.
+        *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
+        *  long atime: The atime to set, in millis since epoch (-1 to not set)
+        * Returns: 0 if successful, an error code otherwise.
+        */
+  abstract protected int ceph_setTimes(String path, long mtime, long atime);
+
+}
index d522167570163a0f5e8b96db8a3fb1ff9b62b20f..1cebf706daa2cc7df932a6f9df284e84b4f72863 100644 (file)
@@ -78,172 +78,10 @@ public class CephFileSystem extends FileSystem {
 
   private final Path root;
   private boolean initialized = false;
+       private CephFS ceph = null;
 
   private boolean debug = false;
   private String fs_default_name;
-  
-       /*
-        * Performs any necessary setup to allow general use of the filesystem.
-        * Inputs:
-        *  String argsuments -- a command-line style input of Ceph config params
-        *  int block_size -- the size in bytes to use for blocks
-        * Returns: true on success, false otherwise
-        */
-  private native boolean ceph_initializeClient(String arguments, int block_size);
-
-       /*
-        * Returns the current working directory.(absolute) as a String
-        */
-  private native String ceph_getcwd();
-       /*
-        * Changes the working directory.
-        * Inputs:
-        *  String path: The path (relative or absolute) to switch to
-        * Returns: true on success, false otherwise.
-        */
-  private native boolean ceph_setcwd(String path);
-       /*
-        * Given a path to a directory, removes the directory.if empty.
-        * Inputs:
-        *  jstring j_path: The path (relative or absolute) to the directory
-        * Returns: true on successful delete; false otherwise
-        */
-  private native boolean ceph_rmdir(String path);
-       /*
-        * Given a path, unlinks it.
-        * Inputs:
-        *  String path: The path (relative or absolute) to the file or empty dir
-        * Returns: true if the unlink occurred, false otherwise.
-        */
-  private native boolean ceph_unlink(String path);
-       /*
-        * Changes a given path name to a new name.
-        * Inputs:
-        *  jstring j_from: The path whose name you want to change.
-        *  jstring j_to: The new name for the path.
-        * Returns: true if the rename occurred, false otherwise
-        */
-  private native boolean ceph_rename(String old_path, String new_path);
-       /*
-        * Returns true if it the input path exists, false
-        * if it does not or there is an unexpected failure.
-        */
-  private native boolean ceph_exists(String path);
-       /*
-        * Get the block size for a given path.
-        * Input:
-        *  String path: The path (relative or absolute) you want
-        *  the block size for.
-        * Returns: block size if the path exists, otherwise a negative number
-        *  corresponding to the standard C++ error codes (which are positive).
-        */
-  private native long ceph_getblocksize(String path);
-       /*
-        * Returns true if the given path is a directory, false otherwise.
-        */
-  private native boolean ceph_isdirectory(String path);
-       /*
-        * Returns true if the given path is a file; false otherwise.
-        */
-  private native boolean ceph_isfile(String path);
-       /*
-        * Get the contents of a given directory.
-        * Inputs:
-        *  String path: The path (relative or absolute) to the directory.
-        * Returns: A Java String[] of the contents of the directory, or
-        *  NULL if there is an error (ie, path is not a dir). This listing
-        *  will not contain . or .. entries.
-        */
-  private native String[] ceph_getdir(String path);
-       /*
-        * Create the specified directory and any required intermediate ones with the
-        * given mode.
-        */
-  private native int ceph_mkdirs(String path, int mode);
-       /*
-        * Open a file to append. If the file does not exist, it will be created.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
-  private native int ceph_open_for_append(String path);
-       /*
-        * Open a file for reading.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
-  private native int ceph_open_for_read(String path);
-       /*
-        * Opens a file for overwriting; creates it if necessary.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        *  int mode: The mode to open with.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
-  private native int ceph_open_for_overwrite(String path, int mode);
-       /*
-        * Closes a given filehandle.
-        */
-  private native int ceph_close(int filehandle);
-       /*
-        * Change the mode on a path.
-        * Inputs:
-        *  String path: The path to change mode on.
-        *  int mode: The mode to apply.
-        * Returns: true if the mode is properly applied, false if there
-        *  is any error.
-        */
-  private native boolean ceph_setPermission(String path, int mode);
-       /*
-        * Closes the Ceph client. This should be called before shutting down
-        * (multiple times is okay but redundant).
-        */
-  private native boolean ceph_kill_client();
-       /*
-        * Get the statistics on a path returned in a custom format defined below.
-        * Inputs:
-        *  String path: The path to stat.
-        *  Stat fill: The stat object to fill.
-        * Returns: true if the stat is successful, false otherwise.
-        */
-  private native boolean ceph_stat(String path, Stat fill);
-       /*
-        * Statfs a filesystem in a custom format defined in CephFileSystem.
-        * Inputs:
-        *  String path: A path on the filesystem that you wish to stat.
-        *  CephStat fill: The CephStat object to fill.
-        * Returns: true if successful and the CephStat is filled; false otherwise.
-        */
-  private native int ceph_statfs(String Path, CephStat fill);
-       /*
-        * Check how many times a path should be replicated (if it is
-        * degraded it may not actually be replicated this often).
-        * Inputs:
-        *  String path: The path to check.
-        * Returns: an int containing the number of times replicated.
-        */
-  private native int ceph_replication(String path);
-       /*
-        * Find the IP:port addresses of the primary OSD for a given file and offset.
-        * Inputs:
-        *  int fh: The filehandle for the file.
-        *  long offset: The offset to get the location of.
-        * Returns: a String of the location as IP, or NULL if there is an error.
-        */
-  private native String ceph_hosts(int fh, long offset);
-       /*
-        * Set the mtime and atime for a given path.
-        * Inputs:
-        *  String path: The path to set the times for.
-        *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
-        *  long atime: The atime to set, in millis since epoch (-1 to not set)
-        * Returns: 0 if successful, an error code otherwise.
-        */
-  private native int ceph_setTimes(String path, long mtime, long atime);
 
   /**
    * Create a new CephFileSystem.
@@ -276,12 +114,14 @@ public class CephFileSystem extends FileSystem {
   public void initialize(URI uri, Configuration conf) throws IOException {
     debug("initialize:enter", DEBUG);
     if (!initialized) {
-      System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-      System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
       super.initialize(uri, conf);
       setConf(conf);
       this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());  
-      statistics = getStatistics(uri.getScheme(), getClass());  
+      statistics = getStatistics(uri.getScheme(), getClass());
+
+                       if (ceph == null) {
+                               ceph = new CephTalker(conf);
+                       }
       
       fs_default_name = conf.get("fs.default.name");
       debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
@@ -311,18 +151,29 @@ public class CephFileSystem extends FileSystem {
                                throw new IOException("You must specify a Ceph monitor address or config file!");
       }
       //  Initialize the client
-      if (!ceph_initializeClient(arguments,
+      if (!ceph.ceph_initializeClient(arguments,
                                                                                                                                 conf.getInt("fs.ceph.blockSize", 1<<26))) {
                                debug("initialize:Ceph initialization failed!", FATAL);
                                throw new IOException("Ceph initialization failed!");
       }
       initialized = true;
       debug("initialize:Ceph initialized client. Setting cwd to /", INFO);
-      ceph_setcwd("/");
+      ceph.ceph_setcwd("/");
     }
     debug("initialize:exit", DEBUG);
   }
 
+       /**
+        * Used for testing purposes, this version of initialize
+        * sets the given CephFS instead of defaulting to a
+        * CephTalker (with its assumed real Ceph instance to talk to).
+        */
+       protected void initialize(URI uri, Configuration conf, CephFS ceph_fs)
+               throws IOException{
+               ceph = ceph_fs;
+               initialize(uri, conf);
+       }
+
   /**
    * Close down the CephFileSystem. Runs the base-class close method
    * and then kills the Ceph client itself.
@@ -335,7 +186,7 @@ public class CephFileSystem extends FileSystem {
     debug("close:enter", DEBUG);
     super.close();//this method does stuff, make sure it's run!
                debug("close: Calling ceph_kill_client from Java", TRACE);
-    ceph_kill_client();
+    ceph.ceph_kill_client();
     debug("close:exit", DEBUG);
   }
 
@@ -356,7 +207,7 @@ public class CephFileSystem extends FileSystem {
     Path abs_path = makeAbsolute(file);
     if (progress!=null) progress.progress();
                debug("append: Entering ceph_open_for_append from Java", TRACE);
-    int fd = ceph_open_for_append(abs_path.toString());
+    int fd = ceph.ceph_open_for_append(abs_path.toString());
                debug("append: Returned to Java", TRACE);
     if (progress!=null) progress.progress();
     if( fd < 0 ) { //error in open
@@ -375,9 +226,9 @@ public class CephFileSystem extends FileSystem {
   public Path getWorkingDirectory() {
     if (!initialized) return null;
     debug("getWorkingDirectory:enter", DEBUG);
-               String cwd = ceph_getcwd();
+               String cwd = ceph.ceph_getcwd();
     debug("getWorkingDirectory:exit with path " + cwd, DEBUG);
-    return new Path(fs_default_name + ceph_getcwd());
+    return new Path(fs_default_name + ceph.ceph_getcwd());
   }
 
   /**
@@ -393,7 +244,7 @@ public class CephFileSystem extends FileSystem {
     debug("setWorkingDirecty:enter with new working dir " + dir, DEBUG);
     Path abs_path = makeAbsolute(dir);
     debug("setWorkingDirectory:calling ceph_setcwd from Java", TRACE);
-    if (!ceph_setcwd(abs_path.toString()))
+    if (!ceph.ceph_setcwd(abs_path.toString()))
       debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ERROR);
     debug("setWorkingDirectory:exit", DEBUG);
   }
@@ -418,7 +269,7 @@ public class CephFileSystem extends FileSystem {
     else {
       debug("exists:Calling ceph_exists from Java on path "
                                                                                        + abs_path.toString(), TRACE);
-      result =  ceph_exists(abs_path.toString());
+      result =  ceph.ceph_exists(abs_path.toString());
       debug("exists:Returned from ceph_exists to Java", TRACE);
     }
     debug("exists:exit with value " + result, DEBUG);
@@ -439,7 +290,7 @@ public class CephFileSystem extends FileSystem {
     debug("mkdirs:enter with path " + path, DEBUG);
     Path abs_path = makeAbsolute(path);
     debug("mkdirs:calling ceph_mkdirs from Java", TRACE);
-    int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
+    int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
     debug("mkdirs:exit with result " + result, DEBUG);
     if (result != 0)
       return false;
@@ -465,7 +316,7 @@ public class CephFileSystem extends FileSystem {
     }
     else {
                        debug("isFile:entering ceph_isfile from Java", TRACE);
-      result = ceph_isfile(abs_path.toString());
+      result = ceph.ceph_isfile(abs_path.toString());
     }
     debug("isFile:exit with result " + result, DEBUG);
     return result;
@@ -490,7 +341,7 @@ public class CephFileSystem extends FileSystem {
     }
     else {
       debug("calling ceph_isdirectory from Java", TRACE);
-      result = ceph_isdirectory(abs_path.toString());
+      result = ceph.ceph_isdirectory(abs_path.toString());
       debug("Returned from ceph_isdirectory to Java", TRACE);
     }
     debug("isDirectory:exit with result " + result, DEBUG);
@@ -515,9 +366,9 @@ public class CephFileSystem extends FileSystem {
     FileStatus status;
     Stat lstat = new Stat();
                debug("getFileStatus: calling ceph_stat from Java", TRACE);
-    if(ceph_stat(abs_path.toString(), lstat)) {
+    if(ceph.ceph_stat(abs_path.toString(), lstat)) {
       status = new FileStatus(lstat.size, lstat.is_dir,
-                                                                                                                       ceph_replication(abs_path.toString()),
+                                                                                                                       ceph.ceph_replication(abs_path.toString()),
                                                                                                                        lstat.block_size,
                                                                                                                        lstat.mod_time,
                                                                                                                        lstat.access_time,
@@ -570,7 +421,7 @@ public class CephFileSystem extends FileSystem {
                                        + " and permissions " + permission, DEBUG);
     Path abs_path = makeAbsolute(p);
                debug("setPermission:calling ceph_setpermission from Java", TRACE);
-    ceph_setPermission(abs_path.toString(), permission.toShort());
+    ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
                debug("setPermission:exit", DEBUG);
   }
 
@@ -588,7 +439,7 @@ public class CephFileSystem extends FileSystem {
                                        " atime:" + atime, DEBUG);
                Path abs_path = makeAbsolute(p);
                debug("setTimes:calling ceph_setTimes from Java", TRACE);
-               int r = ceph_setTimes(abs_path.toString(), mtime, atime);
+               int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
                if (r<0) throw new IOException ("Failed to set times on path "
                                                                                                                                                + abs_path.toString() + " Error code: " + r);
                debug("setTimes:exit", DEBUG);
@@ -650,7 +501,7 @@ public class CephFileSystem extends FileSystem {
     if (!exists) {
       Path parent = abs_path.getParent();
       if (parent != null) { // if parent is root, we're done
-                               int r = ceph_mkdirs(parent.toString(), permission.toShort());
+                               int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
                                if (!(r==0 || r==-EEXIST))
                                        throw new IOException ("Error creating parent directory; code: " + r);
       }
@@ -658,7 +509,7 @@ public class CephFileSystem extends FileSystem {
     }
     // Step 3: open the file
     debug("calling ceph_open_for_overwrite from Java", TRACE);
-    int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
+    int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
     if (progress!=null) progress.progress();
     debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, TRACE);
     if (fh < 0) {
@@ -686,7 +537,7 @@ public class CephFileSystem extends FileSystem {
     debug("open:enter with path " + path, DEBUG);
     Path abs_path = makeAbsolute(path);
     
-    int fh = ceph_open_for_read(abs_path.toString());
+    int fh = ceph.ceph_open_for_read(abs_path.toString());
     if (fh < 0) { //uh-oh, something's bad!
       if (fh == -ENOENT) //well that was a stupid open
                                throw new IOException("open:  absolute path \""  + abs_path.toString()
@@ -697,13 +548,13 @@ public class CephFileSystem extends FileSystem {
 
     if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
       //but that doesn't mean you should in Hadoop!
-      ceph_close(fh);
+      ceph.ceph_close(fh);
       throw new IOException("open:  absolute path \""  + abs_path.toString()
                                                                                                                + "\" is a directory!");
     }
     Stat lstat = new Stat();
                debug("open:calling ceph_stat from Java", TRACE);
-    ceph_stat(abs_path.toString(), lstat);
+    ceph.ceph_stat(abs_path.toString(), lstat);
                debug("open:returned to Java", TRACE);
     long size = lstat.size;
     if (size < 0) {
@@ -730,7 +581,7 @@ public class CephFileSystem extends FileSystem {
     Path abs_src = makeAbsolute(src);
     Path abs_dst = makeAbsolute(dst);
     debug("calling ceph_rename from Java", TRACE);
-    boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
+    boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
     debug("rename:exit with result: " + result, DEBUG);
     return result;
   }
@@ -759,7 +610,7 @@ public class CephFileSystem extends FileSystem {
     //sanitize and get the filehandle
     Path abs_path = makeAbsolute(file.getPath());
                debug("getFileBlockLocations:call ceph_open_for_read from Java", TRACE);
-    int fh = ceph_open_for_read(abs_path.toString());
+    int fh = ceph.ceph_open_for_read(abs_path.toString());
                debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
                                        + fh, TRACE);
     if (fh < 0) {
@@ -769,7 +620,7 @@ public class CephFileSystem extends FileSystem {
     }
     //get the block size
                debug("getFileBlockLocations:call ceph_getblocksize from Java", TRACE);
-    long blockSize = ceph_getblocksize(abs_path.toString());
+    long blockSize = ceph.ceph_getblocksize(abs_path.toString());
                debug("getFileBlockLocations:return from ceph_getblocksize", TRACE);
     BlockLocation[] locations =
       new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
@@ -778,7 +629,7 @@ public class CephFileSystem extends FileSystem {
                        offset = start + i*blockSize;
                        debug("getFileBlockLocations:call ceph_hosts from Java on fh "
                                                + fh + " and offset " + offset, TRACE);
-      String host = ceph_hosts(fh, offset);
+      String host = ceph.ceph_hosts(fh, offset);
                        debug("getFileBlockLocations:return from ceph_hosts to Java with host "
                                                + host, TRACE);
       String[] hostArray = new String[1];
@@ -789,7 +640,7 @@ public class CephFileSystem extends FileSystem {
     }
                debug("getFileBlockLocations:call ceph_close from Java on fh "
                                        + fh, TRACE);
-    ceph_close(fh);
+    ceph.ceph_close(fh);
                debug("getFileBlockLocations:return with " + locations.length
                                        + " locations", DEBUG);
     return locations;
@@ -815,7 +666,7 @@ public class CephFileSystem extends FileSystem {
     //error-checking code.
     CephStat ceph_stat = new CephStat();
                debug("getStatus:calling ceph_statfs from Java", TRACE);
-    int result = ceph_statfs(abs_path.toString(), ceph_stat);
+    int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
     if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
     debug("getStatus:exit successfully", DEBUG);
     return new FsStatus(ceph_stat.capacity,
@@ -849,7 +700,7 @@ public class CephFileSystem extends FileSystem {
     if (isFile(abs_path)) {
                        debug("delete:calling ceph_unlink from Java with path " + abs_path,
                                                TRACE);
-      boolean result = ceph_unlink(abs_path.toString());
+      boolean result = ceph.ceph_unlink(abs_path.toString());
       if(!result)
                                debug("delete: failed to delete file \"" +
                                                                                                abs_path.toString() + "\".", ERROR);
@@ -882,7 +733,7 @@ public class CephFileSystem extends FileSystem {
       }
     }
     //if we've come this far it's a now-empty directory, so delete it!
-    boolean result = ceph_rmdir(abs_path.toString());
+    boolean result = ceph.ceph_rmdir(abs_path.toString());
     if (!result)
       debug("delete: failed to delete \"" + abs_path.toString()
                                                + "\", BAILING", ERROR);
@@ -925,7 +776,7 @@ public class CephFileSystem extends FileSystem {
       debug("makeAbsolute:exit with path " + path, NOLOG);
       return path;
     }
-    Path new_path = new Path(ceph_getcwd(), path);
+    Path new_path = new Path(ceph.ceph_getcwd(), path);
     debug("makeAbsolute:exit with path " + new_path, NOLOG);
     return new_path;
   }
@@ -938,7 +789,7 @@ public class CephFileSystem extends FileSystem {
 
     // If it's a directory, get the listing. Otherwise, complain and give up.
     debug("calling ceph_getdir from Java with path " + abs_path, NOLOG);
-    dirlist = ceph_getdir(abs_path.toString());
+    dirlist = ceph.ceph_getdir(abs_path.toString());
     debug("returning from ceph_getdir to Java", NOLOG);
 
     if (dirlist == null) {
@@ -981,7 +832,7 @@ public class CephFileSystem extends FileSystem {
                }
   }
 
-  private static class Stat {
+  static class Stat {
     public long size;
     public boolean is_dir;
     public long block_size;
@@ -992,7 +843,7 @@ public class CephFileSystem extends FileSystem {
     public Stat(){}
   }
 
-  private static class CephStat {
+  static class CephStat {
     public long capacity;
     public long used;
     public long remaining;
diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java
new file mode 100644 (file)
index 0000000..d9c3246
--- /dev/null
@@ -0,0 +1,53 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ * 
+ * Wraps a number of native function calls to communicate with the Ceph
+ * filesystem.
+ */
+package org.apache.hadoop.fs.ceph;
+
+import org.apache.hadoop.conf.Configuration;
+
+class CephTalker extends CephFS {
+       //we write a constructor so we can load the libraries
+       public CephTalker(Configuration conf) {
+               System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
+               System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+       }
+       protected native boolean ceph_initializeClient(String arguments, int block_size);
+       protected native String ceph_getcwd();
+       protected native boolean ceph_setcwd(String path);
+  protected native boolean ceph_rmdir(String path);
+  protected native boolean ceph_unlink(String path);
+  protected native boolean ceph_rename(String old_path, String new_path);
+  protected native boolean ceph_exists(String path);
+  protected native long ceph_getblocksize(String path);
+  protected native boolean ceph_isdirectory(String path);
+  protected native boolean ceph_isfile(String path);
+  protected native String[] ceph_getdir(String path);
+  protected native int ceph_mkdirs(String path, int mode);
+  protected native int ceph_open_for_append(String path);
+  protected native int ceph_open_for_read(String path);
+  protected native int ceph_open_for_overwrite(String path, int mode);
+  protected native int ceph_close(int filehandle);
+  protected native boolean ceph_setPermission(String path, int mode);
+  protected native boolean ceph_kill_client();
+  protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
+  protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
+  protected native int ceph_replication(String path);
+  protected native String ceph_hosts(int fh, long offset);
+  protected native int ceph_setTimes(String path, long mtime, long atime);
+}