]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
hadoop: update patch and Readme.
authorGreg Farnum <gregory.farnum@dreamhost.com>
Wed, 9 Nov 2011 23:23:38 +0000 (15:23 -0800)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Wed, 9 Nov 2011 23:38:57 +0000 (15:38 -0800)
Patch generated by Noah Watkins <noahwatkins@gmail.com>

Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/client/hadoop/HADOOP-ceph.patch
src/client/hadoop/Readme

index 388916a7faa30e5c6b4f64bfe5cd799fcb17e27e..84cdb370f774355ec2af21c06c6765122dfd78d3 100644 (file)
-Index: src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java
-===================================================================
---- src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java      (revision 0)
-+++ src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java      (revision 0)
-@@ -0,0 +1,42 @@
+diff --git a/src/core/core-default.xml b/src/core/core-default.xml
+index 8bc3b99..26543bc 100644
+--- a/src/core/core-default.xml
++++ b/src/core/core-default.xml
+@@ -210,6 +210,12 @@
+ </property>
+ <property>
++  <name>fs.ceph.impl</name>
++  <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
++  <description>The file system for ceph: uris.</description>
++</property>
++
++<property>
+   <name>fs.har.impl.disable.cache</name>
+   <value>true</value>
+   <description>Don't cache 'har' filesystem instances.</description>
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
+new file mode 100644
+index 0000000..5d51eb2
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
+@@ -0,0 +1,250 @@
 +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++
 +/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements.  See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership.  The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License.  You may obtain a copy of the License at
 + *
-+ *     http://www.apache.org/licenses/LICENSE-2.0
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
 + *
-+ * Unit tests for the CephFileSystem API implementation.
++ * 
++ * Abstract base class for communicating with a Ceph filesystem and its
++ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
++ * As only the Ceph package should be using this directly, all methods
++ * are protected.
 + */
-+
 +package org.apache.hadoop.fs.ceph;
 +
-+import java.io.IOException;
-+import java.net.URI;
-+import org.apache.hadoop.fs.FileSystemContractBaseTest;
 +import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.Path;
 +
-+public class TestCeph extends FileSystemContractBaseTest {
++abstract class CephFS {
 +
-+      @Override
-+    protected void setUp() throws IOException {
-+    Configuration conf = new Configuration();
-+    CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
-+    CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
-+    cephfs.initialize(URI.create("ceph://null"), conf);
-+              fs = cephfs;
-+              cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
-+      }
++  protected static final int ENOTDIR = 20;
++  protected static final int EEXIST = 17;
++  protected static final int ENOENT = 2;
++
++  /*
++   * Performs any necessary setup to allow general use of the filesystem.
++   * Inputs:
++   *  String argsuments -- a command-line style input of Ceph config params
++   *  int block_size -- the size in bytes to use for blocks
++   * Returns: true on success, false otherwise
++   */
++  abstract protected boolean ceph_initializeClient(String arguments, int block_size);
++      
++  /*
++   * Returns the current working directory (absolute) as a String
++   */
++  abstract protected String ceph_getcwd();
++
++  /*
++   * Changes the working directory.
++   * Inputs:
++   *  String path: The path (relative or absolute) to switch to
++   * Returns: true on success, false otherwise.
++   */
++  abstract protected boolean ceph_setcwd(String path);
++
++  /*
++   * Given a path to a directory, removes the directory if empty.
++   * Inputs:
++   *  jstring j_path: The path (relative or absolute) to the directory
++   * Returns: true on successful delete; false otherwise
++   */
++  abstract protected boolean ceph_rmdir(String path);
++
++  /*
++   * Given a path, unlinks it.
++   * Inputs:
++   *  String path: The path (relative or absolute) to the file or empty dir
++   * Returns: true if the unlink occurred, false otherwise.
++   */
++  abstract protected boolean ceph_unlink(String path);
++
++  /*
++   * Changes a given path name to a new name, assuming new_path doesn't exist.
++   * Inputs:
++   *  jstring j_from: The path whose name you want to change.
++   *  jstring j_to: The new name for the path.
++   * Returns: true if the rename occurred, false otherwise
++   */
++  abstract protected boolean ceph_rename(String old_path, String new_path);
++
++  /*
++   * Returns true if it the input path exists, false
++   * if it does not or there is an unexpected failure.
++   */
++  abstract protected boolean ceph_exists(String path);
++
++  /*
++   * Get the block size for a given path.
++   * Input:
++   *  String path: The path (relative or absolute) you want
++   *  the block size for.
++   * Returns: block size if the path exists, otherwise a negative number
++   *  corresponding to the standard C++ error codes (which are positive).
++   */
++  abstract protected long ceph_getblocksize(String path);
++
++  /*
++   * Returns true if the given path is a directory, false otherwise.
++   */
++  abstract protected boolean ceph_isdirectory(String path);
++
++  /*
++   * Returns true if the given path is a file; false otherwise.
++   */
++  abstract protected boolean ceph_isfile(String path);
++
++  /*
++   * Get the contents of a given directory.
++   * Inputs:
++   *  String path: The path (relative or absolute) to the directory.
++   * Returns: A Java String[] of the contents of the directory, or
++   *  NULL if there is an error (ie, path is not a dir). This listing
++   *  will not contain . or .. entries.
++   */
++  abstract protected String[] ceph_getdir(String path);
++
++  /*
++   * Create the specified directory and any required intermediate ones with the
++   * given mode.
++   */
++  abstract protected int ceph_mkdirs(String path, int mode);
++
++  /*
++   * Open a file to append. If the file does not exist, it will be created.
++   * Opening a dir is possible but may have bad results.
++   * Inputs:
++   *  String path: The path to open.
++   * Returns: an int filehandle, or a number<0 if an error occurs.
++   */
++  abstract protected int ceph_open_for_append(String path);
++
++  /*
++   * Open a file for reading.
++   * Opening a dir is possible but may have bad results.
++   * Inputs:
++   *  String path: The path to open.
++   * Returns: an int filehandle, or a number<0 if an error occurs.
++   */
++  abstract protected int ceph_open_for_read(String path);
++
++  /*
++   * Opens a file for overwriting; creates it if necessary.
++   * Opening a dir is possible but may have bad results.
++   * Inputs:
++   *  String path: The path to open.
++   *  int mode: The mode to open with.
++   * Returns: an int filehandle, or a number<0 if an error occurs.
++   */
++  abstract protected int ceph_open_for_overwrite(String path, int mode);
++
++  /*
++   * Closes the given file. Returns 0 on success, or a negative
++   * error code otherwise.
++   */
++  abstract protected int ceph_close(int filehandle);
++
++  /*
++   * Change the mode on a path.
++   * Inputs:
++   *  String path: The path to change mode on.
++   *  int mode: The mode to apply.
++   * Returns: true if the mode is properly applied, false if there
++   *  is any error.
++   */
++  abstract protected boolean ceph_setPermission(String path, int mode);
++
++  /*
++   * Closes the Ceph client. This should be called before shutting down
++   * (multiple times is okay but redundant).
++   */
++  abstract protected boolean ceph_kill_client();
++
++  /*
++   * Get the statistics on a path returned in a custom format defined
++   * in CephFileSystem.
++   * Inputs:
++   *  String path: The path to stat.
++   *  Stat fill: The stat object to fill.
++   * Returns: true if the stat is successful, false otherwise.
++   */
++  abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
++
++  /*
++   * Check how many times a file should be replicated. If it is,
++   * degraded it may not actually be replicated this often.
++   * Inputs:
++   *  int fh: a file descriptor
++   * Returns: an int containing the number of times replicated.
++   */
++  abstract protected int ceph_replication(String path);
++
++  /*
++   * Find the IP address of the primary OSD for a given file and offset.
++   * Inputs:
++   *  int fh: The filehandle for the file.
++   *  long offset: The offset to get the location of.
++   * Returns: an array of String of the location as IP, or NULL if there is an error.
++   */
++  abstract protected String[] ceph_hosts(int fh, long offset);
++
++  /*
++   * Set the mtime and atime for a given path.
++   * Inputs:
++   *  String path: The path to set the times for.
++   *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
++   *  long atime: The atime to set, in millis since epoch (-1 to not set)
++   * Returns: 0 if successful, an error code otherwise.
++   */
++  abstract protected int ceph_setTimes(String path, long mtime, long atime);
++
++  /*
++   * Get the current position in a file (as a long) of a given filehandle.
++   * Returns: (long) current file position on success, or a
++   *  negative error code on failure.
++   */
++  abstract protected long ceph_getpos(int fh);
++
++  /*
++   * Write the given buffer contents to the given filehandle.
++   * Inputs:
++   *  int fh: The filehandle to write to.
++   *  byte[] buffer: The buffer to write from
++   *  int buffer_offset: The position in the buffer to write from
++   *  int length: The number of (sequential) bytes to write.
++   * Returns: int, on success the number of bytes written, on failure
++   *  a negative error code.
++   */
++  abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++  /*
++   * Reads into the given byte array from the current position.
++   * Inputs:
++   *  int fh: the filehandle to read from
++   *  byte[] buffer: the byte array to read into
++   *  int buffer_offset: where in the buffer to start writing
++   *  int length: how much to read.
++   * There'd better be enough space in the buffer to write all
++   * the data from the given offset!
++   * Returns: the number of bytes read on success (as an int),
++   *  or an error code otherwise.      */
++  abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++
++  /*
++   * Seeks to the given position in the given file.
++   * Inputs:
++   *  int fh: The filehandle to seek in.
++   *  long pos: The position to seek to.
++   * Returns: the new position (as a long) of the filehandle on success,
++   *  or a negative error code on failure.     */
++  abstract protected long ceph_seek_from_start(int fh, long pos);
 +}
-Index: src/java/org/apache/hadoop/fs/ceph/SubmitProcess
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/SubmitProcess   (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/SubmitProcess   (revision 0)
-@@ -0,0 +1,2 @@
-+1) Remove the emacs indententation rules line.
-+2) ? No more?
-\ No newline at end of file
-Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
-@@ -0,0 +1,59 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
+new file mode 100644
+index 0000000..c598f53
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
+@@ -0,0 +1,483 @@
 +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -74,55 +294,480 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java
 + * permissions and limitations under the License.
 + *
 + * 
-+ * Wraps a number of native function calls to communicate with the Ceph
-+ * filesystem.
++ * This uses the local Filesystem but pretends to be communicating
++ * with a Ceph deployment, for unit testing the CephFileSystem.
 + */
++
 +package org.apache.hadoop.fs.ceph;
 +
-+import org.apache.hadoop.conf.Configuration;
++
++import java.net.URI;
++import java.util.Hashtable;
++import java.io.Closeable;
++import java.io.FileNotFoundException;
++import java.io.IOException;
++
 +import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
 +
-+class CephTalker extends CephFS {
-+      //we write a constructor so we can load the libraries
-+      public CephTalker(Configuration conf, Log log) {
-+              super(conf, log);
-+              System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+              System.load(conf.get("fs.ceph.libDir")+"/libcephfs.so");
-+      }
-+      protected native boolean ceph_initializeClient(String arguments, int block_size);
-+      protected native String ceph_getcwd();
-+      protected native boolean ceph_setcwd(String path);
-+  protected native boolean ceph_rmdir(String path);
-+  protected native boolean ceph_unlink(String path);
-+  protected native boolean ceph_rename(String old_path, String new_path);
-+  protected native boolean ceph_exists(String path);
-+  protected native long ceph_getblocksize(String path);
-+  protected native boolean ceph_isdirectory(String path);
-+  protected native boolean ceph_isfile(String path);
-+  protected native String[] ceph_getdir(String path);
-+  protected native int ceph_mkdirs(String path, int mode);
-+  protected native int ceph_open_for_append(String path);
-+  protected native int ceph_open_for_read(String path);
-+  protected native int ceph_open_for_overwrite(String path, int mode);
-+  protected native int ceph_close(int filehandle);
-+  protected native boolean ceph_setPermission(String path, int mode);
-+  protected native boolean ceph_kill_client();
-+  protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+  protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
-+  protected native int ceph_replication(String path);
-+  protected native String ceph_hosts(int fh, long offset);
-+  protected native int ceph_setTimes(String path, long mtime, long atime);
-+  protected native long ceph_getpos(int fh);
-+  protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+      protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+      protected native long ceph_seek_from_start(int fh, long pos);
++
++class CephFaker extends CephFS {
++  private static final Log LOG = LogFactory.getLog(CephFaker.class);
++  FileSystem localFS;
++  String localPrefix;
++  int blockSize;
++  Configuration conf;
++  Hashtable<Integer, Object> files;
++  Hashtable<Integer, String> filenames;
++  int fileCount = 0;
++  boolean initialized = false;
++      
++  public CephFaker(Configuration con, Log log) {
++    conf = con;
++    files = new Hashtable<Integer, Object>();
++    filenames = new Hashtable<Integer, String>();
++  }
++      
++  protected boolean ceph_initializeClient(String args, int block_size) {
++    if (!initialized) {
++      // let's remember the default block_size
++      blockSize = block_size;
++
++      /* for a real Ceph deployment, this starts up the client, 
++       * sets debugging levels, etc. We just need to get the
++       * local FileSystem to use, and we'll ignore any
++       * command-line arguments. */
++      try {
++        localFS = FileSystem.getLocal(conf);
++        localFS.initialize(URI.create("file://localhost"), conf);
++        localFS.setVerifyChecksum(false);
++        String testDir = conf.get("hadoop.tmp.dir");
++
++        localPrefix = localFS.getWorkingDirectory().toString();
++        int testDirLoc = localPrefix.indexOf(testDir) - 1;
++
++        if (-2 == testDirLoc) {
++          testDirLoc = localPrefix.length();
++        }
++        localPrefix = localPrefix.substring(0, testDirLoc) + "/"
++            + conf.get("hadoop.tmp.dir");
++
++        localFS.setWorkingDirectory(
++            new Path(localPrefix + "/user/" + System.getProperty("user.name")));
++        // I don't know why, but the unit tests expect the default
++        // working dir to be /user/username, so satisfy them!
++        // debug("localPrefix is " + localPrefix, INFO);
++      } catch (IOException e) {
++        return false;
++      }
++      initialized = true;
++    }
++    return true;
++  }
++
++  protected String ceph_getcwd() {
++    return sanitize_path(localFS.getWorkingDirectory().toString());
++  }
++
++  protected boolean ceph_setcwd(String path) {
++    localFS.setWorkingDirectory(new Path(prepare_path(path)));
++    return true;
++  }
++
++  // the caller is responsible for ensuring empty dirs
++  protected boolean ceph_rmdir(String pth) {
++    Path path = new Path(prepare_path(pth));
++    boolean ret = false;
++
++    try {
++      if (localFS.listStatus(path).length <= 1) {
++        ret = localFS.delete(path, true);
++      }
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  // this needs to work on (empty) directories too
++  protected boolean ceph_unlink(String path) {
++    path = prepare_path(path);
++    boolean ret = false;
++
++    if (ceph_isdirectory(path)) {
++      ret = ceph_rmdir(path);
++    } else {
++      try {
++        ret = localFS.delete(new Path(path), false);
++      } catch (IOException e) {}
++    }
++    return ret;
++  }
++
++  protected boolean ceph_rename(String oldName, String newName) {
++    oldName = prepare_path(oldName);
++    newName = prepare_path(newName);
++    try {
++      Path parent = new Path(newName).getParent();
++      Path newPath = new Path(newName);
++
++      if (localFS.exists(parent) && !localFS.exists(newPath)) {
++        return localFS.rename(new Path(oldName), newPath);
++      }
++      return false;
++    } catch (IOException e) {
++      return false;
++    }
++  }
++
++  protected boolean ceph_exists(String path) {
++    path = prepare_path(path);
++    boolean ret = false;
++
++    try {
++      ret = localFS.exists(new Path(path));
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  protected long ceph_getblocksize(String path) {
++    path = prepare_path(path);
++    try {
++      FileStatus status = localFS.getFileStatus(new Path(path));
++
++      return status.getBlockSize();
++    } catch (FileNotFoundException e) {
++      return -CephFS.ENOENT;
++    } catch (IOException e) {
++      return -1; // just fail generically
++    }
++  }
++
++  protected boolean ceph_isdirectory(String path) {
++    path = prepare_path(path);
++    try {
++      FileStatus status = localFS.getFileStatus(new Path(path));
++
++      return status.isDir();
++    } catch (IOException e) {
++      return false;
++    }
++  }
++
++  protected boolean ceph_isfile(String path) {
++    path = prepare_path(path);
++    boolean ret = false;
++
++    try {
++      FileStatus status = localFS.getFileStatus(new Path(path));
++
++      ret = !status.isDir();
++    } catch (Exception e) {}
++    return ret;
++  }
++
++  protected String[] ceph_getdir(String path) {
++    path = prepare_path(path);
++    if (!ceph_isdirectory(path)) {
++      return null;
++    }
++    try {
++      FileStatus[] stats = localFS.listStatus(new Path(path));
++      String[] names = new String[stats.length];
++      String name;
++
++      for (int i = 0; i < stats.length; ++i) {
++        name = stats[i].getPath().toString();
++        names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
++      }
++      return names;
++    } catch (IOException e) {}
++    return null;
++  }
++
++  protected int ceph_mkdirs(String path, int mode) {
++    path = prepare_path(path);
++    // debug("ceph_mkdirs on " + path, INFO);
++    try {
++      if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
++        return 0;
++      }
++    } catch (IOException e) {}
++    if (ceph_isdirectory(path)) { // apparently it already existed
++      return -EEXIST;
++    } else if (ceph_isfile(path)) {
++                      return -ENOTDIR;
++              }
++    return -1;
++  }
++
++  /*
++   * Unlike a real Ceph deployment, you can't do opens on a directory.
++   * Since that has unpredictable behavior and you shouldn't do it anyway,
++   * it's okay.
++   */
++  protected int ceph_open_for_append(String path) {
++    path = prepare_path(path);
++    FSDataOutputStream stream;
++
++    try {
++      stream = localFS.append(new Path(path));
++      files.put(new Integer(fileCount), stream);
++      filenames.put(new Integer(fileCount), path);
++      return fileCount++;
++    } catch (IOException e) {}
++    return -1; // failure
++  }
++
++  protected int ceph_open_for_read(String path) {
++    path = prepare_path(path);
++    FSDataInputStream stream;
++
++    try {
++      stream = localFS.open(new Path(path));
++      files.put(new Integer(fileCount), stream);
++      filenames.put(new Integer(fileCount), path);
++      LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
++      return fileCount++;
++    } catch (IOException e) {}
++    return -1; // failure
++  }
++
++  protected int ceph_open_for_overwrite(String path, int mode) {
++    path = prepare_path(path);
++    FSDataOutputStream stream;
++
++    try {
++      stream = localFS.create(new Path(path));
++      files.put(new Integer(fileCount), stream);
++      filenames.put(new Integer(fileCount), path);
++      LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
++      return fileCount++;
++    } catch (IOException e) {}
++    return -1; // failure
++  }
++
++  protected int ceph_close(int filehandle) {
++    LOG.info("ceph_close(filehandle " + filehandle + ")");
++    try {
++      ((Closeable) files.get(new Integer(filehandle))).close();
++      if (null == files.get(new Integer(filehandle))) {
++        return -ENOENT; // this isn't quite the right error code,
++        // but the important part is it's negative
++      }
++      return 0; // hurray, success
++    } catch (NullPointerException ne) {
++      LOG.warn("ceph_close caught NullPointerException!" + ne);
++    } // err, how?
++    catch (IOException ie) {
++      LOG.warn("ceph_close caught IOException!" + ie);
++    }
++    return -1; // failure
++  }
++
++  protected boolean ceph_setPermission(String pth, int mode) {
++    pth = prepare_path(pth);
++    Path path = new Path(pth);
++    boolean ret = false;
++
++    try {
++      localFS.setPermission(path, new FsPermission((short) mode));
++      ret = true;
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  // rather than try and match a Ceph deployment's behavior exactly,
++  // just make bad things happen if they try and call methods after this
++  protected boolean ceph_kill_client() {
++    // debug("ceph_kill_client", INFO);
++    localFS.setWorkingDirectory(new Path(localPrefix));
++    // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
++    try {
++      localFS.close();
++    } catch (Exception e) {}
++    localFS = null;
++    files = null;
++    filenames = null;
++    return true;
++  }
++
++  protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
++    pth = prepare_path(pth);
++    Path path = new Path(pth);
++    boolean ret = false;
++
++    try {
++      FileStatus status = localFS.getFileStatus(path);
++
++      fill.size = status.getLen();
++      fill.is_dir = status.isDir();
++      fill.block_size = status.getBlockSize();
++      fill.mod_time = status.getModificationTime();
++      fill.access_time = status.getAccessTime();
++      fill.mode = status.getPermission().toShort();
++      ret = true;
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  protected int ceph_replication(String path) {
++    path = prepare_path(path);
++    int ret = -1; // -1 for failure
++
++    try {
++      ret = localFS.getFileStatus(new Path(path)).getReplication();
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  protected String[] ceph_hosts(int fh, long offset) {
++    String[] ret = null;
++
++    try {
++      BlockLocation[] locs = localFS.getFileBlockLocations(
++          localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
++          offset, 1);
++
++      ret = locs[0].getNames();
++    } catch (IOException e) {} catch (NullPointerException f) {}
++    return ret;
++  }
++
++  protected int ceph_setTimes(String pth, long mtime, long atime) {
++    pth = prepare_path(pth);
++    Path path = new Path(pth);
++    int ret = -1; // generic fail
++
++    try {
++      localFS.setTimes(path, mtime, atime);
++      ret = 0;
++    } catch (IOException e) {}
++    return ret;
++  }
++
++  protected long ceph_getpos(int fh) {
++    long ret = -1; // generic fail
++
++    try {
++      Object stream = files.get(new Integer(fh));
++
++      if (stream instanceof FSDataInputStream) {
++        ret = ((FSDataInputStream) stream).getPos();
++      } else if (stream instanceof FSDataOutputStream) {
++        ret = ((FSDataOutputStream) stream).getPos();
++      }
++    } catch (IOException e) {} catch (NullPointerException f) {}
++    return ret;
++  }
++
++  protected int ceph_write(int fh, byte[] buffer,
++      int buffer_offset, int length) {
++    LOG.info(
++        "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
++        + length);
++    long ret = -1; // generic fail
++
++    try {
++      FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
++
++      LOG.info("ceph_write got outputstream");
++      long startPos = os.getPos();
++
++      os.write(buffer, buffer_offset, length);
++      ret = os.getPos() - startPos;
++    } catch (IOException e) {
++      LOG.warn("ceph_write caught IOException!");
++    } catch (NullPointerException f) {
++      LOG.warn("ceph_write caught NullPointerException!");
++    }
++    return (int) ret;
++  }
++
++  protected int ceph_read(int fh, byte[] buffer,
++      int buffer_offset, int length) {
++    long ret = -1; // generic fail
++
++    try {
++      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++      long startPos = is.getPos();
++
++      is.read(buffer, buffer_offset, length);
++      ret = is.getPos() - startPos;
++    } catch (IOException e) {} catch (NullPointerException f) {}
++    return (int) ret;
++  }
++
++  protected long ceph_seek_from_start(int fh, long pos) {
++    LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
++    long ret = -1; // generic fail
++
++    try {
++      LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
++      if (null == files.get(new Integer(fh))) {
++        LOG.warn("ceph_seek_from_start: is is null!");
++      }
++      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++
++      LOG.info("ceph_seek_from_start retrieved is!");
++      is.seek(pos);
++      ret = is.getPos();
++    } catch (IOException e) {
++      LOG.warn("ceph_seek_from_start caught IOException!");
++    } catch (NullPointerException f) {
++      LOG.warn("ceph_seek_from_start caught NullPointerException!");
++    }
++    return (int) ret;
++  }
++
++  /*
++   * We need to remove the localFS file prefix before returning to Ceph
++   */
++  private String sanitize_path(String path) {
++    // debug("sanitize_path(" + path + ")", INFO);
++    /* if (path.startsWith("file:"))
++     path = path.substring("file:".length()); */
++    if (path.startsWith(localPrefix)) {
++      path = path.substring(localPrefix.length());
++      if (path.length() == 0) { // it was a root path
++        path = "/";
++      }
++    }
++    // debug("sanitize_path returning " + path, INFO);
++    return path;
++  }
++
++  /*
++   * If it's an absolute path we need to shove the
++   * test dir onto the front as a prefix.
++   */
++  private String prepare_path(String path) {
++    // debug("prepare_path(" + path + ")", INFO);
++    if (path.startsWith("/")) {
++      path = localPrefix + path;
++    } else if (path.equals("..")) {
++      if (ceph_getcwd().equals("/")) {
++        path = ".";
++      } // you can't go up past root!
++    }
++    // debug("prepare_path returning" + path, INFO);
++    return path;
++  }
 +}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java     (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java     (revision 0)
-@@ -0,0 +1,848 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
+new file mode 100644
+index 0000000..95f2223
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
+@@ -0,0 +1,804 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -137,22 +782,26 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 + * implied. See the License for the specific language governing
 + * permissions and limitations under the License.
 + *
-+ * 
++ *
 + * Implements the Hadoop FS interfaces to allow applications to store
 + * files in Ceph.
 + */
 +package org.apache.hadoop.fs.ceph;
 +
++
 +import java.io.IOException;
 +import java.io.FileNotFoundException;
 +import java.io.OutputStream;
 +import java.net.URI;
++import java.net.InetAddress;
 +import java.util.EnumSet;
 +import java.lang.Math;
++import java.util.ArrayList;
 +
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FileAlreadyExistsException;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
@@ -162,8 +811,8 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +import org.apache.hadoop.fs.permission.FsPermission;
 +import org.apache.hadoop.util.Progressable;
 +import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FsStatus;
-+import org.apache.hadoop.fs.CreateFlag;
++import org.apache.hadoop.net.DNS;
++
 +
 +/**
 + * <p>
@@ -187,15 +836,16 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 + * from the respective Ceph system of at least that importance.
 + */
 +public class CephFileSystem extends FileSystem {
-+
++  private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
 +  private URI uri;
 +
++  private Path workingDir;
 +  private final Path root;
-+  private boolean initialized = false;
-+      private CephFS ceph = null;
++  private CephFS ceph = null;
 +
-+  private boolean debug = false;
-+  private String fs_default_name;
++  private static String CEPH_NAMESERVER;
++  private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
++  private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
 +
 +  /**
 +   * Create a new CephFileSystem.
@@ -204,25 +854,23 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +    root = new Path("/");
 +  }
 +
-+      /**
-+       * Used for testing purposes, this constructor
-+       * sets the given CephFS instead of defaulting to a
-+       * CephTalker (with its assumed real Ceph instance to talk to).
-+       */
-+      public CephFileSystem(CephFS ceph_fs, String default_path) {
-+              super();
-+              root = new Path("/");
-+              ceph = ceph_fs;
-+              fs_default_name = default_path;
-+      }
++  /**
++   * Used for testing purposes, this constructor
++   * sets the given CephFS instead of defaulting to a
++   * CephTalker (with its assumed real Ceph instance to talk to).
++   */
++  public CephFileSystem(CephFS ceph_fs) {
++    super();
++    root = new Path("/");
++    ceph = ceph_fs;
++  }
 +
 +  /**
 +   * Lets you get the URI of this CephFileSystem.
 +   * @return the URI.
 +   */
 +  public URI getUri() {
-+    if (!initialized) return null;
-+    ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
++    LOG.debug("getUri:exit with return " + uri);
 +    return uri;
 +  }
 +
@@ -232,75 +880,69 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Starts up the connection to Ceph, reads in configuraton options, etc.
 +   * @param uri The URI for this filesystem.
 +   * @param conf The Hadoop Configuration to retrieve properties from.
-+   * @throws IOException if the Ceph client initialization fails
-+   * or necessary properties are unset.
++   * @throws IOException if necessary properties are unset.
 +   */
 +  @Override
 +  public void initialize(URI uri, Configuration conf) throws IOException {
-+    if (!initialized) {
-+      super.initialize(uri, conf);
-+      setConf(conf);
-+      this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());  
-+      statistics = getStatistics(uri.getScheme(), getClass());
-+
-+                      if (ceph == null) {
-+                              ceph = new CephTalker(conf, LOG);
-+                      }
-+      if (null == fs_default_name) {
-+                              fs_default_name = conf.get("fs.default.name");
-+                      }
-+      //build up the arguments for Ceph
-+      String arguments = "CephFSInterface";
-+      arguments += conf.get("fs.ceph.commandLine", "");
-+      if (conf.get("fs.ceph.clientDebug") != null) {
-+                              arguments += " --debug_client ";
-+                              arguments += conf.get("fs.ceph.clientDebug");
-+      }
-+      if (conf.get("fs.ceph.messengerDebug") != null) {
-+                              arguments += " --debug_ms ";
-+                              arguments += conf.get("fs.ceph.messengerDebug");
-+      }
-+      if (conf.get("fs.ceph.monAddr") != null) {
-+                              arguments += " -m ";
-+                              arguments += conf.get("fs.ceph.monAddr");
-+      }
-+      arguments += " --client-readahead-max-periods="
-+                              + conf.get("fs.ceph.readahead", "1");
-+      //make sure they gave us a ceph monitor address or conf file
-+                      ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
-+      if ( (conf.get("fs.ceph.monAddr") == null) &&
-+                                       (arguments.indexOf("-m") == -1) &&
-+                                       (arguments.indexOf("-c") == -1) ) {
-+                              ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
-+                              throw new IOException("You must specify a Ceph monitor address or config file!");
-+      }
-+      //  Initialize the client
-+      if (!ceph.ceph_initializeClient(arguments,
-+                                                                                                                               conf.getInt("fs.ceph.blockSize", 1<<26))) {
-+                              ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
-+                              throw new IOException("Ceph initialization failed!");
-+      }
-+      initialized = true;
-+      ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
-+      ceph.ceph_setcwd("/");
++    super.initialize(uri, conf);
++    setConf(conf);
++    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
++    if (ceph == null) {
++      ceph = new CephTalker(conf, LOG);
++    }
++
++    CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
++
++    // build up the arguments for Ceph
++    String arguments = "CephFSInterface";
++
++    arguments += conf.get("fs.ceph.commandLine", "");
++    if (conf.get("fs.ceph.clientDebug") != null) {
++      arguments += " --debug_client ";
++      arguments += conf.get("fs.ceph.clientDebug");
 +    }
-+    ceph.debug("initialize:exit", ceph.DEBUG);
++    if (conf.get("fs.ceph.messengerDebug") != null) {
++      arguments += " --debug_ms ";
++      arguments += conf.get("fs.ceph.messengerDebug");
++    }
++    if (conf.get("fs.ceph.monAddr") != null) {
++      arguments += " -m ";
++      arguments += conf.get("fs.ceph.monAddr");
++    }
++    arguments += " --client-readahead-max-periods="
++        + conf.get("fs.ceph.readahead", "1");
++    // make sure they gave us a ceph monitor address or conf file
++    LOG.info("initialize:Ceph initialization arguments: " + arguments);
++    if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
++        && (arguments.indexOf("-c") == -1)) {
++      LOG.fatal("initialize:You need to specify a Ceph monitor address.");
++      throw new IOException(
++          "You must specify a Ceph monitor address or config file!");
++    }
++    // Initialize the client
++    if (!ceph.ceph_initializeClient(arguments,
++        conf.getInt("fs.ceph.blockSize", 1 << 26))) {
++      LOG.fatal("initialize:Ceph initialization failed!");
++      throw new IOException("Ceph initialization failed!");
++    }
++    LOG.info("initialize:Ceph initialized client. Setting cwd to /");
++    ceph.ceph_setcwd("/");
++    LOG.debug("initialize:exit");
++
++    this.workingDir = getHomeDirectory();
 +  }
 +
 +  /**
 +   * Close down the CephFileSystem. Runs the base-class close method
 +   * and then kills the Ceph client itself.
-+   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
 +  public void close() throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("close:enter", ceph.DEBUG);
-+    super.close();//this method does stuff, make sure it's run!
-+              ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
++    LOG.debug("close:enter");
++    super.close(); // this method does stuff, make sure it's run!
++    LOG.trace("close: Calling ceph_kill_client from Java");
 +    ceph.ceph_kill_client();
-+    ceph.debug("close:exit", ceph.DEBUG);
++    LOG.debug("close:exit");
 +  }
 +
 +  /**
@@ -310,26 +952,32 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param progress The Progressable to report progress to.
 +   * Reporting is limited but exists.
 +   * @return An FSDataOutputStream that connects to the file on Ceph.
-+   * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
++   * @throws IOException If the file cannot be found or appended to.
 +   */
-+  public FSDataOutputStream append (Path file, int bufferSize,
-+                                                                                                                                              Progressable progress) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
++  public FSDataOutputStream append(Path file, int bufferSize,
++      Progressable progress) throws IOException {
++    LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
 +    Path abs_path = makeAbsolute(file);
-+    if (progress!=null) progress.progress();
-+              ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
-+    int fd = ceph.ceph_open_for_append(abs_path.toString());
-+              ceph.debug("append: Returned to Java", ceph.TRACE);
-+    if (progress!=null) progress.progress();
-+    if( fd < 0 ) { //error in open
-+      throw new IOException("append: Open for append failed on path \"" +
-+                                                                                                              abs_path.toString() + "\"");
-+    }
-+    CephOutputStream cephOStream = new CephOutputStream(getConf(),
-+                                                                                                                                                                                                                              ceph, fd, bufferSize);
-+    ceph.debug("append:exit", ceph.DEBUG);
++
++    if (progress != null) {
++      progress.progress();
++    }
++    LOG.trace("append: Entering ceph_open_for_append from Java");
++    int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
++
++    LOG.trace("append: Returned to Java");
++    if (progress != null) {
++      progress.progress();
++    }
++    if (fd < 0) { // error in open
++      throw new IOException(
++          "append: Open for append failed on path \"" + abs_path.toString()
++          + "\"");
++    }
++    CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
++        bufferSize);
++
++    LOG.debug("append:exit");
 +    return new FSDataOutputStream(cephOStream, statistics);
 +  }
 +
@@ -338,29 +986,28 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @return the directory Path
 +   */
 +  public Path getWorkingDirectory() {
-+    if (!initialized) return null;
-+    ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
-+              String cwd = ceph.ceph_getcwd();
-+    ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
-+    return new Path(fs_default_name + ceph.ceph_getcwd());
++    return workingDir;
 +  }
 +
 +  /**
 +   * Set the current working directory for the given file system. All relative
-+   * paths will be resolved relative to it. You need to have initialized the
-+   * filesystem prior to calling this method.
++   * paths will be resolved relative to it.
 +   *
 +   * @param dir The directory to change to.
 +   */
 +  @Override
-+      public void setWorkingDirectory(Path dir) {
-+    if (!initialized) return;
-+    ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
-+    Path abs_path = makeAbsolute(dir);
-+    ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
-+    if (!ceph.ceph_setcwd(abs_path.toString()))
-+      ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
-+    ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
++  public void setWorkingDirectory(Path dir) {
++    workingDir = makeAbsolute(dir);
++  }
++
++  /**
++   * Return only the path component from a potentially fully qualified path.
++   */
++  private String getCephPath(Path path) {
++    if (!path.isAbsolute()) {
++      throw new IllegalArgumentException("Path must be absolute: " + path);
++    }
++    return path.toUri().getPath();
 +  }
 +
 +  /**
@@ -368,25 +1015,22 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Overriden because it's moderately faster than the generic implementation.
 +   * @param path The file to check existence on.
 +   * @return true if the file exists, false otherwise.
-+   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+      public boolean exists(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("exists:enter with path " + path, ceph.DEBUG);
++  public boolean exists(Path path) throws IOException {
++    LOG.debug("exists:enter with path " + path);
 +    boolean result;
 +    Path abs_path = makeAbsolute(path);
++
 +    if (abs_path.equals(root)) {
 +      result = true;
++    } else {
++      LOG.trace(
++          "exists:Calling ceph_exists from Java on path " + abs_path.toString());
++      result = ceph.ceph_exists(getCephPath(abs_path));
++      LOG.trace("exists:Returned from ceph_exists to Java");
 +    }
-+    else {
-+      ceph.debug("exists:Calling ceph_exists from Java on path "
-+                                                                                      + abs_path.toString(), ceph.TRACE);
-+      result =  ceph.ceph_exists(abs_path.toString());
-+      ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
-+    }
-+    ceph.debug("exists:exit with value " + result, ceph.DEBUG);
++    LOG.debug("exists:exit with value " + result);
 +    return result;
 +  }
 +
@@ -396,28 +1040,27 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param path The directory path to create
 +   * @param perms The permissions to apply to the created directories.
 +   * @return true if successful, false otherwise
-+   * @throws IOException if initialize() hasn't been called or the path
-+       *  is a child of a file.
++   * @throws IOException if the path is a child of a file.
 +   */
-+      @Override
++  @Override
 +  public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
++    LOG.debug("mkdirs:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
-+    ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
-+    int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
++
++    LOG.trace("mkdirs:calling ceph_mkdirs from Java");
++    int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
++
 +    if (result != 0) {
-+                      ceph.debug("mkdirs: make directory " + abs_path
-+                                                               + "Failing with result " + result, ceph.WARN);
-+                      if (ceph.ENOTDIR == result)
-+                              throw new FileAlreadyExistsException("Parent path is not a directory");
++      LOG.warn(
++          "mkdirs: make directory " + abs_path + "Failing with result " + result);
++      if (-ceph.ENOTDIR == result) {
++        throw new IOException("Parent path is not a directory");
++      }
 +      return false;
-+              }
-+    else {
-+                      ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
-+                      return true;
-+              }
++    } else {
++      LOG.debug("mkdirs:exiting succesfully");
++      return true;
++    }
 +  }
 +
 +  /**
@@ -425,49 +1068,20 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * generic implementation.
 +   * @param path The path to check.
 +   * @return true if the path is definitely a file, false otherwise.
-+   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+      public boolean isFile(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
++  public boolean isFile(Path path) throws IOException {
++    LOG.debug("isFile:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
 +    boolean result;
-+    if (abs_path.equals(root)) {
-+      result =  false;
-+    }
-+    else {
-+                      ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
-+      result = ceph.ceph_isfile(abs_path.toString());
-+    }
-+    ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
-+    return result;
-+  }
 +
-+  /**
-+   * Check if a path is a directory. This is moderately faster than
-+   * the generic implementation.
-+   * @param path The path to check
-+   * @return true if the path is definitely a directory, false otherwise.
-+   * @throws IOException if initialize() hasn't been called.
-+   */
-+  @Override
-+      public boolean isDirectory(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
-+    Path abs_path = makeAbsolute(path);
-+    boolean result;
 +    if (abs_path.equals(root)) {
-+      result = true;
++      result = false;
++    } else {
++      LOG.trace("isFile:entering ceph_isfile from Java");
++      result = ceph.ceph_isfile(getCephPath(abs_path));
 +    }
-+    else {
-+      ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
-+      result = ceph.ceph_isdirectory(abs_path.toString());
-+      ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
-+    }
-+    ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG);
++    LOG.debug("isFile:exit with result " + result);
 +    return result;
 +  }
 +
@@ -476,36 +1090,30 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Ceph's support for these is a bit different than HDFS'.
 +   * @param path The path to stat.
 +   * @return FileStatus object containing the stat information.
-+   * @throws IOException if initialize() hasn't been called
 +   * @throws FileNotFoundException if the path could not be resolved.
 +   */
 +  public FileStatus getFileStatus(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
++    LOG.debug("getFileStatus:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
-+    //sadly, Ceph doesn't really do uids/gids just yet, but
-+    //everything else is filled
++    // sadly, Ceph doesn't really do uids/gids just yet, but
++    // everything else is filled
 +    FileStatus status;
 +    Stat lstat = new Stat();
-+              ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
-+    if(ceph.ceph_stat(abs_path.toString(), lstat)) {
++
++    LOG.trace("getFileStatus: calling ceph_stat from Java");
++    if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
 +      status = new FileStatus(lstat.size, lstat.is_dir,
-+                                                                                                                      ceph.ceph_replication(abs_path.toString()),
-+                                                                                                                      lstat.block_size,
-+                                                                                                                      lstat.mod_time,
-+                                                                                                                      lstat.access_time,
-+                                                                                                                      new FsPermission((short)lstat.mode),
-+                                                                                                                      null,
-+                                                                                                                      null,
-+                                                                                                                      new Path(fs_default_name+abs_path.toString()));
-+    }
-+    else { //fail out
-+                      throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
-+                                                                                                                                                      + path + " does not exist or could not be accessed");
++          ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
++          lstat.mod_time, lstat.access_time,
++          new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
++          path.makeQualified(this));
++    } else { // fail out
++      throw new FileNotFoundException(
++          "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
++          + " does not exist or could not be accessed");
 +    }
 +
-+    ceph.debug("getFileStatus:exit", ceph.DEBUG);
++    LOG.debug("getFileStatus:exit");
 +    return status;
 +  }
 +
@@ -513,39 +1121,39 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Get the FileStatus for each listing in a directory.
 +   * @param path The directory to get listings from.
 +   * @return FileStatus[] containing one FileStatus for each directory listing;
-+   * null if path is not a directory.
-+   * @throws IOException if initialize() hasn't been called.
-+   * @throws FileNotFoundException if the input path can't be found.
++   *         null if path does not exist.
 +   */
 +  public FileStatus[] listStatus(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("listStatus:enter with path " + path, ceph.WARN);
++    LOG.debug("listStatus:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
 +    Path[] paths = listPaths(abs_path);
++
 +    if (paths != null) {
 +      FileStatus[] statuses = new FileStatus[paths.length];
++
 +      for (int i = 0; i < paths.length; ++i) {
-+                              statuses[i] = getFileStatus(paths[i]);
++        statuses[i] = getFileStatus(paths[i]);
 +      }
-+      ceph.debug("listStatus:exit", ceph.DEBUG);
++      LOG.debug("listStatus:exit");
 +      return statuses;
 +    }
-+    if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
-+    //which means that the input wasn't a directory, so throw an Exception if it's not a file
-+    return null; //or return null if it's a file
++
++    if (isFile(path)) {
++      return new FileStatus[] { getFileStatus(path) };
++    }
++
++    return null;
 +  }
 +
 +  @Override
 +  public void setPermission(Path p, FsPermission permission) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+              ceph.debug("setPermission:enter with path " + p
-+                                      + " and permissions " + permission, ceph.DEBUG);
++    LOG.debug(
++        "setPermission:enter with path " + p + " and permissions " + permission);
 +    Path abs_path = makeAbsolute(p);
-+              ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
-+    ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
-+              ceph.debug("setPermission:exit", ceph.DEBUG);
++
++    LOG.trace("setPermission:calling ceph_setpermission from Java");
++    ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
++    LOG.debug("setPermission:exit");
 +  }
 +
 +  /**
@@ -554,28 +1162,31 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param mtime Set modification time in number of millis since Jan 1, 1970.
 +   * @param atime Set access time in number of millis since Jan 1, 1970.
 +   */
-+      @Override
-+      public void setTimes(Path p, long mtime, long atime) throws IOException {
-+              if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+              ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
-+                                      " atime:" + atime, ceph.DEBUG);
-+              Path abs_path = makeAbsolute(p);
-+              ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
-+              int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
-+              if (r<0) throw new IOException ("Failed to set times on path "
-+                                                                                                                                              + abs_path.toString() + " Error code: " + r);
-+              ceph.debug("setTimes:exit", ceph.DEBUG);
-+      }
++  @Override
++  public void setTimes(Path p, long mtime, long atime) throws IOException {
++    LOG.debug(
++        "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
++    Path abs_path = makeAbsolute(p);
++
++    LOG.trace("setTimes:calling ceph_setTimes from Java");
++    int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
++
++    if (r < 0) {
++      throw new IOException(
++          "Failed to set times on path " + abs_path.toString() + " Error code: "
++          + r);
++    }
++    LOG.debug("setTimes:exit");
++  }
 +
 +  /**
 +   * Create a new file and open an FSDataOutputStream that's connected to it.
 +   * @param path The file to create.
 +   * @param permission The permissions to apply to the file.
-+   * @param flag If CreateFlag.OVERWRITE, overwrite any existing
-+   * file with this name; otherwise don't.
++   * @param overwrite If true, overwrite any existing file with
++       * this name; otherwise don't.
 +   * @param bufferSize Ceph does internal buffering, but you can buffer
-+       *   in the Java code too if you like.
++   *   in the Java code too if you like.
 +   * @param replication Ignored by Ceph. This can be
 +   * configured via Ceph configuration.
 +   * @param blockSize Ignored by Ceph. You can set client-wide block sizes
@@ -583,146 +1194,189 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param progress A Progressable to report back to.
 +   * Reporting is limited but exists.
 +   * @return An FSDataOutputStream pointing to the created file.
-+   * @throws IOException if initialize() hasn't been called, or the path is an
++   * @throws IOException if the path is an
 +   * existing directory, or the path exists but overwrite is false, or there is a
 +   * failure in attempting to open for append with Ceph.
 +   */
 +  public FSDataOutputStream create(Path path,
-+                                                                                                                                       FsPermission permission,
-+                                                                                                                                       EnumSet<CreateFlag> flag,
-+                                                                                                                                       //boolean overwrite,
-+                                                                                                                                       int bufferSize,
-+                                                                                                                                       short replication,
-+                                                                                                                                       long blockSize,
-+                                                                                                                                       Progressable progress
-+                                                                                                                                       ) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("create:enter with path " + path, ceph.DEBUG);
++      FsPermission permission,
++      boolean overwrite,
++      int bufferSize,
++      short replication,
++      long blockSize,
++      Progressable progress) throws IOException {
++    LOG.debug("create:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
-+    if (progress!=null) progress.progress();
++
++    if (progress != null) {
++      progress.progress();
++    }
 +    // We ignore replication since that's not configurable here, and
 +    // progress reporting is quite limited.
-+    // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE;
-+    // throw an exception if !CreateFlag.OVERWRITE.
++    // Required semantics: if the file exists, overwrite if 'overwrite' is set;
++    // otherwise, throw an exception
 +
 +    // Step 1: existence test
 +    boolean exists = exists(abs_path);
++
 +    if (exists) {
-+      if(isDirectory(abs_path))
-+                              throw new IOException("create: Cannot overwrite existing directory \""
-+                                                                                                                      + path.toString() + "\" with a file");
-+      //if (!overwrite)
-+      if (!flag.contains(CreateFlag.OVERWRITE))
-+                              throw new IOException("createRaw: Cannot open existing file \"" 
-+                                                                                                                      + abs_path.toString() 
-+                                                                                                                      + "\" for writing without overwrite flag");
++      if (getFileStatus(abs_path).isDir()) {
++        throw new IOException(
++            "create: Cannot overwrite existing directory \"" + path.toString()
++            + "\" with a file");
++      }
++      if (!overwrite) {
++        throw new IOException(
++            "createRaw: Cannot open existing file \"" + abs_path.toString()
++            + "\" for writing without overwrite flag");
++      }
 +    }
 +
-+    if (progress!=null) progress.progress();
++    if (progress != null) {
++      progress.progress();
++    }
 +
 +    // Step 2: create any nonexistent directories in the path
 +    if (!exists) {
 +      Path parent = abs_path.getParent();
++
 +      if (parent != null) { // if parent is root, we're done
-+                              int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
-+                              if (!(r==0 || r==-ceph.EEXIST))
-+                                      throw new IOException ("Error creating parent directory; code: " + r);
++        int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
++
++        if (!(r == 0 || r == -ceph.EEXIST)) {
++          throw new IOException("Error creating parent directory; code: " + r);
++        }
++      }
++      if (progress != null) {
++        progress.progress();
 +      }
-+      if (progress!=null) progress.progress();
 +    }
 +    // Step 3: open the file
-+    ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
-+    int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
-+    if (progress!=null) progress.progress();
-+    ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
++    LOG.trace("calling ceph_open_for_overwrite from Java");
++    int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
++        (int) permission.toShort());
++
++    if (progress != null) {
++      progress.progress();
++    }
++    LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
 +    if (fh < 0) {
-+      throw new IOException("create: Open for overwrite failed on path \"" + 
-+                                                                                                              path.toString() + "\"");
++      throw new IOException(
++          "create: Open for overwrite failed on path \"" + path.toString()
++          + "\"");
 +    }
-+      
++
 +    // Step 4: create the stream
-+    OutputStream cephOStream = new CephOutputStream(getConf(),
-+                                                                                                                                                                                                              ceph, fh, bufferSize);
-+    ceph.debug("create:exit", ceph.DEBUG);
++    OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
++        bufferSize);
++
++    LOG.debug("create:exit");
 +    return new FSDataOutputStream(cephOStream, statistics);
-+      }
++  }
 +
 +  /**
 +   * Open a Ceph file and attach the file handle to an FSDataInputStream.
 +   * @param path The file to open
 +   * @param bufferSize Ceph does internal buffering; but you can buffer in
-+       *   the Java code too if you like.
++   *   the Java code too if you like.
 +   * @return FSDataInputStream reading from the given path.
-+   * @throws IOException if initialize() hasn't been called, the path DNE or is a
++   * @throws IOException if the path DNE or is a
 +   * directory, or there is an error getting data to set up the FSDataInputStream.
 +   */
 +  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("open:enter with path " + path, ceph.DEBUG);
++    LOG.debug("open:enter with path " + path);
 +    Path abs_path = makeAbsolute(path);
-+    
-+    int fh = ceph.ceph_open_for_read(abs_path.toString());
-+    if (fh < 0) { //uh-oh, something's bad!
-+      if (fh == -ceph.ENOENT) //well that was a stupid open
-+                              throw new IOException("open:  absolute path \""  + abs_path.toString()
-+                                                                                                                      + "\" does not exist");
-+      else //hrm...the file exists but we can't open it :(
-+                              throw new IOException("open: Failed to open file " + abs_path.toString());
++
++    int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
++
++    if (fh < 0) { // uh-oh, something's bad!
++      if (fh == -ceph.ENOENT) { // well that was a stupid open
++        throw new IOException(
++            "open:  absolute path \"" + abs_path.toString()
++            + "\" does not exist");
++      } else { // hrm...the file exists but we can't open it :(
++        throw new IOException("open: Failed to open file " + abs_path.toString());
++      }
 +    }
 +
-+    if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
-+      //but that doesn't mean you should in Hadoop!
++    if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
++      // but that doesn't mean you should in Hadoop!
 +      ceph.ceph_close(fh);
-+      throw new IOException("open:  absolute path \""  + abs_path.toString()
-+                                                                                                              + "\" is a directory!");
++      throw new IOException(
++          "open:  absolute path \"" + abs_path.toString() + "\" is a directory!");
 +    }
 +    Stat lstat = new Stat();
-+              ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
-+    ceph.ceph_stat(abs_path.toString(), lstat);
-+              ceph.debug("open:returned to Java", ceph.TRACE);
++
++    LOG.trace("open:calling ceph_stat from Java");
++    ceph.ceph_stat(getCephPath(abs_path), lstat);
++    LOG.trace("open:returned to Java");
 +    long size = lstat.size;
++
 +    if (size < 0) {
-+      throw new IOException("Failed to get file size for file " + abs_path.toString() + 
-+                                                                                                              " but succeeded in opening file. Something bizarre is going on.");
++      throw new IOException(
++          "Failed to get file size for file " + abs_path.toString()
++          + " but succeeded in opening file. Something bizarre is going on.");
 +    }
-+    FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
-+                                                                                                                                                                                                              fh, size, bufferSize);
-+    ceph.debug("open:exit", ceph.DEBUG);
++    FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
++        bufferSize);
++
++    LOG.debug("open:exit");
 +    return new FSDataInputStream(cephIStream);
-+      }
++  }
 +
 +  /**
 +   * Rename a file or directory.
 +   * @param src The current path of the file/directory
 +   * @param dst The new name for the path.
 +   * @return true if the rename succeeded, false otherwise.
-+   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+      public boolean rename(Path src, Path dst) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
++  public boolean rename(Path src, Path dst) throws IOException {
++    LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
 +    Path abs_src = makeAbsolute(src);
 +    Path abs_dst = makeAbsolute(dst);
-+    ceph.debug("calling ceph_rename from Java", ceph.TRACE);
-+    boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
-+              if (!result) {
-+                      if (isDirectory(abs_dst)) { //move the srcdir into destdir
-+                              ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
-+                              Path new_dst = new Path(abs_dst, abs_src.getName());
-+                              result = rename(abs_src, new_dst);
-+                              ceph.debug("attempt to move " + abs_src.toString()
-+                                                                       + " to " + new_dst.toString()
-+                                                                       + "has result:" + result, ceph.NOLOG);
-+                      }
-+              }
-+    ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
++
++    LOG.trace("calling ceph_rename from Java");
++    boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
++
++    if (!result) {
++      boolean isDir = false;
++      try {
++        isDir = getFileStatus(abs_dst).isDir();
++      } catch (FileNotFoundException e) {}
++      if (isDir) { // move the srcdir into destdir
++        LOG.debug("ceph_rename failed but dst is a directory!");
++        Path new_dst = new Path(abs_dst, abs_src.getName());
++
++        result = rename(abs_src, new_dst);
++        LOG.debug(
++            "attempt to move " + abs_src.toString() + " to "
++            + new_dst.toString() + "has result:" + result);
++      }
++    }
++    LOG.debug("rename:exit with result: " + result);
 +    return result;
 +  }
 +
++  /*
++   * Attempt to convert an IP into its hostname
++   */
++  private String[] ips2Hosts(String[] ips) {
++    ArrayList<String> hosts = new ArrayList<String>();
++    for (String ip : ips) {
++      try {
++        String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
++        if (host.charAt(host.length()-1) == '.') {
++          host = host.substring(0, host.length()-1);
++        }
++        hosts.add(host); /* append */
++      } catch (Exception e) {
++        LOG.error("reverseDns ["+ip+"] failed: "+ e);
++      }
++    }
++    return hosts.toArray(new String[hosts.size()]);
++  }
++
 +  /**
 +   * Get a BlockLocation object for each block in a file.
 +   *
@@ -735,80 +1389,37 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param len The amount of the file past the offset you are interested in.
 +   * @return A BlockLocation[] where each object corresponds to a block within
 +   * the given range.
-+   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+      public BlockLocation[] getFileBlockLocations(FileStatus file,
-+                                                                                                                                                                                               long start, long len) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+              ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
-+                                      ", start pos " + start + ", length " + len, ceph.DEBUG);
-+    //sanitize and get the filehandle
++  public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 +    Path abs_path = makeAbsolute(file.getPath());
-+              ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
-+    int fh = ceph.ceph_open_for_read(abs_path.toString());
-+              ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
-+                                      + fh, ceph.TRACE);
++
++    int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
 +    if (fh < 0) {
-+                      ceph.debug("getFileBlockLocations:got error " + fh +
-+                                              ", exiting and returning null!", ceph.ERROR);
++      LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
 +      return null;
 +    }
-+    //get the block size
-+              ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
-+    long blockSize = ceph.ceph_getblocksize(abs_path.toString());
-+              ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
-+    BlockLocation[] locations =
-+      new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
-+              long offset;
++
++    long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
++    BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
++
 +    for (int i = 0; i < locations.length; ++i) {
-+                      offset = start + i*blockSize;
-+                      ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
-+                                              + fh + " and offset " + offset, ceph.TRACE);
-+      String host = ceph.ceph_hosts(fh, offset);
-+                      ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
-+                                              + host, ceph.TRACE);
-+      String[] hostArray = new String[1];
-+      hostArray[0] = host;
-+      locations[i] = new BlockLocation(hostArray, hostArray,
-+                                                                                                                                                       start+i*blockSize-(start % blockSize),
-+                                                                                                                                                       blockSize);
-+    }
-+              ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
-+                                      + fh, ceph.TRACE);
++      long offset = start + i * blockSize;
++      long blockStart = start + i * blockSize - (start % blockSize);
++      String ips[] = ceph.ceph_hosts(fh, offset);
++      String hosts[] = ips2Hosts(ips);
++      locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
++      LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
++    }
++
 +    ceph.ceph_close(fh);
-+              ceph.debug("getFileBlockLocations:return with " + locations.length
-+                                      + " locations", ceph.DEBUG);
 +    return locations;
 +  }
-+  
-+  /**
-+   * Get usage statistics on the Ceph filesystem.
-+   * @param path A path to the partition you're interested in.
-+   * Ceph doesn't partition, so this is ignored.
-+   * @return FsStatus reporting capacity, usage, and remaining space.
-+   * @throws IOException if initialize() hasn't been called, or the
-+   * stat somehow fails.
-+   */
-+  @Override
-+      public FsStatus getStatus (Path path) throws IOException {
-+    if (!initialized) throw new IOException("You have to initialize the "
-+                                                                                                                                                                              + " CephFileSystem before calling other methods.");
-+    ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
-+    Path abs_path = makeAbsolute(path);
-+    
-+    //currently(Ceph .16) Ceph actually ignores the path
-+    //but we still pass it in; if Ceph stops ignoring we may need more
-+    //error-checking code.
-+    CephStat ceph_stat = new CephStat();
-+              ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
-+    int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
-+    if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
-+    ceph.debug("getStatus:exit successfully", ceph.DEBUG);
-+    return new FsStatus(ceph_stat.capacity,
-+                                                                                              ceph_stat.used, ceph_stat.remaining);
-+  }
++
++  @Deprecated
++      public boolean delete(Path path) throws IOException {
++              return delete(path, false);
++      }
 +
 +  /**
 +   * Delete the given path, and optionally its children.
@@ -817,64 +1428,67 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * delete will throw an IOException. If path is a file this is ignored.
 +   * @return true if the delete succeeded, false otherwise (including if
 +   * path doesn't exist).
-+   * @throws IOException if initialize() hasn't been called,
-+   * or you attempt to non-recursively delete a directory,
++   * @throws IOException if you attempt to non-recursively delete a directory,
 +   * or you attempt to delete the root directory.
 +   */
 +  public boolean delete(Path path, boolean recursive) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the "
-+                                                                                                                                                                               +"CephFileSystem before calling other methods.");
-+    ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
-+                                      ceph.DEBUG);
++    LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
 +    Path abs_path = makeAbsolute(path);
-+    
++
 +    // sanity check
-+    if (abs_path.equals(root))
++    if (abs_path.equals(root)) {
 +      throw new IOException("Error: deleting the root directory is a Bad Idea.");
-+    if (!exists(abs_path)) return false;
++    }
++    if (!exists(abs_path)) {
++      return false;
++    }
 +
 +    // if the path is a file, try to delete it.
 +    if (isFile(abs_path)) {
-+                      ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
-+                                              ceph.TRACE);
-+      boolean result = ceph.ceph_unlink(abs_path.toString());
-+      if(!result)
-+                              ceph.debug("delete: failed to delete file \"" +
-+                                                                                              abs_path.toString() + "\".", ceph.ERROR);
-+      ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
++      LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
++      boolean result = ceph.ceph_unlink(getCephPath(abs_path));
++
++      if (!result) {
++        LOG.error(
++            "delete: failed to delete file \"" + abs_path.toString() + "\".");
++      }
++      LOG.debug("delete:exit with success=" + result);
 +      return result;
 +    }
 +
 +    /* The path is a directory, so recursively try to delete its contents,
-+       and then delete the directory. */
-+    //get the entries; listPaths will remove . and .. for us
++     and then delete the directory. */
++    // get the entries; listPaths will remove . and .. for us
 +    Path[] contents = listPaths(abs_path);
++
 +    if (contents == null) {
-+      ceph.debug("delete: Failed to read contents of directory \"" +
-+                                              abs_path.toString() +
-+                                              "\" while trying to delete it, BAILING", ceph.ERROR);
++      LOG.error(
++          "delete: Failed to read contents of directory \""
++              + abs_path.toString() + "\" while trying to delete it, BAILING");
 +      return false;
 +    }
 +    if (!recursive && contents.length > 0) {
 +      throw new IOException("Directories must be deleted recursively!");
 +    }
 +    // delete the entries
-+              ceph.debug("delete: recursively calling delete on contents of "
-+                                      + abs_path, ceph.DEBUG);
++    LOG.debug("delete: recursively calling delete on contents of " + abs_path);
 +    for (Path p : contents) {
 +      if (!delete(p, true)) {
-+                              ceph.debug("delete: Failed to delete file \"" + 
-+                                                                                              p.toString() + "\" while recursively deleting \""
-+                                                                                              + abs_path.toString() + "\", BAILING", ceph.ERROR );
-+                              return false;
++        LOG.error(
++            "delete: Failed to delete file \"" + p.toString()
++            + "\" while recursively deleting \"" + abs_path.toString()
++            + "\", BAILING");
++        return false;
 +      }
 +    }
-+    //if we've come this far it's a now-empty directory, so delete it!
-+    boolean result = ceph.ceph_rmdir(abs_path.toString());
-+    if (!result)
-+      ceph.debug("delete: failed to delete \"" + abs_path.toString()
-+                                              + "\", BAILING", ceph.ERROR);
-+    ceph.debug("delete:exit", ceph.DEBUG);
++    // if we've come this far it's a now-empty directory, so delete it!
++    boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
++
++    if (!result) {
++      LOG.error(
++          "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
++    }
++    LOG.debug("delete:exit");
 +    return result;
 +  }
 +
@@ -884,860 +1498,86 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * by a separate Ceph configuration.
 +   */
 +  @Override
-+      public short getDefaultReplication() {
++  public short getDefaultReplication() {
 +    return 1;
 +  }
 +
 +  /**
 +   * Get the default block size.
-+   * @return the default block size, in bytes, as a long.
-+   */
-+  @Override
-+      public long getDefaultBlockSize() {
-+    return getConf().getInt("fs.ceph.blockSize", 1<<26);
-+  }
-+
-+  // Makes a Path absolute. In a cheap, dirty hack, we're
-+  // also going to strip off any fs_default_name prefix we see. 
-+  private Path makeAbsolute(Path path) {
-+    ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
-+    if (path == null) return new Path("/");
-+    // first, check for the prefix
-+    if (path.toString().startsWith(fs_default_name)) {          
-+      Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
-+      ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
-+      return stripped_path;
-+    }
-+
-+    if (path.isAbsolute()) {
-+      ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG);
-+      return path;
-+    }
-+    Path new_path = new Path(ceph.ceph_getcwd(), path);
-+    ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
-+    return new_path;
-+  }
-+ 
-+  private Path[] listPaths(Path path) throws IOException {
-+    ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
-+    String dirlist[];
-+
-+    Path abs_path = makeAbsolute(path);
-+
-+    // If it's a directory, get the listing. Otherwise, complain and give up.
-+    ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG);
-+    dirlist = ceph.ceph_getdir(abs_path.toString());
-+    ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG);
-+
-+    if (dirlist == null) {
-+      return null;
-+    }
-+    
-+    // convert the strings to Paths
-+    Path[] paths = new Path[dirlist.length];
-+    for (int i = 0; i < dirlist.length; ++i) {
-+      ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
-+                                                                                      dirlist[i] + "\"", ceph.TRACE);
-+      // convert each listing to an absolute path
-+      Path raw_path = new Path(dirlist[i]);
-+      if (raw_path.isAbsolute())
-+                              paths[i] = raw_path;
-+      else
-+                              paths[i] = new Path(abs_path, raw_path);
-+    }
-+    ceph.debug("listPaths:exit", ceph.NOLOG);
-+    return paths;
-+  }
-+
-+  
-+
-+  static class Stat {
-+    public long size;
-+    public boolean is_dir;
-+    public long block_size;
-+    public long mod_time;
-+    public long access_time;
-+    public int mode;
-+
-+    public Stat(){}
-+  }
-+
-+  static class CephStat {
-+    public long capacity;
-+    public long used;
-+    public long remaining;
-+
-+    public CephStat() {}
-+  }
-+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFS.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFS.java     (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFS.java     (revision 0)
-@@ -0,0 +1,272 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ * 
-+ * Abstract base class for communicating with a Ceph filesystem and its
-+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
-+ * As only the Ceph package should be using this directly, all methods
-+ * are protected.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.commons.logging.Log;
-+
-+abstract class CephFS {
-+
-+      protected static final int FATAL = 0;
-+      protected static final int ERROR = 1;
-+      protected static final int WARN = 2;
-+      protected static final int INFO = 3;
-+      protected static final int DEBUG = 4;
-+      protected static final int TRACE = 5;
-+      protected static final int NOLOG = 6;
-+
-+      protected static final int ENOTDIR = 20;
-+  protected static final int EEXIST = 17;
-+  protected static final int ENOENT = 2;
-+
-+      private boolean debug = false;
-+      private Log LOG;
-+
-+      public CephFS(Configuration conf, Log log) {
-+      debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-+                      LOG = log;
-+      }
-+
-+      /*
-+       * Performs any necessary setup to allow general use of the filesystem.
-+       * Inputs:
-+       *  String argsuments -- a command-line style input of Ceph config params
-+       *  int block_size -- the size in bytes to use for blocks
-+       * Returns: true on success, false otherwise
-+       */
-+      abstract protected boolean ceph_initializeClient(String arguments, int block_size);
-+      
-+      /*
-+       * Returns the current working directory (absolute) as a String
-+       */
-+      abstract protected String ceph_getcwd();
-+      /*
-+       * Changes the working directory.
-+       * Inputs:
-+       *  String path: The path (relative or absolute) to switch to
-+       * Returns: true on success, false otherwise.
-+       */
-+      abstract protected boolean ceph_setcwd(String path);
-+      /*
-+       * Given a path to a directory, removes the directory if empty.
-+       * Inputs:
-+       *  jstring j_path: The path (relative or absolute) to the directory
-+       * Returns: true on successful delete; false otherwise
-+       */
-+  abstract protected boolean ceph_rmdir(String path);
-+      /*
-+       * Given a path, unlinks it.
-+       * Inputs:
-+       *  String path: The path (relative or absolute) to the file or empty dir
-+       * Returns: true if the unlink occurred, false otherwise.
-+       */
-+  abstract protected boolean ceph_unlink(String path);
-+      /*
-+       * Changes a given path name to a new name, assuming new_path doesn't exist.
-+       * Inputs:
-+       *  jstring j_from: The path whose name you want to change.
-+       *  jstring j_to: The new name for the path.
-+       * Returns: true if the rename occurred, false otherwise
-+       */
-+  abstract protected boolean ceph_rename(String old_path, String new_path);
-+      /*
-+       * Returns true if it the input path exists, false
-+       * if it does not or there is an unexpected failure.
-+       */
-+  abstract protected boolean ceph_exists(String path);
-+      /*
-+       * Get the block size for a given path.
-+       * Input:
-+       *  String path: The path (relative or absolute) you want
-+       *  the block size for.
-+       * Returns: block size if the path exists, otherwise a negative number
-+       *  corresponding to the standard C++ error codes (which are positive).
-+       */
-+  abstract protected long ceph_getblocksize(String path);
-+      /*
-+       * Returns true if the given path is a directory, false otherwise.
-+       */
-+  abstract protected boolean ceph_isdirectory(String path);
-+      /*
-+       * Returns true if the given path is a file; false otherwise.
-+       */
-+  abstract protected boolean ceph_isfile(String path);
-+      /*
-+       * Get the contents of a given directory.
-+       * Inputs:
-+       *  String path: The path (relative or absolute) to the directory.
-+       * Returns: A Java String[] of the contents of the directory, or
-+       *  NULL if there is an error (ie, path is not a dir). This listing
-+       *  will not contain . or .. entries.
-+       */
-+  abstract protected String[] ceph_getdir(String path);
-+      /*
-+       * Create the specified directory and any required intermediate ones with the
-+       * given mode.
-+       */
-+  abstract protected int ceph_mkdirs(String path, int mode);
-+      /*
-+       * Open a file to append. If the file does not exist, it will be created.
-+       * Opening a dir is possible but may have bad results.
-+       * Inputs:
-+       *  String path: The path to open.
-+       * Returns: an int filehandle, or a number<0 if an error occurs.
-+       */
-+  abstract protected int ceph_open_for_append(String path);
-+      /*
-+       * Open a file for reading.
-+       * Opening a dir is possible but may have bad results.
-+       * Inputs:
-+       *  String path: The path to open.
-+       * Returns: an int filehandle, or a number<0 if an error occurs.
-+       */
-+  abstract protected int ceph_open_for_read(String path);
-+      /*
-+       * Opens a file for overwriting; creates it if necessary.
-+       * Opening a dir is possible but may have bad results.
-+       * Inputs:
-+       *  String path: The path to open.
-+       *  int mode: The mode to open with.
-+       * Returns: an int filehandle, or a number<0 if an error occurs.
-+       */
-+  abstract protected int ceph_open_for_overwrite(String path, int mode);
-+      /*
-+       * Closes the given file. Returns 0 on success, or a negative
-+       * error code otherwise.
-+       */
-+  abstract protected int ceph_close(int filehandle);
-+      /*
-+       * Change the mode on a path.
-+       * Inputs:
-+       *  String path: The path to change mode on.
-+       *  int mode: The mode to apply.
-+       * Returns: true if the mode is properly applied, false if there
-+       *  is any error.
-+       */
-+  abstract protected boolean ceph_setPermission(String path, int mode);
-+      /*
-+       * Closes the Ceph client. This should be called before shutting down
-+       * (multiple times is okay but redundant).
-+       */
-+  abstract protected boolean ceph_kill_client();
-+      /*
-+       * Get the statistics on a path returned in a custom format defined
-+       * in CephFileSystem.
-+       * Inputs:
-+       *  String path: The path to stat.
-+       *  Stat fill: The stat object to fill.
-+       * Returns: true if the stat is successful, false otherwise.
-+       */
-+  abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+      /*
-+       * Statfs a filesystem in a custom format defined in CephFileSystem.
-+       * Inputs:
-+       *  String path: A path on the filesystem that you wish to stat.
-+       *  CephStat fill: The CephStat object to fill.
-+       * Returns: 0 if successful and the CephStat is filled; a negative
-+       *  error code otherwise.
-+       */
-+  abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
-+      /*
-+       * Check how many times a path should be replicated (if it is
-+       * degraded it may not actually be replicated this often).
-+       * Inputs:
-+       *  String path: The path to check.
-+       * Returns: an int containing the number of times replicated.
-+       */
-+  abstract protected int ceph_replication(String path);
-+      /*
-+       * Find the IP address of the primary OSD for a given file and offset.
-+       * Inputs:
-+       *  int fh: The filehandle for the file.
-+       *  long offset: The offset to get the location of.
-+       * Returns: a String of the location as IP, or NULL if there is an error.
-+       */
-+  abstract protected String ceph_hosts(int fh, long offset);
-+      /*
-+       * Set the mtime and atime for a given path.
-+       * Inputs:
-+       *  String path: The path to set the times for.
-+       *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
-+       *  long atime: The atime to set, in millis since epoch (-1 to not set)
-+       * Returns: 0 if successful, an error code otherwise.
-+       */
-+  abstract protected int ceph_setTimes(String path, long mtime, long atime);
-+      /*
-+       * Get the current position in a file (as a long) of a given filehandle.
-+       * Returns: (long) current file position on success, or a
-+       *  negative error code on failure.
-+       */
-+  abstract protected long ceph_getpos(int fh);
-+      /*
-+       * Write the given buffer contents to the given filehandle.
-+       * Inputs:
-+       *  int fh: The filehandle to write to.
-+       *  byte[] buffer: The buffer to write from
-+       *  int buffer_offset: The position in the buffer to write from
-+       *  int length: The number of (sequential) bytes to write.
-+       * Returns: int, on success the number of bytes written, on failure
-+       *  a negative error code.
-+       */
-+  abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+      /*
-+       * Reads into the given byte array from the current position.
-+       * Inputs:
-+       *  int fh: the filehandle to read from
-+       *  byte[] buffer: the byte array to read into
-+       *  int buffer_offset: where in the buffer to start writing
-+       *  int length: how much to read.
-+       * There'd better be enough space in the buffer to write all
-+       * the data from the given offset!
-+       * Returns: the number of bytes read on success (as an int),
-+       *  or an error code otherwise.  */
-+  abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+      /*
-+       * Seeks to the given position in the given file.
-+       * Inputs:
-+       *  int fh: The filehandle to seek in.
-+       *  long pos: The position to seek to.
-+       * Returns: the new position (as a long) of the filehandle on success,
-+       *  or a negative error code on failure.         */
-+  abstract protected long ceph_seek_from_start(int fh, long pos);
-+    
-+      protected void debug(String statement, int priority) {
-+    if (debug) System.err.println(statement);
-+              switch(priority) {
-+              case FATAL: LOG.fatal(statement);
-+                      break;
-+              case ERROR: LOG.error(statement);
-+                      break;
-+              case WARN: LOG.warn(statement);
-+                      break;
-+              case INFO: LOG.info(statement);
-+                      break;
-+              case DEBUG: LOG.debug(statement);
-+                      break;
-+              case TRACE: LOG.trace(statement);
-+                      break;
-+              case NOLOG: break;
-+              default: break;
-+              }
-+  }
-+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFaker.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFaker.java  (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFaker.java  (revision 0)
-@@ -0,0 +1,480 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ * 
-+ * This uses the local Filesystem but pretends to be communicating
-+ * with a Ceph deployment, for unit testing the CephFileSystem.
-+ */
-+
-+package org.apache.hadoop.fs.ceph;
-+
-+import java.net.URI;
-+import java.util.Hashtable;
-+import java.io.Closeable;
-+import java.io.FileNotFoundException;
-+import java.io.IOException;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FileAlreadyExistsException;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.FsStatus;
-+import org.apache.hadoop.fs.FSDataInputStream;
-+import org.apache.hadoop.fs.FSDataOutputStream;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+
-+class CephFaker extends CephFS {
-+      
-+      FileSystem localFS;
-+      String localPrefix;
-+      int blockSize;
-+      Configuration conf;
-+      Hashtable<Integer, Object> files;
-+      Hashtable<Integer, String> filenames;
-+      int fileCount = 0;
-+      boolean initialized = false;
-+      
-+      public CephFaker(Configuration con, Log log) {
-+              super(con, log);
-+              conf = con;
-+              files = new Hashtable<Integer, Object>();
-+              filenames = new Hashtable<Integer, String>();
-+      }
-+      
-+      protected boolean ceph_initializeClient(String args, int block_size) {
-+              if (!initialized) {
-+                      //let's remember the default block_size
-+                      blockSize = block_size;
-+                      /* for a real Ceph deployment, this starts up the client, 
-+                       * sets debugging levels, etc. We just need to get the
-+                       * local FileSystem to use, and we'll ignore any
-+                       * command-line arguments. */
-+                      try {
-+                              localFS = FileSystem.getLocal(conf);
-+                              localFS.initialize(URI.create("file://localhost"), conf);
-+                              localFS.setVerifyChecksum(false);
-+                              String testDir = conf.get("hadoop.tmp.dir");
-+                              localPrefix = localFS.getWorkingDirectory().toString();
-+                              int testDirLoc = localPrefix.indexOf(testDir) - 1;
-+                              if (-2 == testDirLoc)
-+                                      testDirLoc = localPrefix.length();
-+                              localPrefix = localPrefix.substring(0, testDirLoc)
-+                                      + "/" + conf.get("hadoop.tmp.dir");
-+
-+                              localFS.setWorkingDirectory(new Path(localPrefix
-+                                                                                                                                                                               +"/user/"
-+                                                                                                                                                                               + System.getProperty("user.name")));
-+                              //I don't know why, but the unit tests expect the default
-+                              //working dir to be /user/username, so satisfy them!
-+                              //debug("localPrefix is " + localPrefix, INFO);
-+                      }
-+                      catch (IOException e) {
-+                              return false;
-+                      }
-+                      initialized = true;
-+              }
-+              return true;
-+      }
-+
-+      protected String ceph_getcwd() {
-+              return sanitize_path(localFS.getWorkingDirectory().toString());
-+      }
-+
-+      protected boolean ceph_setcwd(String path) {
-+              localFS.setWorkingDirectory(new Path(prepare_path(path)));
-+              return true;
-+      }
-+
-+      //the caller is responsible for ensuring empty dirs
-+      protected boolean ceph_rmdir(String pth) {
-+              Path path = new Path(prepare_path(pth));
-+              boolean ret = false;
-+              try {
-+                      if (localFS.listStatus(path).length <= 1) {
-+                              ret = localFS.delete(path, true);
-+                      }
-+              }
-+              catch (IOException e){ }
-+              return ret;
-+      }
-+
-+      //this needs to work on (empty) directories too
-+      protected boolean ceph_unlink(String path) {
-+              path = prepare_path(path);
-+              boolean ret = false;
-+              if (ceph_isdirectory(path)) {
-+                      ret = ceph_rmdir(path);
-+              }
-+              else {
-+                      try {
-+                              ret = localFS.delete(new Path(path), false);
-+                      }
-+                      catch (IOException e){ }
-+              }
-+              return ret;
-+      }
-+
-+      protected boolean ceph_rename(String oldName, String newName) {
-+              oldName = prepare_path(oldName);
-+              newName = prepare_path(newName);
-+              try {
-+                      Path parent = new Path(newName).getParent();
-+                      Path newPath = new Path(newName);
-+                      if (localFS.exists(parent) && !localFS.exists(newPath))
-+                              return localFS.rename(new Path(oldName), newPath);
-+                      return false;
-+              }
-+              catch (IOException e) { return false; }
-+      }
-+
-+      protected boolean ceph_exists(String path) {
-+              path = prepare_path(path);
-+              boolean ret = false;
-+              try {
-+                      ret = localFS.exists(new Path(path));
-+              }
-+              catch (IOException e){ }
-+              return ret;
-+      }
-+
-+      protected long ceph_getblocksize(String path) {
-+              path = prepare_path(path);
-+              try {
-+                      FileStatus status = localFS.getFileStatus(new Path(path));
-+                      return status.getBlockSize();
-+              }
-+              catch (FileNotFoundException e) {
-+                      return -CephFS.ENOENT;
-+              }
-+              catch (IOException e) {
-+                      return -1; //just fail generically
-+              }
-+      }
-+
-+      protected boolean ceph_isdirectory(String path) {
-+              path = prepare_path(path);
-+              try {
-+                      FileStatus status = localFS.getFileStatus(new Path(path));
-+                      return status.isDir();
-+              }
-+              catch (IOException e) { return false; }
-+      }
-+
-+      protected boolean ceph_isfile(String path) {
-+              path = prepare_path(path);
-+              boolean ret = false;
-+              try {
-+                      FileStatus status = localFS.getFileStatus(new Path(path));
-+                      ret = !status.isDir();
-+              }
-+              catch (Exception e) {}
-+              return ret;
-+      }
-+
-+      protected String[] ceph_getdir(String path) {
-+              path = prepare_path(path);
-+              if (!ceph_isdirectory(path)) {
-+                      return null;
-+              }
-+              try {
-+                      FileStatus[] stats = localFS.listStatus(new Path(path));
-+                      String[] names = new String[stats.length];
-+                      String name;
-+                      for (int i=0; i<stats.length; ++i) {
-+                              name = stats[i].getPath().toString();
-+                              names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
-+                      }
-+                      return names;
-+              }
-+              catch (IOException e) {}
-+              return null;
-+      }
-+
-+      protected int ceph_mkdirs(String path, int mode) {
-+              path = prepare_path(path);
-+              //debug("ceph_mkdirs on " + path, INFO);
-+              try {
-+                      if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
-+                              return 0;
-+              }
-+              catch (FileAlreadyExistsException fe) { return ENOTDIR; }
-+              catch (IOException e) {}
-+              if (ceph_isdirectory(path))
-+                      return -EEXIST; //apparently it already existed
-+              return -1;
-+      }
-+
-+      /* 
-+       * Unlike a real Ceph deployment, you can't do opens on a directory.
-+       * Since that has unpredictable behavior and you shouldn't do it anyway,
-+       * it's okay.
-+       */
-+      protected int ceph_open_for_append(String path) {
-+              path = prepare_path(path);
-+              FSDataOutputStream stream;
-+              try {
-+                      stream = localFS.append(new Path(path));
-+                      files.put(new Integer(fileCount), stream);
-+                      filenames.put(new Integer(fileCount), path);
-+                      return fileCount++;
-+              }
-+              catch (IOException e) { }
-+              return -1; // failure
-+      }
-+
-+      protected int ceph_open_for_read(String path) {
-+              path = prepare_path(path);
-+              FSDataInputStream stream;
-+              try {
-+                      stream = localFS.open(new Path(path));
-+                      files.put(new Integer(fileCount), stream);
-+                      filenames.put(new Integer(fileCount), path);
-+                      debug("ceph_open_for_read fh:" + fileCount
-+                                              + ", pathname:" + path, INFO);
-+                      return fileCount++;
-+              }
-+              catch (IOException e) { }
-+              return -1; //failure
-+      }
-+
-+      protected int ceph_open_for_overwrite(String path, int mode) {
-+              path = prepare_path(path);
-+              FSDataOutputStream stream;
-+              try {
-+                      stream = localFS.create(new Path(path));
-+                      files.put(new Integer(fileCount), stream);
-+                      filenames.put(new Integer(fileCount), path);
-+                      debug("ceph_open_for_overwrite fh:" + fileCount
-+                                              + ", pathname:" + path, INFO);
-+                      return fileCount++;
-+              }
-+              catch (IOException e) { }
-+              return -1; //failure
-+      }
-+
-+      protected int ceph_close(int filehandle) {
-+              debug("ceph_close(filehandle " + filehandle + ")", INFO);
-+              try {
-+                      ((Closeable)files.get(new Integer(filehandle))).close();
-+                      if(null == files.get(new Integer(filehandle))) {
-+                              return -ENOENT; //this isn't quite the right error code,
-+                              // but the important part is it's negative
-+                      }
-+                      return 0; //hurray, success
-+              }
-+              catch (NullPointerException ne) {
-+                      debug("ceph_close caught NullPointerException!" + ne, WARN);
-+              } //err, how?
-+              catch (IOException ie) {
-+                      debug("ceph_close caught IOException!" + ie, WARN);
-+              }
-+              return -1; //failure
-+      }
-+
-+      protected boolean ceph_setPermission(String pth, int mode) {
-+              pth = prepare_path(pth);
-+              Path path = new Path(pth);
-+              boolean ret = false;
-+              try {
-+                      localFS.setPermission(path, new FsPermission((short)mode));
-+                      ret = true;
-+              }
-+              catch (IOException e) { }
-+              return ret;
-+      }
-+
-+      //rather than try and match a Ceph deployment's behavior exactly,
-+      //just make bad things happen if they try and call methods after this
-+      protected boolean ceph_kill_client() {
-+              //debug("ceph_kill_client", INFO);
-+              localFS.setWorkingDirectory(new Path(localPrefix));
-+              //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
-+              try{
-+                      localFS.close(); }
-+              catch (Exception e) {}
-+              localFS = null;
-+              files = null;
-+              filenames = null;
-+              return true;
-+      }
-+
-+      protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
-+              pth = prepare_path(pth);
-+              Path path = new Path(pth);
-+              boolean ret = false;
-+              try {
-+                      FileStatus status = localFS.getFileStatus(path);
-+                      fill.size = status.getLen();
-+                      fill.is_dir = status.isDir();
-+                      fill.block_size = status.getBlockSize();
-+                      fill.mod_time = status.getModificationTime();
-+                      fill.access_time = status.getAccessTime();
-+                      fill.mode = status.getPermission().toShort();
-+                      ret = true;
-+              }
-+              catch (IOException e) {}
-+              return ret;
-+      }
++   * @return the default block size, in bytes, as a long.
++   */
++  @Override
++  public long getDefaultBlockSize() {
++    return getConf().getInt("fs.ceph.blockSize", 1 << 26);
++  }
 +
-+      protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
-+              pth = prepare_path(pth);
-+              try {
-+                      FsStatus stat = localFS.getStatus();
-+                      fill.capacity = stat.getCapacity();
-+                      fill.used = stat.getUsed();
-+                      fill.remaining = stat.getRemaining();
-+                      return 0;
-+              }
-+              catch (Exception e){}
-+              return -1; //failure;
-+      }
++  /**
++   * Adds the working directory to path if path is not already
++   * an absolute path. The URI scheme is not removed here. It
++   * is removed only when users (e.g. ceph native calls) need
++   * the path-only portion.
++   */
++  private Path makeAbsolute(Path path) {
++    if (path.isAbsolute()) {
++      return path;
++    }
++    return new Path(workingDir, path);
++  }
 +
-+      protected int ceph_replication(String path) {
-+              path = prepare_path(path);
-+              int ret = -1; //-1 for failure
-+              try {
-+                      ret = localFS.getFileStatus(new Path(path)).getReplication();
-+              }
-+              catch (IOException e) {}
-+              return ret;
-+      }
++  private Path[] listPaths(Path path) throws IOException {
++    LOG.debug("listPaths:enter with path " + path);
++    String dirlist[];
 +
-+      protected String ceph_hosts(int fh, long offset) {
-+              String ret = null;
-+              try {
-+                      BlockLocation[] locs = localFS.getFileBlockLocations(
-+                                               localFS.getFileStatus(new Path(
-+                                                                                      filenames.get(new Integer(fh)))), offset, 1);
-+                      ret = locs[0].getNames()[0];
-+              }
-+              catch (IOException e) {}
-+              catch (NullPointerException f) { }
-+              return ret;
-+      }
++    Path abs_path = makeAbsolute(path);
 +
-+      protected int ceph_setTimes(String pth, long mtime, long atime) {
-+              pth = prepare_path(pth);
-+              Path path = new Path(pth);
-+              int ret = -1; //generic fail
-+              try {
-+                      localFS.setTimes(path, mtime, atime);
-+                      ret = 0;
-+              }
-+              catch (IOException e) {}
-+              return ret;
-+      }
++    // If it's a directory, get the listing. Otherwise, complain and give up.
++    LOG.debug("calling ceph_getdir from Java with path " + abs_path);
++    dirlist = ceph.ceph_getdir(getCephPath(abs_path));
++    LOG.debug("returning from ceph_getdir to Java");
 +
-+      protected long ceph_getpos(int fh) {
-+              long ret = -1; //generic fail
-+              try {
-+                      Object stream = files.get(new Integer(fh));
-+                      if (stream instanceof FSDataInputStream) {
-+                              ret = ((FSDataInputStream)stream).getPos();
-+                      }
-+                      else if (stream instanceof FSDataOutputStream) {
-+                              ret = ((FSDataOutputStream)stream).getPos();
-+                      }
-+              }
-+              catch (IOException e) {}
-+              catch (NullPointerException f) { }
-+              return ret;
-+      }
++    if (dirlist == null) {
++      return null;
++    }
 +
-+      protected int ceph_write(int fh, byte[] buffer,
-+                                                                                                       int buffer_offset, int length) {
-+              debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
-+                                      + ", length:" + length, INFO);
-+              long ret = -1;//generic fail
-+              try {
-+                      FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-+                      debug("ceph_write got outputstream", INFO);
-+                      long startPos = os.getPos();
-+                      os.write(buffer, buffer_offset, length);
-+                      ret = os.getPos() - startPos;
-+              }
-+              catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
-+              catch (NullPointerException f) {
-+                      debug("ceph_write caught NullPointerException!", WARN); }
-+              return (int)ret;
-+      }
++    // convert the strings to Paths
++    Path[] paths = new Path[dirlist.length];
 +
-+      protected int ceph_read(int fh, byte[] buffer,
-+                                                                                                      int buffer_offset, int length) {
-+              long ret = -1;//generic fail
-+              try {
-+                      FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
-+                      long startPos = is.getPos();
-+                      is.read(buffer, buffer_offset, length);
-+                      ret = is.getPos() - startPos;
-+              }
-+              catch (IOException e) {}
-+              catch (NullPointerException f) {}
-+              return (int)ret;
-+      }
++    for (int i = 0; i < dirlist.length; ++i) {
++      LOG.trace(
++          "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
++          + dirlist[i] + "\"");
++      // convert each listing to an absolute path
++      Path raw_path = new Path(dirlist[i]);
 +
-+      protected long ceph_seek_from_start(int fh, long pos) {
-+              debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
-+              long ret = -1;//generic fail
-+              try {
-+                      debug("ceph_seek_from_start filename is "
-+                                              + filenames.get(new Integer(fh)), INFO);
-+                      if (null == files.get(new Integer(fh))) {
-+                              debug("ceph_seek_from_start: is is null!", WARN);
-+                      }
-+                      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-+                      debug("ceph_seek_from_start retrieved is!", INFO);
-+                      is.seek(pos);
-+                      ret = is.getPos();
-+              }
-+              catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
-+              catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
-+              return (int)ret;
-+      }
++      if (raw_path.isAbsolute()) {
++        paths[i] = raw_path;
++      } else {
++        paths[i] = new Path(abs_path, raw_path);
++      }
++    }
++    LOG.debug("listPaths:exit");
++    return paths;
++  }
 +
-+      /*
-+       * We need to remove the localFS file prefix before returning to Ceph
-+       */
-+      private String sanitize_path(String path) {
-+              //debug("sanitize_path(" + path + ")", INFO);
-+              /*              if (path.startsWith("file:"))
-+                                      path = path.substring("file:".length()); */
-+              if (path.startsWith(localPrefix)) {
-+                      path = path.substring(localPrefix.length());
-+                      if (path.length() == 0) //it was a root path
-+                              path = "/";
-+              }
-+              //debug("sanitize_path returning " + path, INFO);
-+              return path;
-+      }
++  static class Stat {
++    public long size;
++    public boolean is_dir;
++    public long block_size;
++    public long mod_time;
++    public long access_time;
++    public int mode;
 +
-+      /*
-+       * If it's an absolute path we need to shove the
-+       * test dir onto the front as a prefix.
-+       */
-+      private String prepare_path(String path) {
-+              //debug("prepare_path(" + path + ")", INFO);
-+              if (path.startsWith("/"))
-+                      path = localPrefix + path;
-+              else if (path.equals("..")) {
-+                      if (ceph_getcwd().equals("/"))
-+                              path = "."; // you can't go up past root!
-+              }
-+              //debug("prepare_path returning" + path, INFO);
-+              return path;
-+      }
++    public Stat() {}
++  }
 +}
-Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephInputStream.java    (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephInputStream.java    (revision 0)
-@@ -0,0 +1,235 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
+new file mode 100644
+index 0000000..d9668d0
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
+@@ -0,0 +1,254 @@
 +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -1758,8 +1598,11 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 + */
 +package org.apache.hadoop.fs.ceph;
 +
++
 +import java.io.IOException;
 +
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSInputStream;
 +
@@ -1770,19 +1613,19 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 + * Ceph instance.
 + */
 +public class CephInputStream extends FSInputStream {
-+
++  private static final Log LOG = LogFactory.getLog(CephInputStream.class);
 +  private boolean closed;
 +
 +  private int fileHandle;
 +
 +  private long fileLength;
 +
-+      private CephFS ceph;
++  private CephFS ceph;
 +
-+      private byte[] buffer;
-+      private int bufPos = 0;
-+      private int bufValid = 0;
-+      private long cephPos = 0;
++  private byte[] buffer;
++  private int bufPos = 0;
++  private int bufValid = 0;
++  private long cephPos = 0;
 +
 +  /**
 +   * Create a new CephInputStream.
@@ -1792,75 +1635,82 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +   * you will need to close and re-open it to access the new data.
 +   */
 +  public CephInputStream(Configuration conf, CephFS cephfs,
-+                                                                                               int fh, long flength, int bufferSize) {
++      int fh, long flength, int bufferSize) {
 +    // Whoever's calling the constructor is responsible for doing the actual ceph_open
 +    // call and providing the file handle.
 +    fileLength = flength;
 +    fileHandle = fh;
 +    closed = false;
-+              ceph = cephfs;
-+              buffer = new byte[bufferSize];
-+    ceph.debug("CephInputStream constructor: initializing stream with fh "
-+                                                                              + fh + " and file length " + flength, ceph.DEBUG);
++    ceph = cephfs;
++    buffer = new byte[bufferSize];
++    LOG.debug(
++        "CephInputStream constructor: initializing stream with fh " + fh
++        + " and file length " + flength);
 +      
 +  }
++
 +  /** Ceph likes things to be closed before it shuts down,
 +   * so closing the IOStream stuff voluntarily in a finalizer is good
 +   */
-+  protected void finalize () throws Throwable {
++  protected void finalize() throws Throwable {
 +    try {
-+      if (!closed) close();
++      if (!closed) {
++        close();
++      }
++    } finally {
++      super.finalize();
 +    }
-+    finally { super.finalize(); }
 +  }
 +
-+      private synchronized boolean fillBuffer() throws IOException {
-+              bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
-+              bufPos = 0;
-+              if (bufValid < 0) {
-+                      int err = bufValid;
-+                      bufValid = 0;
-+                      //attempt to reset to old position. If it fails, too bad.
-+                      ceph.ceph_seek_from_start(fileHandle, cephPos);
-+                      throw new IOException("Failed to fill read buffer! Error code:"
-+                                                                                                              + err);
-+              }
-+              cephPos += bufValid;
-+              return (bufValid != 0);
-+      }
++  private synchronized boolean fillBuffer() throws IOException {
++    bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
++    bufPos = 0;
++    if (bufValid < 0) {
++      int err = bufValid;
 +
-+      /*
-+       * Get the current position of the stream.
-+       */
++      bufValid = 0;
++      // attempt to reset to old position. If it fails, too bad.
++      ceph.ceph_seek_from_start(fileHandle, cephPos);
++      throw new IOException("Failed to fill read buffer! Error code:" + err);
++    }
++    cephPos += bufValid;
++    return (bufValid != 0);
++  }
++
++  /*
++   * Get the current position of the stream.
++   */
 +  public synchronized long getPos() throws IOException {
-+              return cephPos - bufValid + bufPos;
++    return cephPos - bufValid + bufPos;
 +  }
 +
 +  /**
 +   * Find the number of bytes remaining in the file.
 +   */
 +  @Override
-+      public synchronized int available() throws IOException {
-+      return (int) (fileLength - getPos());
-+    }
++  public synchronized int available() throws IOException {
++    return (int) (fileLength - getPos());
++  }
 +
 +  public synchronized void seek(long targetPos) throws IOException {
-+    ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
-+                                                                              " on fd " + fileHandle, ceph.TRACE);
++    LOG.trace(
++        "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
++        + fileHandle);
 +    if (targetPos > fileLength) {
-+      throw new IOException("CephInputStream.seek: failed seek to position "
-+                                                                                                              + targetPos + " on fd " + fileHandle
-+                                                                                                              + ": Cannot seek after EOF " + fileLength);
-+    }
-+              long oldPos = cephPos;
-+              cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
-+              bufValid = 0;
-+              bufPos = 0;
-+              if (cephPos < 0) {
-+                      cephPos = oldPos;
-+                      throw new IOException ("Ceph failed to seek to new position!");
-+              }
-+      }
++      throw new IOException(
++          "CephInputStream.seek: failed seek to position " + targetPos
++          + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
++    }
++    long oldPos = cephPos;
++
++    cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
++    bufValid = 0;
++    bufPos = 0;
++    if (cephPos < 0) {
++      cephPos = oldPos;
++      throw new IOException("Ceph failed to seek to new position!");
++    }
++  }
 +
 +  /**
 +   * Failovers are handled by the Ceph code at a very low level;
@@ -1872,22 +1722,30 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +    return false;
 +  }
 +    
-+    
 +  /**
 +   * Read a byte from the file.
 +   * @return the next byte.
 +   */
 +  @Override
-+      public synchronized int read() throws IOException {
-+      ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-+                                                                                      + " by calling general read function", ceph.TRACE);
++  public synchronized int read() throws IOException {
++    LOG.trace(
++        "CephInputStream.read: Reading a single byte from fd " + fileHandle
++        + " by calling general read function");
 +
-+      byte result[] = new byte[1];
-+      if (getPos() >= fileLength) return -1;
-+      if (-1 == read(result, 0, 1)) return -1;
-+      if (result[0]<0) return 256+(int)result[0];
-+      else return result[0];
++    byte result[] = new byte[1];
++
++    if (getPos() >= fileLength) {
++      return -1;
++    }
++    if (-1 == read(result, 0, 1)) {
++      return -1;
 +    }
++    if (result[0] < 0) {
++      return 256 + (int) result[0];
++    } else {
++      return result[0];
++    }
++  }
 +
 +  /**
 +   * Read a specified number of bytes from the file into a byte[].
@@ -1895,89 +1753,91 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +   * @param off the offset to start at in the file
 +   * @param len the number of bytes to read
 +   * @return 0 if successful, otherwise an error code.
-+       * @throws IOException on bad input.
++   * @throws IOException on bad input.
 +   */
 +  @Override
-+      public synchronized int read(byte buf[], int off, int len)
-+              throws IOException {
-+      ceph.debug("CephInputStream.read: Reading " + len  +
-+                                                               " bytes from fd " + fileHandle, ceph.TRACE);
++  public synchronized int read(byte buf[], int off, int len)
++    throws IOException {
++    LOG.trace(
++        "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
 +      
-+      if (closed) {
-+                              throw new IOException("CephInputStream.read: cannot read " + len  + 
-+                                                                                                                      " bytes from fd " + fileHandle +
-+                                                                                                                      ": stream closed");
-+      }
++    if (closed) {
++      throw new IOException(
++          "CephInputStream.read: cannot read " + len + " bytes from fd "
++          + fileHandle + ": stream closed");
++    }
 +                      
-+      // ensure we're not past the end of the file
-+      if (getPos() >= fileLength) {
-+                              ceph.debug("CephInputStream.read: cannot read " + len  + 
-+                                                                       " bytes from fd " + fileHandle + ": current position is "
-+                                                                       + getPos() + " and file length is " + fileLength,
-+                                                                       ceph.DEBUG);
++    // ensure we're not past the end of the file
++    if (getPos() >= fileLength) {
++      LOG.debug(
++          "CephInputStream.read: cannot read " + len + " bytes from fd "
++          + fileHandle + ": current position is " + getPos()
++          + " and file length is " + fileLength);
 +                              
-+                              return -1;
-+                      }
-+
-+                      int totalRead = 0;
-+                      int initialLen = len;
-+                      int read;
-+                      do {
-+                              read = Math.min(len, bufValid - bufPos);
-+                              try {
-+                                      System.arraycopy(buffer, bufPos, buf, off, read);
-+                              }
-+                              catch(IndexOutOfBoundsException ie) {
-+                                      throw new IOException("CephInputStream.read: Indices out of bounds:"
-+                                                                                                                              + "read length is " + len
-+                                                                                                                              + ", buffer offset is " + off
-+                                                                                                                              + ", and buffer size is " + buf.length);
-+                              }
-+                              catch (ArrayStoreException ae) {
-+                                      throw new IOException("Uh-oh, CephInputStream failed to do an array"
-+                                                                                                                              + "copy due to type mismatch...");
-+                              }
-+                              catch (NullPointerException ne) {
-+                                      throw new IOException("CephInputStream.read: cannot read "
-+                                                                                                                              + len + "bytes from fd:" + fileHandle
-+                                                                                                                              + ": buf is null");
-+                              }
-+                              bufPos += read;
-+                              len -= read;
-+                              off += read;
-+                              totalRead += read;
-+                      } while (len > 0 && fillBuffer());
-+
-+      ceph.debug("CephInputStream.read: Reading " + initialLen
-+                                                               + " bytes from fd " + fileHandle
-+                                                               + ": succeeded in reading " + totalRead + " bytes",
-+                                                               ceph.TRACE);
-+      return totalRead;
-+      }
++      return -1;
++    }
++
++    int totalRead = 0;
++    int initialLen = len;
++    int read;
++
++    do {
++      read = Math.min(len, bufValid - bufPos);
++      try {
++        System.arraycopy(buffer, bufPos, buf, off, read);
++      } catch (IndexOutOfBoundsException ie) {
++        throw new IOException(
++            "CephInputStream.read: Indices out of bounds:" + "read length is "
++            + len + ", buffer offset is " + off + ", and buffer size is "
++            + buf.length);
++      } catch (ArrayStoreException ae) {
++        throw new IOException(
++            "Uh-oh, CephInputStream failed to do an array"
++                + "copy due to type mismatch...");
++      } catch (NullPointerException ne) {
++        throw new IOException(
++            "CephInputStream.read: cannot read " + len + "bytes from fd:"
++            + fileHandle + ": buf is null");
++      }
++      bufPos += read;
++      len -= read;
++      off += read;
++      totalRead += read;
++    } while (len > 0 && fillBuffer());
++
++    LOG.trace(
++        "CephInputStream.read: Reading " + initialLen + " bytes from fd "
++        + fileHandle + ": succeeded in reading " + totalRead + " bytes");
++    return totalRead;
++  }
 +
 +  /**
 +   * Close the CephInputStream and release the associated filehandle.
 +   */
 +  @Override
-+      public void close() throws IOException {
-+    ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++  public void close() throws IOException {
++    LOG.trace("CephOutputStream.close:enter");
 +    if (!closed) {
-+                      int result = ceph.ceph_close(fileHandle);
-+                      closed = true;
-+                      if (result != 0) {
-+                              throw new IOException("Close somehow failed!"
-+                                                                                                                      + "Don't try and use this stream again, though");
-+                      }
-+                      ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-+              }
-+      }
++      int result = ceph.ceph_close(fileHandle);
++
++      closed = true;
++      if (result != 0) {
++        throw new IOException(
++            "Close somehow failed!"
++                + "Don't try and use this stream again, though");
++      }
++      LOG.trace("CephOutputStream.close:exit");
++    }
++  }
 +}
-Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java   (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java   (revision 0)
-@@ -0,0 +1,202 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
+new file mode 100644
+index 0000000..4c50f88
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
+@@ -0,0 +1,219 @@
 +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -1999,27 +1859,31 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +
 +package org.apache.hadoop.fs.ceph;
 +
++
 +import java.io.IOException;
 +import java.io.OutputStream;
 +
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.util.Progressable;
 +
++
 +/**
 + * <p>
 + * An {@link OutputStream} for a CephFileSystem and corresponding
 + * Ceph instance.
 + */
 +public class CephOutputStream extends OutputStream {
-+
++  private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
 +  private boolean closed;
 +
-+      private CephFS ceph;
++  private CephFS ceph;
 +
 +  private int fileHandle;
 +
-+      private byte[] buffer;
-+      private int bufUsed = 0;
++  private byte[] buffer;
++  private int bufUsed = 0;
 +
 +  /**
 +   * Construct the CephOutputStream.
@@ -2027,21 +1891,24 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * @param fh The Ceph filehandle to connect to.
 +   */
 +  public CephOutputStream(Configuration conf, CephFS cephfs,
-+                                                                                                      int fh, int bufferSize) {
-+              ceph = cephfs;
++      int fh, int bufferSize) {
++    ceph = cephfs;
 +    fileHandle = fh;
 +    closed = false;
-+              buffer = new byte[bufferSize];
++    buffer = new byte[bufferSize];
 +  }
 +
-+  /**Ceph likes things to be closed before it shuts down,
++  /** Ceph likes things to be closed before it shuts down,
 +   *so closing the IOStream stuff voluntarily is good
 +   */
-+  protected void finalize () throws Throwable {
++  protected void finalize() throws Throwable {
 +    try {
-+      if (!closed) close();
++      if (!closed) {
++        close();
++      }
++    } finally {
++      super.finalize();
 +    }
-+    finally { super.finalize();}
 +  }
 +
 +  /**
@@ -2059,20 +1926,22 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * write fails.
 +   */
 +  @Override
-+      public synchronized void write(int b) throws IOException {
-+      ceph.debug("CephOutputStream.write: writing a single byte to fd "
-+                                                               + fileHandle, ceph.TRACE);
-+
-+      if (closed) {
-+                              throw new IOException("CephOutputStream.write: cannot write " + 
-+                                                                                                                      "a byte to fd " + fileHandle + ": stream closed");
-+      }
-+      // Stick the byte in a buffer and write it
-+      byte buf[] = new byte[1];
-+      buf[0] = (byte) b;    
-+      write(buf, 0, 1);
-+      return;
++  public synchronized void write(int b) throws IOException {
++    LOG.trace(
++        "CephOutputStream.write: writing a single byte to fd " + fileHandle);
++
++    if (closed) {
++      throw new IOException(
++          "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
++          + ": stream closed");
 +    }
++    // Stick the byte in a buffer and write it
++    byte buf[] = new byte[1];
++
++    buf[0] = (byte) b;    
++    write(buf, 0, 1);
++    return;
++  }
 +
 +  /**
 +   * Write a byte buffer into the Ceph file.
@@ -2080,226 +1949,286 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * @param off the position in the file to start writing at.
 +   * @param len The number of bytes to actually write.
 +   * @throws IOException if you have closed the CephOutputStream, or
-+       * if buf is null or off + len > buf.length, or
-+       * if the write fails due to a Ceph error.
++   * if buf is null or off + len > buf.length, or
++   * if the write fails due to a Ceph error.
 +   */
 +  @Override
-+      public synchronized void write(byte buf[], int off, int len) throws IOException {
-+              ceph.debug("CephOutputStream.write: writing " + len + 
-+                                                       " bytes to fd " + fileHandle, ceph.TRACE);
-+              // make sure stream is open
-+              if (closed) {
-+                      throw new IOException("CephOutputStream.write: cannot write " + len + 
-+                                                                                                              "bytes to fd " + fileHandle + ": stream closed");
-+              }
++  public synchronized void write(byte buf[], int off, int len) throws IOException {
++    LOG.trace(
++        "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
++    // make sure stream is open
++    if (closed) {
++      throw new IOException(
++          "CephOutputStream.write: cannot write " + len + "bytes to fd "
++          + fileHandle + ": stream closed");
++    }
 +              
-+              int result;
-+              int write;
-+              while (len>0) {
-+                      write = Math.min(len, buffer.length - bufUsed);
-+                      try {
-+                              System.arraycopy(buf, off, buffer, bufUsed, write);
-+                      }
-+                      catch (IndexOutOfBoundsException ie) {
-+                              throw new IOException("CephOutputStream.write: Indices out of bounds: "
-+                                                                                                                      + "write length is " + len
-+                                                                                                                      + ", buffer offset is " + off
-+                                                                                                                      + ", and buffer size is " + buf.length);
-+                      }
-+                      catch (ArrayStoreException ae) {
-+                              throw new IOException("Uh-oh, CephOutputStream failed to do an array"
-+                                                                                                                      + " copy due to type mismatch...");
-+                      }
-+                      catch (NullPointerException ne) {
-+                              throw new IOException("CephOutputStream.write: cannot write "
-+                                                                                                                      + len + "bytes to fd " + fileHandle
-+                                                                                                                      + ": buffer is null");
-+                      }
-+                      bufUsed += write;
-+                      len -= write;
-+                      off += write;
-+                      if (bufUsed == buffer.length) {
-+                              result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+                              if (result < 0)
-+                                      throw new IOException("CephOutputStream.write: Buffered write of "
-+                                                                                                                              + bufUsed + " bytes failed!");
-+                              if (result != bufUsed)
-+                                      throw new IOException("CephOutputStream.write: Wrote only "
-+                                                                                                                              + result + " bytes of " + bufUsed
-+                                                                                                                              + " in buffer! Data may be lost or written"
-+                                                                                                                              + " twice to Ceph!");
-+                              bufUsed = 0;
-+                      }
++    int result;
++    int write;
++
++    while (len > 0) {
++      write = Math.min(len, buffer.length - bufUsed);
++      try {
++        System.arraycopy(buf, off, buffer, bufUsed, write);
++      } catch (IndexOutOfBoundsException ie) {
++        throw new IOException(
++            "CephOutputStream.write: Indices out of bounds: "
++                + "write length is " + len + ", buffer offset is " + off
++                + ", and buffer size is " + buf.length);
++      } catch (ArrayStoreException ae) {
++        throw new IOException(
++            "Uh-oh, CephOutputStream failed to do an array"
++                + " copy due to type mismatch...");
++      } catch (NullPointerException ne) {
++        throw new IOException(
++            "CephOutputStream.write: cannot write " + len + "bytes to fd "
++            + fileHandle + ": buffer is null");
++      }
++      bufUsed += write;
++      len -= write;
++      off += write;
++      if (bufUsed == buffer.length) {
++        result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++        if (result < 0) {
++          throw new IOException(
++              "CephOutputStream.write: Buffered write of " + bufUsed
++              + " bytes failed!");
++        }
++        if (result != bufUsed) {
++          throw new IOException(
++              "CephOutputStream.write: Wrote only " + result + " bytes of "
++              + bufUsed + " in buffer! Data may be lost or written"
++              + " twice to Ceph!");
++        }
++        bufUsed = 0;
++      }
 +
-+              }
-+              return; 
-+      }
++    }
++    return; 
++  }
 +   
 +  /**
 +   * Flush the buffered data.
 +   * @throws IOException if you've closed the stream or the write fails.
 +   */
 +  @Override
-+      public synchronized void flush() throws IOException {
-+                      if (!closed) {
-+                              if (bufUsed == 0) return;
-+                              int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+                              if (result < 0) {
-+                                      throw new IOException("CephOutputStream.write: Write of "
-+                                                                                                                              + bufUsed + "bytes to fd "
-+                                                                                                                              + fileHandle + " failed");
-+                              }
-+                              if (result != bufUsed) {
-+                                      throw new IOException("CephOutputStream.write: Write of " + bufUsed
-+                                                                                                                              + "bytes to fd " + fileHandle
-+                                                                                                                              + "was incomplete:  only " + result + " of "
-+                                                                                                                              + bufUsed + " bytes were written.");
-+                              }
-+                              bufUsed = 0;
-+                              return;
-+                      }
-+      }
++  public synchronized void flush() throws IOException {
++    if (!closed) {
++      if (bufUsed == 0) {
++        return;
++      }
++      int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++
++      if (result < 0) {
++        throw new IOException(
++            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
++            + fileHandle + " failed");
++      }
++      if (result != bufUsed) {
++        throw new IOException(
++            "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
++            + fileHandle + "was incomplete:  only " + result + " of " + bufUsed
++            + " bytes were written.");
++      }
++      bufUsed = 0;
++      return;
++    }
++  }
 +  
 +  /**
 +   * Close the CephOutputStream.
 +   * @throws IOException if Ceph somehow returns an error. In current code it can't.
 +   */
 +  @Override
-+      public synchronized void close() throws IOException {
-+      ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
-+      if (!closed) {
-+                              flush();
-+                              int result = ceph.ceph_close(fileHandle);
-+                              if (result != 0) {
-+                                      throw new IOException("Close failed!");
-+                              }
++  public synchronized void close() throws IOException {
++    LOG.trace("CephOutputStream.close:enter");
++    if (!closed) {
++      flush();
++      int result = ceph.ceph_close(fileHandle);
++
++      if (result != 0) {
++        throw new IOException("Close failed!");
++      }
 +                              
-+                              closed = true;
-+                              ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-+                      }
-+      }
++      closed = true;
++      LOG.trace("CephOutputStream.close:exit");
++    }
++  }
 +}
-Index: src/java/org/apache/hadoop/fs/ceph/package.html
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/package.html    (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/package.html    (revision 0)
-@@ -0,0 +1,101 @@
-+<html>
-+
-+<!--
-+   Licensed to the Apache Software Foundation (ASF) under one or more
-+   contributor license agreements.  See the NOTICE file distributed with
-+   this work for additional information regarding copyright ownership.
-+   The ASF licenses this file to You under the Apache License, Version 2.0
-+   (the "License"); you may not use this file except in compliance with
-+   the License.  You may obtain a copy of the License at
-+
-+       http://www.apache.org/licenses/LICENSE-2.0
-+
-+   Unless required by applicable law or agreed to in writing, software
-+   distributed under the License is distributed on an "AS IS" BASIS,
-+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+   See the License for the specific language governing permissions and
-+   limitations under the License.
-+-->
-+
-+<head></head>
-+<body>
-+<h1>A client for the Ceph filesystem</h1>
-+
-+<h3>Introduction</h3>
-+
-+This page describes how to use <a href="http://ceph.newdream.net">Ceph</a>
-+as a backing store with Hadoop. This page assumes that you have downloaded
-+the Ceph software and installed necessary binaries as outlined in the Ceph
-+documentation.
-+
-+<h3>Steps</h3>
-+<ul>
-+  <li>In the Hadoop conf directory edit core-site.xml,
-+      adding the following (with appropriate substitutions). Note that
-+      different nodes can connect to different monitors in the same cluster
-+      without issue (the Ceph client will automatically redirect as necessary).
-+<pre>
-+&lt;property&gt;
-+  &lt;name&gt;fs.default.name&lt;/name&gt;
-+  &lt;value&gt;ceph://null&lt;/value&gt; 
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.monAddr&lt;/name&gt;
-+  &lt;value&gt;&lt;serverIP:port&gt;&lt;/value&gt;
-+  &lt;description&gt;The location of the Ceph monitor to connect to.
-+  This should be an IP address, not a domain-based web address.&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.libDir&lt;/name&gt;
-+  &lt;value&gt;/usr/local/lib&lt;/value&gt;
-+  &lt;description&gt;The folder holding libcephfs and libhadoopceph&lt;/description&gt;
-+  &lt;/property&gt;
-+</pre>
-+  <li>There are also a number of optional Ceph configuration options.
-+<pre>
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.blockSize&lt;/name&gt;
-+  &lt;value&gt;67108864&lt;/value&gt;
-+  &lt;description&gt;Defaulting to 64MB, this is the size (in bytes) you want Ceph to use in striping data internally and presenting it to Hadoop.&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.debug&lt;/name&gt;
-+  &lt;value&gt;true&lt;/value&gt;
-+  &lt;description&gt;If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.clientDebug&lt;/name&gt;
-+  &lt;value&gt;1&lt;/value&gt;
-+  &lt;description&gt;If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.messengerDebug&lt;/name&gt;
-+  &lt;value&gt;1&lt;/value&gt;
-+  &lt;description&gt;If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.readahead&lt;/name&gt;
-+  &lt;value&gt;1&lt;/value&gt;
-+  &lt;description&gt;Sets the number of object periods to read ahead in prefetching. This should probably be left at the default of 1.&lt;/description&gt;
-+&lt;/property&gt;
-+
-+&lt;property&gt;
-+  &lt;name&gt;fs.ceph.commandLine&lt;/name&gt;
-+  &lt;value&gt;a string&lt;/value&gt;
-+  &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
-+By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.&lt;/description&gt;
-+&lt;/property&gt;
-+</pre>
-+  
-+  <li>Start up your Ceph instance according to the Ceph documentation.</li>
-+  <li>Do not use the bin/start-all.sh commands, as they will attempt to start
-+      up an hdfs instance. Just start whatever systems you need and they will
-+      automatically make use of the Ceph filesystem once configured as above.</li>
-+</body>
-+</html>
-\ No newline at end of file
-Index: src/java/core-default.xml
-===================================================================
---- src/java/core-default.xml  (revision 832912)
-+++ src/java/core-default.xml  (working copy)
-@@ -194,6 +194,12 @@
- </property>
- <property>
-+  <name>fs.ceph.impl</name>
-+  <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
-+  <description>The FileSystem for ceph: uris.</description>
-+</property>
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
+new file mode 100644
+index 0000000..569652f
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
+@@ -0,0 +1,91 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 +
-+<property>
-   <name>fs.hftp.impl</name>
-   <value>org.apache.hadoop.hdfs.HftpFileSystem</value>
- </property>
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ * 
++ * Wraps a number of native function calls to communicate with the Ceph
++ * filesystem.
++ */
++package org.apache.hadoop.fs.ceph;
++
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++
++class CephTalker extends CephFS {
++  // JNI doesn't give us any way to store pointers, so use a long.
++  // Here we're assuming pointers aren't longer than 8 bytes.
++  long cluster;
++
++  // we write a constructor so we can load the libraries
++  public CephTalker(Configuration conf, Log log) {
++    System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
++    System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
++    cluster = 0;
++  }
++
++  protected native boolean ceph_initializeClient(String arguments, int block_size);
++
++  protected native String ceph_getcwd();
++
++  protected native boolean ceph_setcwd(String path);
++
++  protected native boolean ceph_rmdir(String path);
++
++  protected native boolean ceph_unlink(String path);
++
++  protected native boolean ceph_rename(String old_path, String new_path);
++
++  protected native boolean ceph_exists(String path);
++
++  protected native long ceph_getblocksize(String path);
++
++  protected native boolean ceph_isdirectory(String path);
++
++  protected native boolean ceph_isfile(String path);
++
++  protected native String[] ceph_getdir(String path);
++
++  protected native int ceph_mkdirs(String path, int mode);
++
++  protected native int ceph_open_for_append(String path);
++
++  protected native int ceph_open_for_read(String path);
++
++  protected native int ceph_open_for_overwrite(String path, int mode);
++
++  protected native int ceph_close(int filehandle);
++
++  protected native boolean ceph_setPermission(String path, int mode);
++
++  protected native boolean ceph_kill_client();
++
++  protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
++
++  protected native int ceph_replication(String Path);
++
++  protected native String[] ceph_hosts(int fh, long offset);
++
++  protected native int ceph_setTimes(String path, long mtime, long atime);
++
++  protected native long ceph_getpos(int fh);
++
++  protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++  protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++
++  protected native long ceph_seek_from_start(int fh, long pos);
++}
+diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
+index 9e22f1f..cd55361 100644
+--- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
++++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
+@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager {
+     if (modifiedTime != desiredTimestamp) {
+       DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, 
+                                                      DateFormat.SHORT);
++      /*
+       throw new IOException("The distributed cache object " + source + 
+                             " changed during the job from " + 
+                             df.format(new Date(desiredTimestamp)) + " to " +
+                             df.format(new Date(modifiedTime)));
++      */
+     }
+     
+     Path parchive = null;
+diff --git a/src/test/commit-tests b/src/test/commit-tests
+index 1148c8b..85fa53d 100644
+--- a/src/test/commit-tests
++++ b/src/test/commit-tests
+@@ -53,6 +53,7 @@
+ **/TestRPC.java
+ **/TestS3Credentials.java
+ **/TestS3FileSystem.java
++**/TestCeph.java
+ **/TestSaslRPC.java
+ **/TestScriptBasedMapping.java
+ **/TestSequenceFileSerialization.java
+diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
+new file mode 100644
+index 0000000..e46b0ee
+--- /dev/null
++++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
+@@ -0,0 +1,45 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ * Unit tests for the CephFileSystem API implementation.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++
++import java.io.IOException;
++import java.net.URI;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystemContractBaseTest;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++
++public class TestCeph extends FileSystemContractBaseTest {
++
++  @Override
++  protected void setUp() throws IOException {
++    Configuration conf = new Configuration();
++    CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
++    CephFileSystem cephfs = new CephFileSystem(cephfaker);
++
++    cephfs.initialize(URI.create("ceph://null"), conf);
++    fs = cephfs;
++  }
++}
index c3b967e3a38a90ea32b3ac800d4e0a23042a5333..2967b96cf5a9b1fdc4c1eb96cd46fb57397c7bac 100644 (file)
@@ -2,8 +2,8 @@ This directory contains:
 CephFSInterface.cc/h: A C++ JNI library used by the Hadoop Java code.
 ceph: A directory containing all the Java source files for a 
 Hadoop-compliant CephFileSystem.
-HADOOP-ceph.patch: A patch for Hadoop. It should apply fine to the trunk 
-build but will need some (minor) edits to apply to a .20 version. This 
+HADOOP-ceph.patch: A patch for Hadoop. It should apply fine to one of the
+.20 branches. (It was generated against .20.205.0) This
 patch adds in all the files contained in the ceph dir as well as making 
 some changes so that Hadoop's configuration code will recognize the 
 CephFileSystem properties and classes. It is possible that this will be