]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Update JavaDoc and put in new patch file
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 5 Nov 2009 20:58:29 +0000 (12:58 -0800)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 5 Nov 2009 21:06:39 +0000 (13:06 -0800)
src/client/hadoop/HADOOP-ceph.patch
src/client/hadoop/ceph/CephFS.java
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java
src/client/hadoop/ceph/package.html

index 966453e11108d3b1393c0329e25c130f88aa425e..1f3c7a3a919c159109f629a6d781e505c20231f3 100644 (file)
@@ -1,9 +1,128 @@
+Index: src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java
+===================================================================
+--- src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java      (revision 0)
++++ src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java      (revision 0)
+@@ -0,0 +1,42 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ * Unit tests for the CephFileSystem API implementation.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++import java.io.IOException;
++import java.net.URI;
++import org.apache.hadoop.fs.FileSystemContractBaseTest;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++public class TestCeph extends FileSystemContractBaseTest {
++
++      @Override
++    protected void setUp() throws IOException {
++    Configuration conf = new Configuration();
++    CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
++    CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
++    cephfs.initialize(URI.create("ceph://null"), conf);
++              fs = cephfs;
++              cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
++      }
++}
+Index: src/java/org/apache/hadoop/fs/ceph/SubmitProcess
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/SubmitProcess   (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/SubmitProcess   (revision 0)
+@@ -0,0 +1,2 @@
++1) Remove the emacs indententation rules line.
++2) ? No more?
+\ No newline at end of file
+Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
+@@ -0,0 +1,59 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ * 
++ * Wraps a number of native function calls to communicate with the Ceph
++ * filesystem.
++ */
++package org.apache.hadoop.fs.ceph;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++class CephTalker extends CephFS {
++      //we write a constructor so we can load the libraries
++      public CephTalker(Configuration conf, Log log) {
++              super(conf, log);
++              System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
++              System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++      }
++      protected native boolean ceph_initializeClient(String arguments, int block_size);
++      protected native String ceph_getcwd();
++      protected native boolean ceph_setcwd(String path);
++  protected native boolean ceph_rmdir(String path);
++  protected native boolean ceph_unlink(String path);
++  protected native boolean ceph_rename(String old_path, String new_path);
++  protected native boolean ceph_exists(String path);
++  protected native long ceph_getblocksize(String path);
++  protected native boolean ceph_isdirectory(String path);
++  protected native boolean ceph_isfile(String path);
++  protected native String[] ceph_getdir(String path);
++  protected native int ceph_mkdirs(String path, int mode);
++  protected native int ceph_open_for_append(String path);
++  protected native int ceph_open_for_read(String path);
++  protected native int ceph_open_for_overwrite(String path, int mode);
++  protected native int ceph_close(int filehandle);
++  protected native boolean ceph_setPermission(String path, int mode);
++  protected native boolean ceph_kill_client();
++  protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
++  protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
++  protected native int ceph_replication(String path);
++  protected native String ceph_hosts(int fh, long offset);
++  protected native int ceph_setTimes(String path, long mtime, long atime);
++  protected native long ceph_getpos(int fh);
++  protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++      protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++      protected native long ceph_seek_from_start(int fh, long pos);
++}
 Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 ===================================================================
 --- src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java     (revision 0)
 +++ src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java     (revision 0)
-@@ -0,0 +1,810 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+@@ -0,0 +1,848 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -26,16 +145,14 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +
 +import java.io.IOException;
 +import java.io.FileNotFoundException;
-+import java.io.File;
 +import java.io.OutputStream;
 +import java.net.URI;
-+import java.util.Set;
 +import java.util.EnumSet;
-+import java.util.Vector;
 +import java.lang.Math;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileAlreadyExistsException;
 +import org.apache.hadoop.fs.FSDataInputStream;
 +import org.apache.hadoop.fs.FSInputStream;
 +import org.apache.hadoop.fs.FSDataOutputStream;
@@ -71,61 +188,41 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 + */
 +public class CephFileSystem extends FileSystem {
 +
-+  private static final int EEXIST = 17;
-+  private static final int ENOENT = 2;
-+
 +  private URI uri;
 +
 +  private final Path root;
 +  private boolean initialized = false;
++      private CephFS ceph = null;
 +
 +  private boolean debug = false;
-+  private String cephDebugLevel;
-+  private String monAddr;
 +  private String fs_default_name;
-+  
-+  private native boolean ceph_initializeClient(String arguments, int block_size);
-+  private native String  ceph_getcwd();
-+  private native boolean ceph_setcwd(String path);
-+  private native boolean ceph_rmdir(String path);
-+  private native boolean ceph_mkdir(String path);
-+  private native boolean ceph_unlink(String path);
-+  private native boolean ceph_rename(String old_path, String new_path);
-+  private native boolean ceph_exists(String path);
-+  private native long    ceph_getblocksize(String path);
-+  private native boolean ceph_isdirectory(String path);
-+  private native boolean ceph_isfile(String path);
-+  private native String[] ceph_getdir(String path);
-+  private native int ceph_mkdirs(String path, int mode);
-+  private native int ceph_open_for_append(String path);
-+  private native int ceph_open_for_read(String path);
-+  private native int ceph_open_for_overwrite(String path, int mode);
-+  private native int ceph_close(int filehandle);
-+  private native boolean ceph_setPermission(String path, int mode);
-+  private native boolean ceph_kill_client();
-+  private native boolean ceph_stat(String path, Stat fill);
-+  private native int ceph_statfs(String Path, CephStat fill);
-+  private native int ceph_replication(String path);
-+  private native String ceph_hosts(int fh, long offset);
-+  private native int ceph_setTimes(String path, long mtime, long atime);
 +
 +  /**
 +   * Create a new CephFileSystem.
 +   */
 +  public CephFileSystem() {
-+    if(debug) debug("CephFileSystem:enter");
 +    root = new Path("/");
-+    if(debug) debug("CephFileSystem:exit");
 +  }
 +
++      /**
++       * Used for testing purposes, this constructor
++       * sets the given CephFS instead of defaulting to a
++       * CephTalker (with its assumed real Ceph instance to talk to).
++       */
++      public CephFileSystem(CephFS ceph_fs, String default_path) {
++              super();
++              root = new Path("/");
++              ceph = ceph_fs;
++              fs_default_name = default_path;
++      }
++
 +  /**
 +   * Lets you get the URI of this CephFileSystem.
 +   * @return the URI.
 +   */
 +  public URI getUri() {
 +    if (!initialized) return null;
-+    if(debug) debug("getUri:enter");
-+    if(debug) debug("getUri:exit with return " + uri);
++    ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
 +    return uri;
 +  }
 +
@@ -140,52 +237,54 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   */
 +  @Override
 +  public void initialize(URI uri, Configuration conf) throws IOException {
-+    if(debug) debug("initialize:enter");
 +    if (!initialized) {
-+      System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+      System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
 +      super.initialize(uri, conf);
 +      setConf(conf);
 +      this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());  
-+      statistics = getStatistics(uri.getScheme(), getClass());  
-+      
-+      fs_default_name = conf.get("fs.default.name");
-+      debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++      statistics = getStatistics(uri.getScheme(), getClass());
++
++                      if (ceph == null) {
++                              ceph = new CephTalker(conf, LOG);
++                      }
++      if (null == fs_default_name) {
++                              fs_default_name = conf.get("fs.default.name");
++                      }
 +      //build up the arguments for Ceph
 +      String arguments = "CephFSInterface";
 +      arguments += conf.get("fs.ceph.commandLine", "");
 +      if (conf.get("fs.ceph.clientDebug") != null) {
-+      arguments += " --debug_client ";
-+      arguments += conf.get("fs.ceph.clientDebug");
++                              arguments += " --debug_client ";
++                              arguments += conf.get("fs.ceph.clientDebug");
 +      }
 +      if (conf.get("fs.ceph.messengerDebug") != null) {
-+      arguments += " --debug_ms ";
-+      arguments += conf.get("fs.ceph.messengerDebug");
++                              arguments += " --debug_ms ";
++                              arguments += conf.get("fs.ceph.messengerDebug");
 +      }
 +      if (conf.get("fs.ceph.monAddr") != null) {
-+      arguments += " -m ";
-+      arguments += conf.get("fs.ceph.monAddr");
++                              arguments += " -m ";
++                              arguments += conf.get("fs.ceph.monAddr");
 +      }
-+      arguments += " --client-readahead-max-periods="
-+        + conf.get("fs.ceph.readahead", "1");
++      arguments += " --client-readahead-max-periods="
++                              + conf.get("fs.ceph.readahead", "1");
 +      //make sure they gave us a ceph monitor address or conf file
++                      ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
 +      if ( (conf.get("fs.ceph.monAddr") == null) &&
-+         (arguments.indexOf("-m") == -1) &&
-+         (arguments.indexOf("-c") == -1) ) {
-+      if(debug) debug("You need to specify a Ceph monitor address.");
-+      throw new IOException("You must specify a Ceph monitor address or config file!");
++                                       (arguments.indexOf("-m") == -1) &&
++                                       (arguments.indexOf("-c") == -1) ) {
++                              ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
++                              throw new IOException("You must specify a Ceph monitor address or config file!");
 +      }
 +      //  Initialize the client
-+      if (!ceph_initializeClient(arguments,
-+                               conf.getInt("fs.ceph.blockSize", 1<<26))) {
-+      if(debug) debug("Ceph initialization failed!");
-+      throw new IOException("Ceph initialization failed!");
++      if (!ceph.ceph_initializeClient(arguments,
++                                                                                                                               conf.getInt("fs.ceph.blockSize", 1<<26))) {
++                              ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
++                              throw new IOException("Ceph initialization failed!");
 +      }
 +      initialized = true;
-+      if(debug) debug("Initialized client. Setting cwd to /");
-+      ceph_setcwd("/");
++      ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
++      ceph.ceph_setcwd("/");
 +    }
-+    if(debug) debug("initialize:exit");
++    ceph.debug("initialize:exit", ceph.DEBUG);
 +  }
 +
 +  /**
@@ -195,38 +294,42 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   */
 +  @Override
 +  public void close() throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                       +"CephFileSystem before calling other methods.");
-+    if(debug) debug("close:enter");
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("close:enter", ceph.DEBUG);
 +    super.close();//this method does stuff, make sure it's run!
-+    ceph_kill_client();
-+    if(debug) debug("close:exit");
++              ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
++    ceph.ceph_kill_client();
++    ceph.debug("close:exit", ceph.DEBUG);
 +  }
 +
 +  /**
 +   * Get an FSDataOutputStream to append onto a file.
 +   * @param file The File you want to append onto
-+   * @param bufferSize Ceph does internal buffering; this is ignored.
++   * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
 +   * @param progress The Progressable to report progress to.
 +   * Reporting is limited but exists.
 +   * @return An FSDataOutputStream that connects to the file on Ceph.
 +   * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
 +   */
 +  public FSDataOutputStream append (Path file, int bufferSize,
-+                                  Progressable progress) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                       +"CephFileSystem before calling other methods.");
-+    if(debug) debug("append:enter with path " + file + " bufferSize " + bufferSize);
++                                                                                                                                              Progressable progress) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(file);
 +    if (progress!=null) progress.progress();
-+    int fd = ceph_open_for_append(abs_path.toString());
++              ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
++    int fd = ceph.ceph_open_for_append(abs_path.toString());
++              ceph.debug("append: Returned to Java", ceph.TRACE);
 +    if (progress!=null) progress.progress();
 +    if( fd < 0 ) { //error in open
 +      throw new IOException("append: Open for append failed on path \"" +
-+                          abs_path.toString() + "\"");
++                                                                                                              abs_path.toString() + "\"");
 +    }
-+    CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
-+    if(debug) debug("append:exit");
++    CephOutputStream cephOStream = new CephOutputStream(getConf(),
++                                                                                                                                                                                                                              ceph, fd, bufferSize);
++    ceph.debug("append:exit", ceph.DEBUG);
 +    return new FSDataOutputStream(cephOStream, statistics);
 +  }
 +
@@ -236,10 +339,10 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   */
 +  public Path getWorkingDirectory() {
 +    if (!initialized) return null;
-+    if(debug) debug("getWorkingDirectory:enter");
-+    if(debug) debug("Working directory is " + ceph_getcwd());
-+    if(debug) debug("getWorkingDirectory:exit");
-+    return new Path(fs_default_name + ceph_getcwd());
++    ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
++              String cwd = ceph.ceph_getcwd();
++    ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
++    return new Path(fs_default_name + ceph.ceph_getcwd());
 +  }
 +
 +  /**
@@ -250,15 +353,14 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param dir The directory to change to.
 +   */
 +  @Override
-+  public void setWorkingDirectory(Path dir) {
++      public void setWorkingDirectory(Path dir) {
 +    if (!initialized) return;
-+    if(debug) debug("setWorkingDirecty:enter with new working dir " + dir);
++    ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(dir);
-+    if(debug) debug("calling ceph_setcwd from Java");
-+    if (!ceph_setcwd(abs_path.toString()))
-+      if(debug) debug("Warning:ceph_setcwd failed for some reason on path " + abs_path);
-+    if(debug) debug("returned from ceph_setcwd to Java" );
-+    if(debug) debug("setWorkingDirectory:exit");
++    ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
++    if (!ceph.ceph_setcwd(abs_path.toString()))
++      ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
++    ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
 +  }
 +
 +  /**
@@ -269,22 +371,22 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+  public boolean exists(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("exists:enter with path " + path);
++      public boolean exists(Path path) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("exists:enter with path " + path, ceph.DEBUG);
 +    boolean result;
 +    Path abs_path = makeAbsolute(path);
 +    if (abs_path.equals(root)) {
 +      result = true;
 +    }
 +    else {
-+      if(debug) debug("Calling ceph_exists from Java on path "
-+          + abs_path.toString() + ":");
-+      result =  ceph_exists(abs_path.toString());
-+      if(debug) debug("Returned from ceph_exists to Java");
++      ceph.debug("exists:Calling ceph_exists from Java on path "
++                                                                                      + abs_path.toString(), ceph.TRACE);
++      result =  ceph.ceph_exists(abs_path.toString());
++      ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
 +    }
-+    if(debug) debug("exists:exit with value " + result);
++    ceph.debug("exists:exit with value " + result, ceph.DEBUG);
 +    return result;
 +  }
 +
@@ -294,43 +396,52 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param path The directory path to create
 +   * @param perms The permissions to apply to the created directories.
 +   * @return true if successful, false otherwise
-+   * @throws IOException if initialize() hasn't been called.
++   * @throws IOException if initialize() hasn't been called or the path
++       *  is a child of a file.
 +   */
++      @Override
 +  public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("mkdirs:enter with path " + path);
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
-+    if(debug) debug("calling ceph_mkdirs from Java");
-+    int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
-+    if(debug) debug("Returned from ceph_mkdirs to Java with result " + result);
-+    if(debug) debug("mkdirs:exit with result " + result);
-+    if (result != 0)
++    ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
++    int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
++    if (result != 0) {
++                      ceph.debug("mkdirs: make directory " + abs_path
++                                                               + "Failing with result " + result, ceph.WARN);
++                      if (ceph.ENOTDIR == result)
++                              throw new FileAlreadyExistsException("Parent path is not a directory");
 +      return false;
-+    else return true;
++              }
++    else {
++                      ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
++                      return true;
++              }
 +  }
 +
 +  /**
 +   * Check if a path is a file. This is moderately faster than the
 +   * generic implementation.
 +   * @param path The path to check.
-+   * @return true if the path is a file, false otherwise.
++   * @return true if the path is definitely a file, false otherwise.
 +   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+  public boolean isFile(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("isFile:enter with path " + path);
++      public boolean isFile(Path path) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    boolean result;
 +    if (abs_path.equals(root)) {
 +      result =  false;
 +    }
 +    else {
-+      result = ceph_isfile(abs_path.toString());
++                      ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
++      result = ceph.ceph_isfile(abs_path.toString());
 +    }
-+    if(debug) debug("isFile:exit with result " + result);
++    ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
 +    return result;
 +  }
 +
@@ -338,25 +449,25 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Check if a path is a directory. This is moderately faster than
 +   * the generic implementation.
 +   * @param path The path to check
-+   * @return true if the path is a directory, false otherwise.
++   * @return true if the path is definitely a directory, false otherwise.
 +   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+  public boolean isDirectory(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("isDirectory:enter with path " + path);
++      public boolean isDirectory(Path path) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    boolean result;
 +    if (abs_path.equals(root)) {
 +      result = true;
 +    }
 +    else {
-+      if(debug) debug("calling ceph_isdirectory from Java");
-+      result = ceph_isdirectory(abs_path.toString());
-+      if(debug) debug("Returned from ceph_isdirectory to Java");
++      ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
++      result = ceph.ceph_isdirectory(abs_path.toString());
++      ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
 +    }
-+    if(debug) debug("isDirectory:exit with result " + result);
++    ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG);
 +    return result;
 +  }
 +
@@ -369,31 +480,32 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @throws FileNotFoundException if the path could not be resolved.
 +   */
 +  public FileStatus getFileStatus(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("getFileStatus:enter with path " + path);
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    //sadly, Ceph doesn't really do uids/gids just yet, but
 +    //everything else is filled
 +    FileStatus status;
 +    Stat lstat = new Stat();
-+    if(ceph_stat(abs_path.toString(), lstat)) {
++              ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
++    if(ceph.ceph_stat(abs_path.toString(), lstat)) {
 +      status = new FileStatus(lstat.size, lstat.is_dir,
-+                            ceph_replication(abs_path.toString()),
-+                            lstat.block_size,
-+                            lstat.mod_time,
-+                            lstat.access_time,
-+                            new FsPermission((short)lstat.mode),
-+                            null,
-+                            null,
-+                            new Path(fs_default_name+abs_path.toString()));
++                                                                                                                      ceph.ceph_replication(abs_path.toString()),
++                                                                                                                      lstat.block_size,
++                                                                                                                      lstat.mod_time,
++                                                                                                                      lstat.access_time,
++                                                                                                                      new FsPermission((short)lstat.mode),
++                                                                                                                      null,
++                                                                                                                      null,
++                                                                                                                      new Path(fs_default_name+abs_path.toString()));
 +    }
 +    else { //fail out
-+      throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
-+                                      + path + " does not exist or could not be accessed");
++                      throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
++                                                                                                                                                      + path + " does not exist or could not be accessed");
 +    }
 +
-+    if(debug) debug("getFileStatus:exit");
++    ceph.debug("getFileStatus:exit", ceph.DEBUG);
 +    return status;
 +  }
 +
@@ -406,17 +518,17 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @throws FileNotFoundException if the input path can't be found.
 +   */
 +  public FileStatus[] listStatus(Path path) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("listStatus:enter with path " + path);
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("listStatus:enter with path " + path, ceph.WARN);
 +    Path abs_path = makeAbsolute(path);
 +    Path[] paths = listPaths(abs_path);
 +    if (paths != null) {
 +      FileStatus[] statuses = new FileStatus[paths.length];
 +      for (int i = 0; i < paths.length; ++i) {
-+      statuses[i] = getFileStatus(paths[i]);
++                              statuses[i] = getFileStatus(paths[i]);
 +      }
-+      if(debug) debug("listStatus:exit");
++      ceph.debug("listStatus:exit", ceph.DEBUG);
 +      return statuses;
 +    }
 +    if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
@@ -425,11 +537,15 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +  }
 +
 +  @Override
-+    public void setPermission(Path p, FsPermission permission) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
++  public void setPermission(Path p, FsPermission permission) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++              ceph.debug("setPermission:enter with path " + p
++                                      + " and permissions " + permission, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(p);
-+    ceph_setPermission(abs_path.toString(), permission.toShort());
++              ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
++    ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
++              ceph.debug("setPermission:exit", ceph.DEBUG);
 +  }
 +
 +  /**
@@ -438,15 +554,19 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param mtime Set modification time in number of millis since Jan 1, 1970.
 +   * @param atime Set access time in number of millis since Jan 1, 1970.
 +   */
-+@Override
-+  public void setTimes(Path p, long mtime, long atime) throws IOException {
-+  if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+  Path abs_path = makeAbsolute(p);
-+  int r = ceph_setTimes(abs_path.toString(), mtime, atime);
-+  if (r<0) throw new IOException ("Failed to set times on path "
-+                                + abs_path.toString() + " Error code: " + r);
-+}
++      @Override
++      public void setTimes(Path p, long mtime, long atime) throws IOException {
++              if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++              ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
++                                      " atime:" + atime, ceph.DEBUG);
++              Path abs_path = makeAbsolute(p);
++              ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
++              int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
++              if (r<0) throw new IOException ("Failed to set times on path "
++                                                                                                                                              + abs_path.toString() + " Error code: " + r);
++              ceph.debug("setTimes:exit", ceph.DEBUG);
++      }
 +
 +  /**
 +   * Create a new file and open an FSDataOutputStream that's connected to it.
@@ -454,7 +574,8 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @param permission The permissions to apply to the file.
 +   * @param flag If CreateFlag.OVERWRITE, overwrite any existing
 +   * file with this name; otherwise don't.
-+   * @param bufferSize Ceph does internal buffering; this is ignored.
++   * @param bufferSize Ceph does internal buffering, but you can buffer
++       *   in the Java code too if you like.
 +   * @param replication Ignored by Ceph. This can be
 +   * configured via Ceph configuration.
 +   * @param blockSize Ignored by Ceph. You can set client-wide block sizes
@@ -467,17 +588,17 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * failure in attempting to open for append with Ceph.
 +   */
 +  public FSDataOutputStream create(Path path,
-+                                 FsPermission permission,
-+                                 EnumSet<CreateFlag> flag,
-+                                 //boolean overwrite,
-+                                 int bufferSize,
-+                                 short replication,
-+                                 long blockSize,
-+                                 Progressable progress
-+                                 ) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("create:enter with path " + path);
++                                                                                                                                       FsPermission permission,
++                                                                                                                                       EnumSet<CreateFlag> flag,
++                                                                                                                                       //boolean overwrite,
++                                                                                                                                       int bufferSize,
++                                                                                                                                       short replication,
++                                                                                                                                       long blockSize,
++                                                                                                                                       Progressable progress
++                                                                                                                                       ) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("create:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    if (progress!=null) progress.progress();
 +    // We ignore replication since that's not configurable here, and
@@ -489,13 +610,13 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +    boolean exists = exists(abs_path);
 +    if (exists) {
 +      if(isDirectory(abs_path))
-+      throw new IOException("create: Cannot overwrite existing directory \""
-+                            + path.toString() + "\" with a file");
++                              throw new IOException("create: Cannot overwrite existing directory \""
++                                                                                                                      + path.toString() + "\" with a file");
 +      //if (!overwrite)
 +      if (!flag.contains(CreateFlag.OVERWRITE))
-+      throw new IOException("createRaw: Cannot open existing file \"" 
-+                            + abs_path.toString() 
-+                            + "\" for writing without overwrite flag");
++                              throw new IOException("createRaw: Cannot open existing file \"" 
++                                                                                                                      + abs_path.toString() 
++                                                                                                                      + "\" for writing without overwrite flag");
 +    }
 +
 +    if (progress!=null) progress.progress();
@@ -504,68 +625,73 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +    if (!exists) {
 +      Path parent = abs_path.getParent();
 +      if (parent != null) { // if parent is root, we're done
-+      int r = ceph_mkdirs(parent.toString(), permission.toShort());
-+      if (!(r==0 || r==-EEXIST))
-+        throw new IOException ("Error creating parent directory; code: " + r);
++                              int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
++                              if (!(r==0 || r==-ceph.EEXIST))
++                                      throw new IOException ("Error creating parent directory; code: " + r);
 +      }
 +      if (progress!=null) progress.progress();
 +    }
 +    // Step 3: open the file
-+    if(debug) debug("calling ceph_open_for_overwrite from Java");
-+    int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
++    ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
++    int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
 +    if (progress!=null) progress.progress();
-+    if(debug) debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
++    ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
 +    if (fh < 0) {
 +      throw new IOException("create: Open for overwrite failed on path \"" + 
-+                          path.toString() + "\"");
++                                                                                                              path.toString() + "\"");
 +    }
 +      
 +    // Step 4: create the stream
-+    OutputStream cephOStream = new CephOutputStream(getConf(), fh);
-+    if(debug) debug("create:exit");
++    OutputStream cephOStream = new CephOutputStream(getConf(),
++                                                                                                                                                                                                              ceph, fh, bufferSize);
++    ceph.debug("create:exit", ceph.DEBUG);
 +    return new FSDataOutputStream(cephOStream, statistics);
-+    }
++      }
 +
 +  /**
 +   * Open a Ceph file and attach the file handle to an FSDataInputStream.
 +   * @param path The file to open
-+   * @param bufferSize Ceph does internal buffering; this is ignored.
++   * @param bufferSize Ceph does internal buffering; but you can buffer in
++       *   the Java code too if you like.
 +   * @return FSDataInputStream reading from the given path.
 +   * @throws IOException if initialize() hasn't been called, the path DNE or is a
 +   * directory, or there is an error getting data to set up the FSDataInputStream.
 +   */
 +  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("open:enter with path " + path);
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("open:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    
-+    int fh = ceph_open_for_read(abs_path.toString());
++    int fh = ceph.ceph_open_for_read(abs_path.toString());
 +    if (fh < 0) { //uh-oh, something's bad!
-+      if (fh == -ENOENT) //well that was a stupid open
-+      throw new IOException("open:  absolute path \""  + abs_path.toString()
-+                            + "\" does not exist");
++      if (fh == -ceph.ENOENT) //well that was a stupid open
++                              throw new IOException("open:  absolute path \""  + abs_path.toString()
++                                                                                                                      + "\" does not exist");
 +      else //hrm...the file exists but we can't open it :(
-+      throw new IOException("open: Failed to open file " + abs_path.toString());
++                              throw new IOException("open: Failed to open file " + abs_path.toString());
 +    }
 +
 +    if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
 +      //but that doesn't mean you should in Hadoop!
-+      ceph_close(fh);
++      ceph.ceph_close(fh);
 +      throw new IOException("open:  absolute path \""  + abs_path.toString()
-+                          + "\" is a directory!");
++                                                                                                              + "\" is a directory!");
 +    }
 +    Stat lstat = new Stat();
-+    ceph_stat(abs_path.toString(), lstat);
++              ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
++    ceph.ceph_stat(abs_path.toString(), lstat);
++              ceph.debug("open:returned to Java", ceph.TRACE);
 +    long size = lstat.size;
 +    if (size < 0) {
 +      throw new IOException("Failed to get file size for file " + abs_path.toString() + 
-+                          " but succeeded in opening file. Something bizarre is going on.");
++                                                                                                              " but succeeded in opening file. Something bizarre is going on.");
 +    }
-+    FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
-+    if(debug) debug("open:exit");
++    FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
++                                                                                                                                                                                                              fh, size, bufferSize);
++    ceph.debug("open:exit", ceph.DEBUG);
 +    return new FSDataInputStream(cephIStream);
-+    }
++      }
 +
 +  /**
 +   * Rename a file or directory.
@@ -575,16 +701,25 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+  public boolean rename(Path src, Path dst) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("rename:enter");
-+    if(debug) debug("calling ceph_rename from Java");
++      public boolean rename(Path src, Path dst) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
 +    Path abs_src = makeAbsolute(src);
 +    Path abs_dst = makeAbsolute(dst);
-+    boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
-+    if(debug) debug("return from ceph_rename to Java with result " + result);
-+    if(debug) debug("rename:exit");
++    ceph.debug("calling ceph_rename from Java", ceph.TRACE);
++    boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
++              if (!result) {
++                      if (isDirectory(abs_dst)) { //move the srcdir into destdir
++                              ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
++                              Path new_dst = new Path(abs_dst, abs_src.getName());
++                              result = rename(abs_src, new_dst);
++                              ceph.debug("attempt to move " + abs_src.toString()
++                                                                       + " to " + new_dst.toString()
++                                                                       + "has result:" + result, ceph.NOLOG);
++                      }
++              }
++    ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
 +    return result;
 +  }
 +
@@ -603,28 +738,48 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @throws IOException if initialize() hasn't been called.
 +   */
 +  @Override
-+  public BlockLocation[] getFileBlockLocations(FileStatus file,
-+                        long start, long len) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
++      public BlockLocation[] getFileBlockLocations(FileStatus file,
++                                                                                                                                                                                               long start, long len) throws IOException {
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++              ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
++                                      ", start pos " + start + ", length " + len, ceph.DEBUG);
 +    //sanitize and get the filehandle
 +    Path abs_path = makeAbsolute(file.getPath());
-+    int fh = ceph_open_for_read(abs_path.toString());
++              ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
++    int fh = ceph.ceph_open_for_read(abs_path.toString());
++              ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
++                                      + fh, ceph.TRACE);
 +    if (fh < 0) {
++                      ceph.debug("getFileBlockLocations:got error " + fh +
++                                              ", exiting and returning null!", ceph.ERROR);
 +      return null;
 +    }
 +    //get the block size
-+    long blockSize = ceph_getblocksize(abs_path.toString());
++              ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
++    long blockSize = ceph.ceph_getblocksize(abs_path.toString());
++              ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
 +    BlockLocation[] locations =
-+      new BlockLocation[(int)Math.ceil((len-start)/(float)blockSize)];
++      new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
++              long offset;
 +    for (int i = 0; i < locations.length; ++i) {
-+      String host = ceph_hosts(fh, start + i*blockSize);
++                      offset = start + i*blockSize;
++                      ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
++                                              + fh + " and offset " + offset, ceph.TRACE);
++      String host = ceph.ceph_hosts(fh, offset);
++                      ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
++                                              + host, ceph.TRACE);
 +      String[] hostArray = new String[1];
 +      hostArray[0] = host;
 +      locations[i] = new BlockLocation(hostArray, hostArray,
-+                                     start+i*blockSize, blockSize);
++                                                                                                                                                       start+i*blockSize-(start % blockSize),
++                                                                                                                                                       blockSize);
 +    }
-+    ceph_close(fh);
++              ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
++                                      + fh, ceph.TRACE);
++    ceph.ceph_close(fh);
++              ceph.debug("getFileBlockLocations:return with " + locations.length
++                                      + " locations", ceph.DEBUG);
 +    return locations;
 +  }
 +  
@@ -632,32 +787,33 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * Get usage statistics on the Ceph filesystem.
 +   * @param path A path to the partition you're interested in.
 +   * Ceph doesn't partition, so this is ignored.
-+   * @return FsStatus reporting capacity, usage, and remaining spac.
++   * @return FsStatus reporting capacity, usage, and remaining space.
 +   * @throws IOException if initialize() hasn't been called, or the
 +   * stat somehow fails.
 +   */
 +  @Override
-+  public FsStatus getStatus (Path path) throws IOException {
-+    if (!initialized) throw new IOException("You have to initialize the"
-+                    + " CephFileSystem before calling other methods.");
-+    if(debug) debug("getStatus:enter");
++      public FsStatus getStatus (Path path) throws IOException {
++    if (!initialized) throw new IOException("You have to initialize the "
++                                                                                                                                                                              + " CephFileSystem before calling other methods.");
++    ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    
-+    //currently(Ceph .14) Ceph actually ignores the path
++    //currently(Ceph .16) Ceph actually ignores the path
 +    //but we still pass it in; if Ceph stops ignoring we may need more
 +    //error-checking code.
 +    CephStat ceph_stat = new CephStat();
-+    int result = ceph_statfs(abs_path.toString(), ceph_stat);
++              ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
++    int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
 +    if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
-+    if(debug) debug("getStatus:exit");
++    ceph.debug("getStatus:exit successfully", ceph.DEBUG);
 +    return new FsStatus(ceph_stat.capacity,
-+                      ceph_stat.used, ceph_stat.remaining);
++                                                                                              ceph_stat.used, ceph_stat.remaining);
 +  }
 +
 +  /**
 +   * Delete the given path, and optionally its children.
 +   * @param path the path to delete.
-+   * @param recursive If the path is a directory and this is false,
++   * @param recursive If the path is a non-empty directory and this is false,
 +   * delete will throw an IOException. If path is a file this is ignored.
 +   * @return true if the delete succeeded, false otherwise (including if
 +   * path doesn't exist).
@@ -666,56 +822,59 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * or you attempt to delete the root directory.
 +   */
 +  public boolean delete(Path path, boolean recursive) throws IOException {
-+    if (!initialized) throw new IOException ("You have to initialize the"
-+                     +"CephFileSystem before calling other methods.");
-+    if(debug) debug("delete:enter");
++    if (!initialized) throw new IOException ("You have to initialize the "
++                                                                                                                                                                               +"CephFileSystem before calling other methods.");
++    ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
++                                      ceph.DEBUG);
 +    Path abs_path = makeAbsolute(path);
 +    
-+    if(debug) debug("delete: Deleting path " + abs_path.toString());
 +    // sanity check
 +    if (abs_path.equals(root))
 +      throw new IOException("Error: deleting the root directory is a Bad Idea.");
-+
 +    if (!exists(abs_path)) return false;
 +
 +    // if the path is a file, try to delete it.
 +    if (isFile(abs_path)) {
-+      boolean result = ceph_unlink(abs_path.toString());
++                      ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
++                                              ceph.TRACE);
++      boolean result = ceph.ceph_unlink(abs_path.toString());
 +      if(!result)
-+      if(debug) debug("delete: failed to delete file \"" +
-+                      abs_path.toString() + "\".");
-+      if(debug) debug("delete:exit");
++                              ceph.debug("delete: failed to delete file \"" +
++                                                                                              abs_path.toString() + "\".", ceph.ERROR);
++      ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
 +      return result;
 +    }
 +
 +    /* The path is a directory, so recursively try to delete its contents,
 +       and then delete the directory. */
-+    if (!recursive) {
-+      throw new IOException("Directories must be deleted recursively!");
-+    }
 +    //get the entries; listPaths will remove . and .. for us
 +    Path[] contents = listPaths(abs_path);
 +    if (contents == null) {
-+      if(debug) debug("delete: Failed to read contents of directory \"" +
-+                    abs_path.toString() + "\" while trying to delete it");
-+      if(debug) debug("delete:exit");
++      ceph.debug("delete: Failed to read contents of directory \"" +
++                                              abs_path.toString() +
++                                              "\" while trying to delete it, BAILING", ceph.ERROR);
 +      return false;
 +    }
++    if (!recursive && contents.length > 0) {
++      throw new IOException("Directories must be deleted recursively!");
++    }
 +    // delete the entries
++              ceph.debug("delete: recursively calling delete on contents of "
++                                      + abs_path, ceph.DEBUG);
 +    for (Path p : contents) {
 +      if (!delete(p, true)) {
-+      if(debug) debug("delete: Failed to delete file \"" + 
-+                      p.toString() + "\" while recursively deleting \""
-+                      + abs_path.toString() + "\"" );
-+      if(debug) debug("delete:exit");
-+      return false;
++                              ceph.debug("delete: Failed to delete file \"" + 
++                                                                                              p.toString() + "\" while recursively deleting \""
++                                                                                              + abs_path.toString() + "\", BAILING", ceph.ERROR );
++                              return false;
 +      }
 +    }
 +    //if we've come this far it's a now-empty directory, so delete it!
-+    boolean result = ceph_rmdir(abs_path.toString());
++    boolean result = ceph.ceph_rmdir(abs_path.toString());
 +    if (!result)
-+      if(debug) debug("delete: failed to delete \"" + abs_path.toString() + "\"");
-+    if(debug) debug("delete:exit");
++      ceph.debug("delete: failed to delete \"" + abs_path.toString()
++                                              + "\", BAILING", ceph.ERROR);
++    ceph.debug("delete:exit", ceph.DEBUG);
 +    return result;
 +  }
 +
@@ -725,7 +884,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * by a separate Ceph configuration.
 +   */
 +  @Override
-+  public short getDefaultReplication() {
++      public short getDefaultReplication() {
 +    return 1;
 +  }
 +
@@ -734,67 +893,65 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +   * @return the default block size, in bytes, as a long.
 +   */
 +  @Override
-+  public long getDefaultBlockSize() {
++      public long getDefaultBlockSize() {
 +    return getConf().getInt("fs.ceph.blockSize", 1<<26);
 +  }
 +
 +  // Makes a Path absolute. In a cheap, dirty hack, we're
 +  // also going to strip off any fs_default_name prefix we see. 
 +  private Path makeAbsolute(Path path) {
-+    if(debug) debug("makeAbsolute:enter with path " + path);
++    ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
 +    if (path == null) return new Path("/");
 +    // first, check for the prefix
 +    if (path.toString().startsWith(fs_default_name)) {          
 +      Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
-+      if(debug) debug("makeAbsolute:exit with path " + stripped_path);
++      ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
 +      return stripped_path;
 +    }
 +
 +    if (path.isAbsolute()) {
-+      if(debug) debug("makeAbsolute:exit with path " + path);
++      ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG);
 +      return path;
 +    }
-+    Path new_path = new Path(ceph_getcwd(), path);
-+    if(debug) debug("makeAbsolute:exit with path " + new_path);
++    Path new_path = new Path(ceph.ceph_getcwd(), path);
++    ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
 +    return new_path;
 +  }
 + 
 +  private Path[] listPaths(Path path) throws IOException {
-+    if(debug) debug("listPaths:enter with path " + path);
++    ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
 +    String dirlist[];
 +
 +    Path abs_path = makeAbsolute(path);
 +
 +    // If it's a directory, get the listing. Otherwise, complain and give up.
-+    if(debug) debug("calling ceph_getdir from Java with path " + abs_path);
-+    dirlist = ceph_getdir(abs_path.toString());
-+    if(debug) debug("returning from ceph_getdir to Java");
++    ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG);
++    dirlist = ceph.ceph_getdir(abs_path.toString());
++    ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG);
 +
 +    if (dirlist == null) {
-+      throw new IOException("listPaths: path " + path.toString() + " is not a directory.");
++      return null;
 +    }
 +    
 +    // convert the strings to Paths
 +    Path[] paths = new Path[dirlist.length];
 +    for (int i = 0; i < dirlist.length; ++i) {
-+      if(debug) debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
-+                       dirlist[i] + "\"");
++      ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
++                                                                                      dirlist[i] + "\"", ceph.TRACE);
 +      // convert each listing to an absolute path
 +      Path raw_path = new Path(dirlist[i]);
 +      if (raw_path.isAbsolute())
-+      paths[i] = raw_path;
++                              paths[i] = raw_path;
 +      else
-+      paths[i] = new Path(abs_path, raw_path);
++                              paths[i] = new Path(abs_path, raw_path);
 +    }
-+    if(debug) debug("listPaths:exit");
++    ceph.debug("listPaths:exit", ceph.NOLOG);
 +    return paths;
 +  }
 +
-+  private void debug(String statement) {
-+    System.err.println(statement);
-+  }
++  
 +
-+  private static class Stat {
++  static class Stat {
 +    public long size;
 +    public boolean is_dir;
 +    public long block_size;
@@ -805,7 +962,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +    public Stat(){}
 +  }
 +
-+  private static class CephStat {
++  static class CephStat {
 +    public long capacity;
 +    public long used;
 +    public long remaining;
@@ -813,12 +970,774 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
 +    public CephStat() {}
 +  }
 +}
+Index: src/java/org/apache/hadoop/fs/ceph/CephFS.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephFS.java     (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephFS.java     (revision 0)
+@@ -0,0 +1,272 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ * 
++ * Abstract base class for communicating with a Ceph filesystem and its
++ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
++ * As only the Ceph package should be using this directly, all methods
++ * are protected.
++ */
++package org.apache.hadoop.fs.ceph;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++abstract class CephFS {
++
++      protected static final int FATAL = 0;
++      protected static final int ERROR = 1;
++      protected static final int WARN = 2;
++      protected static final int INFO = 3;
++      protected static final int DEBUG = 4;
++      protected static final int TRACE = 5;
++      protected static final int NOLOG = 6;
++
++      protected static final int ENOTDIR = 20;
++  protected static final int EEXIST = 17;
++  protected static final int ENOENT = 2;
++
++      private boolean debug = false;
++      private Log LOG;
++
++      public CephFS(Configuration conf, Log log) {
++      debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++                      LOG = log;
++      }
++
++      /*
++       * Performs any necessary setup to allow general use of the filesystem.
++       * Inputs:
++       *  String argsuments -- a command-line style input of Ceph config params
++       *  int block_size -- the size in bytes to use for blocks
++       * Returns: true on success, false otherwise
++       */
++      abstract protected boolean ceph_initializeClient(String arguments, int block_size);
++      
++      /*
++       * Returns the current working directory (absolute) as a String
++       */
++      abstract protected String ceph_getcwd();
++      /*
++       * Changes the working directory.
++       * Inputs:
++       *  String path: The path (relative or absolute) to switch to
++       * Returns: true on success, false otherwise.
++       */
++      abstract protected boolean ceph_setcwd(String path);
++      /*
++       * Given a path to a directory, removes the directory if empty.
++       * Inputs:
++       *  jstring j_path: The path (relative or absolute) to the directory
++       * Returns: true on successful delete; false otherwise
++       */
++  abstract protected boolean ceph_rmdir(String path);
++      /*
++       * Given a path, unlinks it.
++       * Inputs:
++       *  String path: The path (relative or absolute) to the file or empty dir
++       * Returns: true if the unlink occurred, false otherwise.
++       */
++  abstract protected boolean ceph_unlink(String path);
++      /*
++       * Changes a given path name to a new name, assuming new_path doesn't exist.
++       * Inputs:
++       *  jstring j_from: The path whose name you want to change.
++       *  jstring j_to: The new name for the path.
++       * Returns: true if the rename occurred, false otherwise
++       */
++  abstract protected boolean ceph_rename(String old_path, String new_path);
++      /*
++       * Returns true if it the input path exists, false
++       * if it does not or there is an unexpected failure.
++       */
++  abstract protected boolean ceph_exists(String path);
++      /*
++       * Get the block size for a given path.
++       * Input:
++       *  String path: The path (relative or absolute) you want
++       *  the block size for.
++       * Returns: block size if the path exists, otherwise a negative number
++       *  corresponding to the standard C++ error codes (which are positive).
++       */
++  abstract protected long ceph_getblocksize(String path);
++      /*
++       * Returns true if the given path is a directory, false otherwise.
++       */
++  abstract protected boolean ceph_isdirectory(String path);
++      /*
++       * Returns true if the given path is a file; false otherwise.
++       */
++  abstract protected boolean ceph_isfile(String path);
++      /*
++       * Get the contents of a given directory.
++       * Inputs:
++       *  String path: The path (relative or absolute) to the directory.
++       * Returns: A Java String[] of the contents of the directory, or
++       *  NULL if there is an error (ie, path is not a dir). This listing
++       *  will not contain . or .. entries.
++       */
++  abstract protected String[] ceph_getdir(String path);
++      /*
++       * Create the specified directory and any required intermediate ones with the
++       * given mode.
++       */
++  abstract protected int ceph_mkdirs(String path, int mode);
++      /*
++       * Open a file to append. If the file does not exist, it will be created.
++       * Opening a dir is possible but may have bad results.
++       * Inputs:
++       *  String path: The path to open.
++       * Returns: an int filehandle, or a number<0 if an error occurs.
++       */
++  abstract protected int ceph_open_for_append(String path);
++      /*
++       * Open a file for reading.
++       * Opening a dir is possible but may have bad results.
++       * Inputs:
++       *  String path: The path to open.
++       * Returns: an int filehandle, or a number<0 if an error occurs.
++       */
++  abstract protected int ceph_open_for_read(String path);
++      /*
++       * Opens a file for overwriting; creates it if necessary.
++       * Opening a dir is possible but may have bad results.
++       * Inputs:
++       *  String path: The path to open.
++       *  int mode: The mode to open with.
++       * Returns: an int filehandle, or a number<0 if an error occurs.
++       */
++  abstract protected int ceph_open_for_overwrite(String path, int mode);
++      /*
++       * Closes the given file. Returns 0 on success, or a negative
++       * error code otherwise.
++       */
++  abstract protected int ceph_close(int filehandle);
++      /*
++       * Change the mode on a path.
++       * Inputs:
++       *  String path: The path to change mode on.
++       *  int mode: The mode to apply.
++       * Returns: true if the mode is properly applied, false if there
++       *  is any error.
++       */
++  abstract protected boolean ceph_setPermission(String path, int mode);
++      /*
++       * Closes the Ceph client. This should be called before shutting down
++       * (multiple times is okay but redundant).
++       */
++  abstract protected boolean ceph_kill_client();
++      /*
++       * Get the statistics on a path returned in a custom format defined
++       * in CephFileSystem.
++       * Inputs:
++       *  String path: The path to stat.
++       *  Stat fill: The stat object to fill.
++       * Returns: true if the stat is successful, false otherwise.
++       */
++  abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
++      /*
++       * Statfs a filesystem in a custom format defined in CephFileSystem.
++       * Inputs:
++       *  String path: A path on the filesystem that you wish to stat.
++       *  CephStat fill: The CephStat object to fill.
++       * Returns: 0 if successful and the CephStat is filled; a negative
++       *  error code otherwise.
++       */
++  abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
++      /*
++       * Check how many times a path should be replicated (if it is
++       * degraded it may not actually be replicated this often).
++       * Inputs:
++       *  String path: The path to check.
++       * Returns: an int containing the number of times replicated.
++       */
++  abstract protected int ceph_replication(String path);
++      /*
++       * Find the IP address of the primary OSD for a given file and offset.
++       * Inputs:
++       *  int fh: The filehandle for the file.
++       *  long offset: The offset to get the location of.
++       * Returns: a String of the location as IP, or NULL if there is an error.
++       */
++  abstract protected String ceph_hosts(int fh, long offset);
++      /*
++       * Set the mtime and atime for a given path.
++       * Inputs:
++       *  String path: The path to set the times for.
++       *  long mtime: The mtime to set, in millis since epoch (-1 to not set).
++       *  long atime: The atime to set, in millis since epoch (-1 to not set)
++       * Returns: 0 if successful, an error code otherwise.
++       */
++  abstract protected int ceph_setTimes(String path, long mtime, long atime);
++      /*
++       * Get the current position in a file (as a long) of a given filehandle.
++       * Returns: (long) current file position on success, or a
++       *  negative error code on failure.
++       */
++  abstract protected long ceph_getpos(int fh);
++      /*
++       * Write the given buffer contents to the given filehandle.
++       * Inputs:
++       *  int fh: The filehandle to write to.
++       *  byte[] buffer: The buffer to write from
++       *  int buffer_offset: The position in the buffer to write from
++       *  int length: The number of (sequential) bytes to write.
++       * Returns: int, on success the number of bytes written, on failure
++       *  a negative error code.
++       */
++  abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++      /*
++       * Reads into the given byte array from the current position.
++       * Inputs:
++       *  int fh: the filehandle to read from
++       *  byte[] buffer: the byte array to read into
++       *  int buffer_offset: where in the buffer to start writing
++       *  int length: how much to read.
++       * There'd better be enough space in the buffer to write all
++       * the data from the given offset!
++       * Returns: the number of bytes read on success (as an int),
++       *  or an error code otherwise.  */
++  abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++      /*
++       * Seeks to the given position in the given file.
++       * Inputs:
++       *  int fh: The filehandle to seek in.
++       *  long pos: The position to seek to.
++       * Returns: the new position (as a long) of the filehandle on success,
++       *  or a negative error code on failure.         */
++  abstract protected long ceph_seek_from_start(int fh, long pos);
++    
++      protected void debug(String statement, int priority) {
++    if (debug) System.err.println(statement);
++              switch(priority) {
++              case FATAL: LOG.fatal(statement);
++                      break;
++              case ERROR: LOG.error(statement);
++                      break;
++              case WARN: LOG.warn(statement);
++                      break;
++              case INFO: LOG.info(statement);
++                      break;
++              case DEBUG: LOG.debug(statement);
++                      break;
++              case TRACE: LOG.trace(statement);
++                      break;
++              case NOLOG: break;
++              default: break;
++              }
++  }
++}
+Index: src/java/org/apache/hadoop/fs/ceph/CephFaker.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephFaker.java  (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephFaker.java  (revision 0)
+@@ -0,0 +1,480 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ * 
++ * This uses the local Filesystem but pretends to be communicating
++ * with a Ceph deployment, for unit testing the CephFileSystem.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++import java.net.URI;
++import java.util.Hashtable;
++import java.io.Closeable;
++import java.io.FileNotFoundException;
++import java.io.IOException;
++
++import org.apache.commons.logging.Log;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileAlreadyExistsException;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FsStatus;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
++
++class CephFaker extends CephFS {
++      
++      FileSystem localFS;
++      String localPrefix;
++      int blockSize;
++      Configuration conf;
++      Hashtable<Integer, Object> files;
++      Hashtable<Integer, String> filenames;
++      int fileCount = 0;
++      boolean initialized = false;
++      
++      public CephFaker(Configuration con, Log log) {
++              super(con, log);
++              conf = con;
++              files = new Hashtable<Integer, Object>();
++              filenames = new Hashtable<Integer, String>();
++      }
++      
++      protected boolean ceph_initializeClient(String args, int block_size) {
++              if (!initialized) {
++                      //let's remember the default block_size
++                      blockSize = block_size;
++                      /* for a real Ceph deployment, this starts up the client, 
++                       * sets debugging levels, etc. We just need to get the
++                       * local FileSystem to use, and we'll ignore any
++                       * command-line arguments. */
++                      try {
++                              localFS = FileSystem.getLocal(conf);
++                              localFS.initialize(URI.create("file://localhost"), conf);
++                              localFS.setVerifyChecksum(false);
++                              String testDir = conf.get("hadoop.tmp.dir");
++                              localPrefix = localFS.getWorkingDirectory().toString();
++                              int testDirLoc = localPrefix.indexOf(testDir) - 1;
++                              if (-2 == testDirLoc)
++                                      testDirLoc = localPrefix.length();
++                              localPrefix = localPrefix.substring(0, testDirLoc)
++                                      + "/" + conf.get("hadoop.tmp.dir");
++
++                              localFS.setWorkingDirectory(new Path(localPrefix
++                                                                                                                                                                               +"/user/"
++                                                                                                                                                                               + System.getProperty("user.name")));
++                              //I don't know why, but the unit tests expect the default
++                              //working dir to be /user/username, so satisfy them!
++                              //debug("localPrefix is " + localPrefix, INFO);
++                      }
++                      catch (IOException e) {
++                              return false;
++                      }
++                      initialized = true;
++              }
++              return true;
++      }
++
++      protected String ceph_getcwd() {
++              return sanitize_path(localFS.getWorkingDirectory().toString());
++      }
++
++      protected boolean ceph_setcwd(String path) {
++              localFS.setWorkingDirectory(new Path(prepare_path(path)));
++              return true;
++      }
++
++      //the caller is responsible for ensuring empty dirs
++      protected boolean ceph_rmdir(String pth) {
++              Path path = new Path(prepare_path(pth));
++              boolean ret = false;
++              try {
++                      if (localFS.listStatus(path).length <= 1) {
++                              ret = localFS.delete(path, true);
++                      }
++              }
++              catch (IOException e){ }
++              return ret;
++      }
++
++      //this needs to work on (empty) directories too
++      protected boolean ceph_unlink(String path) {
++              path = prepare_path(path);
++              boolean ret = false;
++              if (ceph_isdirectory(path)) {
++                      ret = ceph_rmdir(path);
++              }
++              else {
++                      try {
++                              ret = localFS.delete(new Path(path), false);
++                      }
++                      catch (IOException e){ }
++              }
++              return ret;
++      }
++
++      protected boolean ceph_rename(String oldName, String newName) {
++              oldName = prepare_path(oldName);
++              newName = prepare_path(newName);
++              try {
++                      Path parent = new Path(newName).getParent();
++                      Path newPath = new Path(newName);
++                      if (localFS.exists(parent) && !localFS.exists(newPath))
++                              return localFS.rename(new Path(oldName), newPath);
++                      return false;
++              }
++              catch (IOException e) { return false; }
++      }
++
++      protected boolean ceph_exists(String path) {
++              path = prepare_path(path);
++              boolean ret = false;
++              try {
++                      ret = localFS.exists(new Path(path));
++              }
++              catch (IOException e){ }
++              return ret;
++      }
++
++      protected long ceph_getblocksize(String path) {
++              path = prepare_path(path);
++              try {
++                      FileStatus status = localFS.getFileStatus(new Path(path));
++                      return status.getBlockSize();
++              }
++              catch (FileNotFoundException e) {
++                      return -CephFS.ENOENT;
++              }
++              catch (IOException e) {
++                      return -1; //just fail generically
++              }
++      }
++
++      protected boolean ceph_isdirectory(String path) {
++              path = prepare_path(path);
++              try {
++                      FileStatus status = localFS.getFileStatus(new Path(path));
++                      return status.isDir();
++              }
++              catch (IOException e) { return false; }
++      }
++
++      protected boolean ceph_isfile(String path) {
++              path = prepare_path(path);
++              boolean ret = false;
++              try {
++                      FileStatus status = localFS.getFileStatus(new Path(path));
++                      ret = !status.isDir();
++              }
++              catch (Exception e) {}
++              return ret;
++      }
++
++      protected String[] ceph_getdir(String path) {
++              path = prepare_path(path);
++              if (!ceph_isdirectory(path)) {
++                      return null;
++              }
++              try {
++                      FileStatus[] stats = localFS.listStatus(new Path(path));
++                      String[] names = new String[stats.length];
++                      String name;
++                      for (int i=0; i<stats.length; ++i) {
++                              name = stats[i].getPath().toString();
++                              names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
++                      }
++                      return names;
++              }
++              catch (IOException e) {}
++              return null;
++      }
++
++      protected int ceph_mkdirs(String path, int mode) {
++              path = prepare_path(path);
++              //debug("ceph_mkdirs on " + path, INFO);
++              try {
++                      if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
++                              return 0;
++              }
++              catch (FileAlreadyExistsException fe) { return ENOTDIR; }
++              catch (IOException e) {}
++              if (ceph_isdirectory(path))
++                      return -EEXIST; //apparently it already existed
++              return -1;
++      }
++
++      /* 
++       * Unlike a real Ceph deployment, you can't do opens on a directory.
++       * Since that has unpredictable behavior and you shouldn't do it anyway,
++       * it's okay.
++       */
++      protected int ceph_open_for_append(String path) {
++              path = prepare_path(path);
++              FSDataOutputStream stream;
++              try {
++                      stream = localFS.append(new Path(path));
++                      files.put(new Integer(fileCount), stream);
++                      filenames.put(new Integer(fileCount), path);
++                      return fileCount++;
++              }
++              catch (IOException e) { }
++              return -1; // failure
++      }
++
++      protected int ceph_open_for_read(String path) {
++              path = prepare_path(path);
++              FSDataInputStream stream;
++              try {
++                      stream = localFS.open(new Path(path));
++                      files.put(new Integer(fileCount), stream);
++                      filenames.put(new Integer(fileCount), path);
++                      debug("ceph_open_for_read fh:" + fileCount
++                                              + ", pathname:" + path, INFO);
++                      return fileCount++;
++              }
++              catch (IOException e) { }
++              return -1; //failure
++      }
++
++      protected int ceph_open_for_overwrite(String path, int mode) {
++              path = prepare_path(path);
++              FSDataOutputStream stream;
++              try {
++                      stream = localFS.create(new Path(path));
++                      files.put(new Integer(fileCount), stream);
++                      filenames.put(new Integer(fileCount), path);
++                      debug("ceph_open_for_overwrite fh:" + fileCount
++                                              + ", pathname:" + path, INFO);
++                      return fileCount++;
++              }
++              catch (IOException e) { }
++              return -1; //failure
++      }
++
++      protected int ceph_close(int filehandle) {
++              debug("ceph_close(filehandle " + filehandle + ")", INFO);
++              try {
++                      ((Closeable)files.get(new Integer(filehandle))).close();
++                      if(null == files.get(new Integer(filehandle))) {
++                              return -ENOENT; //this isn't quite the right error code,
++                              // but the important part is it's negative
++                      }
++                      return 0; //hurray, success
++              }
++              catch (NullPointerException ne) {
++                      debug("ceph_close caught NullPointerException!" + ne, WARN);
++              } //err, how?
++              catch (IOException ie) {
++                      debug("ceph_close caught IOException!" + ie, WARN);
++              }
++              return -1; //failure
++      }
++
++      protected boolean ceph_setPermission(String pth, int mode) {
++              pth = prepare_path(pth);
++              Path path = new Path(pth);
++              boolean ret = false;
++              try {
++                      localFS.setPermission(path, new FsPermission((short)mode));
++                      ret = true;
++              }
++              catch (IOException e) { }
++              return ret;
++      }
++
++      //rather than try and match a Ceph deployment's behavior exactly,
++      //just make bad things happen if they try and call methods after this
++      protected boolean ceph_kill_client() {
++              //debug("ceph_kill_client", INFO);
++              localFS.setWorkingDirectory(new Path(localPrefix));
++              //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
++              try{
++                      localFS.close(); }
++              catch (Exception e) {}
++              localFS = null;
++              files = null;
++              filenames = null;
++              return true;
++      }
++
++      protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
++              pth = prepare_path(pth);
++              Path path = new Path(pth);
++              boolean ret = false;
++              try {
++                      FileStatus status = localFS.getFileStatus(path);
++                      fill.size = status.getLen();
++                      fill.is_dir = status.isDir();
++                      fill.block_size = status.getBlockSize();
++                      fill.mod_time = status.getModificationTime();
++                      fill.access_time = status.getAccessTime();
++                      fill.mode = status.getPermission().toShort();
++                      ret = true;
++              }
++              catch (IOException e) {}
++              return ret;
++      }
++
++      protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
++              pth = prepare_path(pth);
++              try {
++                      FsStatus stat = localFS.getStatus();
++                      fill.capacity = stat.getCapacity();
++                      fill.used = stat.getUsed();
++                      fill.remaining = stat.getRemaining();
++                      return 0;
++              }
++              catch (Exception e){}
++              return -1; //failure;
++      }
++
++      protected int ceph_replication(String path) {
++              path = prepare_path(path);
++              int ret = -1; //-1 for failure
++              try {
++                      ret = localFS.getFileStatus(new Path(path)).getReplication();
++              }
++              catch (IOException e) {}
++              return ret;
++      }
++
++      protected String ceph_hosts(int fh, long offset) {
++              String ret = null;
++              try {
++                      BlockLocation[] locs = localFS.getFileBlockLocations(
++                                               localFS.getFileStatus(new Path(
++                                                                                      filenames.get(new Integer(fh)))), offset, 1);
++                      ret = locs[0].getNames()[0];
++              }
++              catch (IOException e) {}
++              catch (NullPointerException f) { }
++              return ret;
++      }
++
++      protected int ceph_setTimes(String pth, long mtime, long atime) {
++              pth = prepare_path(pth);
++              Path path = new Path(pth);
++              int ret = -1; //generic fail
++              try {
++                      localFS.setTimes(path, mtime, atime);
++                      ret = 0;
++              }
++              catch (IOException e) {}
++              return ret;
++      }
++
++      protected long ceph_getpos(int fh) {
++              long ret = -1; //generic fail
++              try {
++                      Object stream = files.get(new Integer(fh));
++                      if (stream instanceof FSDataInputStream) {
++                              ret = ((FSDataInputStream)stream).getPos();
++                      }
++                      else if (stream instanceof FSDataOutputStream) {
++                              ret = ((FSDataOutputStream)stream).getPos();
++                      }
++              }
++              catch (IOException e) {}
++              catch (NullPointerException f) { }
++              return ret;
++      }
++
++      protected int ceph_write(int fh, byte[] buffer,
++                                                                                                       int buffer_offset, int length) {
++              debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
++                                      + ", length:" + length, INFO);
++              long ret = -1;//generic fail
++              try {
++                      FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
++                      debug("ceph_write got outputstream", INFO);
++                      long startPos = os.getPos();
++                      os.write(buffer, buffer_offset, length);
++                      ret = os.getPos() - startPos;
++              }
++              catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
++              catch (NullPointerException f) {
++                      debug("ceph_write caught NullPointerException!", WARN); }
++              return (int)ret;
++      }
++
++      protected int ceph_read(int fh, byte[] buffer,
++                                                                                                      int buffer_offset, int length) {
++              long ret = -1;//generic fail
++              try {
++                      FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
++                      long startPos = is.getPos();
++                      is.read(buffer, buffer_offset, length);
++                      ret = is.getPos() - startPos;
++              }
++              catch (IOException e) {}
++              catch (NullPointerException f) {}
++              return (int)ret;
++      }
++
++      protected long ceph_seek_from_start(int fh, long pos) {
++              debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
++              long ret = -1;//generic fail
++              try {
++                      debug("ceph_seek_from_start filename is "
++                                              + filenames.get(new Integer(fh)), INFO);
++                      if (null == files.get(new Integer(fh))) {
++                              debug("ceph_seek_from_start: is is null!", WARN);
++                      }
++                      FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++                      debug("ceph_seek_from_start retrieved is!", INFO);
++                      is.seek(pos);
++                      ret = is.getPos();
++              }
++              catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
++              catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
++              return (int)ret;
++      }
++
++      /*
++       * We need to remove the localFS file prefix before returning to Ceph
++       */
++      private String sanitize_path(String path) {
++              //debug("sanitize_path(" + path + ")", INFO);
++              /*              if (path.startsWith("file:"))
++                                      path = path.substring("file:".length()); */
++              if (path.startsWith(localPrefix)) {
++                      path = path.substring(localPrefix.length());
++                      if (path.length() == 0) //it was a root path
++                              path = "/";
++              }
++              //debug("sanitize_path returning " + path, INFO);
++              return path;
++      }
++
++      /*
++       * If it's an absolute path we need to shove the
++       * test dir onto the front as a prefix.
++       */
++      private String prepare_path(String path) {
++              //debug("prepare_path(" + path + ")", INFO);
++              if (path.startsWith("/"))
++                      path = localPrefix + path;
++              else if (path.equals("..")) {
++                      if (ceph_getcwd().equals("/"))
++                              path = "."; // you can't go up past root!
++              }
++              //debug("prepare_path returning" + path, INFO);
++              return path;
++      }
++}
 Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 ===================================================================
 --- src/java/org/apache/hadoop/fs/ceph/CephInputStream.java    (revision 0)
 +++ src/java/org/apache/hadoop/fs/ceph/CephInputStream.java    (revision 0)
-@@ -0,0 +1,203 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+@@ -0,0 +1,235 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -839,15 +1758,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 + */
 +package org.apache.hadoop.fs.ceph;
 +
-+import java.io.BufferedOutputStream;
-+import java.io.DataInputStream;
-+import java.io.File;
-+import java.io.FileInputStream;
-+import java.io.FileOutputStream;
 +import java.io.IOException;
-+import java.io.InputStream;
-+import java.io.OutputStream;
-+//import java.lang.IndexOutOfBoundsException;
 +
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FSInputStream;
@@ -860,21 +1771,19 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 + */
 +public class CephInputStream extends FSInputStream {
 +
-+  private int bufferSize;
-+
 +  private boolean closed;
 +
 +  private int fileHandle;
 +
 +  private long fileLength;
 +
-+  private boolean debug;
++      private CephFS ceph;
++
++      private byte[] buffer;
++      private int bufPos = 0;
++      private int bufValid = 0;
++      private long cephPos = 0;
 +
-+  private native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+  private native long ceph_seek_from_start(int fh, long pos);
-+  private native long ceph_getpos(int fh);
-+  private native int ceph_close(int fh);
-+    
 +  /**
 +   * Create a new CephInputStream.
 +   * @param conf The system configuration. Unused.
@@ -882,17 +1791,17 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +   * @param flength The current length of the file. If the length changes
 +   * you will need to close and re-open it to access the new data.
 +   */
-+  public CephInputStream(Configuration conf, int fh, long flength) {
-+    System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+    System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++  public CephInputStream(Configuration conf, CephFS cephfs,
++                                                                                               int fh, long flength, int bufferSize) {
 +    // Whoever's calling the constructor is responsible for doing the actual ceph_open
 +    // call and providing the file handle.
 +    fileLength = flength;
 +    fileHandle = fh;
 +    closed = false;
-+    debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-+    if(debug) debug("CephInputStream constructor: initializing stream with fh "
-+        + fh + " and file length " + flength);
++              ceph = cephfs;
++              buffer = new byte[bufferSize];
++    ceph.debug("CephInputStream constructor: initializing stream with fh "
++                                                                              + fh + " and file length " + flength, ceph.DEBUG);
 +      
 +  }
 +  /** Ceph likes things to be closed before it shuts down,
@@ -905,26 +1814,53 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +    finally { super.finalize(); }
 +  }
 +
++      private synchronized boolean fillBuffer() throws IOException {
++              bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
++              bufPos = 0;
++              if (bufValid < 0) {
++                      int err = bufValid;
++                      bufValid = 0;
++                      //attempt to reset to old position. If it fails, too bad.
++                      ceph.ceph_seek_from_start(fileHandle, cephPos);
++                      throw new IOException("Failed to fill read buffer! Error code:"
++                                                                                                              + err);
++              }
++              cephPos += bufValid;
++              return (bufValid != 0);
++      }
++
++      /*
++       * Get the current position of the stream.
++       */
 +  public synchronized long getPos() throws IOException {
-+    return ceph_getpos(fileHandle);
++              return cephPos - bufValid + bufPos;
 +  }
++
 +  /**
 +   * Find the number of bytes remaining in the file.
 +   */
 +  @Override
-+  public synchronized int available() throws IOException {
++      public synchronized int available() throws IOException {
 +      return (int) (fileLength - getPos());
 +    }
 +
 +  public synchronized void seek(long targetPos) throws IOException {
-+    if(debug) debug("CephInputStream.seek: Seeking to position " + targetPos +
-+        " on fd " + fileHandle);
++    ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
++                                                                              " on fd " + fileHandle, ceph.TRACE);
 +    if (targetPos > fileLength) {
-+      throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
-+                          " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
++      throw new IOException("CephInputStream.seek: failed seek to position "
++                                                                                                              + targetPos + " on fd " + fileHandle
++                                                                                                              + ": Cannot seek after EOF " + fileLength);
 +    }
-+    ceph_seek_from_start(fileHandle, targetPos);
-+  }
++              long oldPos = cephPos;
++              cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
++              bufValid = 0;
++              bufPos = 0;
++              if (cephPos < 0) {
++                      cephPos = oldPos;
++                      throw new IOException ("Ceph failed to seek to new position!");
++              }
++      }
 +
 +  /**
 +   * Failovers are handled by the Ceph code at a very low level;
@@ -942,9 +1878,9 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +   * @return the next byte.
 +   */
 +  @Override
-+  public synchronized int read() throws IOException {
-+      if(debug) debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-+          + " by calling general read function");
++      public synchronized int read() throws IOException {
++      ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
++                                                                                      + " by calling general read function", ceph.TRACE);
 +
 +      byte result[] = new byte[1];
 +      if (getPos() >= fileLength) return -1;
@@ -954,79 +1890,94 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
 +    }
 +
 +  /**
-+   * Read a specified number of bytes into a byte[] from the file.
++   * Read a specified number of bytes from the file into a byte[].
 +   * @param buf the byte array to read into.
 +   * @param off the offset to start at in the file
 +   * @param len the number of bytes to read
 +   * @return 0 if successful, otherwise an error code.
++       * @throws IOException on bad input.
 +   */
 +  @Override
-+  public synchronized int read(byte buf[], int off, int len) throws IOException {
-+      if(debug) debug("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
++      public synchronized int read(byte buf[], int off, int len)
++              throws IOException {
++      ceph.debug("CephInputStream.read: Reading " + len  +
++                                                               " bytes from fd " + fileHandle, ceph.TRACE);
 +      
 +      if (closed) {
-+      throw new IOException("CephInputStream.read: cannot read " + len  + 
-+                            " bytes from fd " + fileHandle + ": stream closed");
-+      }
-+      if (null == buf) {
-+      throw new NullPointerException("Read buffer is null");
-+      }
-+      
-+      // check for proper index bounds
-+      if((off < 0) || (len < 0) || (off + len > buf.length)) {
-+      throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
-+                          + "read length is " + len + ", buffer offset is " 
-+                          + off +", and buffer size is " + buf.length);
++                              throw new IOException("CephInputStream.read: cannot read " + len  + 
++                                                                                                                      " bytes from fd " + fileHandle +
++                                                                                                                      ": stream closed");
 +      }
-+      
++                      
 +      // ensure we're not past the end of the file
-+      if (getPos() >= fileLength) 
-+      {
-+        if(debug) debug("CephInputStream.read: cannot read " + len  + 
-+                           " bytes from fd " + fileHandle + ": current position is " +
-+                           getPos() + " and file length is " + fileLength);
-+        
-+        return -1;
++      if (getPos() >= fileLength) {
++                              ceph.debug("CephInputStream.read: cannot read " + len  + 
++                                                                       " bytes from fd " + fileHandle + ": current position is "
++                                                                       + getPos() + " and file length is " + fileLength,
++                                                                       ceph.DEBUG);
++                              
++                              return -1;
++                      }
++
++                      int totalRead = 0;
++                      int initialLen = len;
++                      int read;
++                      do {
++                              read = Math.min(len, bufValid - bufPos);
++                              try {
++                                      System.arraycopy(buffer, bufPos, buf, off, read);
++                              }
++                              catch(IndexOutOfBoundsException ie) {
++                                      throw new IOException("CephInputStream.read: Indices out of bounds:"
++                                                                                                                              + "read length is " + len
++                                                                                                                              + ", buffer offset is " + off
++                                                                                                                              + ", and buffer size is " + buf.length);
++                              }
++                              catch (ArrayStoreException ae) {
++                                      throw new IOException("Uh-oh, CephInputStream failed to do an array"
++                                                                                                                              + "copy due to type mismatch...");
++                              }
++                              catch (NullPointerException ne) {
++                                      throw new IOException("CephInputStream.read: cannot read "
++                                                                                                                              + len + "bytes from fd:" + fileHandle
++                                                                                                                              + ": buf is null");
++                              }
++                              bufPos += read;
++                              len -= read;
++                              off += read;
++                              totalRead += read;
++                      } while (len > 0 && fillBuffer());
++
++      ceph.debug("CephInputStream.read: Reading " + initialLen
++                                                               + " bytes from fd " + fileHandle
++                                                               + ": succeeded in reading " + totalRead + " bytes",
++                                                               ceph.TRACE);
++      return totalRead;
 +      }
-+      // actually do the read
-+      int result = ceph_read(fileHandle, buf, off, len);
-+      if (result < 0)
-+      if(debug) debug("CephInputStream.read: Reading " + len
-+                         + " bytes from fd " + fileHandle + " failed.");
-+
-+      if(debug) debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
-+          + fileHandle + ": succeeded in reading " + result + " bytes");   
-+      return result;
-+  }
 +
 +  /**
 +   * Close the CephInputStream and release the associated filehandle.
 +   */
 +  @Override
-+  public void close() throws IOException {
-+    if(debug) debug("CephOutputStream.close:enter");
-+    if (closed) {
-+      throw new IOException("Stream closed");
-+    }
-+
-+    int result = ceph_close(fileHandle);
-+    if (result != 0) {
-+      throw new IOException("Close failed!");
-+    }
-+    closed = true;
-+    if(debug) debug("CephOutputStream.close:exit");
-+  }
-+
-+  private void debug(String out) {
-+    System.err.println(out);
-+  }
++      public void close() throws IOException {
++    ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++    if (!closed) {
++                      int result = ceph.ceph_close(fileHandle);
++                      closed = true;
++                      if (result != 0) {
++                              throw new IOException("Close somehow failed!"
++                                                                                                                      + "Don't try and use this stream again, though");
++                      }
++                      ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
++              }
++      }
 +}
 Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 ===================================================================
 --- src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java   (revision 0)
 +++ src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java   (revision 0)
-@@ -0,0 +1,196 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+@@ -0,0 +1,202 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- 
 +/**
 + *
 + * Licensed under the Apache License, Version 2.0
@@ -1048,18 +1999,10 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +
 +package org.apache.hadoop.fs.ceph;
 +
-+import java.io.File;
-+import java.io.FileInputStream;
-+import java.io.FileOutputStream;
 +import java.io.IOException;
-+import java.io.InputStream;
 +import java.io.OutputStream;
-+import java.util.ArrayList;
-+import java.util.List;
-+import java.util.Random;
 +
 +import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.util.Progressable;
 +
 +/**
@@ -1069,32 +2012,26 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 + */
 +public class CephOutputStream extends OutputStream {
 +
-+  private int bufferSize;
-+
 +  private boolean closed;
 +
-+  private int fileHandle;
-+
-+  private boolean debug;
++      private CephFS ceph;
 +
++  private int fileHandle;
 +
-+  private native long ceph_seek_from_start(int fh, long pos);
-+  private native long ceph_getpos(int fh);
-+  private native int ceph_close(int fh);
-+  private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
++      private byte[] buffer;
++      private int bufUsed = 0;
 +
 +  /**
 +   * Construct the CephOutputStream.
 +   * @param conf The FileSystem configuration.
 +   * @param fh The Ceph filehandle to connect to.
 +   */
-+  public CephOutputStream(Configuration conf,  int fh) {
-+    System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+    System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++  public CephOutputStream(Configuration conf, CephFS cephfs,
++                                                                                                      int fh, int bufferSize) {
++              ceph = cephfs;
 +    fileHandle = fh;
 +    closed = false;
-+    debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++              buffer = new byte[bufferSize];
 +  }
 +
 +  /**Ceph likes things to be closed before it shuts down,
@@ -1112,7 +2049,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * @return The file offset in bytes.
 +   */
 +  public long getPos() throws IOException {
-+    return ceph_getpos(fileHandle);
++    return ceph.ceph_getpos(fileHandle);
 +  }
 +
 +  /**
@@ -1122,20 +2059,18 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * write fails.
 +   */
 +  @Override
-+  public synchronized void write(int b) throws IOException {
-+      if(debug) debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
++      public synchronized void write(int b) throws IOException {
++      ceph.debug("CephOutputStream.write: writing a single byte to fd "
++                                                               + fileHandle, ceph.TRACE);
 +
 +      if (closed) {
-+      throw new IOException("CephOutputStream.write: cannot write " + 
-+                            "a byte to fd " + fileHandle + ": stream closed");
++                              throw new IOException("CephOutputStream.write: cannot write " + 
++                                                                                                                      "a byte to fd " + fileHandle + ": stream closed");
 +      }
 +      // Stick the byte in a buffer and write it
 +      byte buf[] = new byte[1];
 +      buf[0] = (byte) b;    
-+      int result = ceph_write(fileHandle, buf, 0, 1);
-+      if (1 != result)
-+      if(debug) debug("CephOutputStream.write: failed writing a single byte to fd "
-+            + fileHandle + ": Ceph write() result = " + result);
++      write(buf, 0, 1);
 +      return;
 +    }
 +
@@ -1145,88 +2080,110 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
 +   * @param off the position in the file to start writing at.
 +   * @param len The number of bytes to actually write.
 +   * @throws IOException if you have closed the CephOutputStream, or
-+   * the write fails.
-+   * @throws NullPointerException if buf is null.
-+   * @throws IndexOutOfBoundsException if len > buf.length.
++       * if buf is null or off + len > buf.length, or
++       * if the write fails due to a Ceph error.
 +   */
 +  @Override
-+  public synchronized void write(byte buf[], int off, int len) throws IOException {
-+     if(debug) debug("CephOutputStream.write: writing " + len + 
-+         " bytes to fd " + fileHandle);
-+      // make sure stream is open
-+      if (closed) {
-+      throw new IOException("CephOutputStream.write: cannot write " + len + 
-+                            "bytes to fd " + fileHandle + ": stream closed");
-+      }
-+
-+      // sanity check
-+      if (null == buf) {
-+      throw new NullPointerException("CephOutputStream.write: cannot write " + len + 
-+                                     "bytes to fd " + fileHandle + ": write buffer is null");
-+      }
-+
-+      // check for proper index bounds
-+      if((off < 0) || (len < 0) || (off + len > buf.length)) {
-+      throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
-+                                          + "write length is " + len + ", buffer offset is " 
-+                                          + off +", and buffer size is " + buf.length);
-+      }
-+
-+      // write!
-+      int result = ceph_write(fileHandle, buf, off, len);
-+      if (result < 0) {
-+      throw new IOException("CephOutputStream.write: Write of " + len + 
-+                            "bytes to fd " + fileHandle + " failed");
-+      }
-+      if (result != len) {
-+      throw new IOException("CephOutputStream.write: Write of " + len + 
-+                            "bytes to fd " + fileHandle + "was incomplete:  only "
-+                            + result + " of " + len + " bytes were written.");
-+      }
-+      return; 
-+    }
++      public synchronized void write(byte buf[], int off, int len) throws IOException {
++              ceph.debug("CephOutputStream.write: writing " + len + 
++                                                       " bytes to fd " + fileHandle, ceph.TRACE);
++              // make sure stream is open
++              if (closed) {
++                      throw new IOException("CephOutputStream.write: cannot write " + len + 
++                                                                                                              "bytes to fd " + fileHandle + ": stream closed");
++              }
++              
++              int result;
++              int write;
++              while (len>0) {
++                      write = Math.min(len, buffer.length - bufUsed);
++                      try {
++                              System.arraycopy(buf, off, buffer, bufUsed, write);
++                      }
++                      catch (IndexOutOfBoundsException ie) {
++                              throw new IOException("CephOutputStream.write: Indices out of bounds: "
++                                                                                                                      + "write length is " + len
++                                                                                                                      + ", buffer offset is " + off
++                                                                                                                      + ", and buffer size is " + buf.length);
++                      }
++                      catch (ArrayStoreException ae) {
++                              throw new IOException("Uh-oh, CephOutputStream failed to do an array"
++                                                                                                                      + " copy due to type mismatch...");
++                      }
++                      catch (NullPointerException ne) {
++                              throw new IOException("CephOutputStream.write: cannot write "
++                                                                                                                      + len + "bytes to fd " + fileHandle
++                                                                                                                      + ": buffer is null");
++                      }
++                      bufUsed += write;
++                      len -= write;
++                      off += write;
++                      if (bufUsed == buffer.length) {
++                              result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++                              if (result < 0)
++                                      throw new IOException("CephOutputStream.write: Buffered write of "
++                                                                                                                              + bufUsed + " bytes failed!");
++                              if (result != bufUsed)
++                                      throw new IOException("CephOutputStream.write: Wrote only "
++                                                                                                                              + result + " bytes of " + bufUsed
++                                                                                                                              + " in buffer! Data may be lost or written"
++                                                                                                                              + " twice to Ceph!");
++                              bufUsed = 0;
++                      }
++
++              }
++              return; 
++      }
 +   
 +  /**
-+   * Flush the written data. It doesn't actually do anything; all writes are synchronous.
-+   * @throws IOException if you've closed the stream.
++   * Flush the buffered data.
++   * @throws IOException if you've closed the stream or the write fails.
 +   */
 +  @Override
-+  public synchronized void flush() throws IOException {
-+    if (closed) {
-+      throw new IOException("Stream closed");
-+    }
-+    return;
-+  }
++      public synchronized void flush() throws IOException {
++                      if (!closed) {
++                              if (bufUsed == 0) return;
++                              int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++                              if (result < 0) {
++                                      throw new IOException("CephOutputStream.write: Write of "
++                                                                                                                              + bufUsed + "bytes to fd "
++                                                                                                                              + fileHandle + " failed");
++                              }
++                              if (result != bufUsed) {
++                                      throw new IOException("CephOutputStream.write: Write of " + bufUsed
++                                                                                                                              + "bytes to fd " + fileHandle
++                                                                                                                              + "was incomplete:  only " + result + " of "
++                                                                                                                              + bufUsed + " bytes were written.");
++                              }
++                              bufUsed = 0;
++                              return;
++                      }
++      }
 +  
 +  /**
 +   * Close the CephOutputStream.
 +   * @throws IOException if Ceph somehow returns an error. In current code it can't.
 +   */
 +  @Override
-+  public synchronized void close() throws IOException {
-+      if(debug) debug("CephOutputStream.close:enter");
-+      if (closed) {
-+      throw new IOException("Stream closed");
-+      }
-+
-+      int result = ceph_close(fileHandle);
-+      if (result != 0) {
-+      throw new IOException("Close failed!");
-+      }
-+      
-+      closed = true;
-+      if(debug) debug("CephOutputStream.close:exit");
-+    }
-+
-+  private void debug(String out) {
-+    System.err.println(out);
-+  }
++      public synchronized void close() throws IOException {
++      ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++      if (!closed) {
++                              flush();
++                              int result = ceph.ceph_close(fileHandle);
++                              if (result != 0) {
++                                      throw new IOException("Close failed!");
++                              }
++                              
++                              closed = true;
++                              ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
++                      }
++      }
 +}
 Index: src/java/org/apache/hadoop/fs/ceph/package.html
 ===================================================================
 --- src/java/org/apache/hadoop/fs/ceph/package.html    (revision 0)
 +++ src/java/org/apache/hadoop/fs/ceph/package.html    (revision 0)
-@@ -0,0 +1,100 @@
+@@ -0,0 +1,101 @@
 +<html>
 +
 +<!--
@@ -1293,19 +2250,19 @@ Index: src/java/org/apache/hadoop/fs/ceph/package.html
 +&lt;property&gt;
 +  &lt;name&gt;fs.ceph.debug&lt;/name&gt;
 +  &lt;value&gt;true&lt;/value&gt;
-+  &lt;description&gt;If true, the Java-based code will print debugging information.&lt;/description&gt;
++  &lt;description&gt;If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.&lt;/description&gt;
 +&lt;/property&gt;
 +
 +&lt;property&gt;
 +  &lt;name&gt;fs.ceph.clientDebug&lt;/name&gt;
 +  &lt;value&gt;1&lt;/value&gt;
-+  &lt;description&gt;If non-zero, the Ceph client will print debugging information (a higher number=more debugging).&lt;/description&gt;
++  &lt;description&gt;If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).&lt;/description&gt;
 +&lt;/property&gt;
 +
 +&lt;property&gt;
 +  &lt;name&gt;fs.ceph.messengerDebug&lt;/name&gt;
 +  &lt;value&gt;1&lt;/value&gt;
-+  &lt;description&gt;If non-zero, the Ceph messenger will print debugging information (a higher number=more debugging)&lt;/description&gt;
++  &lt;description&gt;If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)&lt;/description&gt;
 +&lt;/property&gt;
 +
 +&lt;property&gt;
@@ -1317,7 +2274,8 @@ Index: src/java/org/apache/hadoop/fs/ceph/package.html
 +&lt;property&gt;
 +  &lt;name&gt;fs.ceph.commandLine&lt;/name&gt;
 +  &lt;value&gt;a string&lt;/value&gt;
-+  &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here.&lt;/description&gt;
++  &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
++By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.&lt;/description&gt;
 +&lt;/property&gt;
 +</pre>
 +  
@@ -1330,9 +2288,9 @@ Index: src/java/org/apache/hadoop/fs/ceph/package.html
 \ No newline at end of file
 Index: src/java/core-default.xml
 ===================================================================
---- src/java/core-default.xml  (revision 814022)
+--- src/java/core-default.xml  (revision 832912)
 +++ src/java/core-default.xml  (working copy)
-@@ -175,6 +175,12 @@
+@@ -194,6 +194,12 @@
  </property>
  
  <property>
index 0de7f44575fb1e81905d5fb6260ce7795eb97337..ac09dc5da1f9aaf93b62e5a9ad80e0d55514bb6c 100644 (file)
@@ -56,7 +56,7 @@ abstract class CephFS {
        abstract protected boolean ceph_initializeClient(String arguments, int block_size);
        
        /*
-        * Returns the current working directory.(absolute) as a String
+        * Returns the current working directory (absolute) as a String
         */
        abstract protected String ceph_getcwd();
        /*
@@ -67,7 +67,7 @@ abstract class CephFS {
         */
        abstract protected boolean ceph_setcwd(String path);
        /*
-        * Given a path to a directory, removes the directory.if empty.
+        * Given a path to a directory, removes the directory if empty.
         * Inputs:
         *  jstring j_path: The path (relative or absolute) to the directory
         * Returns: true on successful delete; false otherwise
@@ -81,7 +81,7 @@ abstract class CephFS {
         */
   abstract protected boolean ceph_unlink(String path);
        /*
-        * Changes a given path name to a new name.
+        * Changes a given path name to a new name, assuming new_path doesn't exist.
         * Inputs:
         *  jstring j_from: The path whose name you want to change.
         *  jstring j_to: The new name for the path.
@@ -169,7 +169,8 @@ abstract class CephFS {
         */
   abstract protected boolean ceph_kill_client();
        /*
-        * Get the statistics on a path returned in a custom format.
+        * Get the statistics on a path returned in a custom format defined
+        * in CephFileSystem.
         * Inputs:
         *  String path: The path to stat.
         *  Stat fill: The stat object to fill.
index fa571a04afafbeaf8fe15c98b1cc50ac337c2ec6..09efce2dec3fa0f99e77612744eea477b8bf9e85 100644 (file)
@@ -182,7 +182,7 @@ public class CephFileSystem extends FileSystem {
   /**
    * Get an FSDataOutputStream to append onto a file.
    * @param file The File you want to append onto
-   * @param bufferSize Ceph does internal buffering; this is ignored.
+   * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
    * @param progress The Progressable to report progress to.
    * Reporting is limited but exists.
    * @return An FSDataOutputStream that connects to the file on Ceph.
@@ -272,7 +272,8 @@ public class CephFileSystem extends FileSystem {
    * @param path The directory path to create
    * @param perms The permissions to apply to the created directories.
    * @return true if successful, false otherwise
-   * @throws IOException if initialize() hasn't been called.
+   * @throws IOException if initialize() hasn't been called or the path
+        *  is a child of a file.
    */
        @Override
   public boolean mkdirs(Path path, FsPermission perms) throws IOException {
@@ -299,7 +300,7 @@ public class CephFileSystem extends FileSystem {
    * Check if a path is a file. This is moderately faster than the
    * generic implementation.
    * @param path The path to check.
-   * @return true if the path is a file, false otherwise.
+   * @return true if the path is definitely a file, false otherwise.
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
@@ -324,7 +325,7 @@ public class CephFileSystem extends FileSystem {
    * Check if a path is a directory. This is moderately faster than
    * the generic implementation.
    * @param path The path to check
-   * @return true if the path is a directory, false otherwise.
+   * @return true if the path is definitely a directory, false otherwise.
    * @throws IOException if initialize() hasn't been called.
    */
   @Override
@@ -449,7 +450,8 @@ public class CephFileSystem extends FileSystem {
    * @param permission The permissions to apply to the file.
    * @param flag If CreateFlag.OVERWRITE, overwrite any existing
    * file with this name; otherwise don't.
-   * @param bufferSize Ceph does internal buffering; this is ignored.
+   * @param bufferSize Ceph does internal buffering, but you can buffer
+        *   in the Java code too if you like.
    * @param replication Ignored by Ceph. This can be
    * configured via Ceph configuration.
    * @param blockSize Ignored by Ceph. You can set client-wide block sizes
@@ -525,7 +527,8 @@ public class CephFileSystem extends FileSystem {
   /**
    * Open a Ceph file and attach the file handle to an FSDataInputStream.
    * @param path The file to open
-   * @param bufferSize Ceph does internal buffering; this is ignored.
+   * @param bufferSize Ceph does internal buffering; but you can buffer in
+        *   the Java code too if you like.
    * @return FSDataInputStream reading from the given path.
    * @throws IOException if initialize() hasn't been called, the path DNE or is a
    * directory, or there is an error getting data to set up the FSDataInputStream.
@@ -660,7 +663,7 @@ public class CephFileSystem extends FileSystem {
    * Get usage statistics on the Ceph filesystem.
    * @param path A path to the partition you're interested in.
    * Ceph doesn't partition, so this is ignored.
-   * @return FsStatus reporting capacity, usage, and remaining spac.
+   * @return FsStatus reporting capacity, usage, and remaining space.
    * @throws IOException if initialize() hasn't been called, or the
    * stat somehow fails.
    */
@@ -686,7 +689,7 @@ public class CephFileSystem extends FileSystem {
   /**
    * Delete the given path, and optionally its children.
    * @param path the path to delete.
-   * @param recursive If the path is a directory and this is false,
+   * @param recursive If the path is a non-empty directory and this is false,
    * delete will throw an IOException. If path is a file this is ignored.
    * @return true if the delete succeeded, false otherwise (including if
    * path doesn't exist).
index 74a1698096e46800e4d907a75ce3fe9c38a3a9cd..a6684576d2ff2a616ff259913c5a7472881a0cc4 100644 (file)
@@ -91,7 +91,7 @@ public class CephInputStream extends FSInputStream {
        }
 
        /*
-        * Make sure this works!
+        * Get the current position of the stream.
         */
   public synchronized long getPos() throws IOException {
                return cephPos - bufValid + bufPos;
@@ -151,11 +151,12 @@ public class CephInputStream extends FSInputStream {
     }
 
   /**
-   * Read a specified number of bytes into a byte[] from the file.
+   * Read a specified number of bytes from the file into a byte[].
    * @param buf the byte array to read into.
    * @param off the offset to start at in the file
    * @param len the number of bytes to read
    * @return 0 if successful, otherwise an error code.
+        * @throws IOException on bad input.
    */
   @Override
        public synchronized int read(byte buf[], int off, int len)
index c7dc3a4a0e8a7674c94c23933d7c5571a4637567..0ca1d9771710dd45e1a2f0267a09589418fe4543 100644 (file)
@@ -101,7 +101,7 @@ public class CephOutputStream extends OutputStream {
    * @param off the position in the file to start writing at.
    * @param len The number of bytes to actually write.
    * @throws IOException if you have closed the CephOutputStream, or
-        * if buf is null or len > buf.length, or
+        * if buf is null or off + len > buf.length, or
         * if the write fails due to a Ceph error.
    */
   @Override
index bcf561a5e1c89bb4297462015dfdce8da422566d..058a3ec83f60453beba08cb9a99b4f1704c7d4a2 100644 (file)
@@ -64,19 +64,19 @@ documentation.
 &lt;property&gt;
   &lt;name&gt;fs.ceph.debug&lt;/name&gt;
   &lt;value&gt;true&lt;/value&gt;
-  &lt;description&gt;If true, the Java-based code will print debugging information.&lt;/description&gt;
+  &lt;description&gt;If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.&lt;/description&gt;
 &lt;/property&gt;
 
 &lt;property&gt;
   &lt;name&gt;fs.ceph.clientDebug&lt;/name&gt;
   &lt;value&gt;1&lt;/value&gt;
-  &lt;description&gt;If non-zero, the Ceph client will print debugging information (a higher number=more debugging).&lt;/description&gt;
+  &lt;description&gt;If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).&lt;/description&gt;
 &lt;/property&gt;
 
 &lt;property&gt;
   &lt;name&gt;fs.ceph.messengerDebug&lt;/name&gt;
   &lt;value&gt;1&lt;/value&gt;
-  &lt;description&gt;If non-zero, the Ceph messenger will print debugging information (a higher number=more debugging)&lt;/description&gt;
+  &lt;description&gt;If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)&lt;/description&gt;
 &lt;/property&gt;
 
 &lt;property&gt;
@@ -88,7 +88,8 @@ documentation.
 &lt;property&gt;
   &lt;name&gt;fs.ceph.commandLine&lt;/name&gt;
   &lt;value&gt;a string&lt;/value&gt;
-  &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here.&lt;/description&gt;
+  &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
+By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.&lt;/description&gt;
 &lt;/property&gt;
 </pre>