]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
hadoop: whitespace fixes
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Thu, 21 Apr 2011 00:29:23 +0000 (17:29 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 25 Apr 2011 18:05:43 +0000 (11:05 -0700)
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/client/hadoop/ceph/CephFS.java
src/client/hadoop/ceph/CephFaker.java
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java
src/client/hadoop/ceph/CephTalker.java
src/client/hadoop/ceph/TestCeph.java

index ac09dc5da1f9aaf93b62e5a9ad80e0d55514bb6c..7c92fce76fcb8908fcffa253dc829134a8f8f84a 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
  */
 package org.apache.hadoop.fs.ceph;
 
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 
+
 abstract class CephFS {
 
-       protected static final int FATAL = 0;
-       protected static final int ERROR = 1;
-       protected static final int WARN = 2;
-       protected static final int INFO = 3;
-       protected static final int DEBUG = 4;
-       protected static final int TRACE = 5;
-       protected static final int NOLOG = 6;
+  protected static final int FATAL = 0;
+  protected static final int ERROR = 1;
+  protected static final int WARN = 2;
+  protected static final int INFO = 3;
+  protected static final int DEBUG = 4;
+  protected static final int TRACE = 5;
+  protected static final int NOLOG = 6;
 
-       protected static final int ENOTDIR = 20;
+  protected static final int ENOTDIR = 20;
   protected static final int EEXIST = 17;
   protected static final int ENOENT = 2;
 
-       private boolean debug = false;
-       private Log LOG;
-
-       public CephFS(Configuration conf, Log log) {
-      debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-                       LOG = log;
-       }
-
-       /*
-        * Performs any necessary setup to allow general use of the filesystem.
-        * Inputs:
-        *  String argsuments -- a command-line style input of Ceph config params
-        *  int block_size -- the size in bytes to use for blocks
-        * Returns: true on success, false otherwise
-        */
-       abstract protected boolean ceph_initializeClient(String arguments, int block_size);
+  private boolean debug = false;
+  private Log LOG;
+
+  public CephFS(Configuration conf, Log log) {
+    debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
+    LOG = log;
+  }
+
+  /*
+   * Performs any necessary setup to allow general use of the filesystem.
+   * Inputs:
+   *  String argsuments -- a command-line style input of Ceph config params
+   *  int block_size -- the size in bytes to use for blocks
+   * Returns: true on success, false otherwise
+   */
+  abstract protected boolean ceph_initializeClient(String arguments, int block_size);
        
-       /*
-        * Returns the current working directory (absolute) as a String
-        */
-       abstract protected String ceph_getcwd();
-       /*
-        * Changes the working directory.
-        * Inputs:
-        *  String path: The path (relative or absolute) to switch to
-        * Returns: true on success, false otherwise.
-        */
-       abstract protected boolean ceph_setcwd(String path);
-       /*
-        * Given a path to a directory, removes the directory if empty.
-        * Inputs:
-        *  jstring j_path: The path (relative or absolute) to the directory
-        * Returns: true on successful delete; false otherwise
-        */
+  /*
+   * Returns the current working directory (absolute) as a String
+   */
+  abstract protected String ceph_getcwd();
+
+  /*
+   * Changes the working directory.
+   * Inputs:
+   *  String path: The path (relative or absolute) to switch to
+   * Returns: true on success, false otherwise.
+   */
+  abstract protected boolean ceph_setcwd(String path);
+
+  /*
+   * Given a path to a directory, removes the directory if empty.
+   * Inputs:
+   *  jstring j_path: The path (relative or absolute) to the directory
+   * Returns: true on successful delete; false otherwise
+   */
   abstract protected boolean ceph_rmdir(String path);
-       /*
-        * Given a path, unlinks it.
-        * Inputs:
-        *  String path: The path (relative or absolute) to the file or empty dir
-        * Returns: true if the unlink occurred, false otherwise.
-        */
+
+  /*
+   * Given a path, unlinks it.
+   * Inputs:
+   *  String path: The path (relative or absolute) to the file or empty dir
+   * Returns: true if the unlink occurred, false otherwise.
+   */
   abstract protected boolean ceph_unlink(String path);
-       /*
-        * Changes a given path name to a new name, assuming new_path doesn't exist.
-        * Inputs:
-        *  jstring j_from: The path whose name you want to change.
-        *  jstring j_to: The new name for the path.
-        * Returns: true if the rename occurred, false otherwise
-        */
+
+  /*
+   * Changes a given path name to a new name, assuming new_path doesn't exist.
+   * Inputs:
+   *  jstring j_from: The path whose name you want to change.
+   *  jstring j_to: The new name for the path.
+   * Returns: true if the rename occurred, false otherwise
+   */
   abstract protected boolean ceph_rename(String old_path, String new_path);
-       /*
-        * Returns true if it the input path exists, false
-        * if it does not or there is an unexpected failure.
-        */
+
+  /*
+   * Returns true if it the input path exists, false
+   * if it does not or there is an unexpected failure.
+   */
   abstract protected boolean ceph_exists(String path);
-       /*
-        * Get the block size for a given path.
-        * Input:
-        *  String path: The path (relative or absolute) you want
-        *  the block size for.
-        * Returns: block size if the path exists, otherwise a negative number
-        *  corresponding to the standard C++ error codes (which are positive).
-        */
+
+  /*
+   * Get the block size for a given path.
+   * Input:
+   *  String path: The path (relative or absolute) you want
+   *  the block size for.
+   * Returns: block size if the path exists, otherwise a negative number
+   *  corresponding to the standard C++ error codes (which are positive).
+   */
   abstract protected long ceph_getblocksize(String path);
-       /*
-        * Returns true if the given path is a directory, false otherwise.
-        */
+
+  /*
+   * Returns true if the given path is a directory, false otherwise.
+   */
   abstract protected boolean ceph_isdirectory(String path);
-       /*
-        * Returns true if the given path is a file; false otherwise.
-        */
+
+  /*
+   * Returns true if the given path is a file; false otherwise.
+   */
   abstract protected boolean ceph_isfile(String path);
-       /*
-        * Get the contents of a given directory.
-        * Inputs:
-        *  String path: The path (relative or absolute) to the directory.
-        * Returns: A Java String[] of the contents of the directory, or
-        *  NULL if there is an error (ie, path is not a dir). This listing
-        *  will not contain . or .. entries.
-        */
+
+  /*
+   * Get the contents of a given directory.
+   * Inputs:
+   *  String path: The path (relative or absolute) to the directory.
+   * Returns: A Java String[] of the contents of the directory, or
+   *  NULL if there is an error (ie, path is not a dir). This listing
+   *  will not contain . or .. entries.
+   */
   abstract protected String[] ceph_getdir(String path);
-       /*
-        * Create the specified directory and any required intermediate ones with the
-        * given mode.
-        */
+
+  /*
+   * Create the specified directory and any required intermediate ones with the
+   * given mode.
+   */
   abstract protected int ceph_mkdirs(String path, int mode);
-       /*
-        * Open a file to append. If the file does not exist, it will be created.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
+
+  /*
+   * Open a file to append. If the file does not exist, it will be created.
+   * Opening a dir is possible but may have bad results.
+   * Inputs:
+   *  String path: The path to open.
+   * Returns: an int filehandle, or a number<0 if an error occurs.
+   */
   abstract protected int ceph_open_for_append(String path);
-       /*
-        * Open a file for reading.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
+
+  /*
+   * Open a file for reading.
+   * Opening a dir is possible but may have bad results.
+   * Inputs:
+   *  String path: The path to open.
+   * Returns: an int filehandle, or a number<0 if an error occurs.
+   */
   abstract protected int ceph_open_for_read(String path);
-       /*
-        * Opens a file for overwriting; creates it if necessary.
-        * Opening a dir is possible but may have bad results.
-        * Inputs:
-        *  String path: The path to open.
-        *  int mode: The mode to open with.
-        * Returns: an int filehandle, or a number<0 if an error occurs.
-        */
+
+  /*
+   * Opens a file for overwriting; creates it if necessary.
+   * Opening a dir is possible but may have bad results.
+   * Inputs:
+   *  String path: The path to open.
+   *  int mode: The mode to open with.
+   * Returns: an int filehandle, or a number<0 if an error occurs.
+   */
   abstract protected int ceph_open_for_overwrite(String path, int mode);
-       /*
-        * Closes the given file. Returns 0 on success, or a negative
-        * error code otherwise.
-        */
+
+  /*
+   * Closes the given file. Returns 0 on success, or a negative
+   * error code otherwise.
+   */
   abstract protected int ceph_close(int filehandle);
-       /*
-        * Change the mode on a path.
-        * Inputs:
-        *  String path: The path to change mode on.
-        *  int mode: The mode to apply.
-        * Returns: true if the mode is properly applied, false if there
-        *  is any error.
-        */
+
+  /*
+   * Change the mode on a path.
+   * Inputs:
+   *  String path: The path to change mode on.
+   *  int mode: The mode to apply.
+   * Returns: true if the mode is properly applied, false if there
+   *  is any error.
+   */
   abstract protected boolean ceph_setPermission(String path, int mode);
-       /*
-        * Closes the Ceph client. This should be called before shutting down
-        * (multiple times is okay but redundant).
-        */
+
+  /*
+   * Closes the Ceph client. This should be called before shutting down
+   * (multiple times is okay but redundant).
+   */
   abstract protected boolean ceph_kill_client();
-       /*
-        * Get the statistics on a path returned in a custom format defined
-        * in CephFileSystem.
-        * Inputs:
-        *  String path: The path to stat.
-        *  Stat fill: The stat object to fill.
-        * Returns: true if the stat is successful, false otherwise.
-        */
+
+  /*
+   * Get the statistics on a path returned in a custom format defined
+   * in CephFileSystem.
+   * Inputs:
+   *  String path: The path to stat.
+   *  Stat fill: The stat object to fill.
+   * Returns: true if the stat is successful, false otherwise.
+   */
   abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
-       /*
-        * Statfs a filesystem in a custom format defined in CephFileSystem.
-        * Inputs:
-        *  String path: A path on the filesystem that you wish to stat.
-        *  CephStat fill: The CephStat object to fill.
-        * Returns: 0 if successful and the CephStat is filled; a negative
-        *  error code otherwise.
-        */
+
+  /*
+   * Statfs a filesystem in a custom format defined in CephFileSystem.
+   * Inputs:
+   *  String path: A path on the filesystem that you wish to stat.
+   *  CephStat fill: The CephStat object to fill.
+   * Returns: 0 if successful and the CephStat is filled; a negative
+   *  error code otherwise.
+   */
   abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
-       /*
-        * Check how many times a path should be replicated (if it is
-        * degraded it may not actually be replicated this often).
-        * Inputs:
-        *  String path: The path to check.
-        * Returns: an int containing the number of times replicated.
-        */
+
+  /*
+   * Check how many times a path should be replicated (if it is
+   * degraded it may not actually be replicated this often).
+   * Inputs:
+   *  String path: The path to check.
+   * Returns: an int containing the number of times replicated.
+   */
   abstract protected int ceph_replication(String path);
-       /*
-        * Find the IP address of the primary OSD for a given file and offset.
-        * Inputs:
-        *  int fh: The filehandle for the file.
-        *  long offset: The offset to get the location of.
-        * Returns: a String of the location as IP, or NULL if there is an error.
-        */
+
+  /*
+   * Find the IP address of the primary OSD for a given file and offset.
+   * Inputs:
+   *  int fh: The filehandle for the file.
+   *  long offset: The offset to get the location of.
+   * Returns: a String of the location as IP, or NULL if there is an error.
+   */
   abstract protected String ceph_hosts(int fh, long offset);
-       /*
-        * Set the mtime and atime for a given path.
-        * Inputs:
-        *  String path: The path to set the times for.
-        *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
-        *  long atime: The atime to set, in millis since epoch (-1 to not set)
-        * Returns: 0 if successful, an error code otherwise.
-        */
+
+  /*
+   * Set the mtime and atime for a given path.
+   * Inputs:
+   *  String path: The path to set the times for.
+   *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
+   *  long atime: The atime to set, in millis since epoch (-1 to not set)
+   * Returns: 0 if successful, an error code otherwise.
+   */
   abstract protected int ceph_setTimes(String path, long mtime, long atime);
-       /*
-        * Get the current position in a file (as a long) of a given filehandle.
-        * Returns: (long) current file position on success, or a
-        *  negative error code on failure.
-        */
+
+  /*
+   * Get the current position in a file (as a long) of a given filehandle.
+   * Returns: (long) current file position on success, or a
+   *  negative error code on failure.
+   */
   abstract protected long ceph_getpos(int fh);
-       /*
-        * Write the given buffer contents to the given filehandle.
-        * Inputs:
-        *  int fh: The filehandle to write to.
-        *  byte[] buffer: The buffer to write from
-        *  int buffer_offset: The position in the buffer to write from
-        *  int length: The number of (sequential) bytes to write.
-        * Returns: int, on success the number of bytes written, on failure
-        *  a negative error code.
-        */
+
+  /*
+   * Write the given buffer contents to the given filehandle.
+   * Inputs:
+   *  int fh: The filehandle to write to.
+   *  byte[] buffer: The buffer to write from
+   *  int buffer_offset: The position in the buffer to write from
+   *  int length: The number of (sequential) bytes to write.
+   * Returns: int, on success the number of bytes written, on failure
+   *  a negative error code.
+   */
   abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 
-       /*
-        * Reads into the given byte array from the current position.
-        * Inputs:
-        *  int fh: the filehandle to read from
-        *  byte[] buffer: the byte array to read into
-        *  int buffer_offset: where in the buffer to start writing
-        *  int length: how much to read.
-        * There'd better be enough space in the buffer to write all
-        * the data from the given offset!
-        * Returns: the number of bytes read on success (as an int),
-        *  or an error code otherwise.  */
+  /*
+   * Reads into the given byte array from the current position.
+   * Inputs:
+   *  int fh: the filehandle to read from
+   *  byte[] buffer: the byte array to read into
+   *  int buffer_offset: where in the buffer to start writing
+   *  int length: how much to read.
+   * There'd better be enough space in the buffer to write all
+   * the data from the given offset!
+   * Returns: the number of bytes read on success (as an int),
+   *  or an error code otherwise.       */
   abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-       /*
-        * Seeks to the given position in the given file.
-        * Inputs:
-        *  int fh: The filehandle to seek in.
-        *  long pos: The position to seek to.
-        * Returns: the new position (as a long) of the filehandle on success,
-        *  or a negative error code on failure.         */
+
+  /*
+   * Seeks to the given position in the given file.
+   * Inputs:
+   *  int fh: The filehandle to seek in.
+   *  long pos: The position to seek to.
+   * Returns: the new position (as a long) of the filehandle on success,
+   *  or a negative error code on failure.      */
   abstract protected long ceph_seek_from_start(int fh, long pos);
     
-       protected void debug(String statement, int priority) {
-    if (debug) System.err.println(statement);
-               switch(priority) {
-               case FATAL: LOG.fatal(statement);
-                       break;
-               case ERROR: LOG.error(statement);
-                       break;
-               case WARN: LOG.warn(statement);
-                       break;
-               case INFO: LOG.info(statement);
-                       break;
-               case DEBUG: LOG.debug(statement);
-                       break;
-               case TRACE: LOG.trace(statement);
-                       break;
-               case NOLOG: break;
-               default: break;
-               }
+  protected void debug(String statement, int priority) {
+    if (debug) {
+      System.err.println(statement);
+    }
+    switch (priority) {
+    case FATAL:
+      LOG.fatal(statement);
+      break;
+
+    case ERROR:
+      LOG.error(statement);
+      break;
+
+    case WARN:
+      LOG.warn(statement);
+      break;
+
+    case INFO:
+      LOG.info(statement);
+      break;
+
+    case DEBUG:
+      LOG.debug(statement);
+      break;
+
+    case TRACE:
+      LOG.trace(statement);
+      break;
+
+    case NOLOG:
+      break;
+
+    default:
+      break;
+    }
   }
 }
index 4257320ddd08ac32a17eab681189642cb78de282..5e638035d87d5d8439988a1d52db660dffb95e50 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
@@ -20,6 +21,7 @@
 
 package org.apache.hadoop.fs.ceph;
 
+
 import java.net.URI;
 import java.util.Hashtable;
 import java.io.Closeable;
@@ -38,443 +40,462 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 
+
 class CephFaker extends CephFS {
        
-       FileSystem localFS;
-       String localPrefix;
-       int blockSize;
-       Configuration conf;
-       Hashtable<Integer, Object> files;
-       Hashtable<Integer, String> filenames;
-       int fileCount = 0;
-       boolean initialized = false;
+  FileSystem localFS;
+  String localPrefix;
+  int blockSize;
+  Configuration conf;
+  Hashtable<Integer, Object> files;
+  Hashtable<Integer, String> filenames;
+  int fileCount = 0;
+  boolean initialized = false;
        
-       public CephFaker(Configuration con, Log log) {
-               super(con, log);
-               conf = con;
-               files = new Hashtable<Integer, Object>();
-               filenames = new Hashtable<Integer, String>();
-       }
+  public CephFaker(Configuration con, Log log) {
+    super(con, log);
+    conf = con;
+    files = new Hashtable<Integer, Object>();
+    filenames = new Hashtable<Integer, String>();
+  }
        
-       protected boolean ceph_initializeClient(String args, int block_size) {
-               if (!initialized) {
-                       //let's remember the default block_size
-                       blockSize = block_size;
-                       /* for a real Ceph deployment, this starts up the client, 
-                        * sets debugging levels, etc. We just need to get the
-                        * local FileSystem to use, and we'll ignore any
-                        * command-line arguments. */
-                       try {
-                               localFS = FileSystem.getLocal(conf);
-                               localFS.initialize(URI.create("file://localhost"), conf);
-                               localFS.setVerifyChecksum(false);
-                               String testDir = conf.get("hadoop.tmp.dir");
-                               localPrefix = localFS.getWorkingDirectory().toString();
-                               int testDirLoc = localPrefix.indexOf(testDir) - 1;
-                               if (-2 == testDirLoc)
-                                       testDirLoc = localPrefix.length();
-                               localPrefix = localPrefix.substring(0, testDirLoc)
-                                       + "/" + conf.get("hadoop.tmp.dir");
-
-                               localFS.setWorkingDirectory(new Path(localPrefix
-                                                                                                                                                                                +"/user/"
-                                                                                                                                                                                + System.getProperty("user.name")));
-                               //I don't know why, but the unit tests expect the default
-                               //working dir to be /user/username, so satisfy them!
-                               //debug("localPrefix is " + localPrefix, INFO);
-                       }
-                       catch (IOException e) {
-                               return false;
-                       }
-                       initialized = true;
-               }
-               return true;
-       }
-
-       protected String ceph_getcwd() {
-               return sanitize_path(localFS.getWorkingDirectory().toString());
-       }
-
-       protected boolean ceph_setcwd(String path) {
-               localFS.setWorkingDirectory(new Path(prepare_path(path)));
-               return true;
-       }
-
-       //the caller is responsible for ensuring empty dirs
-       protected boolean ceph_rmdir(String pth) {
-               Path path = new Path(prepare_path(pth));
-               boolean ret = false;
-               try {
-                       if (localFS.listStatus(path).length <= 1) {
-                               ret = localFS.delete(path, true);
-                       }
-               }
-               catch (IOException e){ }
-               return ret;
-       }
-
-       //this needs to work on (empty) directories too
-       protected boolean ceph_unlink(String path) {
-               path = prepare_path(path);
-               boolean ret = false;
-               if (ceph_isdirectory(path)) {
-                       ret = ceph_rmdir(path);
-               }
-               else {
-                       try {
-                               ret = localFS.delete(new Path(path), false);
-                       }
-                       catch (IOException e){ }
-               }
-               return ret;
-       }
-
-       protected boolean ceph_rename(String oldName, String newName) {
-               oldName = prepare_path(oldName);
-               newName = prepare_path(newName);
-               try {
-                       Path parent = new Path(newName).getParent();
-                       Path newPath = new Path(newName);
-                       if (localFS.exists(parent) && !localFS.exists(newPath))
-                               return localFS.rename(new Path(oldName), newPath);
-                       return false;
-               }
-               catch (IOException e) { return false; }
-       }
-
-       protected boolean ceph_exists(String path) {
-               path = prepare_path(path);
-               boolean ret = false;
-               try {
-                       ret = localFS.exists(new Path(path));
-               }
-               catch (IOException e){ }
-               return ret;
-       }
-
-       protected long ceph_getblocksize(String path) {
-               path = prepare_path(path);
-               try {
-                       FileStatus status = localFS.getFileStatus(new Path(path));
-                       return status.getBlockSize();
-               }
-               catch (FileNotFoundException e) {
-                       return -CephFS.ENOENT;
-               }
-               catch (IOException e) {
-                       return -1; //just fail generically
-               }
-       }
-
-       protected boolean ceph_isdirectory(String path) {
-               path = prepare_path(path);
-               try {
-                       FileStatus status = localFS.getFileStatus(new Path(path));
-                       return status.isDir();
-               }
-               catch (IOException e) { return false; }
-       }
-
-       protected boolean ceph_isfile(String path) {
-               path = prepare_path(path);
-               boolean ret = false;
-               try {
-                       FileStatus status = localFS.getFileStatus(new Path(path));
-                       ret = !status.isDir();
-               }
-               catch (Exception e) {}
-               return ret;
-       }
-
-       protected String[] ceph_getdir(String path) {
-               path = prepare_path(path);
-               if (!ceph_isdirectory(path)) {
-                       return null;
-               }
-               try {
-                       FileStatus[] stats = localFS.listStatus(new Path(path));
-                       String[] names = new String[stats.length];
-                       String name;
-                       for (int i=0; i<stats.length; ++i) {
-                               name = stats[i].getPath().toString();
-                               names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
-                       }
-                       return names;
-               }
-               catch (IOException e) {}
-               return null;
-       }
-
-       protected int ceph_mkdirs(String path, int mode) {
-               path = prepare_path(path);
-               //debug("ceph_mkdirs on " + path, INFO);
-               try {
-                       if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
-                               return 0;
-               }
-               catch (FileAlreadyExistsException fe) { return ENOTDIR; }
-               catch (IOException e) {}
-               if (ceph_isdirectory(path))
-                       return -EEXIST; //apparently it already existed
-               return -1;
-       }
-
-       /* 
-        * Unlike a real Ceph deployment, you can't do opens on a directory.
-        * Since that has unpredictable behavior and you shouldn't do it anyway,
-        * it's okay.
-        */
-       protected int ceph_open_for_append(String path) {
-               path = prepare_path(path);
-               FSDataOutputStream stream;
-               try {
-                       stream = localFS.append(new Path(path));
-                       files.put(new Integer(fileCount), stream);
-                       filenames.put(new Integer(fileCount), path);
-                       return fileCount++;
-               }
-               catch (IOException e) { }
-               return -1; // failure
-       }
-
-       protected int ceph_open_for_read(String path) {
-               path = prepare_path(path);
-               FSDataInputStream stream;
-               try {
-                       stream = localFS.open(new Path(path));
-                       files.put(new Integer(fileCount), stream);
-                       filenames.put(new Integer(fileCount), path);
-                       debug("ceph_open_for_read fh:" + fileCount
-                                               + ", pathname:" + path, INFO);
-                       return fileCount++;
-               }
-               catch (IOException e) { }
-               return -1; //failure
-       }
-
-       protected int ceph_open_for_overwrite(String path, int mode) {
-               path = prepare_path(path);
-               FSDataOutputStream stream;
-               try {
-                       stream = localFS.create(new Path(path));
-                       files.put(new Integer(fileCount), stream);
-                       filenames.put(new Integer(fileCount), path);
-                       debug("ceph_open_for_overwrite fh:" + fileCount
-                                               + ", pathname:" + path, INFO);
-                       return fileCount++;
-               }
-               catch (IOException e) { }
-               return -1; //failure
-       }
-
-       protected int ceph_close(int filehandle) {
-               debug("ceph_close(filehandle " + filehandle + ")", INFO);
-               try {
-                       ((Closeable)files.get(new Integer(filehandle))).close();
-                       if(null == files.get(new Integer(filehandle))) {
-                               return -ENOENT; //this isn't quite the right error code,
-                               // but the important part is it's negative
-                       }
-                       return 0; //hurray, success
-               }
-               catch (NullPointerException ne) {
-                       debug("ceph_close caught NullPointerException!" + ne, WARN);
-               } //err, how?
-               catch (IOException ie) {
-                       debug("ceph_close caught IOException!" + ie, WARN);
-               }
-               return -1; //failure
-       }
-
-       protected boolean ceph_setPermission(String pth, int mode) {
-               pth = prepare_path(pth);
-               Path path = new Path(pth);
-               boolean ret = false;
-               try {
-                       localFS.setPermission(path, new FsPermission((short)mode));
-                       ret = true;
-               }
-               catch (IOException e) { }
-               return ret;
-       }
-
-       //rather than try and match a Ceph deployment's behavior exactly,
-       //just make bad things happen if they try and call methods after this
-       protected boolean ceph_kill_client() {
-               //debug("ceph_kill_client", INFO);
-               localFS.setWorkingDirectory(new Path(localPrefix));
-               //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
-               try{
-                       localFS.close(); }
-               catch (Exception e) {}
-               localFS = null;
-               files = null;
-               filenames = null;
-               return true;
-       }
-
-       protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
-               pth = prepare_path(pth);
-               Path path = new Path(pth);
-               boolean ret = false;
-               try {
-                       FileStatus status = localFS.getFileStatus(path);
-                       fill.size = status.getLen();
-                       fill.is_dir = status.isDir();
-                       fill.block_size = status.getBlockSize();
-                       fill.mod_time = status.getModificationTime();
-                       fill.access_time = status.getAccessTime();
-                       fill.mode = status.getPermission().toShort();
-                       ret = true;
-               }
-               catch (IOException e) {}
-               return ret;
-       }
-
-       protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
-               pth = prepare_path(pth);
-               try {
-                       FsStatus stat = localFS.getStatus();
-                       fill.capacity = stat.getCapacity();
-                       fill.used = stat.getUsed();
-                       fill.remaining = stat.getRemaining();
-                       return 0;
-               }
-               catch (Exception e){}
-               return -1; //failure;
-       }
-
-       protected int ceph_replication(String path) {
-               path = prepare_path(path);
-               int ret = -1; //-1 for failure
-               try {
-                       ret = localFS.getFileStatus(new Path(path)).getReplication();
-               }
-               catch (IOException e) {}
-               return ret;
-       }
-
-       protected String ceph_hosts(int fh, long offset) {
-               String ret = null;
-               try {
-                       BlockLocation[] locs = localFS.getFileBlockLocations(
-                                                localFS.getFileStatus(new Path(
-                                                                                       filenames.get(new Integer(fh)))), offset, 1);
-                       ret = locs[0].getNames()[0];
-               }
-               catch (IOException e) {}
-               catch (NullPointerException f) { }
-               return ret;
-       }
-
-       protected int ceph_setTimes(String pth, long mtime, long atime) {
-               pth = prepare_path(pth);
-               Path path = new Path(pth);
-               int ret = -1; //generic fail
-               try {
-                       localFS.setTimes(path, mtime, atime);
-                       ret = 0;
-               }
-               catch (IOException e) {}
-               return ret;
-       }
-
-       protected long ceph_getpos(int fh) {
-               long ret = -1; //generic fail
-               try {
-                       Object stream = files.get(new Integer(fh));
-                       if (stream instanceof FSDataInputStream) {
-                               ret = ((FSDataInputStream)stream).getPos();
-                       }
-                       else if (stream instanceof FSDataOutputStream) {
-                               ret = ((FSDataOutputStream)stream).getPos();
-                       }
-               }
-               catch (IOException e) {}
-               catch (NullPointerException f) { }
-               return ret;
-       }
-
-       protected int ceph_write(int fh, byte[] buffer,
-                                                                                                        int buffer_offset, int length) {
-               debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
-                                       + ", length:" + length, INFO);
-               long ret = -1;//generic fail
-               try {
-                       FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-                       debug("ceph_write got outputstream", INFO);
-                       long startPos = os.getPos();
-                       os.write(buffer, buffer_offset, length);
-                       ret = os.getPos() - startPos;
-               }
-               catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
-               catch (NullPointerException f) {
-                       debug("ceph_write caught NullPointerException!", WARN); }
-               return (int)ret;
-       }
-
-       protected int ceph_read(int fh, byte[] buffer,
-                                                                                                       int buffer_offset, int length) {
-               long ret = -1;//generic fail
-               try {
-                       FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
-                       long startPos = is.getPos();
-                       is.read(buffer, buffer_offset, length);
-                       ret = is.getPos() - startPos;
-               }
-               catch (IOException e) {}
-               catch (NullPointerException f) {}
-               return (int)ret;
-       }
-
-       protected long ceph_seek_from_start(int fh, long pos) {
-               debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
-               long ret = -1;//generic fail
-               try {
-                       debug("ceph_seek_from_start filename is "
-                                               + filenames.get(new Integer(fh)), INFO);
-                       if (null == files.get(new Integer(fh))) {
-                               debug("ceph_seek_from_start: is is null!", WARN);
-                       }
-                       FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-                       debug("ceph_seek_from_start retrieved is!", INFO);
-                       is.seek(pos);
-                       ret = is.getPos();
-               }
-               catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
-               catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
-               return (int)ret;
-       }
-
-       /*
-        * We need to remove the localFS file prefix before returning to Ceph
-        */
-       private String sanitize_path(String path) {
-               //debug("sanitize_path(" + path + ")", INFO);
-               /*              if (path.startsWith("file:"))
-                                       path = path.substring("file:".length()); */
-               if (path.startsWith(localPrefix)) {
-                       path = path.substring(localPrefix.length());
-                       if (path.length() == 0) //it was a root path
-                               path = "/";
-               }
-               //debug("sanitize_path returning " + path, INFO);
-               return path;
-       }
-
-       /*
-        * If it's an absolute path we need to shove the
-        * test dir onto the front as a prefix.
-        */
-       private String prepare_path(String path) {
-               //debug("prepare_path(" + path + ")", INFO);
-               if (path.startsWith("/"))
-                       path = localPrefix + path;
-               else if (path.equals("..")) {
-                       if (ceph_getcwd().equals("/"))
-                               path = "."; // you can't go up past root!
-               }
-               //debug("prepare_path returning" + path, INFO);
-               return path;
-       }
+  protected boolean ceph_initializeClient(String args, int block_size) {
+    if (!initialized) {
+      // let's remember the default block_size
+      blockSize = block_size;
+
+      /* for a real Ceph deployment, this starts up the client, 
+       * sets debugging levels, etc. We just need to get the
+       * local FileSystem to use, and we'll ignore any
+       * command-line arguments. */
+      try {
+        localFS = FileSystem.getLocal(conf);
+        localFS.initialize(URI.create("file://localhost"), conf);
+        localFS.setVerifyChecksum(false);
+        String testDir = conf.get("hadoop.tmp.dir");
+
+        localPrefix = localFS.getWorkingDirectory().toString();
+        int testDirLoc = localPrefix.indexOf(testDir) - 1;
+
+        if (-2 == testDirLoc) {
+          testDirLoc = localPrefix.length();
+        }
+        localPrefix = localPrefix.substring(0, testDirLoc) + "/"
+            + conf.get("hadoop.tmp.dir");
+
+        localFS.setWorkingDirectory(
+            new Path(localPrefix + "/user/" + System.getProperty("user.name")));
+        // I don't know why, but the unit tests expect the default
+        // working dir to be /user/username, so satisfy them!
+        // debug("localPrefix is " + localPrefix, INFO);
+      } catch (IOException e) {
+        return false;
+      }
+      initialized = true;
+    }
+    return true;
+  }
+
+  protected String ceph_getcwd() {
+    return sanitize_path(localFS.getWorkingDirectory().toString());
+  }
+
+  protected boolean ceph_setcwd(String path) {
+    localFS.setWorkingDirectory(new Path(prepare_path(path)));
+    return true;
+  }
+
+  // the caller is responsible for ensuring empty dirs
+  protected boolean ceph_rmdir(String pth) {
+    Path path = new Path(prepare_path(pth));
+    boolean ret = false;
+
+    try {
+      if (localFS.listStatus(path).length <= 1) {
+        ret = localFS.delete(path, true);
+      }
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  // this needs to work on (empty) directories too
+  protected boolean ceph_unlink(String path) {
+    path = prepare_path(path);
+    boolean ret = false;
+
+    if (ceph_isdirectory(path)) {
+      ret = ceph_rmdir(path);
+    } else {
+      try {
+        ret = localFS.delete(new Path(path), false);
+      } catch (IOException e) {}
+    }
+    return ret;
+  }
+
+  protected boolean ceph_rename(String oldName, String newName) {
+    oldName = prepare_path(oldName);
+    newName = prepare_path(newName);
+    try {
+      Path parent = new Path(newName).getParent();
+      Path newPath = new Path(newName);
+
+      if (localFS.exists(parent) && !localFS.exists(newPath)) {
+        return localFS.rename(new Path(oldName), newPath);
+      }
+      return false;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  protected boolean ceph_exists(String path) {
+    path = prepare_path(path);
+    boolean ret = false;
+
+    try {
+      ret = localFS.exists(new Path(path));
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  protected long ceph_getblocksize(String path) {
+    path = prepare_path(path);
+    try {
+      FileStatus status = localFS.getFileStatus(new Path(path));
+
+      return status.getBlockSize();
+    } catch (FileNotFoundException e) {
+      return -CephFS.ENOENT;
+    } catch (IOException e) {
+      return -1; // just fail generically
+    }
+  }
+
+  protected boolean ceph_isdirectory(String path) {
+    path = prepare_path(path);
+    try {
+      FileStatus status = localFS.getFileStatus(new Path(path));
+
+      return status.isDir();
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  protected boolean ceph_isfile(String path) {
+    path = prepare_path(path);
+    boolean ret = false;
+
+    try {
+      FileStatus status = localFS.getFileStatus(new Path(path));
+
+      ret = !status.isDir();
+    } catch (Exception e) {}
+    return ret;
+  }
+
+  protected String[] ceph_getdir(String path) {
+    path = prepare_path(path);
+    if (!ceph_isdirectory(path)) {
+      return null;
+    }
+    try {
+      FileStatus[] stats = localFS.listStatus(new Path(path));
+      String[] names = new String[stats.length];
+      String name;
+
+      for (int i = 0; i < stats.length; ++i) {
+        name = stats[i].getPath().toString();
+        names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
+      }
+      return names;
+    } catch (IOException e) {}
+    return null;
+  }
+
+  protected int ceph_mkdirs(String path, int mode) {
+    path = prepare_path(path);
+    // debug("ceph_mkdirs on " + path, INFO);
+    try {
+      if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
+        return 0;
+      }
+    } catch (FileAlreadyExistsException fe) {
+      return ENOTDIR;
+    } catch (IOException e) {}
+    if (ceph_isdirectory(path)) {
+      return -EEXIST;
+    } // apparently it already existed
+    return -1;
+  }
+
+  /*
+   * Unlike a real Ceph deployment, you can't do opens on a directory.
+   * Since that has unpredictable behavior and you shouldn't do it anyway,
+   * it's okay.
+   */
+  protected int ceph_open_for_append(String path) {
+    path = prepare_path(path);
+    FSDataOutputStream stream;
+
+    try {
+      stream = localFS.append(new Path(path));
+      files.put(new Integer(fileCount), stream);
+      filenames.put(new Integer(fileCount), path);
+      return fileCount++;
+    } catch (IOException e) {}
+    return -1; // failure
+  }
+
+  protected int ceph_open_for_read(String path) {
+    path = prepare_path(path);
+    FSDataInputStream stream;
+
+    try {
+      stream = localFS.open(new Path(path));
+      files.put(new Integer(fileCount), stream);
+      filenames.put(new Integer(fileCount), path);
+      debug("ceph_open_for_read fh:" + fileCount + ", pathname:" + path, INFO);
+      return fileCount++;
+    } catch (IOException e) {}
+    return -1; // failure
+  }
+
+  protected int ceph_open_for_overwrite(String path, int mode) {
+    path = prepare_path(path);
+    FSDataOutputStream stream;
+
+    try {
+      stream = localFS.create(new Path(path));
+      files.put(new Integer(fileCount), stream);
+      filenames.put(new Integer(fileCount), path);
+      debug("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path,
+          INFO);
+      return fileCount++;
+    } catch (IOException e) {}
+    return -1; // failure
+  }
+
+  protected int ceph_close(int filehandle) {
+    debug("ceph_close(filehandle " + filehandle + ")", INFO);
+    try {
+      ((Closeable) files.get(new Integer(filehandle))).close();
+      if (null == files.get(new Integer(filehandle))) {
+        return -ENOENT; // this isn't quite the right error code,
+        // but the important part is it's negative
+      }
+      return 0; // hurray, success
+    } catch (NullPointerException ne) {
+      debug("ceph_close caught NullPointerException!" + ne, WARN);
+    } // err, how?
+    catch (IOException ie) {
+      debug("ceph_close caught IOException!" + ie, WARN);
+    }
+    return -1; // failure
+  }
+
+  protected boolean ceph_setPermission(String pth, int mode) {
+    pth = prepare_path(pth);
+    Path path = new Path(pth);
+    boolean ret = false;
+
+    try {
+      localFS.setPermission(path, new FsPermission((short) mode));
+      ret = true;
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  // rather than try and match a Ceph deployment's behavior exactly,
+  // just make bad things happen if they try and call methods after this
+  protected boolean ceph_kill_client() {
+    // debug("ceph_kill_client", INFO);
+    localFS.setWorkingDirectory(new Path(localPrefix));
+    // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
+    try {
+      localFS.close();
+    } catch (Exception e) {}
+    localFS = null;
+    files = null;
+    filenames = null;
+    return true;
+  }
+
+  protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
+    pth = prepare_path(pth);
+    Path path = new Path(pth);
+    boolean ret = false;
+
+    try {
+      FileStatus status = localFS.getFileStatus(path);
+
+      fill.size = status.getLen();
+      fill.is_dir = status.isDir();
+      fill.block_size = status.getBlockSize();
+      fill.mod_time = status.getModificationTime();
+      fill.access_time = status.getAccessTime();
+      fill.mode = status.getPermission().toShort();
+      ret = true;
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
+    pth = prepare_path(pth);
+    try {
+      FsStatus stat = localFS.getStatus();
+
+      fill.capacity = stat.getCapacity();
+      fill.used = stat.getUsed();
+      fill.remaining = stat.getRemaining();
+      return 0;
+    } catch (Exception e) {}
+    return -1; // failure;
+  }
+
+  protected int ceph_replication(String path) {
+    path = prepare_path(path);
+    int ret = -1; // -1 for failure
+
+    try {
+      ret = localFS.getFileStatus(new Path(path)).getReplication();
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  protected String ceph_hosts(int fh, long offset) {
+    String ret = null;
+
+    try {
+      BlockLocation[] locs = localFS.getFileBlockLocations(
+          localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
+          offset, 1);
+
+      ret = locs[0].getNames()[0];
+    } catch (IOException e) {} catch (NullPointerException f) {}
+    return ret;
+  }
+
+  protected int ceph_setTimes(String pth, long mtime, long atime) {
+    pth = prepare_path(pth);
+    Path path = new Path(pth);
+    int ret = -1; // generic fail
+
+    try {
+      localFS.setTimes(path, mtime, atime);
+      ret = 0;
+    } catch (IOException e) {}
+    return ret;
+  }
+
+  protected long ceph_getpos(int fh) {
+    long ret = -1; // generic fail
+
+    try {
+      Object stream = files.get(new Integer(fh));
+
+      if (stream instanceof FSDataInputStream) {
+        ret = ((FSDataInputStream) stream).getPos();
+      } else if (stream instanceof FSDataOutputStream) {
+        ret = ((FSDataOutputStream) stream).getPos();
+      }
+    } catch (IOException e) {} catch (NullPointerException f) {}
+    return ret;
+  }
+
+  protected int ceph_write(int fh, byte[] buffer,
+      int buffer_offset, int length) {
+    debug(
+        "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
+        + length,
+        INFO);
+    long ret = -1; // generic fail
+
+    try {
+      FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
+
+      debug("ceph_write got outputstream", INFO);
+      long startPos = os.getPos();
+
+      os.write(buffer, buffer_offset, length);
+      ret = os.getPos() - startPos;
+    } catch (IOException e) {
+      debug("ceph_write caught IOException!", WARN);
+    } catch (NullPointerException f) {
+      debug("ceph_write caught NullPointerException!", WARN);
+    }
+    return (int) ret;
+  }
+
+  protected int ceph_read(int fh, byte[] buffer,
+      int buffer_offset, int length) {
+    long ret = -1; // generic fail
+
+    try {
+      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+      long startPos = is.getPos();
+
+      is.read(buffer, buffer_offset, length);
+      ret = is.getPos() - startPos;
+    } catch (IOException e) {} catch (NullPointerException f) {}
+    return (int) ret;
+  }
+
+  protected long ceph_seek_from_start(int fh, long pos) {
+    debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
+    long ret = -1; // generic fail
+
+    try {
+      debug("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)),
+          INFO);
+      if (null == files.get(new Integer(fh))) {
+        debug("ceph_seek_from_start: is is null!", WARN);
+      }
+      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+
+      debug("ceph_seek_from_start retrieved is!", INFO);
+      is.seek(pos);
+      ret = is.getPos();
+    } catch (IOException e) {
+      debug("ceph_seek_from_start caught IOException!", WARN);
+    } catch (NullPointerException f) {
+      debug("ceph_seek_from_start caught NullPointerException!", WARN);
+    }
+    return (int) ret;
+  }
+
+  /*
+   * We need to remove the localFS file prefix before returning to Ceph
+   */
+  private String sanitize_path(String path) {
+    // debug("sanitize_path(" + path + ")", INFO);
+    /* if (path.startsWith("file:"))
+     path = path.substring("file:".length()); */
+    if (path.startsWith(localPrefix)) {
+      path = path.substring(localPrefix.length());
+      if (path.length() == 0) { // it was a root path
+        path = "/";
+      }
+    }
+    // debug("sanitize_path returning " + path, INFO);
+    return path;
+  }
+
+  /*
+   * If it's an absolute path we need to shove the
+   * test dir onto the front as a prefix.
+   */
+  private String prepare_path(String path) {
+    // debug("prepare_path(" + path + ")", INFO);
+    if (path.startsWith("/")) {
+      path = localPrefix + path;
+    } else if (path.equals("..")) {
+      if (ceph_getcwd().equals("/")) {
+        path = ".";
+      } // you can't go up past root!
+    }
+    // debug("prepare_path returning" + path, INFO);
+    return path;
+  }
 }
index 09efce2dec3fa0f99e77612744eea477b8bf9e85..2ea2683fa712b2d58cd4b0189c8257e150723b0e 100644 (file)
@@ -1,4 +1,5 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * 
+ *
  * Implements the Hadoop FS interfaces to allow applications to store
  * files in Ceph.
  */
 package org.apache.hadoop.fs.ceph;
 
+
 import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.io.OutputStream;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.CreateFlag;
 
+
 /**
  * <p>
  * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
@@ -68,7 +71,7 @@ public class CephFileSystem extends FileSystem {
 
   private final Path root;
   private boolean initialized = false;
-       private CephFS ceph = null;
+  private CephFS ceph = null;
 
   private boolean debug = false;
   private String fs_default_name;
@@ -80,24 +83,26 @@ public class CephFileSystem extends FileSystem {
     root = new Path("/");
   }
 
-       /**
-        * Used for testing purposes, this constructor
-        * sets the given CephFS instead of defaulting to a
-        * CephTalker (with its assumed real Ceph instance to talk to).
-        */
-       public CephFileSystem(CephFS ceph_fs, String default_path) {
-               super();
-               root = new Path("/");
-               ceph = ceph_fs;
-               fs_default_name = default_path;
-       }
+  /**
+   * Used for testing purposes, this constructor
+   * sets the given CephFS instead of defaulting to a
+   * CephTalker (with its assumed real Ceph instance to talk to).
+   */
+  public CephFileSystem(CephFS ceph_fs, String default_path) {
+    super();
+    root = new Path("/");
+    ceph = ceph_fs;
+    fs_default_name = default_path;
+  }
 
   /**
    * Lets you get the URI of this CephFileSystem.
    * @return the URI.
    */
   public URI getUri() {
-    if (!initialized) return null;
+    if (!initialized) {
+      return null;
+    }
     ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
     return uri;
   }
@@ -113,53 +118,56 @@ public class CephFileSystem extends FileSystem {
    */
   @Override
   public void initialize(URI uri, Configuration conf) throws IOException {
-    if (!initialized) {
-      super.initialize(uri, conf);
-      setConf(conf);
-      this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());  
-      statistics = getStatistics(uri.getScheme(), getClass());
-
-                       if (ceph == null) {
-                               ceph = new CephTalker(conf, LOG);
-                       }
-      if (null == fs_default_name) {
-                               fs_default_name = conf.get("fs.default.name");
-                       }
-      //build up the arguments for Ceph
-      String arguments = "CephFSInterface";
-      arguments += conf.get("fs.ceph.commandLine", "");
-      if (conf.get("fs.ceph.clientDebug") != null) {
-                               arguments += " --debug_client ";
-                               arguments += conf.get("fs.ceph.clientDebug");
-      }
-      if (conf.get("fs.ceph.messengerDebug") != null) {
-                               arguments += " --debug_ms ";
-                               arguments += conf.get("fs.ceph.messengerDebug");
-      }
-      if (conf.get("fs.ceph.monAddr") != null) {
-                               arguments += " -m ";
-                               arguments += conf.get("fs.ceph.monAddr");
-      }
-      arguments += " --client-readahead-max-periods="
-                               + conf.get("fs.ceph.readahead", "1");
-      //make sure they gave us a ceph monitor address or conf file
-                       ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
-      if ( (conf.get("fs.ceph.monAddr") == null) &&
-                                        (arguments.indexOf("-m") == -1) &&
-                                        (arguments.indexOf("-c") == -1) ) {
-                               ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
-                               throw new IOException("You must specify a Ceph monitor address or config file!");
-      }
-      //  Initialize the client
-      if (!ceph.ceph_initializeClient(arguments,
-                                                                                                                                conf.getInt("fs.ceph.blockSize", 1<<26))) {
-                               ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
-                               throw new IOException("Ceph initialization failed!");
-      }
-      initialized = true;
-      ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
-      ceph.ceph_setcwd("/");
+    if (initialized) {
+      return;
+    }
+    super.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+    statistics = getStatistics(uri.getScheme(), getClass());
+    if (ceph == null) {
+      ceph = new CephTalker(conf, LOG);
+    }
+    if (null == fs_default_name) {
+      fs_default_name = conf.get("fs.default.name");
     }
+    // build up the arguments for Ceph
+    String arguments = "CephFSInterface";
+
+    arguments += conf.get("fs.ceph.commandLine", "");
+    if (conf.get("fs.ceph.clientDebug") != null) {
+      arguments += " --debug_client ";
+      arguments += conf.get("fs.ceph.clientDebug");
+    }
+    if (conf.get("fs.ceph.messengerDebug") != null) {
+      arguments += " --debug_ms ";
+      arguments += conf.get("fs.ceph.messengerDebug");
+    }
+    if (conf.get("fs.ceph.monAddr") != null) {
+      arguments += " -m ";
+      arguments += conf.get("fs.ceph.monAddr");
+    }
+    arguments += " --client-readahead-max-periods="
+        + conf.get("fs.ceph.readahead", "1");
+    // make sure they gave us a ceph monitor address or conf file
+    ceph.debug("initialize:Ceph initialization arguments: " + arguments,
+        ceph.INFO);
+    if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
+        && (arguments.indexOf("-c") == -1)) {
+      ceph.debug("initialize:You need to specify a Ceph monitor address.",
+          ceph.FATAL);
+      throw new IOException(
+          "You must specify a Ceph monitor address or config file!");
+    }
+    // Initialize the client
+    if (!ceph.ceph_initializeClient(arguments,
+        conf.getInt("fs.ceph.blockSize", 1 << 26))) {
+      ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
+      throw new IOException("Ceph initialization failed!");
+    }
+    initialized = true;
+    ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
+    ceph.ceph_setcwd("/");
     ceph.debug("initialize:exit", ceph.DEBUG);
   }
 
@@ -170,11 +178,14 @@ public class CephFileSystem extends FileSystem {
    */
   @Override
   public void close() throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("close:enter", ceph.DEBUG);
-    super.close();//this method does stuff, make sure it's run!
-               ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
+    super.close(); // this method does stuff, make sure it's run!
+    ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
     ceph.ceph_kill_client();
     ceph.debug("close:exit", ceph.DEBUG);
   }
@@ -188,23 +199,35 @@ public class CephFileSystem extends FileSystem {
    * @return An FSDataOutputStream that connects to the file on Ceph.
    * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
    */
-  public FSDataOutputStream append (Path file, int bufferSize,
-                                                                                                                                               Progressable progress) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
-    ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
+  public FSDataOutputStream append(Path file, int bufferSize,
+      Progressable progress) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
+    ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize,
+        ceph.DEBUG);
     Path abs_path = makeAbsolute(file);
-    if (progress!=null) progress.progress();
-               ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
+
+    if (progress != null) {
+      progress.progress();
+    }
+    ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
     int fd = ceph.ceph_open_for_append(abs_path.toString());
-               ceph.debug("append: Returned to Java", ceph.TRACE);
-    if (progress!=null) progress.progress();
-    if( fd < 0 ) { //error in open
-      throw new IOException("append: Open for append failed on path \"" +
-                                                                                                               abs_path.toString() + "\"");
-    }
-    CephOutputStream cephOStream = new CephOutputStream(getConf(),
-                                                                                                                                                                                                                               ceph, fd, bufferSize);
+
+    ceph.debug("append: Returned to Java", ceph.TRACE);
+    if (progress != null) {
+      progress.progress();
+    }
+    if (fd < 0) { // error in open
+      throw new IOException(
+          "append: Open for append failed on path \"" + abs_path.toString()
+          + "\"");
+    }
+    CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
+        bufferSize);
+
     ceph.debug("append:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
   }
@@ -214,9 +237,12 @@ public class CephFileSystem extends FileSystem {
    * @return the directory Path
    */
   public Path getWorkingDirectory() {
-    if (!initialized) return null;
+    if (!initialized) {
+      return null;
+    }
     ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
-               String cwd = ceph.ceph_getcwd();
+    String cwd = ceph.ceph_getcwd();
+
     ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
     return new Path(fs_default_name + ceph.ceph_getcwd());
   }
@@ -229,13 +255,20 @@ public class CephFileSystem extends FileSystem {
    * @param dir The directory to change to.
    */
   @Override
-       public void setWorkingDirectory(Path dir) {
-    if (!initialized) return;
+  public void setWorkingDirectory(Path dir) {
+    if (!initialized) {
+      return;
+    }
     ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
     Path abs_path = makeAbsolute(dir);
+
     ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
-    if (!ceph.ceph_setcwd(abs_path.toString()))
-      ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
+    if (!ceph.ceph_setcwd(abs_path.toString())) {
+      ceph.debug(
+          "setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path "
+              + abs_path,
+              ceph.WARN);
+    }
     ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
   }
 
@@ -247,19 +280,23 @@ public class CephFileSystem extends FileSystem {
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
-       public boolean exists(Path path) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+  public boolean exists(Path path) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("exists:enter with path " + path, ceph.DEBUG);
     boolean result;
     Path abs_path = makeAbsolute(path);
+
     if (abs_path.equals(root)) {
       result = true;
-    }
-    else {
-      ceph.debug("exists:Calling ceph_exists from Java on path "
-                                                                                       + abs_path.toString(), ceph.TRACE);
-      result =  ceph.ceph_exists(abs_path.toString());
+    } else {
+      ceph.debug(
+          "exists:Calling ceph_exists from Java on path " + abs_path.toString(),
+          ceph.TRACE);
+      result = ceph.ceph_exists(abs_path.toString());
       ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
     }
     ceph.debug("exists:exit with value " + result, ceph.DEBUG);
@@ -273,27 +310,33 @@ public class CephFileSystem extends FileSystem {
    * @param perms The permissions to apply to the created directories.
    * @return true if successful, false otherwise
    * @throws IOException if initialize() hasn't been called or the path
-        *  is a child of a file.
+   *         is a child of a file.
    */
-       @Override
+  @Override
   public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
+
     ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
-    int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
+    int result = ceph.ceph_mkdirs(abs_path.toString(), (int) perms.toShort());
+
     if (result != 0) {
-                       ceph.debug("mkdirs: make directory " + abs_path
-                                                                + "Failing with result " + result, ceph.WARN);
-                       if (ceph.ENOTDIR == result)
-                               throw new FileAlreadyExistsException("Parent path is not a directory");
+      ceph.debug(
+          "mkdirs: make directory " + abs_path + "Failing with result " + result,
+          ceph.WARN);
+      if (ceph.ENOTDIR == result) {
+        throw new FileAlreadyExistsException("Parent path is not a directory");
+      }
       return false;
-               }
-    else {
-                       ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
-                       return true;
-               }
+    } else {
+      ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
+      return true;
+    }
   }
 
   /**
@@ -304,17 +347,20 @@ public class CephFileSystem extends FileSystem {
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
-       public boolean isFile(Path path) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+  public boolean isFile(Path path) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
     boolean result;
+
     if (abs_path.equals(root)) {
-      result =  false;
-    }
-    else {
-                       ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
+      result = false;
+    } else {
+      ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
       result = ceph.ceph_isfile(abs_path.toString());
     }
     ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
@@ -329,16 +375,19 @@ public class CephFileSystem extends FileSystem {
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
-       public boolean isDirectory(Path path) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+  public boolean isDirectory(Path path) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
     boolean result;
+
     if (abs_path.equals(root)) {
       result = true;
-    }
-    else {
+    } else {
       ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
       result = ceph.ceph_isdirectory(abs_path.toString());
       ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
@@ -356,29 +405,29 @@ public class CephFileSystem extends FileSystem {
    * @throws FileNotFoundException if the path could not be resolved.
    */
   public FileStatus getFileStatus(Path path) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
-    //sadly, Ceph doesn't really do uids/gids just yet, but
-    //everything else is filled
+    // sadly, Ceph doesn't really do uids/gids just yet, but
+    // everything else is filled
     FileStatus status;
     Stat lstat = new Stat();
-               ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
-    if(ceph.ceph_stat(abs_path.toString(), lstat)) {
+
+    ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
+    if (ceph.ceph_stat(abs_path.toString(), lstat)) {
       status = new FileStatus(lstat.size, lstat.is_dir,
-                                                                                                                       ceph.ceph_replication(abs_path.toString()),
-                                                                                                                       lstat.block_size,
-                                                                                                                       lstat.mod_time,
-                                                                                                                       lstat.access_time,
-                                                                                                                       new FsPermission((short)lstat.mode),
-                                                                                                                       null,
-                                                                                                                       null,
-                                                                                                                       new Path(fs_default_name+abs_path.toString()));
-    }
-    else { //fail out
-                       throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
-                                                                                                                                                       + path + " does not exist or could not be accessed");
+          ceph.ceph_replication(abs_path.toString()), lstat.block_size,
+          lstat.mod_time, lstat.access_time,
+          new FsPermission((short) lstat.mode), null, null,
+          new Path(fs_default_name + abs_path.toString()));
+    } else { // fail out
+      throw new FileNotFoundException(
+          "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
+          + " does not exist or could not be accessed");
     }
 
     ceph.debug("getFileStatus:exit", ceph.DEBUG);
@@ -394,34 +443,46 @@ public class CephFileSystem extends FileSystem {
    * @throws FileNotFoundException if the input path can't be found.
    */
   public FileStatus[] listStatus(Path path) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("listStatus:enter with path " + path, ceph.WARN);
     Path abs_path = makeAbsolute(path);
     Path[] paths = listPaths(abs_path);
+
     if (paths != null) {
       FileStatus[] statuses = new FileStatus[paths.length];
+
       for (int i = 0; i < paths.length; ++i) {
-                               statuses[i] = getFileStatus(paths[i]);
+        statuses[i] = getFileStatus(paths[i]);
       }
       ceph.debug("listStatus:exit", ceph.DEBUG);
       return statuses;
     }
-    if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
-    //which means that the input wasn't a directory, so throw an Exception if it's not a file
-    return null; //or return null if it's a file
+    if (!isFile(path)) {
+      throw new FileNotFoundException();
+    } // if we get here, listPaths returned null
+    // which means that the input wasn't a directory, so throw an Exception if it's not a file
+    return null; // or return null if it's a file
   }
 
   @Override
   public void setPermission(Path p, FsPermission permission) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
-               ceph.debug("setPermission:enter with path " + p
-                                       + " and permissions " + permission, ceph.DEBUG);
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
+    ceph.debug(
+        "setPermission:enter with path " + p + " and permissions " + permission,
+        ceph.DEBUG);
     Path abs_path = makeAbsolute(p);
-               ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
+
+    ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
     ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
-               ceph.debug("setPermission:exit", ceph.DEBUG);
+    ceph.debug("setPermission:exit", ceph.DEBUG);
   }
 
   /**
@@ -430,19 +491,28 @@ public class CephFileSystem extends FileSystem {
    * @param mtime Set modification time in number of millis since Jan 1, 1970.
    * @param atime Set access time in number of millis since Jan 1, 1970.
    */
-       @Override
-       public void setTimes(Path p, long mtime, long atime) throws IOException {
-               if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
-               ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
-                                       " atime:" + atime, ceph.DEBUG);
-               Path abs_path = makeAbsolute(p);
-               ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
-               int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
-               if (r<0) throw new IOException ("Failed to set times on path "
-                                                                                                                                               + abs_path.toString() + " Error code: " + r);
-               ceph.debug("setTimes:exit", ceph.DEBUG);
-       }
+  @Override
+  public void setTimes(Path p, long mtime, long atime) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
+    ceph.debug(
+        "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime,
+        ceph.DEBUG);
+    Path abs_path = makeAbsolute(p);
+
+    ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
+    int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
+
+    if (r < 0) {
+      throw new IOException(
+          "Failed to set times on path " + abs_path.toString() + " Error code: "
+          + r);
+    }
+    ceph.debug("setTimes:exit", ceph.DEBUG);
+  }
 
   /**
    * Create a new file and open an FSDataOutputStream that's connected to it.
@@ -451,7 +521,7 @@ public class CephFileSystem extends FileSystem {
    * @param flag If CreateFlag.OVERWRITE, overwrite any existing
    * file with this name; otherwise don't.
    * @param bufferSize Ceph does internal buffering, but you can buffer
-        *   in the Java code too if you like.
+   *   in the Java code too if you like.
    * @param replication Ignored by Ceph. This can be
    * configured via Ceph configuration.
    * @param blockSize Ignored by Ceph. You can set client-wide block sizes
@@ -464,19 +534,24 @@ public class CephFileSystem extends FileSystem {
    * failure in attempting to open for append with Ceph.
    */
   public FSDataOutputStream create(Path path,
-                                                                                                                                        FsPermission permission,
-                                                                                                                                        EnumSet<CreateFlag> flag,
-                                                                                                                                        //boolean overwrite,
-                                                                                                                                        int bufferSize,
-                                                                                                                                        short replication,
-                                                                                                                                        long blockSize,
-                                                                                                                                        Progressable progress
-                                                                                                                                        ) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+      FsPermission permission,
+      EnumSet<CreateFlag> flag,
+      // boolean overwrite,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("create:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
-    if (progress!=null) progress.progress();
+
+    if (progress != null) {
+      progress.progress();
+    }
     // We ignore replication since that's not configurable here, and
     // progress reporting is quite limited.
     // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE;
@@ -484,90 +559,118 @@ public class CephFileSystem extends FileSystem {
 
     // Step 1: existence test
     boolean exists = exists(abs_path);
+
     if (exists) {
-      if(isDirectory(abs_path))
-                               throw new IOException("create: Cannot overwrite existing directory \""
-                                                                                                                       + path.toString() + "\" with a file");
-      //if (!overwrite)
-      if (!flag.contains(CreateFlag.OVERWRITE))
-                               throw new IOException("createRaw: Cannot open existing file \"" 
-                                                                                                                       + abs_path.toString() 
-                                                                                                                       + "\" for writing without overwrite flag");
+      if (isDirectory(abs_path)) {
+        throw new IOException(
+            "create: Cannot overwrite existing directory \"" + path.toString()
+            + "\" with a file");
+      }
+      // if (!overwrite)
+      if (!flag.contains(CreateFlag.OVERWRITE)) {
+        throw new IOException(
+            "createRaw: Cannot open existing file \"" + abs_path.toString()
+            + "\" for writing without overwrite flag");
+      }
     }
 
-    if (progress!=null) progress.progress();
+    if (progress != null) {
+      progress.progress();
+    }
 
     // Step 2: create any nonexistent directories in the path
     if (!exists) {
       Path parent = abs_path.getParent();
+
       if (parent != null) { // if parent is root, we're done
-                               int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
-                               if (!(r==0 || r==-ceph.EEXIST))
-                                       throw new IOException ("Error creating parent directory; code: " + r);
+        int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
+
+        if (!(r == 0 || r == -ceph.EEXIST)) {
+          throw new IOException("Error creating parent directory; code: " + r);
+        }
+      }
+      if (progress != null) {
+        progress.progress();
       }
-      if (progress!=null) progress.progress();
     }
     // Step 3: open the file
     ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
-    int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
-    if (progress!=null) progress.progress();
-    ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
+    int fh = ceph.ceph_open_for_overwrite(abs_path.toString(),
+        (int) permission.toShort());
+
+    if (progress != null) {
+      progress.progress();
+    }
+    ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh,
+        ceph.TRACE);
     if (fh < 0) {
-      throw new IOException("create: Open for overwrite failed on path \"" + 
-                                                                                                               path.toString() + "\"");
+      throw new IOException(
+          "create: Open for overwrite failed on path \"" + path.toString()
+          + "\"");
     }
-      
+
     // Step 4: create the stream
-    OutputStream cephOStream = new CephOutputStream(getConf(),
-                                                                                                                                                                                                               ceph, fh, bufferSize);
+    OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
+        bufferSize);
+
     ceph.debug("create:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
-       }
+  }
 
   /**
    * Open a Ceph file and attach the file handle to an FSDataInputStream.
    * @param path The file to open
    * @param bufferSize Ceph does internal buffering; but you can buffer in
-        *   the Java code too if you like.
+   *   the Java code too if you like.
    * @return FSDataInputStream reading from the given path.
    * @throws IOException if initialize() hasn't been called, the path DNE or is a
    * directory, or there is an error getting data to set up the FSDataInputStream.
    */
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("open:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
-    
+
     int fh = ceph.ceph_open_for_read(abs_path.toString());
-    if (fh < 0) { //uh-oh, something's bad!
-      if (fh == -ceph.ENOENT) //well that was a stupid open
-                               throw new IOException("open:  absolute path \""  + abs_path.toString()
-                                                                                                                       + "\" does not exist");
-      else //hrm...the file exists but we can't open it :(
-                               throw new IOException("open: Failed to open file " + abs_path.toString());
+
+    if (fh < 0) { // uh-oh, something's bad!
+      if (fh == -ceph.ENOENT) { // well that was a stupid open
+        throw new IOException(
+            "open:  absolute path \"" + abs_path.toString()
+            + "\" does not exist");
+      } else { // hrm...the file exists but we can't open it :(
+        throw new IOException("open: Failed to open file " + abs_path.toString());
+      }
     }
 
-    if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
-      //but that doesn't mean you should in Hadoop!
+    if (isDirectory(abs_path)) { // yes, it is possible to open Ceph directories
+      // but that doesn't mean you should in Hadoop!
       ceph.ceph_close(fh);
-      throw new IOException("open:  absolute path \""  + abs_path.toString()
-                                                                                                               + "\" is a directory!");
+      throw new IOException(
+          "open:  absolute path \"" + abs_path.toString() + "\" is a directory!");
     }
     Stat lstat = new Stat();
-               ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
+
+    ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
     ceph.ceph_stat(abs_path.toString(), lstat);
-               ceph.debug("open:returned to Java", ceph.TRACE);
+    ceph.debug("open:returned to Java", ceph.TRACE);
     long size = lstat.size;
+
     if (size < 0) {
-      throw new IOException("Failed to get file size for file " + abs_path.toString() + 
-                                                                                                               " but succeeded in opening file. Something bizarre is going on.");
+      throw new IOException(
+          "Failed to get file size for file " + abs_path.toString()
+          + " but succeeded in opening file. Something bizarre is going on.");
     }
-    FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
-                                                                                                                                                                                                               fh, size, bufferSize);
+    FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
+        bufferSize);
+
     ceph.debug("open:exit", ceph.DEBUG);
     return new FSDataInputStream(cephIStream);
-       }
+  }
 
   /**
    * Rename a file or directory.
@@ -577,24 +680,31 @@ public class CephFileSystem extends FileSystem {
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
-       public boolean rename(Path src, Path dst) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+  public boolean rename(Path src, Path dst) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
     Path abs_src = makeAbsolute(src);
     Path abs_dst = makeAbsolute(dst);
+
     ceph.debug("calling ceph_rename from Java", ceph.TRACE);
     boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
-               if (!result) {
-                       if (isDirectory(abs_dst)) { //move the srcdir into destdir
-                               ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
-                               Path new_dst = new Path(abs_dst, abs_src.getName());
-                               result = rename(abs_src, new_dst);
-                               ceph.debug("attempt to move " + abs_src.toString()
-                                                                        + " to " + new_dst.toString()
-                                                                        + "has result:" + result, ceph.NOLOG);
-                       }
-               }
+
+    if (!result) {
+      if (isDirectory(abs_dst)) { // move the srcdir into destdir
+        ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
+        Path new_dst = new Path(abs_dst, abs_src.getName());
+
+        result = rename(abs_src, new_dst);
+        ceph.debug(
+            "attempt to move " + abs_src.toString() + " to "
+            + new_dst.toString() + "has result:" + result,
+            ceph.NOLOG);
+      }
+    }
     ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
     return result;
   }
@@ -614,51 +724,70 @@ public class CephFileSystem extends FileSystem {
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
-       public BlockLocation[] getFileBlockLocations(FileStatus file,
-                                                                                                                                                                                                long start, long len) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
-               ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
-                                       ", start pos " + start + ", length " + len, ceph.DEBUG);
-    //sanitize and get the filehandle
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
+    ceph.debug(
+        "getFileBlockLocations:enter with path " + file.getPath()
+        + ", start pos " + start + ", length " + len,
+        ceph.DEBUG);
+    // sanitize and get the filehandle
     Path abs_path = makeAbsolute(file.getPath());
-               ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
+
+    ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java",
+        ceph.TRACE);
     int fh = ceph.ceph_open_for_read(abs_path.toString());
-               ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
-                                       + fh, ceph.TRACE);
+
+    ceph.debug(
+        "getFileBlockLocations:return from ceph_open_for_read to Java with fh "
+            + fh,
+            ceph.TRACE);
     if (fh < 0) {
-                       ceph.debug("getFileBlockLocations:got error " + fh +
-                                               ", exiting and returning null!", ceph.ERROR);
+      ceph.debug(
+          "getFileBlockLocations:got error " + fh
+          + ", exiting and returning null!",
+          ceph.ERROR);
       return null;
     }
-    //get the block size
-               ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
+    // get the block size
+    ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java",
+        ceph.TRACE);
     long blockSize = ceph.ceph_getblocksize(abs_path.toString());
-               ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
-    BlockLocation[] locations =
-      new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
-               long offset;
+
+    ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
+    BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
+    long offset;
+
     for (int i = 0; i < locations.length; ++i) {
-                       offset = start + i*blockSize;
-                       ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
-                                               + fh + " and offset " + offset, ceph.TRACE);
+      offset = start + i * blockSize;
+      ceph.debug(
+          "getFileBlockLocations:call ceph_hosts from Java on fh " + fh
+          + " and offset " + offset,
+          ceph.TRACE);
       String host = ceph.ceph_hosts(fh, offset);
-                       ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
-                                               + host, ceph.TRACE);
+
+      ceph.debug(
+          "getFileBlockLocations:return from ceph_hosts to Java with host "
+              + host,
+              ceph.TRACE);
       String[] hostArray = new String[1];
+
       hostArray[0] = host;
       locations[i] = new BlockLocation(hostArray, hostArray,
-                                                                                                                                                        start+i*blockSize-(start % blockSize),
-                                                                                                                                                        blockSize);
+          start + i * blockSize - (start % blockSize), blockSize);
     }
-               ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
-                                       + fh, ceph.TRACE);
+    ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " + fh,
+        ceph.TRACE);
     ceph.ceph_close(fh);
-               ceph.debug("getFileBlockLocations:return with " + locations.length
-                                       + " locations", ceph.DEBUG);
+    ceph.debug(
+        "getFileBlockLocations:return with " + locations.length + " locations",
+        ceph.DEBUG);
     return locations;
   }
-  
+
   /**
    * Get usage statistics on the Ceph filesystem.
    * @param path A path to the partition you're interested in.
@@ -668,22 +797,29 @@ public class CephFileSystem extends FileSystem {
    * stat somehow fails.
    */
   @Override
-       public FsStatus getStatus (Path path) throws IOException {
-    if (!initialized) throw new IOException("You have to initialize the "
-                                                                                                                                                                               + " CephFileSystem before calling other methods.");
+  public FsStatus getStatus(Path path) throws IOException {
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
-    
-    //currently(Ceph .16) Ceph actually ignores the path
-    //but we still pass it in; if Ceph stops ignoring we may need more
-    //error-checking code.
+
+    // currently(Ceph .16) Ceph actually ignores the path
+    // but we still pass it in; if Ceph stops ignoring we may need more
+    // error-checking code.
     CephStat ceph_stat = new CephStat();
-               ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
+
+    ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
     int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
-    if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
+
+    if (result != 0) {
+      throw new IOException(
+          "Somehow failed to statfs the Ceph filesystem. Error code: " + result);
+    }
     ceph.debug("getStatus:exit successfully", ceph.DEBUG);
-    return new FsStatus(ceph_stat.capacity,
-                                                                                               ceph_stat.used, ceph_stat.remaining);
+    return new FsStatus(ceph_stat.capacity, ceph_stat.used, ceph_stat.remaining);
   }
 
   /**
@@ -698,58 +834,74 @@ public class CephFileSystem extends FileSystem {
    * or you attempt to delete the root directory.
    */
   public boolean delete(Path path, boolean recursive) throws IOException {
-    if (!initialized) throw new IOException ("You have to initialize the "
-                                                                                                                                                                                +"CephFileSystem before calling other methods.");
+    if (!initialized) {
+      throw new IOException(
+          "You have to initialize the "
+              + "CephFileSystem before calling other methods.");
+    }
     ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
-                                       ceph.DEBUG);
+        ceph.DEBUG);
     Path abs_path = makeAbsolute(path);
-    
+
     // sanity check
-    if (abs_path.equals(root))
+    if (abs_path.equals(root)) {
       throw new IOException("Error: deleting the root directory is a Bad Idea.");
-    if (!exists(abs_path)) return false;
+    }
+    if (!exists(abs_path)) {
+      return false;
+    }
 
     // if the path is a file, try to delete it.
     if (isFile(abs_path)) {
-                       ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
-                                               ceph.TRACE);
+      ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
+          ceph.TRACE);
       boolean result = ceph.ceph_unlink(abs_path.toString());
-      if(!result)
-                               ceph.debug("delete: failed to delete file \"" +
-                                                                                               abs_path.toString() + "\".", ceph.ERROR);
+
+      if (!result) {
+        ceph.debug(
+            "delete: failed to delete file \"" + abs_path.toString() + "\".",
+            ceph.ERROR);
+      }
       ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
       return result;
     }
 
     /* The path is a directory, so recursively try to delete its contents,
-       and then delete the directory. */
-    //get the entries; listPaths will remove . and .. for us
+     and then delete the directory. */
+    // get the entries; listPaths will remove . and .. for us
     Path[] contents = listPaths(abs_path);
+
     if (contents == null) {
-      ceph.debug("delete: Failed to read contents of directory \"" +
-                                               abs_path.toString() +
-                                               "\" while trying to delete it, BAILING", ceph.ERROR);
+      ceph.debug(
+          "delete: Failed to read contents of directory \""
+              + abs_path.toString() + "\" while trying to delete it, BAILING",
+              ceph.ERROR);
       return false;
     }
     if (!recursive && contents.length > 0) {
       throw new IOException("Directories must be deleted recursively!");
     }
     // delete the entries
-               ceph.debug("delete: recursively calling delete on contents of "
-                                       + abs_path, ceph.DEBUG);
+    ceph.debug("delete: recursively calling delete on contents of " + abs_path,
+        ceph.DEBUG);
     for (Path p : contents) {
       if (!delete(p, true)) {
-                               ceph.debug("delete: Failed to delete file \"" + 
-                                                                                               p.toString() + "\" while recursively deleting \""
-                                                                                               + abs_path.toString() + "\", BAILING", ceph.ERROR );
-                               return false;
+        ceph.debug(
+            "delete: Failed to delete file \"" + p.toString()
+            + "\" while recursively deleting \"" + abs_path.toString()
+            + "\", BAILING",
+            ceph.ERROR);
+        return false;
       }
     }
-    //if we've come this far it's a now-empty directory, so delete it!
+    // if we've come this far it's a now-empty directory, so delete it!
     boolean result = ceph.ceph_rmdir(abs_path.toString());
-    if (!result)
-      ceph.debug("delete: failed to delete \"" + abs_path.toString()
-                                               + "\", BAILING", ceph.ERROR);
+
+    if (!result) {
+      ceph.debug(
+          "delete: failed to delete \"" + abs_path.toString() + "\", BAILING",
+          ceph.ERROR);
+    }
     ceph.debug("delete:exit", ceph.DEBUG);
     return result;
   }
@@ -760,7 +912,7 @@ public class CephFileSystem extends FileSystem {
    * by a separate Ceph configuration.
    */
   @Override
-       public short getDefaultReplication() {
+  public short getDefaultReplication() {
     return 1;
   }
 
@@ -769,18 +921,22 @@ public class CephFileSystem extends FileSystem {
    * @return the default block size, in bytes, as a long.
    */
   @Override
-       public long getDefaultBlockSize() {
-    return getConf().getInt("fs.ceph.blockSize", 1<<26);
+  public long getDefaultBlockSize() {
+    return getConf().getInt("fs.ceph.blockSize", 1 << 26);
   }
 
   // Makes a Path absolute. In a cheap, dirty hack, we're
-  // also going to strip off any fs_default_name prefix we see. 
+  // also going to strip off any fs_default_name prefix we see.
   private Path makeAbsolute(Path path) {
     ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
-    if (path == null) return new Path("/");
+    if (path == null) {
+      return new Path("/");
+    }
     // first, check for the prefix
-    if (path.toString().startsWith(fs_default_name)) {   
-      Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
+    if (path.toString().startsWith(fs_default_name)) {
+      Path stripped_path = new Path(
+          path.toString().substring(fs_default_name.length()));
+
       ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
       return stripped_path;
     }
@@ -790,10 +946,11 @@ public class CephFileSystem extends FileSystem {
       return path;
     }
     Path new_path = new Path(ceph.ceph_getcwd(), path);
+
     ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
     return new_path;
   }
+
   private Path[] listPaths(Path path) throws IOException {
     ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
     String dirlist[];
@@ -808,25 +965,28 @@ public class CephFileSystem extends FileSystem {
     if (dirlist == null) {
       return null;
     }
-    
+
     // convert the strings to Paths
     Path[] paths = new Path[dirlist.length];
+
     for (int i = 0; i < dirlist.length; ++i) {
-      ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
-                                                                                       dirlist[i] + "\"", ceph.TRACE);
+      ceph.debug(
+          "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
+          + dirlist[i] + "\"",
+          ceph.TRACE);
       // convert each listing to an absolute path
       Path raw_path = new Path(dirlist[i]);
-      if (raw_path.isAbsolute())
-                               paths[i] = raw_path;
-      else
-                               paths[i] = new Path(abs_path, raw_path);
+
+      if (raw_path.isAbsolute()) {
+        paths[i] = raw_path;
+      } else {
+        paths[i] = new Path(abs_path, raw_path);
+      }
     }
     ceph.debug("listPaths:exit", ceph.NOLOG);
     return paths;
   }
 
-  
-
   static class Stat {
     public long size;
     public boolean is_dir;
@@ -835,9 +995,10 @@ public class CephFileSystem extends FileSystem {
     public long access_time;
     public int mode;
 
-    public Stat(){}
+    public Stat() {}
   }
 
+
   static class CephStat {
     public long capacity;
     public long used;
index a6684576d2ff2a616ff259913c5a7472881a0cc4..6f350efbc575c29d0a0d1eb88c77baa39a5e7dfc 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
@@ -19,6 +20,7 @@
  */
 package org.apache.hadoop.fs.ceph;
 
+
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -38,12 +40,12 @@ public class CephInputStream extends FSInputStream {
 
   private long fileLength;
 
-       private CephFS ceph;
+  private CephFS ceph;
 
-       private byte[] buffer;
-       private int bufPos = 0;
-       private int bufValid = 0;
-       private long cephPos = 0;
+  private byte[] buffer;
+  private int bufPos = 0;
+  private int bufValid = 0;
+  private long cephPos = 0;
 
   /**
    * Create a new CephInputStream.
@@ -53,75 +55,84 @@ public class CephInputStream extends FSInputStream {
    * you will need to close and re-open it to access the new data.
    */
   public CephInputStream(Configuration conf, CephFS cephfs,
-                                                                                                int fh, long flength, int bufferSize) {
+      int fh, long flength, int bufferSize) {
     // Whoever's calling the constructor is responsible for doing the actual ceph_open
     // call and providing the file handle.
     fileLength = flength;
     fileHandle = fh;
     closed = false;
-               ceph = cephfs;
-               buffer = new byte[bufferSize];
-    ceph.debug("CephInputStream constructor: initializing stream with fh "
-                                                                               + fh + " and file length " + flength, ceph.DEBUG);
+    ceph = cephfs;
+    buffer = new byte[bufferSize];
+    ceph.debug(
+        "CephInputStream constructor: initializing stream with fh " + fh
+        + " and file length " + flength,
+        ceph.DEBUG);
       
   }
+
   /** Ceph likes things to be closed before it shuts down,
    * so closing the IOStream stuff voluntarily in a finalizer is good
    */
-  protected void finalize () throws Throwable {
+  protected void finalize() throws Throwable {
     try {
-      if (!closed) close();
+      if (!closed) {
+        close();
+      }
+    } finally {
+      super.finalize();
+    }
+  }
+
+  private synchronized boolean fillBuffer() throws IOException {
+    bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+    bufPos = 0;
+    if (bufValid < 0) {
+      int err = bufValid;
+
+      bufValid = 0;
+      // attempt to reset to old position. If it fails, too bad.
+      ceph.ceph_seek_from_start(fileHandle, cephPos);
+      throw new IOException("Failed to fill read buffer! Error code:" + err);
     }
-    finally { super.finalize(); }
+    cephPos += bufValid;
+    return (bufValid != 0);
   }
 
-       private synchronized boolean fillBuffer() throws IOException {
-               bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
-               bufPos = 0;
-               if (bufValid < 0) {
-                       int err = bufValid;
-                       bufValid = 0;
-                       //attempt to reset to old position. If it fails, too bad.
-                       ceph.ceph_seek_from_start(fileHandle, cephPos);
-                       throw new IOException("Failed to fill read buffer! Error code:"
-                                                                                                               + err);
-               }
-               cephPos += bufValid;
-               return (bufValid != 0);
-       }
-
-       /*
-        * Get the current position of the stream.
-        */
+  /*
+   * Get the current position of the stream.
+   */
   public synchronized long getPos() throws IOException {
-               return cephPos - bufValid + bufPos;
+    return cephPos - bufValid + bufPos;
   }
 
   /**
    * Find the number of bytes remaining in the file.
    */
   @Override
-       public synchronized int available() throws IOException {
-      return (int) (fileLength - getPos());
-    }
+  public synchronized int available() throws IOException {
+    return (int) (fileLength - getPos());
+  }
 
   public synchronized void seek(long targetPos) throws IOException {
-    ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
-                                                                               " on fd " + fileHandle, ceph.TRACE);
+    ceph.debug(
+        "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
+        + fileHandle,
+        ceph.TRACE);
     if (targetPos > fileLength) {
-      throw new IOException("CephInputStream.seek: failed seek to position "
-                                                                                                               + targetPos + " on fd " + fileHandle
-                                                                                                               + ": Cannot seek after EOF " + fileLength);
+      throw new IOException(
+          "CephInputStream.seek: failed seek to position " + targetPos
+          + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+    }
+    long oldPos = cephPos;
+
+    cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+    bufValid = 0;
+    bufPos = 0;
+    if (cephPos < 0) {
+      cephPos = oldPos;
+      throw new IOException("Ceph failed to seek to new position!");
     }
-               long oldPos = cephPos;
-               cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
-               bufValid = 0;
-               bufPos = 0;
-               if (cephPos < 0) {
-                       cephPos = oldPos;
-                       throw new IOException ("Ceph failed to seek to new position!");
-               }
-       }
+  }
 
   /**
    * Failovers are handled by the Ceph code at a very low level;
@@ -133,22 +144,31 @@ public class CephInputStream extends FSInputStream {
     return false;
   }
     
-    
   /**
    * Read a byte from the file.
    * @return the next byte.
    */
   @Override
-       public synchronized int read() throws IOException {
-      ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-                                                                                       + " by calling general read function", ceph.TRACE);
-
-      byte result[] = new byte[1];
-      if (getPos() >= fileLength) return -1;
-      if (-1 == read(result, 0, 1)) return -1;
-      if (result[0]<0) return 256+(int)result[0];
-      else return result[0];
+  public synchronized int read() throws IOException {
+    ceph.debug(
+        "CephInputStream.read: Reading a single byte from fd " + fileHandle
+        + " by calling general read function",
+        ceph.TRACE);
+
+    byte result[] = new byte[1];
+
+    if (getPos() >= fileLength) {
+      return -1;
+    }
+    if (-1 == read(result, 0, 1)) {
+      return -1;
     }
+    if (result[0] < 0) {
+      return 256 + (int) result[0];
+    } else {
+      return result[0];
+    }
+  }
 
   /**
    * Read a specified number of bytes from the file into a byte[].
@@ -156,80 +176,83 @@ public class CephInputStream extends FSInputStream {
    * @param off the offset to start at in the file
    * @param len the number of bytes to read
    * @return 0 if successful, otherwise an error code.
-        * @throws IOException on bad input.
+   * @throws IOException on bad input.
    */
   @Override
-       public synchronized int read(byte buf[], int off, int len)
-               throws IOException {
-      ceph.debug("CephInputStream.read: Reading " + len  +
-                                                                " bytes from fd " + fileHandle, ceph.TRACE);
+  public synchronized int read(byte buf[], int off, int len)
+    throws IOException {
+    ceph.debug(
+        "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle,
+        ceph.TRACE);
       
-      if (closed) {
-                               throw new IOException("CephInputStream.read: cannot read " + len  + 
-                                                                                                                       " bytes from fd " + fileHandle +
-                                                                                                                       ": stream closed");
-      }
+    if (closed) {
+      throw new IOException(
+          "CephInputStream.read: cannot read " + len + " bytes from fd "
+          + fileHandle + ": stream closed");
+    }
                        
-      // ensure we're not past the end of the file
-      if (getPos() >= fileLength) {
-                               ceph.debug("CephInputStream.read: cannot read " + len  + 
-                                                                        " bytes from fd " + fileHandle + ": current position is "
-                                                                        + getPos() + " and file length is " + fileLength,
-                                                                        ceph.DEBUG);
+    // ensure we're not past the end of the file
+    if (getPos() >= fileLength) {
+      ceph.debug(
+          "CephInputStream.read: cannot read " + len + " bytes from fd "
+          + fileHandle + ": current position is " + getPos()
+          + " and file length is " + fileLength,
+          ceph.DEBUG);
                                
-                               return -1;
-                       }
-
-                       int totalRead = 0;
-                       int initialLen = len;
-                       int read;
-                       do {
-                               read = Math.min(len, bufValid - bufPos);
-                               try {
-                                       System.arraycopy(buffer, bufPos, buf, off, read);
-                               }
-                               catch(IndexOutOfBoundsException ie) {
-                                       throw new IOException("CephInputStream.read: Indices out of bounds:"
-                                                                                                                               + "read length is " + len
-                                                                                                                               + ", buffer offset is " + off
-                                                                                                                               + ", and buffer size is " + buf.length);
-                               }
-                               catch (ArrayStoreException ae) {
-                                       throw new IOException("Uh-oh, CephInputStream failed to do an array"
-                                                                                                                               + "copy due to type mismatch...");
-                               }
-                               catch (NullPointerException ne) {
-                                       throw new IOException("CephInputStream.read: cannot read "
-                                                                                                                               + len + "bytes from fd:" + fileHandle
-                                                                                                                               + ": buf is null");
-                               }
-                               bufPos += read;
-                               len -= read;
-                               off += read;
-                               totalRead += read;
-                       } while (len > 0 && fillBuffer());
-
-      ceph.debug("CephInputStream.read: Reading " + initialLen
-                                                                + " bytes from fd " + fileHandle
-                                                                + ": succeeded in reading " + totalRead + " bytes",
-                                                                ceph.TRACE);
-      return totalRead;
-       }
+      return -1;
+    }
+
+    int totalRead = 0;
+    int initialLen = len;
+    int read;
+
+    do {
+      read = Math.min(len, bufValid - bufPos);
+      try {
+        System.arraycopy(buffer, bufPos, buf, off, read);
+      } catch (IndexOutOfBoundsException ie) {
+        throw new IOException(
+            "CephInputStream.read: Indices out of bounds:" + "read length is "
+            + len + ", buffer offset is " + off + ", and buffer size is "
+            + buf.length);
+      } catch (ArrayStoreException ae) {
+        throw new IOException(
+            "Uh-oh, CephInputStream failed to do an array"
+                + "copy due to type mismatch...");
+      } catch (NullPointerException ne) {
+        throw new IOException(
+            "CephInputStream.read: cannot read " + len + "bytes from fd:"
+            + fileHandle + ": buf is null");
+      }
+      bufPos += read;
+      len -= read;
+      off += read;
+      totalRead += read;
+    } while (len > 0 && fillBuffer());
+
+    ceph.debug(
+        "CephInputStream.read: Reading " + initialLen + " bytes from fd "
+        + fileHandle + ": succeeded in reading " + totalRead + " bytes",
+        ceph.TRACE);
+    return totalRead;
+  }
 
   /**
    * Close the CephInputStream and release the associated filehandle.
    */
   @Override
-       public void close() throws IOException {
+  public void close() throws IOException {
     ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
     if (!closed) {
-                       int result = ceph.ceph_close(fileHandle);
-                       closed = true;
-                       if (result != 0) {
-                               throw new IOException("Close somehow failed!"
-                                                                                                                       + "Don't try and use this stream again, though");
-                       }
-                       ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-               }
-       }
+      int result = ceph.ceph_close(fileHandle);
+
+      closed = true;
+      if (result != 0) {
+        throw new IOException(
+            "Close somehow failed!"
+                + "Don't try and use this stream again, though");
+      }
+      ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
+    }
+  }
 }
index 0ca1d9771710dd45e1a2f0267a09589418fe4543..03ea3b9cc8b646318dfd7d67aa610d9168a1d2af 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
 
 package org.apache.hadoop.fs.ceph;
 
+
 import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Progressable;
 
+
 /**
  * <p>
  * An {@link OutputStream} for a CephFileSystem and corresponding
@@ -35,12 +38,12 @@ public class CephOutputStream extends OutputStream {
 
   private boolean closed;
 
-       private CephFS ceph;
+  private CephFS ceph;
 
   private int fileHandle;
 
-       private byte[] buffer;
-       private int bufUsed = 0;
+  private byte[] buffer;
+  private int bufUsed = 0;
 
   /**
    * Construct the CephOutputStream.
@@ -48,21 +51,24 @@ public class CephOutputStream extends OutputStream {
    * @param fh The Ceph filehandle to connect to.
    */
   public CephOutputStream(Configuration conf, CephFS cephfs,
-                                                                                                       int fh, int bufferSize) {
-               ceph = cephfs;
+      int fh, int bufferSize) {
+    ceph = cephfs;
     fileHandle = fh;
     closed = false;
-               buffer = new byte[bufferSize];
+    buffer = new byte[bufferSize];
   }
 
-  /**Ceph likes things to be closed before it shuts down,
+  /** Ceph likes things to be closed before it shuts down,
    *so closing the IOStream stuff voluntarily is good
    */
-  protected void finalize () throws Throwable {
+  protected void finalize() throws Throwable {
     try {
-      if (!closed) close();
+      if (!closed) {
+        close();
+      }
+    } finally {
+      super.finalize();
     }
-    finally { super.finalize();}
   }
 
   /**
@@ -80,20 +86,23 @@ public class CephOutputStream extends OutputStream {
    * write fails.
    */
   @Override
-       public synchronized void write(int b) throws IOException {
-      ceph.debug("CephOutputStream.write: writing a single byte to fd "
-                                                                + fileHandle, ceph.TRACE);
+  public synchronized void write(int b) throws IOException {
+    ceph.debug(
+        "CephOutputStream.write: writing a single byte to fd " + fileHandle,
+        ceph.TRACE);
 
-      if (closed) {
-                               throw new IOException("CephOutputStream.write: cannot write " + 
-                                                                                                                       "a byte to fd " + fileHandle + ": stream closed");
-      }
-      // Stick the byte in a buffer and write it
-      byte buf[] = new byte[1];
-      buf[0] = (byte) b;    
-      write(buf, 0, 1);
-      return;
+    if (closed) {
+      throw new IOException(
+          "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
+          + ": stream closed");
     }
+    // Stick the byte in a buffer and write it
+    byte buf[] = new byte[1];
+
+    buf[0] = (byte) b;    
+    write(buf, 0, 1);
+    return;
+  }
 
   /**
    * Write a byte buffer into the Ceph file.
@@ -101,102 +110,110 @@ public class CephOutputStream extends OutputStream {
    * @param off the position in the file to start writing at.
    * @param len The number of bytes to actually write.
    * @throws IOException if you have closed the CephOutputStream, or
-        * if buf is null or off + len > buf.length, or
-        * if the write fails due to a Ceph error.
+   * if buf is null or off + len > buf.length, or
+   * if the write fails due to a Ceph error.
    */
   @Override
-       public synchronized void write(byte buf[], int off, int len) throws IOException {
-               ceph.debug("CephOutputStream.write: writing " + len + 
-                                                        " bytes to fd " + fileHandle, ceph.TRACE);
-               // make sure stream is open
-               if (closed) {
-                       throw new IOException("CephOutputStream.write: cannot write " + len + 
-                                                                                                               "bytes to fd " + fileHandle + ": stream closed");
-               }
+  public synchronized void write(byte buf[], int off, int len) throws IOException {
+    ceph.debug(
+        "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle,
+        ceph.TRACE);
+    // make sure stream is open
+    if (closed) {
+      throw new IOException(
+          "CephOutputStream.write: cannot write " + len + "bytes to fd "
+          + fileHandle + ": stream closed");
+    }
                
-               int result;
-               int write;
-               while (len>0) {
-                       write = Math.min(len, buffer.length - bufUsed);
-                       try {
-                               System.arraycopy(buf, off, buffer, bufUsed, write);
-                       }
-                       catch (IndexOutOfBoundsException ie) {
-                               throw new IOException("CephOutputStream.write: Indices out of bounds: "
-                                                                                                                       + "write length is " + len
-                                                                                                                       + ", buffer offset is " + off
-                                                                                                                       + ", and buffer size is " + buf.length);
-                       }
-                       catch (ArrayStoreException ae) {
-                               throw new IOException("Uh-oh, CephOutputStream failed to do an array"
-                                                                                                                       + " copy due to type mismatch...");
-                       }
-                       catch (NullPointerException ne) {
-                               throw new IOException("CephOutputStream.write: cannot write "
-                                                                                                                       + len + "bytes to fd " + fileHandle
-                                                                                                                       + ": buffer is null");
-                       }
-                       bufUsed += write;
-                       len -= write;
-                       off += write;
-                       if (bufUsed == buffer.length) {
-                               result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-                               if (result < 0)
-                                       throw new IOException("CephOutputStream.write: Buffered write of "
-                                                                                                                               + bufUsed + " bytes failed!");
-                               if (result != bufUsed)
-                                       throw new IOException("CephOutputStream.write: Wrote only "
-                                                                                                                               + result + " bytes of " + bufUsed
-                                                                                                                               + " in buffer! Data may be lost or written"
-                                                                                                                               + " twice to Ceph!");
-                               bufUsed = 0;
-                       }
-
-               }
-               return; 
-       }
+    int result;
+    int write;
+
+    while (len > 0) {
+      write = Math.min(len, buffer.length - bufUsed);
+      try {
+        System.arraycopy(buf, off, buffer, bufUsed, write);
+      } catch (IndexOutOfBoundsException ie) {
+        throw new IOException(
+            "CephOutputStream.write: Indices out of bounds: "
+                + "write length is " + len + ", buffer offset is " + off
+                + ", and buffer size is " + buf.length);
+      } catch (ArrayStoreException ae) {
+        throw new IOException(
+            "Uh-oh, CephOutputStream failed to do an array"
+                + " copy due to type mismatch...");
+      } catch (NullPointerException ne) {
+        throw new IOException(
+            "CephOutputStream.write: cannot write " + len + "bytes to fd "
+            + fileHandle + ": buffer is null");
+      }
+      bufUsed += write;
+      len -= write;
+      off += write;
+      if (bufUsed == buffer.length) {
+        result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+        if (result < 0) {
+          throw new IOException(
+              "CephOutputStream.write: Buffered write of " + bufUsed
+              + " bytes failed!");
+        }
+        if (result != bufUsed) {
+          throw new IOException(
+              "CephOutputStream.write: Wrote only " + result + " bytes of "
+              + bufUsed + " in buffer! Data may be lost or written"
+              + " twice to Ceph!");
+        }
+        bufUsed = 0;
+      }
+
+    }
+    return; 
+  }
    
   /**
    * Flush the buffered data.
    * @throws IOException if you've closed the stream or the write fails.
    */
   @Override
-       public synchronized void flush() throws IOException {
-                       if (!closed) {
-                               if (bufUsed == 0) return;
-                               int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-                               if (result < 0) {
-                                       throw new IOException("CephOutputStream.write: Write of "
-                                                                                                                               + bufUsed + "bytes to fd "
-                                                                                                                               + fileHandle + " failed");
-                               }
-                               if (result != bufUsed) {
-                                       throw new IOException("CephOutputStream.write: Write of " + bufUsed
-                                                                                                                               + "bytes to fd " + fileHandle
-                                                                                                                               + "was incomplete:  only " + result + " of "
-                                                                                                                               + bufUsed + " bytes were written.");
-                               }
-                               bufUsed = 0;
-                               return;
-                       }
-       }
+  public synchronized void flush() throws IOException {
+    if (!closed) {
+      if (bufUsed == 0) {
+        return;
+      }
+      int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+
+      if (result < 0) {
+        throw new IOException(
+            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+            + fileHandle + " failed");
+      }
+      if (result != bufUsed) {
+        throw new IOException(
+            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+            + fileHandle + "was incomplete:  only " + result + " of " + bufUsed
+            + " bytes were written.");
+      }
+      bufUsed = 0;
+      return;
+    }
+  }
   
   /**
    * Close the CephOutputStream.
    * @throws IOException if Ceph somehow returns an error. In current code it can't.
    */
   @Override
-       public synchronized void close() throws IOException {
-      ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
-      if (!closed) {
-                               flush();
-                               int result = ceph.ceph_close(fileHandle);
-                               if (result != 0) {
-                                       throw new IOException("Close failed!");
-                               }
+  public synchronized void close() throws IOException {
+    ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
+    if (!closed) {
+      flush();
+      int result = ceph.ceph_close(fileHandle);
+
+      if (result != 0) {
+        throw new IOException("Close failed!");
+      }
                                
-                               closed = true;
-                               ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-                       }
-       }
+      closed = true;
+      ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
+    }
+  }
 }
index 20b494b7342ed2b225839b388c2fd715074634fa..9e416a0e231c0ccdde8e7fbad53465dfa8c29ede 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  *
  * Licensed under the Apache License, Version 2.0
  */
 package org.apache.hadoop.fs.ceph;
 
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 
+
 class CephTalker extends CephFS {
-       //we write a constructor so we can load the libraries
-       public CephTalker(Configuration conf, Log log) {
-               super(conf, log);
-               System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-               System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
-       }
-       protected native boolean ceph_initializeClient(String arguments, int block_size);
-       protected native String ceph_getcwd();
-       protected native boolean ceph_setcwd(String path);
+  // we write a constructor so we can load the libraries
+  public CephTalker(Configuration conf, Log log) {
+    super(conf, log);
+    System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
+    System.load(conf.get("fs.ceph.libDir") + "/libceph.so");
+  }
+
+  protected native boolean ceph_initializeClient(String arguments, int block_size);
+
+  protected native String ceph_getcwd();
+
+  protected native boolean ceph_setcwd(String path);
+
   protected native boolean ceph_rmdir(String path);
+
   protected native boolean ceph_unlink(String path);
+
   protected native boolean ceph_rename(String old_path, String new_path);
+
   protected native boolean ceph_exists(String path);
+
   protected native long ceph_getblocksize(String path);
+
   protected native boolean ceph_isdirectory(String path);
+
   protected native boolean ceph_isfile(String path);
+
   protected native String[] ceph_getdir(String path);
+
   protected native int ceph_mkdirs(String path, int mode);
+
   protected native int ceph_open_for_append(String path);
+
   protected native int ceph_open_for_read(String path);
+
   protected native int ceph_open_for_overwrite(String path, int mode);
+
   protected native int ceph_close(int filehandle);
+
   protected native boolean ceph_setPermission(String path, int mode);
+
   protected native boolean ceph_kill_client();
+
   protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
+
   protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
+
   protected native int ceph_replication(String path);
+
   protected native String ceph_hosts(int fh, long offset);
+
   protected native int ceph_setTimes(String path, long mtime, long atime);
+
   protected native long ceph_getpos(int fh);
+
   protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-       protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-       protected native long ceph_seek_from_start(int fh, long pos);
+
+  protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
+
+  protected native long ceph_seek_from_start(int fh, long pos);
 }
index eaafab0ba1968b97b48858decd3f156feb858c19..7f872c888b2e1928e2aa91f07900ee8e510d212d 100644 (file)
@@ -1,4 +1,5 @@
 // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -21,6 +22,7 @@
 
 package org.apache.hadoop.fs.ceph;
 
+
 import java.io.IOException;
 import java.net.URI;
 import org.apache.hadoop.fs.FileSystemContractBaseTest;
@@ -28,15 +30,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+
 public class TestCeph extends FileSystemContractBaseTest {
 
-       @Override
-    protected void setUp() throws IOException {
+  @Override
+  protected void setUp() throws IOException {
     Configuration conf = new Configuration();
     CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
     CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
+
     cephfs.initialize(URI.create("ceph://null"), conf);
-               fs = cephfs;
-               cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
-       }
+    fs = cephfs;
+    cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
+  }
 }