-Index: src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java
-===================================================================
---- src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0)
-+++ src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0)
-@@ -0,0 +1,42 @@
+diff --git a/src/core/core-default.xml b/src/core/core-default.xml
+index 8bc3b99..26543bc 100644
+--- a/src/core/core-default.xml
++++ b/src/core/core-default.xml
+@@ -210,6 +210,12 @@
+ </property>
+
+ <property>
++ <name>fs.ceph.impl</name>
++ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
++ <description>The file system for ceph: uris.</description>
++</property>
++
++<property>
+ <name>fs.har.impl.disable.cache</name>
+ <value>true</value>
+ <description>Don't cache 'har' filesystem instances.</description>
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
+new file mode 100644
+index 0000000..5d51eb2
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
+@@ -0,0 +1,250 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
+ *
-+ * Unit tests for the CephFileSystem API implementation.
++ *
++ * Abstract base class for communicating with a Ceph filesystem and its
++ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
++ * As only the Ceph package should be using this directly, all methods
++ * are protected.
+ */
-+
+package org.apache.hadoop.fs.ceph;
+
-+import java.io.IOException;
-+import java.net.URI;
-+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.Path;
+
-+public class TestCeph extends FileSystemContractBaseTest {
++abstract class CephFS {
+
-+ @Override
-+ protected void setUp() throws IOException {
-+ Configuration conf = new Configuration();
-+ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
-+ CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
-+ cephfs.initialize(URI.create("ceph://null"), conf);
-+ fs = cephfs;
-+ cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
-+ }
++ protected static final int ENOTDIR = 20;
++ protected static final int EEXIST = 17;
++ protected static final int ENOENT = 2;
++
++ /*
++ * Performs any necessary setup to allow general use of the filesystem.
++ * Inputs:
++ * String argsuments -- a command-line style input of Ceph config params
++ * int block_size -- the size in bytes to use for blocks
++ * Returns: true on success, false otherwise
++ */
++ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
++
++ /*
++ * Returns the current working directory (absolute) as a String
++ */
++ abstract protected String ceph_getcwd();
++
++ /*
++ * Changes the working directory.
++ * Inputs:
++ * String path: The path (relative or absolute) to switch to
++ * Returns: true on success, false otherwise.
++ */
++ abstract protected boolean ceph_setcwd(String path);
++
++ /*
++ * Given a path to a directory, removes the directory if empty.
++ * Inputs:
++ * jstring j_path: The path (relative or absolute) to the directory
++ * Returns: true on successful delete; false otherwise
++ */
++ abstract protected boolean ceph_rmdir(String path);
++
++ /*
++ * Given a path, unlinks it.
++ * Inputs:
++ * String path: The path (relative or absolute) to the file or empty dir
++ * Returns: true if the unlink occurred, false otherwise.
++ */
++ abstract protected boolean ceph_unlink(String path);
++
++ /*
++ * Changes a given path name to a new name, assuming new_path doesn't exist.
++ * Inputs:
++ * jstring j_from: The path whose name you want to change.
++ * jstring j_to: The new name for the path.
++ * Returns: true if the rename occurred, false otherwise
++ */
++ abstract protected boolean ceph_rename(String old_path, String new_path);
++
++ /*
++ * Returns true if it the input path exists, false
++ * if it does not or there is an unexpected failure.
++ */
++ abstract protected boolean ceph_exists(String path);
++
++ /*
++ * Get the block size for a given path.
++ * Input:
++ * String path: The path (relative or absolute) you want
++ * the block size for.
++ * Returns: block size if the path exists, otherwise a negative number
++ * corresponding to the standard C++ error codes (which are positive).
++ */
++ abstract protected long ceph_getblocksize(String path);
++
++ /*
++ * Returns true if the given path is a directory, false otherwise.
++ */
++ abstract protected boolean ceph_isdirectory(String path);
++
++ /*
++ * Returns true if the given path is a file; false otherwise.
++ */
++ abstract protected boolean ceph_isfile(String path);
++
++ /*
++ * Get the contents of a given directory.
++ * Inputs:
++ * String path: The path (relative or absolute) to the directory.
++ * Returns: A Java String[] of the contents of the directory, or
++ * NULL if there is an error (ie, path is not a dir). This listing
++ * will not contain . or .. entries.
++ */
++ abstract protected String[] ceph_getdir(String path);
++
++ /*
++ * Create the specified directory and any required intermediate ones with the
++ * given mode.
++ */
++ abstract protected int ceph_mkdirs(String path, int mode);
++
++ /*
++ * Open a file to append. If the file does not exist, it will be created.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_append(String path);
++
++ /*
++ * Open a file for reading.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_read(String path);
++
++ /*
++ * Opens a file for overwriting; creates it if necessary.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * int mode: The mode to open with.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_overwrite(String path, int mode);
++
++ /*
++ * Closes the given file. Returns 0 on success, or a negative
++ * error code otherwise.
++ */
++ abstract protected int ceph_close(int filehandle);
++
++ /*
++ * Change the mode on a path.
++ * Inputs:
++ * String path: The path to change mode on.
++ * int mode: The mode to apply.
++ * Returns: true if the mode is properly applied, false if there
++ * is any error.
++ */
++ abstract protected boolean ceph_setPermission(String path, int mode);
++
++ /*
++ * Closes the Ceph client. This should be called before shutting down
++ * (multiple times is okay but redundant).
++ */
++ abstract protected boolean ceph_kill_client();
++
++ /*
++ * Get the statistics on a path returned in a custom format defined
++ * in CephFileSystem.
++ * Inputs:
++ * String path: The path to stat.
++ * Stat fill: The stat object to fill.
++ * Returns: true if the stat is successful, false otherwise.
++ */
++ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
++
++ /*
++ * Check how many times a file should be replicated. If it is,
++ * degraded it may not actually be replicated this often.
++ * Inputs:
++ * int fh: a file descriptor
++ * Returns: an int containing the number of times replicated.
++ */
++ abstract protected int ceph_replication(String path);
++
++ /*
++ * Find the IP address of the primary OSD for a given file and offset.
++ * Inputs:
++ * int fh: The filehandle for the file.
++ * long offset: The offset to get the location of.
++ * Returns: an array of String of the location as IP, or NULL if there is an error.
++ */
++ abstract protected String[] ceph_hosts(int fh, long offset);
++
++ /*
++ * Set the mtime and atime for a given path.
++ * Inputs:
++ * String path: The path to set the times for.
++ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
++ * long atime: The atime to set, in millis since epoch (-1 to not set)
++ * Returns: 0 if successful, an error code otherwise.
++ */
++ abstract protected int ceph_setTimes(String path, long mtime, long atime);
++
++ /*
++ * Get the current position in a file (as a long) of a given filehandle.
++ * Returns: (long) current file position on success, or a
++ * negative error code on failure.
++ */
++ abstract protected long ceph_getpos(int fh);
++
++ /*
++ * Write the given buffer contents to the given filehandle.
++ * Inputs:
++ * int fh: The filehandle to write to.
++ * byte[] buffer: The buffer to write from
++ * int buffer_offset: The position in the buffer to write from
++ * int length: The number of (sequential) bytes to write.
++ * Returns: int, on success the number of bytes written, on failure
++ * a negative error code.
++ */
++ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++ /*
++ * Reads into the given byte array from the current position.
++ * Inputs:
++ * int fh: the filehandle to read from
++ * byte[] buffer: the byte array to read into
++ * int buffer_offset: where in the buffer to start writing
++ * int length: how much to read.
++ * There'd better be enough space in the buffer to write all
++ * the data from the given offset!
++ * Returns: the number of bytes read on success (as an int),
++ * or an error code otherwise. */
++ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++
++ /*
++ * Seeks to the given position in the given file.
++ * Inputs:
++ * int fh: The filehandle to seek in.
++ * long pos: The position to seek to.
++ * Returns: the new position (as a long) of the filehandle on success,
++ * or a negative error code on failure. */
++ abstract protected long ceph_seek_from_start(int fh, long pos);
+}
-Index: src/java/org/apache/hadoop/fs/ceph/SubmitProcess
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0)
-@@ -0,0 +1,2 @@
-+1) Remove the emacs indententation rules line.
-+2) ? No more?
-\ No newline at end of file
-Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
-@@ -0,0 +1,59 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
+new file mode 100644
+index 0000000..c598f53
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
+@@ -0,0 +1,483 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * permissions and limitations under the License.
+ *
+ *
-+ * Wraps a number of native function calls to communicate with the Ceph
-+ * filesystem.
++ * This uses the local Filesystem but pretends to be communicating
++ * with a Ceph deployment, for unit testing the CephFileSystem.
+ */
++
+package org.apache.hadoop.fs.ceph;
+
-+import org.apache.hadoop.conf.Configuration;
++
++import java.net.URI;
++import java.util.Hashtable;
++import java.io.Closeable;
++import java.io.FileNotFoundException;
++import java.io.IOException;
++
+import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
+
-+class CephTalker extends CephFS {
-+ //we write a constructor so we can load the libraries
-+ public CephTalker(Configuration conf, Log log) {
-+ super(conf, log);
-+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+ System.load(conf.get("fs.ceph.libDir")+"/libcephfs.so");
-+ }
-+ protected native boolean ceph_initializeClient(String arguments, int block_size);
-+ protected native String ceph_getcwd();
-+ protected native boolean ceph_setcwd(String path);
-+ protected native boolean ceph_rmdir(String path);
-+ protected native boolean ceph_unlink(String path);
-+ protected native boolean ceph_rename(String old_path, String new_path);
-+ protected native boolean ceph_exists(String path);
-+ protected native long ceph_getblocksize(String path);
-+ protected native boolean ceph_isdirectory(String path);
-+ protected native boolean ceph_isfile(String path);
-+ protected native String[] ceph_getdir(String path);
-+ protected native int ceph_mkdirs(String path, int mode);
-+ protected native int ceph_open_for_append(String path);
-+ protected native int ceph_open_for_read(String path);
-+ protected native int ceph_open_for_overwrite(String path, int mode);
-+ protected native int ceph_close(int filehandle);
-+ protected native boolean ceph_setPermission(String path, int mode);
-+ protected native boolean ceph_kill_client();
-+ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+ protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
-+ protected native int ceph_replication(String path);
-+ protected native String ceph_hosts(int fh, long offset);
-+ protected native int ceph_setTimes(String path, long mtime, long atime);
-+ protected native long ceph_getpos(int fh);
-+ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+ protected native long ceph_seek_from_start(int fh, long pos);
++
++class CephFaker extends CephFS {
++ private static final Log LOG = LogFactory.getLog(CephFaker.class);
++ FileSystem localFS;
++ String localPrefix;
++ int blockSize;
++ Configuration conf;
++ Hashtable<Integer, Object> files;
++ Hashtable<Integer, String> filenames;
++ int fileCount = 0;
++ boolean initialized = false;
++
++ public CephFaker(Configuration con, Log log) {
++ conf = con;
++ files = new Hashtable<Integer, Object>();
++ filenames = new Hashtable<Integer, String>();
++ }
++
++ protected boolean ceph_initializeClient(String args, int block_size) {
++ if (!initialized) {
++ // let's remember the default block_size
++ blockSize = block_size;
++
++ /* for a real Ceph deployment, this starts up the client,
++ * sets debugging levels, etc. We just need to get the
++ * local FileSystem to use, and we'll ignore any
++ * command-line arguments. */
++ try {
++ localFS = FileSystem.getLocal(conf);
++ localFS.initialize(URI.create("file://localhost"), conf);
++ localFS.setVerifyChecksum(false);
++ String testDir = conf.get("hadoop.tmp.dir");
++
++ localPrefix = localFS.getWorkingDirectory().toString();
++ int testDirLoc = localPrefix.indexOf(testDir) - 1;
++
++ if (-2 == testDirLoc) {
++ testDirLoc = localPrefix.length();
++ }
++ localPrefix = localPrefix.substring(0, testDirLoc) + "/"
++ + conf.get("hadoop.tmp.dir");
++
++ localFS.setWorkingDirectory(
++ new Path(localPrefix + "/user/" + System.getProperty("user.name")));
++ // I don't know why, but the unit tests expect the default
++ // working dir to be /user/username, so satisfy them!
++ // debug("localPrefix is " + localPrefix, INFO);
++ } catch (IOException e) {
++ return false;
++ }
++ initialized = true;
++ }
++ return true;
++ }
++
++ protected String ceph_getcwd() {
++ return sanitize_path(localFS.getWorkingDirectory().toString());
++ }
++
++ protected boolean ceph_setcwd(String path) {
++ localFS.setWorkingDirectory(new Path(prepare_path(path)));
++ return true;
++ }
++
++ // the caller is responsible for ensuring empty dirs
++ protected boolean ceph_rmdir(String pth) {
++ Path path = new Path(prepare_path(pth));
++ boolean ret = false;
++
++ try {
++ if (localFS.listStatus(path).length <= 1) {
++ ret = localFS.delete(path, true);
++ }
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ // this needs to work on (empty) directories too
++ protected boolean ceph_unlink(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++
++ if (ceph_isdirectory(path)) {
++ ret = ceph_rmdir(path);
++ } else {
++ try {
++ ret = localFS.delete(new Path(path), false);
++ } catch (IOException e) {}
++ }
++ return ret;
++ }
++
++ protected boolean ceph_rename(String oldName, String newName) {
++ oldName = prepare_path(oldName);
++ newName = prepare_path(newName);
++ try {
++ Path parent = new Path(newName).getParent();
++ Path newPath = new Path(newName);
++
++ if (localFS.exists(parent) && !localFS.exists(newPath)) {
++ return localFS.rename(new Path(oldName), newPath);
++ }
++ return false;
++ } catch (IOException e) {
++ return false;
++ }
++ }
++
++ protected boolean ceph_exists(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++
++ try {
++ ret = localFS.exists(new Path(path));
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ protected long ceph_getblocksize(String path) {
++ path = prepare_path(path);
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++
++ return status.getBlockSize();
++ } catch (FileNotFoundException e) {
++ return -CephFS.ENOENT;
++ } catch (IOException e) {
++ return -1; // just fail generically
++ }
++ }
++
++ protected boolean ceph_isdirectory(String path) {
++ path = prepare_path(path);
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++
++ return status.isDir();
++ } catch (IOException e) {
++ return false;
++ }
++ }
++
++ protected boolean ceph_isfile(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++
++ ret = !status.isDir();
++ } catch (Exception e) {}
++ return ret;
++ }
++
++ protected String[] ceph_getdir(String path) {
++ path = prepare_path(path);
++ if (!ceph_isdirectory(path)) {
++ return null;
++ }
++ try {
++ FileStatus[] stats = localFS.listStatus(new Path(path));
++ String[] names = new String[stats.length];
++ String name;
++
++ for (int i = 0; i < stats.length; ++i) {
++ name = stats[i].getPath().toString();
++ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
++ }
++ return names;
++ } catch (IOException e) {}
++ return null;
++ }
++
++ protected int ceph_mkdirs(String path, int mode) {
++ path = prepare_path(path);
++ // debug("ceph_mkdirs on " + path, INFO);
++ try {
++ if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
++ return 0;
++ }
++ } catch (IOException e) {}
++ if (ceph_isdirectory(path)) { // apparently it already existed
++ return -EEXIST;
++ } else if (ceph_isfile(path)) {
++ return -ENOTDIR;
++ }
++ return -1;
++ }
++
++ /*
++ * Unlike a real Ceph deployment, you can't do opens on a directory.
++ * Since that has unpredictable behavior and you shouldn't do it anyway,
++ * it's okay.
++ */
++ protected int ceph_open_for_append(String path) {
++ path = prepare_path(path);
++ FSDataOutputStream stream;
++
++ try {
++ stream = localFS.append(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ return fileCount++;
++ } catch (IOException e) {}
++ return -1; // failure
++ }
++
++ protected int ceph_open_for_read(String path) {
++ path = prepare_path(path);
++ FSDataInputStream stream;
++
++ try {
++ stream = localFS.open(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
++ return fileCount++;
++ } catch (IOException e) {}
++ return -1; // failure
++ }
++
++ protected int ceph_open_for_overwrite(String path, int mode) {
++ path = prepare_path(path);
++ FSDataOutputStream stream;
++
++ try {
++ stream = localFS.create(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
++ return fileCount++;
++ } catch (IOException e) {}
++ return -1; // failure
++ }
++
++ protected int ceph_close(int filehandle) {
++ LOG.info("ceph_close(filehandle " + filehandle + ")");
++ try {
++ ((Closeable) files.get(new Integer(filehandle))).close();
++ if (null == files.get(new Integer(filehandle))) {
++ return -ENOENT; // this isn't quite the right error code,
++ // but the important part is it's negative
++ }
++ return 0; // hurray, success
++ } catch (NullPointerException ne) {
++ LOG.warn("ceph_close caught NullPointerException!" + ne);
++ } // err, how?
++ catch (IOException ie) {
++ LOG.warn("ceph_close caught IOException!" + ie);
++ }
++ return -1; // failure
++ }
++
++ protected boolean ceph_setPermission(String pth, int mode) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ boolean ret = false;
++
++ try {
++ localFS.setPermission(path, new FsPermission((short) mode));
++ ret = true;
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ // rather than try and match a Ceph deployment's behavior exactly,
++ // just make bad things happen if they try and call methods after this
++ protected boolean ceph_kill_client() {
++ // debug("ceph_kill_client", INFO);
++ localFS.setWorkingDirectory(new Path(localPrefix));
++ // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
++ try {
++ localFS.close();
++ } catch (Exception e) {}
++ localFS = null;
++ files = null;
++ filenames = null;
++ return true;
++ }
++
++ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ boolean ret = false;
++
++ try {
++ FileStatus status = localFS.getFileStatus(path);
++
++ fill.size = status.getLen();
++ fill.is_dir = status.isDir();
++ fill.block_size = status.getBlockSize();
++ fill.mod_time = status.getModificationTime();
++ fill.access_time = status.getAccessTime();
++ fill.mode = status.getPermission().toShort();
++ ret = true;
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ protected int ceph_replication(String path) {
++ path = prepare_path(path);
++ int ret = -1; // -1 for failure
++
++ try {
++ ret = localFS.getFileStatus(new Path(path)).getReplication();
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ protected String[] ceph_hosts(int fh, long offset) {
++ String[] ret = null;
++
++ try {
++ BlockLocation[] locs = localFS.getFileBlockLocations(
++ localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
++ offset, 1);
++
++ ret = locs[0].getNames();
++ } catch (IOException e) {} catch (NullPointerException f) {}
++ return ret;
++ }
++
++ protected int ceph_setTimes(String pth, long mtime, long atime) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ int ret = -1; // generic fail
++
++ try {
++ localFS.setTimes(path, mtime, atime);
++ ret = 0;
++ } catch (IOException e) {}
++ return ret;
++ }
++
++ protected long ceph_getpos(int fh) {
++ long ret = -1; // generic fail
++
++ try {
++ Object stream = files.get(new Integer(fh));
++
++ if (stream instanceof FSDataInputStream) {
++ ret = ((FSDataInputStream) stream).getPos();
++ } else if (stream instanceof FSDataOutputStream) {
++ ret = ((FSDataOutputStream) stream).getPos();
++ }
++ } catch (IOException e) {} catch (NullPointerException f) {}
++ return ret;
++ }
++
++ protected int ceph_write(int fh, byte[] buffer,
++ int buffer_offset, int length) {
++ LOG.info(
++ "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
++ + length);
++ long ret = -1; // generic fail
++
++ try {
++ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
++
++ LOG.info("ceph_write got outputstream");
++ long startPos = os.getPos();
++
++ os.write(buffer, buffer_offset, length);
++ ret = os.getPos() - startPos;
++ } catch (IOException e) {
++ LOG.warn("ceph_write caught IOException!");
++ } catch (NullPointerException f) {
++ LOG.warn("ceph_write caught NullPointerException!");
++ }
++ return (int) ret;
++ }
++
++ protected int ceph_read(int fh, byte[] buffer,
++ int buffer_offset, int length) {
++ long ret = -1; // generic fail
++
++ try {
++ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++ long startPos = is.getPos();
++
++ is.read(buffer, buffer_offset, length);
++ ret = is.getPos() - startPos;
++ } catch (IOException e) {} catch (NullPointerException f) {}
++ return (int) ret;
++ }
++
++ protected long ceph_seek_from_start(int fh, long pos) {
++ LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
++ long ret = -1; // generic fail
++
++ try {
++ LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
++ if (null == files.get(new Integer(fh))) {
++ LOG.warn("ceph_seek_from_start: is is null!");
++ }
++ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++
++ LOG.info("ceph_seek_from_start retrieved is!");
++ is.seek(pos);
++ ret = is.getPos();
++ } catch (IOException e) {
++ LOG.warn("ceph_seek_from_start caught IOException!");
++ } catch (NullPointerException f) {
++ LOG.warn("ceph_seek_from_start caught NullPointerException!");
++ }
++ return (int) ret;
++ }
++
++ /*
++ * We need to remove the localFS file prefix before returning to Ceph
++ */
++ private String sanitize_path(String path) {
++ // debug("sanitize_path(" + path + ")", INFO);
++ /* if (path.startsWith("file:"))
++ path = path.substring("file:".length()); */
++ if (path.startsWith(localPrefix)) {
++ path = path.substring(localPrefix.length());
++ if (path.length() == 0) { // it was a root path
++ path = "/";
++ }
++ }
++ // debug("sanitize_path returning " + path, INFO);
++ return path;
++ }
++
++ /*
++ * If it's an absolute path we need to shove the
++ * test dir onto the front as a prefix.
++ */
++ private String prepare_path(String path) {
++ // debug("prepare_path(" + path + ")", INFO);
++ if (path.startsWith("/")) {
++ path = localPrefix + path;
++ } else if (path.equals("..")) {
++ if (ceph_getcwd().equals("/")) {
++ path = ".";
++ } // you can't go up past root!
++ }
++ // debug("prepare_path returning" + path, INFO);
++ return path;
++ }
+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0)
-@@ -0,0 +1,848 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
+new file mode 100644
+index 0000000..95f2223
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
+@@ -0,0 +1,804 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
-+ *
++ *
+ * Implements the Hadoop FS interfaces to allow applications to store
+ * files in Ceph.
+ */
+package org.apache.hadoop.fs.ceph;
+
++
+import java.io.IOException;
+import java.io.FileNotFoundException;
+import java.io.OutputStream;
+import java.net.URI;
++import java.net.InetAddress;
+import java.util.EnumSet;
+import java.lang.Math;
++import java.util.ArrayList;
+
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FsStatus;
-+import org.apache.hadoop.fs.CreateFlag;
++import org.apache.hadoop.net.DNS;
++
+
+/**
+ * <p>
+ * from the respective Ceph system of at least that importance.
+ */
+public class CephFileSystem extends FileSystem {
-+
++ private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
+ private URI uri;
+
++ private Path workingDir;
+ private final Path root;
-+ private boolean initialized = false;
-+ private CephFS ceph = null;
++ private CephFS ceph = null;
+
-+ private boolean debug = false;
-+ private String fs_default_name;
++ private static String CEPH_NAMESERVER;
++ private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
++ private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
+
+ /**
+ * Create a new CephFileSystem.
+ root = new Path("/");
+ }
+
-+ /**
-+ * Used for testing purposes, this constructor
-+ * sets the given CephFS instead of defaulting to a
-+ * CephTalker (with its assumed real Ceph instance to talk to).
-+ */
-+ public CephFileSystem(CephFS ceph_fs, String default_path) {
-+ super();
-+ root = new Path("/");
-+ ceph = ceph_fs;
-+ fs_default_name = default_path;
-+ }
++ /**
++ * Used for testing purposes, this constructor
++ * sets the given CephFS instead of defaulting to a
++ * CephTalker (with its assumed real Ceph instance to talk to).
++ */
++ public CephFileSystem(CephFS ceph_fs) {
++ super();
++ root = new Path("/");
++ ceph = ceph_fs;
++ }
+
+ /**
+ * Lets you get the URI of this CephFileSystem.
+ * @return the URI.
+ */
+ public URI getUri() {
-+ if (!initialized) return null;
-+ ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
++ LOG.debug("getUri:exit with return " + uri);
+ return uri;
+ }
+
+ * Starts up the connection to Ceph, reads in configuraton options, etc.
+ * @param uri The URI for this filesystem.
+ * @param conf The Hadoop Configuration to retrieve properties from.
-+ * @throws IOException if the Ceph client initialization fails
-+ * or necessary properties are unset.
++ * @throws IOException if necessary properties are unset.
+ */
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
-+ if (!initialized) {
-+ super.initialize(uri, conf);
-+ setConf(conf);
-+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-+ statistics = getStatistics(uri.getScheme(), getClass());
-+
-+ if (ceph == null) {
-+ ceph = new CephTalker(conf, LOG);
-+ }
-+ if (null == fs_default_name) {
-+ fs_default_name = conf.get("fs.default.name");
-+ }
-+ //build up the arguments for Ceph
-+ String arguments = "CephFSInterface";
-+ arguments += conf.get("fs.ceph.commandLine", "");
-+ if (conf.get("fs.ceph.clientDebug") != null) {
-+ arguments += " --debug_client ";
-+ arguments += conf.get("fs.ceph.clientDebug");
-+ }
-+ if (conf.get("fs.ceph.messengerDebug") != null) {
-+ arguments += " --debug_ms ";
-+ arguments += conf.get("fs.ceph.messengerDebug");
-+ }
-+ if (conf.get("fs.ceph.monAddr") != null) {
-+ arguments += " -m ";
-+ arguments += conf.get("fs.ceph.monAddr");
-+ }
-+ arguments += " --client-readahead-max-periods="
-+ + conf.get("fs.ceph.readahead", "1");
-+ //make sure they gave us a ceph monitor address or conf file
-+ ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
-+ if ( (conf.get("fs.ceph.monAddr") == null) &&
-+ (arguments.indexOf("-m") == -1) &&
-+ (arguments.indexOf("-c") == -1) ) {
-+ ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
-+ throw new IOException("You must specify a Ceph monitor address or config file!");
-+ }
-+ // Initialize the client
-+ if (!ceph.ceph_initializeClient(arguments,
-+ conf.getInt("fs.ceph.blockSize", 1<<26))) {
-+ ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
-+ throw new IOException("Ceph initialization failed!");
-+ }
-+ initialized = true;
-+ ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
-+ ceph.ceph_setcwd("/");
++ super.initialize(uri, conf);
++ setConf(conf);
++ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
++ if (ceph == null) {
++ ceph = new CephTalker(conf, LOG);
++ }
++
++ CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
++
++ // build up the arguments for Ceph
++ String arguments = "CephFSInterface";
++
++ arguments += conf.get("fs.ceph.commandLine", "");
++ if (conf.get("fs.ceph.clientDebug") != null) {
++ arguments += " --debug_client ";
++ arguments += conf.get("fs.ceph.clientDebug");
+ }
-+ ceph.debug("initialize:exit", ceph.DEBUG);
++ if (conf.get("fs.ceph.messengerDebug") != null) {
++ arguments += " --debug_ms ";
++ arguments += conf.get("fs.ceph.messengerDebug");
++ }
++ if (conf.get("fs.ceph.monAddr") != null) {
++ arguments += " -m ";
++ arguments += conf.get("fs.ceph.monAddr");
++ }
++ arguments += " --client-readahead-max-periods="
++ + conf.get("fs.ceph.readahead", "1");
++ // make sure they gave us a ceph monitor address or conf file
++ LOG.info("initialize:Ceph initialization arguments: " + arguments);
++ if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
++ && (arguments.indexOf("-c") == -1)) {
++ LOG.fatal("initialize:You need to specify a Ceph monitor address.");
++ throw new IOException(
++ "You must specify a Ceph monitor address or config file!");
++ }
++ // Initialize the client
++ if (!ceph.ceph_initializeClient(arguments,
++ conf.getInt("fs.ceph.blockSize", 1 << 26))) {
++ LOG.fatal("initialize:Ceph initialization failed!");
++ throw new IOException("Ceph initialization failed!");
++ }
++ LOG.info("initialize:Ceph initialized client. Setting cwd to /");
++ ceph.ceph_setcwd("/");
++ LOG.debug("initialize:exit");
++
++ this.workingDir = getHomeDirectory();
+ }
+
+ /**
+ * Close down the CephFileSystem. Runs the base-class close method
+ * and then kills the Ceph client itself.
-+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
+ public void close() throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("close:enter", ceph.DEBUG);
-+ super.close();//this method does stuff, make sure it's run!
-+ ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
++ LOG.debug("close:enter");
++ super.close(); // this method does stuff, make sure it's run!
++ LOG.trace("close: Calling ceph_kill_client from Java");
+ ceph.ceph_kill_client();
-+ ceph.debug("close:exit", ceph.DEBUG);
++ LOG.debug("close:exit");
+ }
+
+ /**
+ * @param progress The Progressable to report progress to.
+ * Reporting is limited but exists.
+ * @return An FSDataOutputStream that connects to the file on Ceph.
-+ * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
++ * @throws IOException If the file cannot be found or appended to.
+ */
-+ public FSDataOutputStream append (Path file, int bufferSize,
-+ Progressable progress) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
++ public FSDataOutputStream append(Path file, int bufferSize,
++ Progressable progress) throws IOException {
++ LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
+ Path abs_path = makeAbsolute(file);
-+ if (progress!=null) progress.progress();
-+ ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
-+ int fd = ceph.ceph_open_for_append(abs_path.toString());
-+ ceph.debug("append: Returned to Java", ceph.TRACE);
-+ if (progress!=null) progress.progress();
-+ if( fd < 0 ) { //error in open
-+ throw new IOException("append: Open for append failed on path \"" +
-+ abs_path.toString() + "\"");
-+ }
-+ CephOutputStream cephOStream = new CephOutputStream(getConf(),
-+ ceph, fd, bufferSize);
-+ ceph.debug("append:exit", ceph.DEBUG);
++
++ if (progress != null) {
++ progress.progress();
++ }
++ LOG.trace("append: Entering ceph_open_for_append from Java");
++ int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
++
++ LOG.trace("append: Returned to Java");
++ if (progress != null) {
++ progress.progress();
++ }
++ if (fd < 0) { // error in open
++ throw new IOException(
++ "append: Open for append failed on path \"" + abs_path.toString()
++ + "\"");
++ }
++ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
++ bufferSize);
++
++ LOG.debug("append:exit");
+ return new FSDataOutputStream(cephOStream, statistics);
+ }
+
+ * @return the directory Path
+ */
+ public Path getWorkingDirectory() {
-+ if (!initialized) return null;
-+ ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
-+ String cwd = ceph.ceph_getcwd();
-+ ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
-+ return new Path(fs_default_name + ceph.ceph_getcwd());
++ return workingDir;
+ }
+
+ /**
+ * Set the current working directory for the given file system. All relative
-+ * paths will be resolved relative to it. You need to have initialized the
-+ * filesystem prior to calling this method.
++ * paths will be resolved relative to it.
+ *
+ * @param dir The directory to change to.
+ */
+ @Override
-+ public void setWorkingDirectory(Path dir) {
-+ if (!initialized) return;
-+ ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
-+ Path abs_path = makeAbsolute(dir);
-+ ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
-+ if (!ceph.ceph_setcwd(abs_path.toString()))
-+ ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
-+ ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
++ public void setWorkingDirectory(Path dir) {
++ workingDir = makeAbsolute(dir);
++ }
++
++ /**
++ * Return only the path component from a potentially fully qualified path.
++ */
++ private String getCephPath(Path path) {
++ if (!path.isAbsolute()) {
++ throw new IllegalArgumentException("Path must be absolute: " + path);
++ }
++ return path.toUri().getPath();
+ }
+
+ /**
+ * Overriden because it's moderately faster than the generic implementation.
+ * @param path The file to check existence on.
+ * @return true if the file exists, false otherwise.
-+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean exists(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("exists:enter with path " + path, ceph.DEBUG);
++ public boolean exists(Path path) throws IOException {
++ LOG.debug("exists:enter with path " + path);
+ boolean result;
+ Path abs_path = makeAbsolute(path);
++
+ if (abs_path.equals(root)) {
+ result = true;
++ } else {
++ LOG.trace(
++ "exists:Calling ceph_exists from Java on path " + abs_path.toString());
++ result = ceph.ceph_exists(getCephPath(abs_path));
++ LOG.trace("exists:Returned from ceph_exists to Java");
+ }
-+ else {
-+ ceph.debug("exists:Calling ceph_exists from Java on path "
-+ + abs_path.toString(), ceph.TRACE);
-+ result = ceph.ceph_exists(abs_path.toString());
-+ ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
-+ }
-+ ceph.debug("exists:exit with value " + result, ceph.DEBUG);
++ LOG.debug("exists:exit with value " + result);
+ return result;
+ }
+
+ * @param path The directory path to create
+ * @param perms The permissions to apply to the created directories.
+ * @return true if successful, false otherwise
-+ * @throws IOException if initialize() hasn't been called or the path
-+ * is a child of a file.
++ * @throws IOException if the path is a child of a file.
+ */
-+ @Override
++ @Override
+ public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
++ LOG.debug("mkdirs:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
-+ ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
-+ int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
++
++ LOG.trace("mkdirs:calling ceph_mkdirs from Java");
++ int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
++
+ if (result != 0) {
-+ ceph.debug("mkdirs: make directory " + abs_path
-+ + "Failing with result " + result, ceph.WARN);
-+ if (ceph.ENOTDIR == result)
-+ throw new FileAlreadyExistsException("Parent path is not a directory");
++ LOG.warn(
++ "mkdirs: make directory " + abs_path + "Failing with result " + result);
++ if (-ceph.ENOTDIR == result) {
++ throw new IOException("Parent path is not a directory");
++ }
+ return false;
-+ }
-+ else {
-+ ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
-+ return true;
-+ }
++ } else {
++ LOG.debug("mkdirs:exiting succesfully");
++ return true;
++ }
+ }
+
+ /**
+ * generic implementation.
+ * @param path The path to check.
+ * @return true if the path is definitely a file, false otherwise.
-+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean isFile(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
++ public boolean isFile(Path path) throws IOException {
++ LOG.debug("isFile:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
+ boolean result;
-+ if (abs_path.equals(root)) {
-+ result = false;
-+ }
-+ else {
-+ ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
-+ result = ceph.ceph_isfile(abs_path.toString());
-+ }
-+ ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
-+ return result;
-+ }
+
-+ /**
-+ * Check if a path is a directory. This is moderately faster than
-+ * the generic implementation.
-+ * @param path The path to check
-+ * @return true if the path is definitely a directory, false otherwise.
-+ * @throws IOException if initialize() hasn't been called.
-+ */
-+ @Override
-+ public boolean isDirectory(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
-+ Path abs_path = makeAbsolute(path);
-+ boolean result;
+ if (abs_path.equals(root)) {
-+ result = true;
++ result = false;
++ } else {
++ LOG.trace("isFile:entering ceph_isfile from Java");
++ result = ceph.ceph_isfile(getCephPath(abs_path));
+ }
-+ else {
-+ ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
-+ result = ceph.ceph_isdirectory(abs_path.toString());
-+ ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
-+ }
-+ ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG);
++ LOG.debug("isFile:exit with result " + result);
+ return result;
+ }
+
+ * Ceph's support for these is a bit different than HDFS'.
+ * @param path The path to stat.
+ * @return FileStatus object containing the stat information.
-+ * @throws IOException if initialize() hasn't been called
+ * @throws FileNotFoundException if the path could not be resolved.
+ */
+ public FileStatus getFileStatus(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
++ LOG.debug("getFileStatus:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
-+ //sadly, Ceph doesn't really do uids/gids just yet, but
-+ //everything else is filled
++ // sadly, Ceph doesn't really do uids/gids just yet, but
++ // everything else is filled
+ FileStatus status;
+ Stat lstat = new Stat();
-+ ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
-+ if(ceph.ceph_stat(abs_path.toString(), lstat)) {
++
++ LOG.trace("getFileStatus: calling ceph_stat from Java");
++ if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
+ status = new FileStatus(lstat.size, lstat.is_dir,
-+ ceph.ceph_replication(abs_path.toString()),
-+ lstat.block_size,
-+ lstat.mod_time,
-+ lstat.access_time,
-+ new FsPermission((short)lstat.mode),
-+ null,
-+ null,
-+ new Path(fs_default_name+abs_path.toString()));
-+ }
-+ else { //fail out
-+ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
-+ + path + " does not exist or could not be accessed");
++ ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
++ lstat.mod_time, lstat.access_time,
++ new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
++ path.makeQualified(this));
++ } else { // fail out
++ throw new FileNotFoundException(
++ "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
++ + " does not exist or could not be accessed");
+ }
+
-+ ceph.debug("getFileStatus:exit", ceph.DEBUG);
++ LOG.debug("getFileStatus:exit");
+ return status;
+ }
+
+ * Get the FileStatus for each listing in a directory.
+ * @param path The directory to get listings from.
+ * @return FileStatus[] containing one FileStatus for each directory listing;
-+ * null if path is not a directory.
-+ * @throws IOException if initialize() hasn't been called.
-+ * @throws FileNotFoundException if the input path can't be found.
++ * null if path does not exist.
+ */
+ public FileStatus[] listStatus(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("listStatus:enter with path " + path, ceph.WARN);
++ LOG.debug("listStatus:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
+ Path[] paths = listPaths(abs_path);
++
+ if (paths != null) {
+ FileStatus[] statuses = new FileStatus[paths.length];
++
+ for (int i = 0; i < paths.length; ++i) {
-+ statuses[i] = getFileStatus(paths[i]);
++ statuses[i] = getFileStatus(paths[i]);
+ }
-+ ceph.debug("listStatus:exit", ceph.DEBUG);
++ LOG.debug("listStatus:exit");
+ return statuses;
+ }
-+ if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
-+ //which means that the input wasn't a directory, so throw an Exception if it's not a file
-+ return null; //or return null if it's a file
++
++ if (isFile(path)) {
++ return new FileStatus[] { getFileStatus(path) };
++ }
++
++ return null;
+ }
+
+ @Override
+ public void setPermission(Path p, FsPermission permission) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("setPermission:enter with path " + p
-+ + " and permissions " + permission, ceph.DEBUG);
++ LOG.debug(
++ "setPermission:enter with path " + p + " and permissions " + permission);
+ Path abs_path = makeAbsolute(p);
-+ ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
-+ ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
-+ ceph.debug("setPermission:exit", ceph.DEBUG);
++
++ LOG.trace("setPermission:calling ceph_setpermission from Java");
++ ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
++ LOG.debug("setPermission:exit");
+ }
+
+ /**
+ * @param mtime Set modification time in number of millis since Jan 1, 1970.
+ * @param atime Set access time in number of millis since Jan 1, 1970.
+ */
-+ @Override
-+ public void setTimes(Path p, long mtime, long atime) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
-+ " atime:" + atime, ceph.DEBUG);
-+ Path abs_path = makeAbsolute(p);
-+ ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
-+ int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
-+ if (r<0) throw new IOException ("Failed to set times on path "
-+ + abs_path.toString() + " Error code: " + r);
-+ ceph.debug("setTimes:exit", ceph.DEBUG);
-+ }
++ @Override
++ public void setTimes(Path p, long mtime, long atime) throws IOException {
++ LOG.debug(
++ "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
++ Path abs_path = makeAbsolute(p);
++
++ LOG.trace("setTimes:calling ceph_setTimes from Java");
++ int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
++
++ if (r < 0) {
++ throw new IOException(
++ "Failed to set times on path " + abs_path.toString() + " Error code: "
++ + r);
++ }
++ LOG.debug("setTimes:exit");
++ }
+
+ /**
+ * Create a new file and open an FSDataOutputStream that's connected to it.
+ * @param path The file to create.
+ * @param permission The permissions to apply to the file.
-+ * @param flag If CreateFlag.OVERWRITE, overwrite any existing
-+ * file with this name; otherwise don't.
++ * @param overwrite If true, overwrite any existing file with
++ * this name; otherwise don't.
+ * @param bufferSize Ceph does internal buffering, but you can buffer
-+ * in the Java code too if you like.
++ * in the Java code too if you like.
+ * @param replication Ignored by Ceph. This can be
+ * configured via Ceph configuration.
+ * @param blockSize Ignored by Ceph. You can set client-wide block sizes
+ * @param progress A Progressable to report back to.
+ * Reporting is limited but exists.
+ * @return An FSDataOutputStream pointing to the created file.
-+ * @throws IOException if initialize() hasn't been called, or the path is an
++ * @throws IOException if the path is an
+ * existing directory, or the path exists but overwrite is false, or there is a
+ * failure in attempting to open for append with Ceph.
+ */
+ public FSDataOutputStream create(Path path,
-+ FsPermission permission,
-+ EnumSet<CreateFlag> flag,
-+ //boolean overwrite,
-+ int bufferSize,
-+ short replication,
-+ long blockSize,
-+ Progressable progress
-+ ) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("create:enter with path " + path, ceph.DEBUG);
++ FsPermission permission,
++ boolean overwrite,
++ int bufferSize,
++ short replication,
++ long blockSize,
++ Progressable progress) throws IOException {
++ LOG.debug("create:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
-+ if (progress!=null) progress.progress();
++
++ if (progress != null) {
++ progress.progress();
++ }
+ // We ignore replication since that's not configurable here, and
+ // progress reporting is quite limited.
-+ // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE;
-+ // throw an exception if !CreateFlag.OVERWRITE.
++ // Required semantics: if the file exists, overwrite if 'overwrite' is set;
++ // otherwise, throw an exception
+
+ // Step 1: existence test
+ boolean exists = exists(abs_path);
++
+ if (exists) {
-+ if(isDirectory(abs_path))
-+ throw new IOException("create: Cannot overwrite existing directory \""
-+ + path.toString() + "\" with a file");
-+ //if (!overwrite)
-+ if (!flag.contains(CreateFlag.OVERWRITE))
-+ throw new IOException("createRaw: Cannot open existing file \""
-+ + abs_path.toString()
-+ + "\" for writing without overwrite flag");
++ if (getFileStatus(abs_path).isDir()) {
++ throw new IOException(
++ "create: Cannot overwrite existing directory \"" + path.toString()
++ + "\" with a file");
++ }
++ if (!overwrite) {
++ throw new IOException(
++ "createRaw: Cannot open existing file \"" + abs_path.toString()
++ + "\" for writing without overwrite flag");
++ }
+ }
+
-+ if (progress!=null) progress.progress();
++ if (progress != null) {
++ progress.progress();
++ }
+
+ // Step 2: create any nonexistent directories in the path
+ if (!exists) {
+ Path parent = abs_path.getParent();
++
+ if (parent != null) { // if parent is root, we're done
-+ int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
-+ if (!(r==0 || r==-ceph.EEXIST))
-+ throw new IOException ("Error creating parent directory; code: " + r);
++ int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
++
++ if (!(r == 0 || r == -ceph.EEXIST)) {
++ throw new IOException("Error creating parent directory; code: " + r);
++ }
++ }
++ if (progress != null) {
++ progress.progress();
+ }
-+ if (progress!=null) progress.progress();
+ }
+ // Step 3: open the file
-+ ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
-+ int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
-+ if (progress!=null) progress.progress();
-+ ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
++ LOG.trace("calling ceph_open_for_overwrite from Java");
++ int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
++ (int) permission.toShort());
++
++ if (progress != null) {
++ progress.progress();
++ }
++ LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
+ if (fh < 0) {
-+ throw new IOException("create: Open for overwrite failed on path \"" +
-+ path.toString() + "\"");
++ throw new IOException(
++ "create: Open for overwrite failed on path \"" + path.toString()
++ + "\"");
+ }
-+
++
+ // Step 4: create the stream
-+ OutputStream cephOStream = new CephOutputStream(getConf(),
-+ ceph, fh, bufferSize);
-+ ceph.debug("create:exit", ceph.DEBUG);
++ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
++ bufferSize);
++
++ LOG.debug("create:exit");
+ return new FSDataOutputStream(cephOStream, statistics);
-+ }
++ }
+
+ /**
+ * Open a Ceph file and attach the file handle to an FSDataInputStream.
+ * @param path The file to open
+ * @param bufferSize Ceph does internal buffering; but you can buffer in
-+ * the Java code too if you like.
++ * the Java code too if you like.
+ * @return FSDataInputStream reading from the given path.
-+ * @throws IOException if initialize() hasn't been called, the path DNE or is a
++ * @throws IOException if the path DNE or is a
+ * directory, or there is an error getting data to set up the FSDataInputStream.
+ */
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("open:enter with path " + path, ceph.DEBUG);
++ LOG.debug("open:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
-+
-+ int fh = ceph.ceph_open_for_read(abs_path.toString());
-+ if (fh < 0) { //uh-oh, something's bad!
-+ if (fh == -ceph.ENOENT) //well that was a stupid open
-+ throw new IOException("open: absolute path \"" + abs_path.toString()
-+ + "\" does not exist");
-+ else //hrm...the file exists but we can't open it :(
-+ throw new IOException("open: Failed to open file " + abs_path.toString());
++
++ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
++
++ if (fh < 0) { // uh-oh, something's bad!
++ if (fh == -ceph.ENOENT) { // well that was a stupid open
++ throw new IOException(
++ "open: absolute path \"" + abs_path.toString()
++ + "\" does not exist");
++ } else { // hrm...the file exists but we can't open it :(
++ throw new IOException("open: Failed to open file " + abs_path.toString());
++ }
+ }
+
-+ if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
-+ //but that doesn't mean you should in Hadoop!
++ if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
++ // but that doesn't mean you should in Hadoop!
+ ceph.ceph_close(fh);
-+ throw new IOException("open: absolute path \"" + abs_path.toString()
-+ + "\" is a directory!");
++ throw new IOException(
++ "open: absolute path \"" + abs_path.toString() + "\" is a directory!");
+ }
+ Stat lstat = new Stat();
-+ ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
-+ ceph.ceph_stat(abs_path.toString(), lstat);
-+ ceph.debug("open:returned to Java", ceph.TRACE);
++
++ LOG.trace("open:calling ceph_stat from Java");
++ ceph.ceph_stat(getCephPath(abs_path), lstat);
++ LOG.trace("open:returned to Java");
+ long size = lstat.size;
++
+ if (size < 0) {
-+ throw new IOException("Failed to get file size for file " + abs_path.toString() +
-+ " but succeeded in opening file. Something bizarre is going on.");
++ throw new IOException(
++ "Failed to get file size for file " + abs_path.toString()
++ + " but succeeded in opening file. Something bizarre is going on.");
+ }
-+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
-+ fh, size, bufferSize);
-+ ceph.debug("open:exit", ceph.DEBUG);
++ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
++ bufferSize);
++
++ LOG.debug("open:exit");
+ return new FSDataInputStream(cephIStream);
-+ }
++ }
+
+ /**
+ * Rename a file or directory.
+ * @param src The current path of the file/directory
+ * @param dst The new name for the path.
+ * @return true if the rename succeeded, false otherwise.
-+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean rename(Path src, Path dst) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
++ public boolean rename(Path src, Path dst) throws IOException {
++ LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
+ Path abs_src = makeAbsolute(src);
+ Path abs_dst = makeAbsolute(dst);
-+ ceph.debug("calling ceph_rename from Java", ceph.TRACE);
-+ boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
-+ if (!result) {
-+ if (isDirectory(abs_dst)) { //move the srcdir into destdir
-+ ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
-+ Path new_dst = new Path(abs_dst, abs_src.getName());
-+ result = rename(abs_src, new_dst);
-+ ceph.debug("attempt to move " + abs_src.toString()
-+ + " to " + new_dst.toString()
-+ + "has result:" + result, ceph.NOLOG);
-+ }
-+ }
-+ ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
++
++ LOG.trace("calling ceph_rename from Java");
++ boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
++
++ if (!result) {
++ boolean isDir = false;
++ try {
++ isDir = getFileStatus(abs_dst).isDir();
++ } catch (FileNotFoundException e) {}
++ if (isDir) { // move the srcdir into destdir
++ LOG.debug("ceph_rename failed but dst is a directory!");
++ Path new_dst = new Path(abs_dst, abs_src.getName());
++
++ result = rename(abs_src, new_dst);
++ LOG.debug(
++ "attempt to move " + abs_src.toString() + " to "
++ + new_dst.toString() + "has result:" + result);
++ }
++ }
++ LOG.debug("rename:exit with result: " + result);
+ return result;
+ }
+
++ /*
++ * Attempt to convert an IP into its hostname
++ */
++ private String[] ips2Hosts(String[] ips) {
++ ArrayList<String> hosts = new ArrayList<String>();
++ for (String ip : ips) {
++ try {
++ String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
++ if (host.charAt(host.length()-1) == '.') {
++ host = host.substring(0, host.length()-1);
++ }
++ hosts.add(host); /* append */
++ } catch (Exception e) {
++ LOG.error("reverseDns ["+ip+"] failed: "+ e);
++ }
++ }
++ return hosts.toArray(new String[hosts.size()]);
++ }
++
+ /**
+ * Get a BlockLocation object for each block in a file.
+ *
+ * @param len The amount of the file past the offset you are interested in.
+ * @return A BlockLocation[] where each object corresponds to a block within
+ * the given range.
-+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public BlockLocation[] getFileBlockLocations(FileStatus file,
-+ long start, long len) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
-+ ", start pos " + start + ", length " + len, ceph.DEBUG);
-+ //sanitize and get the filehandle
++ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ Path abs_path = makeAbsolute(file.getPath());
-+ ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
-+ int fh = ceph.ceph_open_for_read(abs_path.toString());
-+ ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
-+ + fh, ceph.TRACE);
++
++ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
+ if (fh < 0) {
-+ ceph.debug("getFileBlockLocations:got error " + fh +
-+ ", exiting and returning null!", ceph.ERROR);
++ LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
+ return null;
+ }
-+ //get the block size
-+ ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
-+ long blockSize = ceph.ceph_getblocksize(abs_path.toString());
-+ ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
-+ BlockLocation[] locations =
-+ new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
-+ long offset;
++
++ long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
++ BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
++
+ for (int i = 0; i < locations.length; ++i) {
-+ offset = start + i*blockSize;
-+ ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
-+ + fh + " and offset " + offset, ceph.TRACE);
-+ String host = ceph.ceph_hosts(fh, offset);
-+ ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
-+ + host, ceph.TRACE);
-+ String[] hostArray = new String[1];
-+ hostArray[0] = host;
-+ locations[i] = new BlockLocation(hostArray, hostArray,
-+ start+i*blockSize-(start % blockSize),
-+ blockSize);
-+ }
-+ ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
-+ + fh, ceph.TRACE);
++ long offset = start + i * blockSize;
++ long blockStart = start + i * blockSize - (start % blockSize);
++ String ips[] = ceph.ceph_hosts(fh, offset);
++ String hosts[] = ips2Hosts(ips);
++ locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
++ LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
++ }
++
+ ceph.ceph_close(fh);
-+ ceph.debug("getFileBlockLocations:return with " + locations.length
-+ + " locations", ceph.DEBUG);
+ return locations;
+ }
-+
-+ /**
-+ * Get usage statistics on the Ceph filesystem.
-+ * @param path A path to the partition you're interested in.
-+ * Ceph doesn't partition, so this is ignored.
-+ * @return FsStatus reporting capacity, usage, and remaining space.
-+ * @throws IOException if initialize() hasn't been called, or the
-+ * stat somehow fails.
-+ */
-+ @Override
-+ public FsStatus getStatus (Path path) throws IOException {
-+ if (!initialized) throw new IOException("You have to initialize the "
-+ + " CephFileSystem before calling other methods.");
-+ ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
-+ Path abs_path = makeAbsolute(path);
-+
-+ //currently(Ceph .16) Ceph actually ignores the path
-+ //but we still pass it in; if Ceph stops ignoring we may need more
-+ //error-checking code.
-+ CephStat ceph_stat = new CephStat();
-+ ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
-+ int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
-+ if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
-+ ceph.debug("getStatus:exit successfully", ceph.DEBUG);
-+ return new FsStatus(ceph_stat.capacity,
-+ ceph_stat.used, ceph_stat.remaining);
-+ }
++
++ @Deprecated
++ public boolean delete(Path path) throws IOException {
++ return delete(path, false);
++ }
+
+ /**
+ * Delete the given path, and optionally its children.
+ * delete will throw an IOException. If path is a file this is ignored.
+ * @return true if the delete succeeded, false otherwise (including if
+ * path doesn't exist).
-+ * @throws IOException if initialize() hasn't been called,
-+ * or you attempt to non-recursively delete a directory,
++ * @throws IOException if you attempt to non-recursively delete a directory,
+ * or you attempt to delete the root directory.
+ */
+ public boolean delete(Path path, boolean recursive) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the "
-+ +"CephFileSystem before calling other methods.");
-+ ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
-+ ceph.DEBUG);
++ LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
+ Path abs_path = makeAbsolute(path);
-+
++
+ // sanity check
-+ if (abs_path.equals(root))
++ if (abs_path.equals(root)) {
+ throw new IOException("Error: deleting the root directory is a Bad Idea.");
-+ if (!exists(abs_path)) return false;
++ }
++ if (!exists(abs_path)) {
++ return false;
++ }
+
+ // if the path is a file, try to delete it.
+ if (isFile(abs_path)) {
-+ ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
-+ ceph.TRACE);
-+ boolean result = ceph.ceph_unlink(abs_path.toString());
-+ if(!result)
-+ ceph.debug("delete: failed to delete file \"" +
-+ abs_path.toString() + "\".", ceph.ERROR);
-+ ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
++ LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
++ boolean result = ceph.ceph_unlink(getCephPath(abs_path));
++
++ if (!result) {
++ LOG.error(
++ "delete: failed to delete file \"" + abs_path.toString() + "\".");
++ }
++ LOG.debug("delete:exit with success=" + result);
+ return result;
+ }
+
+ /* The path is a directory, so recursively try to delete its contents,
-+ and then delete the directory. */
-+ //get the entries; listPaths will remove . and .. for us
++ and then delete the directory. */
++ // get the entries; listPaths will remove . and .. for us
+ Path[] contents = listPaths(abs_path);
++
+ if (contents == null) {
-+ ceph.debug("delete: Failed to read contents of directory \"" +
-+ abs_path.toString() +
-+ "\" while trying to delete it, BAILING", ceph.ERROR);
++ LOG.error(
++ "delete: Failed to read contents of directory \""
++ + abs_path.toString() + "\" while trying to delete it, BAILING");
+ return false;
+ }
+ if (!recursive && contents.length > 0) {
+ throw new IOException("Directories must be deleted recursively!");
+ }
+ // delete the entries
-+ ceph.debug("delete: recursively calling delete on contents of "
-+ + abs_path, ceph.DEBUG);
++ LOG.debug("delete: recursively calling delete on contents of " + abs_path);
+ for (Path p : contents) {
+ if (!delete(p, true)) {
-+ ceph.debug("delete: Failed to delete file \"" +
-+ p.toString() + "\" while recursively deleting \""
-+ + abs_path.toString() + "\", BAILING", ceph.ERROR );
-+ return false;
++ LOG.error(
++ "delete: Failed to delete file \"" + p.toString()
++ + "\" while recursively deleting \"" + abs_path.toString()
++ + "\", BAILING");
++ return false;
+ }
+ }
-+ //if we've come this far it's a now-empty directory, so delete it!
-+ boolean result = ceph.ceph_rmdir(abs_path.toString());
-+ if (!result)
-+ ceph.debug("delete: failed to delete \"" + abs_path.toString()
-+ + "\", BAILING", ceph.ERROR);
-+ ceph.debug("delete:exit", ceph.DEBUG);
++ // if we've come this far it's a now-empty directory, so delete it!
++ boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
++
++ if (!result) {
++ LOG.error(
++ "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
++ }
++ LOG.debug("delete:exit");
+ return result;
+ }
+
+ * by a separate Ceph configuration.
+ */
+ @Override
-+ public short getDefaultReplication() {
++ public short getDefaultReplication() {
+ return 1;
+ }
+
+ /**
+ * Get the default block size.
-+ * @return the default block size, in bytes, as a long.
-+ */
-+ @Override
-+ public long getDefaultBlockSize() {
-+ return getConf().getInt("fs.ceph.blockSize", 1<<26);
-+ }
-+
-+ // Makes a Path absolute. In a cheap, dirty hack, we're
-+ // also going to strip off any fs_default_name prefix we see.
-+ private Path makeAbsolute(Path path) {
-+ ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
-+ if (path == null) return new Path("/");
-+ // first, check for the prefix
-+ if (path.toString().startsWith(fs_default_name)) {
-+ Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
-+ ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
-+ return stripped_path;
-+ }
-+
-+ if (path.isAbsolute()) {
-+ ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG);
-+ return path;
-+ }
-+ Path new_path = new Path(ceph.ceph_getcwd(), path);
-+ ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
-+ return new_path;
-+ }
-+
-+ private Path[] listPaths(Path path) throws IOException {
-+ ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
-+ String dirlist[];
-+
-+ Path abs_path = makeAbsolute(path);
-+
-+ // If it's a directory, get the listing. Otherwise, complain and give up.
-+ ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG);
-+ dirlist = ceph.ceph_getdir(abs_path.toString());
-+ ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG);
-+
-+ if (dirlist == null) {
-+ return null;
-+ }
-+
-+ // convert the strings to Paths
-+ Path[] paths = new Path[dirlist.length];
-+ for (int i = 0; i < dirlist.length; ++i) {
-+ ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
-+ dirlist[i] + "\"", ceph.TRACE);
-+ // convert each listing to an absolute path
-+ Path raw_path = new Path(dirlist[i]);
-+ if (raw_path.isAbsolute())
-+ paths[i] = raw_path;
-+ else
-+ paths[i] = new Path(abs_path, raw_path);
-+ }
-+ ceph.debug("listPaths:exit", ceph.NOLOG);
-+ return paths;
-+ }
-+
-+
-+
-+ static class Stat {
-+ public long size;
-+ public boolean is_dir;
-+ public long block_size;
-+ public long mod_time;
-+ public long access_time;
-+ public int mode;
-+
-+ public Stat(){}
-+ }
-+
-+ static class CephStat {
-+ public long capacity;
-+ public long used;
-+ public long remaining;
-+
-+ public CephStat() {}
-+ }
-+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFS.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0)
-@@ -0,0 +1,272 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Abstract base class for communicating with a Ceph filesystem and its
-+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
-+ * As only the Ceph package should be using this directly, all methods
-+ * are protected.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.commons.logging.Log;
-+
-+abstract class CephFS {
-+
-+ protected static final int FATAL = 0;
-+ protected static final int ERROR = 1;
-+ protected static final int WARN = 2;
-+ protected static final int INFO = 3;
-+ protected static final int DEBUG = 4;
-+ protected static final int TRACE = 5;
-+ protected static final int NOLOG = 6;
-+
-+ protected static final int ENOTDIR = 20;
-+ protected static final int EEXIST = 17;
-+ protected static final int ENOENT = 2;
-+
-+ private boolean debug = false;
-+ private Log LOG;
-+
-+ public CephFS(Configuration conf, Log log) {
-+ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-+ LOG = log;
-+ }
-+
-+ /*
-+ * Performs any necessary setup to allow general use of the filesystem.
-+ * Inputs:
-+ * String argsuments -- a command-line style input of Ceph config params
-+ * int block_size -- the size in bytes to use for blocks
-+ * Returns: true on success, false otherwise
-+ */
-+ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
-+
-+ /*
-+ * Returns the current working directory (absolute) as a String
-+ */
-+ abstract protected String ceph_getcwd();
-+ /*
-+ * Changes the working directory.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to switch to
-+ * Returns: true on success, false otherwise.
-+ */
-+ abstract protected boolean ceph_setcwd(String path);
-+ /*
-+ * Given a path to a directory, removes the directory if empty.
-+ * Inputs:
-+ * jstring j_path: The path (relative or absolute) to the directory
-+ * Returns: true on successful delete; false otherwise
-+ */
-+ abstract protected boolean ceph_rmdir(String path);
-+ /*
-+ * Given a path, unlinks it.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to the file or empty dir
-+ * Returns: true if the unlink occurred, false otherwise.
-+ */
-+ abstract protected boolean ceph_unlink(String path);
-+ /*
-+ * Changes a given path name to a new name, assuming new_path doesn't exist.
-+ * Inputs:
-+ * jstring j_from: The path whose name you want to change.
-+ * jstring j_to: The new name for the path.
-+ * Returns: true if the rename occurred, false otherwise
-+ */
-+ abstract protected boolean ceph_rename(String old_path, String new_path);
-+ /*
-+ * Returns true if it the input path exists, false
-+ * if it does not or there is an unexpected failure.
-+ */
-+ abstract protected boolean ceph_exists(String path);
-+ /*
-+ * Get the block size for a given path.
-+ * Input:
-+ * String path: The path (relative or absolute) you want
-+ * the block size for.
-+ * Returns: block size if the path exists, otherwise a negative number
-+ * corresponding to the standard C++ error codes (which are positive).
-+ */
-+ abstract protected long ceph_getblocksize(String path);
-+ /*
-+ * Returns true if the given path is a directory, false otherwise.
-+ */
-+ abstract protected boolean ceph_isdirectory(String path);
-+ /*
-+ * Returns true if the given path is a file; false otherwise.
-+ */
-+ abstract protected boolean ceph_isfile(String path);
-+ /*
-+ * Get the contents of a given directory.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to the directory.
-+ * Returns: A Java String[] of the contents of the directory, or
-+ * NULL if there is an error (ie, path is not a dir). This listing
-+ * will not contain . or .. entries.
-+ */
-+ abstract protected String[] ceph_getdir(String path);
-+ /*
-+ * Create the specified directory and any required intermediate ones with the
-+ * given mode.
-+ */
-+ abstract protected int ceph_mkdirs(String path, int mode);
-+ /*
-+ * Open a file to append. If the file does not exist, it will be created.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_append(String path);
-+ /*
-+ * Open a file for reading.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_read(String path);
-+ /*
-+ * Opens a file for overwriting; creates it if necessary.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * int mode: The mode to open with.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_overwrite(String path, int mode);
-+ /*
-+ * Closes the given file. Returns 0 on success, or a negative
-+ * error code otherwise.
-+ */
-+ abstract protected int ceph_close(int filehandle);
-+ /*
-+ * Change the mode on a path.
-+ * Inputs:
-+ * String path: The path to change mode on.
-+ * int mode: The mode to apply.
-+ * Returns: true if the mode is properly applied, false if there
-+ * is any error.
-+ */
-+ abstract protected boolean ceph_setPermission(String path, int mode);
-+ /*
-+ * Closes the Ceph client. This should be called before shutting down
-+ * (multiple times is okay but redundant).
-+ */
-+ abstract protected boolean ceph_kill_client();
-+ /*
-+ * Get the statistics on a path returned in a custom format defined
-+ * in CephFileSystem.
-+ * Inputs:
-+ * String path: The path to stat.
-+ * Stat fill: The stat object to fill.
-+ * Returns: true if the stat is successful, false otherwise.
-+ */
-+ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+ /*
-+ * Statfs a filesystem in a custom format defined in CephFileSystem.
-+ * Inputs:
-+ * String path: A path on the filesystem that you wish to stat.
-+ * CephStat fill: The CephStat object to fill.
-+ * Returns: 0 if successful and the CephStat is filled; a negative
-+ * error code otherwise.
-+ */
-+ abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
-+ /*
-+ * Check how many times a path should be replicated (if it is
-+ * degraded it may not actually be replicated this often).
-+ * Inputs:
-+ * String path: The path to check.
-+ * Returns: an int containing the number of times replicated.
-+ */
-+ abstract protected int ceph_replication(String path);
-+ /*
-+ * Find the IP address of the primary OSD for a given file and offset.
-+ * Inputs:
-+ * int fh: The filehandle for the file.
-+ * long offset: The offset to get the location of.
-+ * Returns: a String of the location as IP, or NULL if there is an error.
-+ */
-+ abstract protected String ceph_hosts(int fh, long offset);
-+ /*
-+ * Set the mtime and atime for a given path.
-+ * Inputs:
-+ * String path: The path to set the times for.
-+ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
-+ * long atime: The atime to set, in millis since epoch (-1 to not set)
-+ * Returns: 0 if successful, an error code otherwise.
-+ */
-+ abstract protected int ceph_setTimes(String path, long mtime, long atime);
-+ /*
-+ * Get the current position in a file (as a long) of a given filehandle.
-+ * Returns: (long) current file position on success, or a
-+ * negative error code on failure.
-+ */
-+ abstract protected long ceph_getpos(int fh);
-+ /*
-+ * Write the given buffer contents to the given filehandle.
-+ * Inputs:
-+ * int fh: The filehandle to write to.
-+ * byte[] buffer: The buffer to write from
-+ * int buffer_offset: The position in the buffer to write from
-+ * int length: The number of (sequential) bytes to write.
-+ * Returns: int, on success the number of bytes written, on failure
-+ * a negative error code.
-+ */
-+ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+ /*
-+ * Reads into the given byte array from the current position.
-+ * Inputs:
-+ * int fh: the filehandle to read from
-+ * byte[] buffer: the byte array to read into
-+ * int buffer_offset: where in the buffer to start writing
-+ * int length: how much to read.
-+ * There'd better be enough space in the buffer to write all
-+ * the data from the given offset!
-+ * Returns: the number of bytes read on success (as an int),
-+ * or an error code otherwise. */
-+ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+ /*
-+ * Seeks to the given position in the given file.
-+ * Inputs:
-+ * int fh: The filehandle to seek in.
-+ * long pos: The position to seek to.
-+ * Returns: the new position (as a long) of the filehandle on success,
-+ * or a negative error code on failure. */
-+ abstract protected long ceph_seek_from_start(int fh, long pos);
-+
-+ protected void debug(String statement, int priority) {
-+ if (debug) System.err.println(statement);
-+ switch(priority) {
-+ case FATAL: LOG.fatal(statement);
-+ break;
-+ case ERROR: LOG.error(statement);
-+ break;
-+ case WARN: LOG.warn(statement);
-+ break;
-+ case INFO: LOG.info(statement);
-+ break;
-+ case DEBUG: LOG.debug(statement);
-+ break;
-+ case TRACE: LOG.trace(statement);
-+ break;
-+ case NOLOG: break;
-+ default: break;
-+ }
-+ }
-+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephFaker.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0)
-@@ -0,0 +1,480 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * This uses the local Filesystem but pretends to be communicating
-+ * with a Ceph deployment, for unit testing the CephFileSystem.
-+ */
-+
-+package org.apache.hadoop.fs.ceph;
-+
-+import java.net.URI;
-+import java.util.Hashtable;
-+import java.io.Closeable;
-+import java.io.FileNotFoundException;
-+import java.io.IOException;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FileAlreadyExistsException;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.FsStatus;
-+import org.apache.hadoop.fs.FSDataInputStream;
-+import org.apache.hadoop.fs.FSDataOutputStream;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+
-+class CephFaker extends CephFS {
-+
-+ FileSystem localFS;
-+ String localPrefix;
-+ int blockSize;
-+ Configuration conf;
-+ Hashtable<Integer, Object> files;
-+ Hashtable<Integer, String> filenames;
-+ int fileCount = 0;
-+ boolean initialized = false;
-+
-+ public CephFaker(Configuration con, Log log) {
-+ super(con, log);
-+ conf = con;
-+ files = new Hashtable<Integer, Object>();
-+ filenames = new Hashtable<Integer, String>();
-+ }
-+
-+ protected boolean ceph_initializeClient(String args, int block_size) {
-+ if (!initialized) {
-+ //let's remember the default block_size
-+ blockSize = block_size;
-+ /* for a real Ceph deployment, this starts up the client,
-+ * sets debugging levels, etc. We just need to get the
-+ * local FileSystem to use, and we'll ignore any
-+ * command-line arguments. */
-+ try {
-+ localFS = FileSystem.getLocal(conf);
-+ localFS.initialize(URI.create("file://localhost"), conf);
-+ localFS.setVerifyChecksum(false);
-+ String testDir = conf.get("hadoop.tmp.dir");
-+ localPrefix = localFS.getWorkingDirectory().toString();
-+ int testDirLoc = localPrefix.indexOf(testDir) - 1;
-+ if (-2 == testDirLoc)
-+ testDirLoc = localPrefix.length();
-+ localPrefix = localPrefix.substring(0, testDirLoc)
-+ + "/" + conf.get("hadoop.tmp.dir");
-+
-+ localFS.setWorkingDirectory(new Path(localPrefix
-+ +"/user/"
-+ + System.getProperty("user.name")));
-+ //I don't know why, but the unit tests expect the default
-+ //working dir to be /user/username, so satisfy them!
-+ //debug("localPrefix is " + localPrefix, INFO);
-+ }
-+ catch (IOException e) {
-+ return false;
-+ }
-+ initialized = true;
-+ }
-+ return true;
-+ }
-+
-+ protected String ceph_getcwd() {
-+ return sanitize_path(localFS.getWorkingDirectory().toString());
-+ }
-+
-+ protected boolean ceph_setcwd(String path) {
-+ localFS.setWorkingDirectory(new Path(prepare_path(path)));
-+ return true;
-+ }
-+
-+ //the caller is responsible for ensuring empty dirs
-+ protected boolean ceph_rmdir(String pth) {
-+ Path path = new Path(prepare_path(pth));
-+ boolean ret = false;
-+ try {
-+ if (localFS.listStatus(path).length <= 1) {
-+ ret = localFS.delete(path, true);
-+ }
-+ }
-+ catch (IOException e){ }
-+ return ret;
-+ }
-+
-+ //this needs to work on (empty) directories too
-+ protected boolean ceph_unlink(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+ if (ceph_isdirectory(path)) {
-+ ret = ceph_rmdir(path);
-+ }
-+ else {
-+ try {
-+ ret = localFS.delete(new Path(path), false);
-+ }
-+ catch (IOException e){ }
-+ }
-+ return ret;
-+ }
-+
-+ protected boolean ceph_rename(String oldName, String newName) {
-+ oldName = prepare_path(oldName);
-+ newName = prepare_path(newName);
-+ try {
-+ Path parent = new Path(newName).getParent();
-+ Path newPath = new Path(newName);
-+ if (localFS.exists(parent) && !localFS.exists(newPath))
-+ return localFS.rename(new Path(oldName), newPath);
-+ return false;
-+ }
-+ catch (IOException e) { return false; }
-+ }
-+
-+ protected boolean ceph_exists(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+ try {
-+ ret = localFS.exists(new Path(path));
-+ }
-+ catch (IOException e){ }
-+ return ret;
-+ }
-+
-+ protected long ceph_getblocksize(String path) {
-+ path = prepare_path(path);
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+ return status.getBlockSize();
-+ }
-+ catch (FileNotFoundException e) {
-+ return -CephFS.ENOENT;
-+ }
-+ catch (IOException e) {
-+ return -1; //just fail generically
-+ }
-+ }
-+
-+ protected boolean ceph_isdirectory(String path) {
-+ path = prepare_path(path);
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+ return status.isDir();
-+ }
-+ catch (IOException e) { return false; }
-+ }
-+
-+ protected boolean ceph_isfile(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+ ret = !status.isDir();
-+ }
-+ catch (Exception e) {}
-+ return ret;
-+ }
-+
-+ protected String[] ceph_getdir(String path) {
-+ path = prepare_path(path);
-+ if (!ceph_isdirectory(path)) {
-+ return null;
-+ }
-+ try {
-+ FileStatus[] stats = localFS.listStatus(new Path(path));
-+ String[] names = new String[stats.length];
-+ String name;
-+ for (int i=0; i<stats.length; ++i) {
-+ name = stats[i].getPath().toString();
-+ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
-+ }
-+ return names;
-+ }
-+ catch (IOException e) {}
-+ return null;
-+ }
-+
-+ protected int ceph_mkdirs(String path, int mode) {
-+ path = prepare_path(path);
-+ //debug("ceph_mkdirs on " + path, INFO);
-+ try {
-+ if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
-+ return 0;
-+ }
-+ catch (FileAlreadyExistsException fe) { return ENOTDIR; }
-+ catch (IOException e) {}
-+ if (ceph_isdirectory(path))
-+ return -EEXIST; //apparently it already existed
-+ return -1;
-+ }
-+
-+ /*
-+ * Unlike a real Ceph deployment, you can't do opens on a directory.
-+ * Since that has unpredictable behavior and you shouldn't do it anyway,
-+ * it's okay.
-+ */
-+ protected int ceph_open_for_append(String path) {
-+ path = prepare_path(path);
-+ FSDataOutputStream stream;
-+ try {
-+ stream = localFS.append(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ return fileCount++;
-+ }
-+ catch (IOException e) { }
-+ return -1; // failure
-+ }
-+
-+ protected int ceph_open_for_read(String path) {
-+ path = prepare_path(path);
-+ FSDataInputStream stream;
-+ try {
-+ stream = localFS.open(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ debug("ceph_open_for_read fh:" + fileCount
-+ + ", pathname:" + path, INFO);
-+ return fileCount++;
-+ }
-+ catch (IOException e) { }
-+ return -1; //failure
-+ }
-+
-+ protected int ceph_open_for_overwrite(String path, int mode) {
-+ path = prepare_path(path);
-+ FSDataOutputStream stream;
-+ try {
-+ stream = localFS.create(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ debug("ceph_open_for_overwrite fh:" + fileCount
-+ + ", pathname:" + path, INFO);
-+ return fileCount++;
-+ }
-+ catch (IOException e) { }
-+ return -1; //failure
-+ }
-+
-+ protected int ceph_close(int filehandle) {
-+ debug("ceph_close(filehandle " + filehandle + ")", INFO);
-+ try {
-+ ((Closeable)files.get(new Integer(filehandle))).close();
-+ if(null == files.get(new Integer(filehandle))) {
-+ return -ENOENT; //this isn't quite the right error code,
-+ // but the important part is it's negative
-+ }
-+ return 0; //hurray, success
-+ }
-+ catch (NullPointerException ne) {
-+ debug("ceph_close caught NullPointerException!" + ne, WARN);
-+ } //err, how?
-+ catch (IOException ie) {
-+ debug("ceph_close caught IOException!" + ie, WARN);
-+ }
-+ return -1; //failure
-+ }
-+
-+ protected boolean ceph_setPermission(String pth, int mode) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ boolean ret = false;
-+ try {
-+ localFS.setPermission(path, new FsPermission((short)mode));
-+ ret = true;
-+ }
-+ catch (IOException e) { }
-+ return ret;
-+ }
-+
-+ //rather than try and match a Ceph deployment's behavior exactly,
-+ //just make bad things happen if they try and call methods after this
-+ protected boolean ceph_kill_client() {
-+ //debug("ceph_kill_client", INFO);
-+ localFS.setWorkingDirectory(new Path(localPrefix));
-+ //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
-+ try{
-+ localFS.close(); }
-+ catch (Exception e) {}
-+ localFS = null;
-+ files = null;
-+ filenames = null;
-+ return true;
-+ }
-+
-+ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ boolean ret = false;
-+ try {
-+ FileStatus status = localFS.getFileStatus(path);
-+ fill.size = status.getLen();
-+ fill.is_dir = status.isDir();
-+ fill.block_size = status.getBlockSize();
-+ fill.mod_time = status.getModificationTime();
-+ fill.access_time = status.getAccessTime();
-+ fill.mode = status.getPermission().toShort();
-+ ret = true;
-+ }
-+ catch (IOException e) {}
-+ return ret;
-+ }
++ * @return the default block size, in bytes, as a long.
++ */
++ @Override
++ public long getDefaultBlockSize() {
++ return getConf().getInt("fs.ceph.blockSize", 1 << 26);
++ }
+
-+ protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
-+ pth = prepare_path(pth);
-+ try {
-+ FsStatus stat = localFS.getStatus();
-+ fill.capacity = stat.getCapacity();
-+ fill.used = stat.getUsed();
-+ fill.remaining = stat.getRemaining();
-+ return 0;
-+ }
-+ catch (Exception e){}
-+ return -1; //failure;
-+ }
++ /**
++ * Adds the working directory to path if path is not already
++ * an absolute path. The URI scheme is not removed here. It
++ * is removed only when users (e.g. ceph native calls) need
++ * the path-only portion.
++ */
++ private Path makeAbsolute(Path path) {
++ if (path.isAbsolute()) {
++ return path;
++ }
++ return new Path(workingDir, path);
++ }
+
-+ protected int ceph_replication(String path) {
-+ path = prepare_path(path);
-+ int ret = -1; //-1 for failure
-+ try {
-+ ret = localFS.getFileStatus(new Path(path)).getReplication();
-+ }
-+ catch (IOException e) {}
-+ return ret;
-+ }
++ private Path[] listPaths(Path path) throws IOException {
++ LOG.debug("listPaths:enter with path " + path);
++ String dirlist[];
+
-+ protected String ceph_hosts(int fh, long offset) {
-+ String ret = null;
-+ try {
-+ BlockLocation[] locs = localFS.getFileBlockLocations(
-+ localFS.getFileStatus(new Path(
-+ filenames.get(new Integer(fh)))), offset, 1);
-+ ret = locs[0].getNames()[0];
-+ }
-+ catch (IOException e) {}
-+ catch (NullPointerException f) { }
-+ return ret;
-+ }
++ Path abs_path = makeAbsolute(path);
+
-+ protected int ceph_setTimes(String pth, long mtime, long atime) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ int ret = -1; //generic fail
-+ try {
-+ localFS.setTimes(path, mtime, atime);
-+ ret = 0;
-+ }
-+ catch (IOException e) {}
-+ return ret;
-+ }
++ // If it's a directory, get the listing. Otherwise, complain and give up.
++ LOG.debug("calling ceph_getdir from Java with path " + abs_path);
++ dirlist = ceph.ceph_getdir(getCephPath(abs_path));
++ LOG.debug("returning from ceph_getdir to Java");
+
-+ protected long ceph_getpos(int fh) {
-+ long ret = -1; //generic fail
-+ try {
-+ Object stream = files.get(new Integer(fh));
-+ if (stream instanceof FSDataInputStream) {
-+ ret = ((FSDataInputStream)stream).getPos();
-+ }
-+ else if (stream instanceof FSDataOutputStream) {
-+ ret = ((FSDataOutputStream)stream).getPos();
-+ }
-+ }
-+ catch (IOException e) {}
-+ catch (NullPointerException f) { }
-+ return ret;
-+ }
++ if (dirlist == null) {
++ return null;
++ }
+
-+ protected int ceph_write(int fh, byte[] buffer,
-+ int buffer_offset, int length) {
-+ debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
-+ + ", length:" + length, INFO);
-+ long ret = -1;//generic fail
-+ try {
-+ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-+ debug("ceph_write got outputstream", INFO);
-+ long startPos = os.getPos();
-+ os.write(buffer, buffer_offset, length);
-+ ret = os.getPos() - startPos;
-+ }
-+ catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
-+ catch (NullPointerException f) {
-+ debug("ceph_write caught NullPointerException!", WARN); }
-+ return (int)ret;
-+ }
++ // convert the strings to Paths
++ Path[] paths = new Path[dirlist.length];
+
-+ protected int ceph_read(int fh, byte[] buffer,
-+ int buffer_offset, int length) {
-+ long ret = -1;//generic fail
-+ try {
-+ FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
-+ long startPos = is.getPos();
-+ is.read(buffer, buffer_offset, length);
-+ ret = is.getPos() - startPos;
-+ }
-+ catch (IOException e) {}
-+ catch (NullPointerException f) {}
-+ return (int)ret;
-+ }
++ for (int i = 0; i < dirlist.length; ++i) {
++ LOG.trace(
++ "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
++ + dirlist[i] + "\"");
++ // convert each listing to an absolute path
++ Path raw_path = new Path(dirlist[i]);
+
-+ protected long ceph_seek_from_start(int fh, long pos) {
-+ debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
-+ long ret = -1;//generic fail
-+ try {
-+ debug("ceph_seek_from_start filename is "
-+ + filenames.get(new Integer(fh)), INFO);
-+ if (null == files.get(new Integer(fh))) {
-+ debug("ceph_seek_from_start: is is null!", WARN);
-+ }
-+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-+ debug("ceph_seek_from_start retrieved is!", INFO);
-+ is.seek(pos);
-+ ret = is.getPos();
-+ }
-+ catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
-+ catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
-+ return (int)ret;
-+ }
++ if (raw_path.isAbsolute()) {
++ paths[i] = raw_path;
++ } else {
++ paths[i] = new Path(abs_path, raw_path);
++ }
++ }
++ LOG.debug("listPaths:exit");
++ return paths;
++ }
+
-+ /*
-+ * We need to remove the localFS file prefix before returning to Ceph
-+ */
-+ private String sanitize_path(String path) {
-+ //debug("sanitize_path(" + path + ")", INFO);
-+ /* if (path.startsWith("file:"))
-+ path = path.substring("file:".length()); */
-+ if (path.startsWith(localPrefix)) {
-+ path = path.substring(localPrefix.length());
-+ if (path.length() == 0) //it was a root path
-+ path = "/";
-+ }
-+ //debug("sanitize_path returning " + path, INFO);
-+ return path;
-+ }
++ static class Stat {
++ public long size;
++ public boolean is_dir;
++ public long block_size;
++ public long mod_time;
++ public long access_time;
++ public int mode;
+
-+ /*
-+ * If it's an absolute path we need to shove the
-+ * test dir onto the front as a prefix.
-+ */
-+ private String prepare_path(String path) {
-+ //debug("prepare_path(" + path + ")", INFO);
-+ if (path.startsWith("/"))
-+ path = localPrefix + path;
-+ else if (path.equals("..")) {
-+ if (ceph_getcwd().equals("/"))
-+ path = "."; // you can't go up past root!
-+ }
-+ //debug("prepare_path returning" + path, INFO);
-+ return path;
-+ }
++ public Stat() {}
++ }
+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephInputStream.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephInputStream.java (revision 0)
-@@ -0,0 +1,235 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
+new file mode 100644
+index 0000000..d9668d0
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
+@@ -0,0 +1,254 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ */
+package org.apache.hadoop.fs.ceph;
+
++
+import java.io.IOException;
+
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+
+ * Ceph instance.
+ */
+public class CephInputStream extends FSInputStream {
-+
++ private static final Log LOG = LogFactory.getLog(CephInputStream.class);
+ private boolean closed;
+
+ private int fileHandle;
+
+ private long fileLength;
+
-+ private CephFS ceph;
++ private CephFS ceph;
+
-+ private byte[] buffer;
-+ private int bufPos = 0;
-+ private int bufValid = 0;
-+ private long cephPos = 0;
++ private byte[] buffer;
++ private int bufPos = 0;
++ private int bufValid = 0;
++ private long cephPos = 0;
+
+ /**
+ * Create a new CephInputStream.
+ * you will need to close and re-open it to access the new data.
+ */
+ public CephInputStream(Configuration conf, CephFS cephfs,
-+ int fh, long flength, int bufferSize) {
++ int fh, long flength, int bufferSize) {
+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
+ // call and providing the file handle.
+ fileLength = flength;
+ fileHandle = fh;
+ closed = false;
-+ ceph = cephfs;
-+ buffer = new byte[bufferSize];
-+ ceph.debug("CephInputStream constructor: initializing stream with fh "
-+ + fh + " and file length " + flength, ceph.DEBUG);
++ ceph = cephfs;
++ buffer = new byte[bufferSize];
++ LOG.debug(
++ "CephInputStream constructor: initializing stream with fh " + fh
++ + " and file length " + flength);
+
+ }
++
+ /** Ceph likes things to be closed before it shuts down,
+ * so closing the IOStream stuff voluntarily in a finalizer is good
+ */
-+ protected void finalize () throws Throwable {
++ protected void finalize() throws Throwable {
+ try {
-+ if (!closed) close();
++ if (!closed) {
++ close();
++ }
++ } finally {
++ super.finalize();
+ }
-+ finally { super.finalize(); }
+ }
+
-+ private synchronized boolean fillBuffer() throws IOException {
-+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
-+ bufPos = 0;
-+ if (bufValid < 0) {
-+ int err = bufValid;
-+ bufValid = 0;
-+ //attempt to reset to old position. If it fails, too bad.
-+ ceph.ceph_seek_from_start(fileHandle, cephPos);
-+ throw new IOException("Failed to fill read buffer! Error code:"
-+ + err);
-+ }
-+ cephPos += bufValid;
-+ return (bufValid != 0);
-+ }
++ private synchronized boolean fillBuffer() throws IOException {
++ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
++ bufPos = 0;
++ if (bufValid < 0) {
++ int err = bufValid;
+
-+ /*
-+ * Get the current position of the stream.
-+ */
++ bufValid = 0;
++ // attempt to reset to old position. If it fails, too bad.
++ ceph.ceph_seek_from_start(fileHandle, cephPos);
++ throw new IOException("Failed to fill read buffer! Error code:" + err);
++ }
++ cephPos += bufValid;
++ return (bufValid != 0);
++ }
++
++ /*
++ * Get the current position of the stream.
++ */
+ public synchronized long getPos() throws IOException {
-+ return cephPos - bufValid + bufPos;
++ return cephPos - bufValid + bufPos;
+ }
+
+ /**
+ * Find the number of bytes remaining in the file.
+ */
+ @Override
-+ public synchronized int available() throws IOException {
-+ return (int) (fileLength - getPos());
-+ }
++ public synchronized int available() throws IOException {
++ return (int) (fileLength - getPos());
++ }
+
+ public synchronized void seek(long targetPos) throws IOException {
-+ ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
-+ " on fd " + fileHandle, ceph.TRACE);
++ LOG.trace(
++ "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
++ + fileHandle);
+ if (targetPos > fileLength) {
-+ throw new IOException("CephInputStream.seek: failed seek to position "
-+ + targetPos + " on fd " + fileHandle
-+ + ": Cannot seek after EOF " + fileLength);
-+ }
-+ long oldPos = cephPos;
-+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
-+ bufValid = 0;
-+ bufPos = 0;
-+ if (cephPos < 0) {
-+ cephPos = oldPos;
-+ throw new IOException ("Ceph failed to seek to new position!");
-+ }
-+ }
++ throw new IOException(
++ "CephInputStream.seek: failed seek to position " + targetPos
++ + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
++ }
++ long oldPos = cephPos;
++
++ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
++ bufValid = 0;
++ bufPos = 0;
++ if (cephPos < 0) {
++ cephPos = oldPos;
++ throw new IOException("Ceph failed to seek to new position!");
++ }
++ }
+
+ /**
+ * Failovers are handled by the Ceph code at a very low level;
+ return false;
+ }
+
-+
+ /**
+ * Read a byte from the file.
+ * @return the next byte.
+ */
+ @Override
-+ public synchronized int read() throws IOException {
-+ ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-+ + " by calling general read function", ceph.TRACE);
++ public synchronized int read() throws IOException {
++ LOG.trace(
++ "CephInputStream.read: Reading a single byte from fd " + fileHandle
++ + " by calling general read function");
+
-+ byte result[] = new byte[1];
-+ if (getPos() >= fileLength) return -1;
-+ if (-1 == read(result, 0, 1)) return -1;
-+ if (result[0]<0) return 256+(int)result[0];
-+ else return result[0];
++ byte result[] = new byte[1];
++
++ if (getPos() >= fileLength) {
++ return -1;
++ }
++ if (-1 == read(result, 0, 1)) {
++ return -1;
+ }
++ if (result[0] < 0) {
++ return 256 + (int) result[0];
++ } else {
++ return result[0];
++ }
++ }
+
+ /**
+ * Read a specified number of bytes from the file into a byte[].
+ * @param off the offset to start at in the file
+ * @param len the number of bytes to read
+ * @return 0 if successful, otherwise an error code.
-+ * @throws IOException on bad input.
++ * @throws IOException on bad input.
+ */
+ @Override
-+ public synchronized int read(byte buf[], int off, int len)
-+ throws IOException {
-+ ceph.debug("CephInputStream.read: Reading " + len +
-+ " bytes from fd " + fileHandle, ceph.TRACE);
++ public synchronized int read(byte buf[], int off, int len)
++ throws IOException {
++ LOG.trace(
++ "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
+
-+ if (closed) {
-+ throw new IOException("CephInputStream.read: cannot read " + len +
-+ " bytes from fd " + fileHandle +
-+ ": stream closed");
-+ }
++ if (closed) {
++ throw new IOException(
++ "CephInputStream.read: cannot read " + len + " bytes from fd "
++ + fileHandle + ": stream closed");
++ }
+
-+ // ensure we're not past the end of the file
-+ if (getPos() >= fileLength) {
-+ ceph.debug("CephInputStream.read: cannot read " + len +
-+ " bytes from fd " + fileHandle + ": current position is "
-+ + getPos() + " and file length is " + fileLength,
-+ ceph.DEBUG);
++ // ensure we're not past the end of the file
++ if (getPos() >= fileLength) {
++ LOG.debug(
++ "CephInputStream.read: cannot read " + len + " bytes from fd "
++ + fileHandle + ": current position is " + getPos()
++ + " and file length is " + fileLength);
+
-+ return -1;
-+ }
-+
-+ int totalRead = 0;
-+ int initialLen = len;
-+ int read;
-+ do {
-+ read = Math.min(len, bufValid - bufPos);
-+ try {
-+ System.arraycopy(buffer, bufPos, buf, off, read);
-+ }
-+ catch(IndexOutOfBoundsException ie) {
-+ throw new IOException("CephInputStream.read: Indices out of bounds:"
-+ + "read length is " + len
-+ + ", buffer offset is " + off
-+ + ", and buffer size is " + buf.length);
-+ }
-+ catch (ArrayStoreException ae) {
-+ throw new IOException("Uh-oh, CephInputStream failed to do an array"
-+ + "copy due to type mismatch...");
-+ }
-+ catch (NullPointerException ne) {
-+ throw new IOException("CephInputStream.read: cannot read "
-+ + len + "bytes from fd:" + fileHandle
-+ + ": buf is null");
-+ }
-+ bufPos += read;
-+ len -= read;
-+ off += read;
-+ totalRead += read;
-+ } while (len > 0 && fillBuffer());
-+
-+ ceph.debug("CephInputStream.read: Reading " + initialLen
-+ + " bytes from fd " + fileHandle
-+ + ": succeeded in reading " + totalRead + " bytes",
-+ ceph.TRACE);
-+ return totalRead;
-+ }
++ return -1;
++ }
++
++ int totalRead = 0;
++ int initialLen = len;
++ int read;
++
++ do {
++ read = Math.min(len, bufValid - bufPos);
++ try {
++ System.arraycopy(buffer, bufPos, buf, off, read);
++ } catch (IndexOutOfBoundsException ie) {
++ throw new IOException(
++ "CephInputStream.read: Indices out of bounds:" + "read length is "
++ + len + ", buffer offset is " + off + ", and buffer size is "
++ + buf.length);
++ } catch (ArrayStoreException ae) {
++ throw new IOException(
++ "Uh-oh, CephInputStream failed to do an array"
++ + "copy due to type mismatch...");
++ } catch (NullPointerException ne) {
++ throw new IOException(
++ "CephInputStream.read: cannot read " + len + "bytes from fd:"
++ + fileHandle + ": buf is null");
++ }
++ bufPos += read;
++ len -= read;
++ off += read;
++ totalRead += read;
++ } while (len > 0 && fillBuffer());
++
++ LOG.trace(
++ "CephInputStream.read: Reading " + initialLen + " bytes from fd "
++ + fileHandle + ": succeeded in reading " + totalRead + " bytes");
++ return totalRead;
++ }
+
+ /**
+ * Close the CephInputStream and release the associated filehandle.
+ */
+ @Override
-+ public void close() throws IOException {
-+ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++ public void close() throws IOException {
++ LOG.trace("CephOutputStream.close:enter");
+ if (!closed) {
-+ int result = ceph.ceph_close(fileHandle);
-+ closed = true;
-+ if (result != 0) {
-+ throw new IOException("Close somehow failed!"
-+ + "Don't try and use this stream again, though");
-+ }
-+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-+ }
-+ }
++ int result = ceph.ceph_close(fileHandle);
++
++ closed = true;
++ if (result != 0) {
++ throw new IOException(
++ "Close somehow failed!"
++ + "Don't try and use this stream again, though");
++ }
++ LOG.trace("CephOutputStream.close:exit");
++ }
++ }
+}
-Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0)
-@@ -0,0 +1,202 @@
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
+new file mode 100644
+index 0000000..4c50f88
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
+@@ -0,0 +1,219 @@
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+
+package org.apache.hadoop.fs.ceph;
+
++
+import java.io.IOException;
+import java.io.OutputStream;
+
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Progressable;
+
++
+/**
+ * <p>
+ * An {@link OutputStream} for a CephFileSystem and corresponding
+ * Ceph instance.
+ */
+public class CephOutputStream extends OutputStream {
-+
++ private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
+ private boolean closed;
+
-+ private CephFS ceph;
++ private CephFS ceph;
+
+ private int fileHandle;
+
-+ private byte[] buffer;
-+ private int bufUsed = 0;
++ private byte[] buffer;
++ private int bufUsed = 0;
+
+ /**
+ * Construct the CephOutputStream.
+ * @param fh The Ceph filehandle to connect to.
+ */
+ public CephOutputStream(Configuration conf, CephFS cephfs,
-+ int fh, int bufferSize) {
-+ ceph = cephfs;
++ int fh, int bufferSize) {
++ ceph = cephfs;
+ fileHandle = fh;
+ closed = false;
-+ buffer = new byte[bufferSize];
++ buffer = new byte[bufferSize];
+ }
+
-+ /**Ceph likes things to be closed before it shuts down,
++ /** Ceph likes things to be closed before it shuts down,
+ *so closing the IOStream stuff voluntarily is good
+ */
-+ protected void finalize () throws Throwable {
++ protected void finalize() throws Throwable {
+ try {
-+ if (!closed) close();
++ if (!closed) {
++ close();
++ }
++ } finally {
++ super.finalize();
+ }
-+ finally { super.finalize();}
+ }
+
+ /**
+ * write fails.
+ */
+ @Override
-+ public synchronized void write(int b) throws IOException {
-+ ceph.debug("CephOutputStream.write: writing a single byte to fd "
-+ + fileHandle, ceph.TRACE);
-+
-+ if (closed) {
-+ throw new IOException("CephOutputStream.write: cannot write " +
-+ "a byte to fd " + fileHandle + ": stream closed");
-+ }
-+ // Stick the byte in a buffer and write it
-+ byte buf[] = new byte[1];
-+ buf[0] = (byte) b;
-+ write(buf, 0, 1);
-+ return;
++ public synchronized void write(int b) throws IOException {
++ LOG.trace(
++ "CephOutputStream.write: writing a single byte to fd " + fileHandle);
++
++ if (closed) {
++ throw new IOException(
++ "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
++ + ": stream closed");
+ }
++ // Stick the byte in a buffer and write it
++ byte buf[] = new byte[1];
++
++ buf[0] = (byte) b;
++ write(buf, 0, 1);
++ return;
++ }
+
+ /**
+ * Write a byte buffer into the Ceph file.
+ * @param off the position in the file to start writing at.
+ * @param len The number of bytes to actually write.
+ * @throws IOException if you have closed the CephOutputStream, or
-+ * if buf is null or off + len > buf.length, or
-+ * if the write fails due to a Ceph error.
++ * if buf is null or off + len > buf.length, or
++ * if the write fails due to a Ceph error.
+ */
+ @Override
-+ public synchronized void write(byte buf[], int off, int len) throws IOException {
-+ ceph.debug("CephOutputStream.write: writing " + len +
-+ " bytes to fd " + fileHandle, ceph.TRACE);
-+ // make sure stream is open
-+ if (closed) {
-+ throw new IOException("CephOutputStream.write: cannot write " + len +
-+ "bytes to fd " + fileHandle + ": stream closed");
-+ }
++ public synchronized void write(byte buf[], int off, int len) throws IOException {
++ LOG.trace(
++ "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
++ // make sure stream is open
++ if (closed) {
++ throw new IOException(
++ "CephOutputStream.write: cannot write " + len + "bytes to fd "
++ + fileHandle + ": stream closed");
++ }
+
-+ int result;
-+ int write;
-+ while (len>0) {
-+ write = Math.min(len, buffer.length - bufUsed);
-+ try {
-+ System.arraycopy(buf, off, buffer, bufUsed, write);
-+ }
-+ catch (IndexOutOfBoundsException ie) {
-+ throw new IOException("CephOutputStream.write: Indices out of bounds: "
-+ + "write length is " + len
-+ + ", buffer offset is " + off
-+ + ", and buffer size is " + buf.length);
-+ }
-+ catch (ArrayStoreException ae) {
-+ throw new IOException("Uh-oh, CephOutputStream failed to do an array"
-+ + " copy due to type mismatch...");
-+ }
-+ catch (NullPointerException ne) {
-+ throw new IOException("CephOutputStream.write: cannot write "
-+ + len + "bytes to fd " + fileHandle
-+ + ": buffer is null");
-+ }
-+ bufUsed += write;
-+ len -= write;
-+ off += write;
-+ if (bufUsed == buffer.length) {
-+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+ if (result < 0)
-+ throw new IOException("CephOutputStream.write: Buffered write of "
-+ + bufUsed + " bytes failed!");
-+ if (result != bufUsed)
-+ throw new IOException("CephOutputStream.write: Wrote only "
-+ + result + " bytes of " + bufUsed
-+ + " in buffer! Data may be lost or written"
-+ + " twice to Ceph!");
-+ bufUsed = 0;
-+ }
++ int result;
++ int write;
++
++ while (len > 0) {
++ write = Math.min(len, buffer.length - bufUsed);
++ try {
++ System.arraycopy(buf, off, buffer, bufUsed, write);
++ } catch (IndexOutOfBoundsException ie) {
++ throw new IOException(
++ "CephOutputStream.write: Indices out of bounds: "
++ + "write length is " + len + ", buffer offset is " + off
++ + ", and buffer size is " + buf.length);
++ } catch (ArrayStoreException ae) {
++ throw new IOException(
++ "Uh-oh, CephOutputStream failed to do an array"
++ + " copy due to type mismatch...");
++ } catch (NullPointerException ne) {
++ throw new IOException(
++ "CephOutputStream.write: cannot write " + len + "bytes to fd "
++ + fileHandle + ": buffer is null");
++ }
++ bufUsed += write;
++ len -= write;
++ off += write;
++ if (bufUsed == buffer.length) {
++ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++ if (result < 0) {
++ throw new IOException(
++ "CephOutputStream.write: Buffered write of " + bufUsed
++ + " bytes failed!");
++ }
++ if (result != bufUsed) {
++ throw new IOException(
++ "CephOutputStream.write: Wrote only " + result + " bytes of "
++ + bufUsed + " in buffer! Data may be lost or written"
++ + " twice to Ceph!");
++ }
++ bufUsed = 0;
++ }
+
-+ }
-+ return;
-+ }
++ }
++ return;
++ }
+
+ /**
+ * Flush the buffered data.
+ * @throws IOException if you've closed the stream or the write fails.
+ */
+ @Override
-+ public synchronized void flush() throws IOException {
-+ if (!closed) {
-+ if (bufUsed == 0) return;
-+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+ if (result < 0) {
-+ throw new IOException("CephOutputStream.write: Write of "
-+ + bufUsed + "bytes to fd "
-+ + fileHandle + " failed");
-+ }
-+ if (result != bufUsed) {
-+ throw new IOException("CephOutputStream.write: Write of " + bufUsed
-+ + "bytes to fd " + fileHandle
-+ + "was incomplete: only " + result + " of "
-+ + bufUsed + " bytes were written.");
-+ }
-+ bufUsed = 0;
-+ return;
-+ }
-+ }
++ public synchronized void flush() throws IOException {
++ if (!closed) {
++ if (bufUsed == 0) {
++ return;
++ }
++ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++
++ if (result < 0) {
++ throw new IOException(
++ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
++ + fileHandle + " failed");
++ }
++ if (result != bufUsed) {
++ throw new IOException(
++ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
++ + fileHandle + "was incomplete: only " + result + " of " + bufUsed
++ + " bytes were written.");
++ }
++ bufUsed = 0;
++ return;
++ }
++ }
+
+ /**
+ * Close the CephOutputStream.
+ * @throws IOException if Ceph somehow returns an error. In current code it can't.
+ */
+ @Override
-+ public synchronized void close() throws IOException {
-+ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
-+ if (!closed) {
-+ flush();
-+ int result = ceph.ceph_close(fileHandle);
-+ if (result != 0) {
-+ throw new IOException("Close failed!");
-+ }
++ public synchronized void close() throws IOException {
++ LOG.trace("CephOutputStream.close:enter");
++ if (!closed) {
++ flush();
++ int result = ceph.ceph_close(fileHandle);
++
++ if (result != 0) {
++ throw new IOException("Close failed!");
++ }
+
-+ closed = true;
-+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
-+ }
-+ }
++ closed = true;
++ LOG.trace("CephOutputStream.close:exit");
++ }
++ }
+}
-Index: src/java/org/apache/hadoop/fs/ceph/package.html
-===================================================================
---- src/java/org/apache/hadoop/fs/ceph/package.html (revision 0)
-+++ src/java/org/apache/hadoop/fs/ceph/package.html (revision 0)
-@@ -0,0 +1,101 @@
-+<html>
-+
-+<!--
-+ Licensed to the Apache Software Foundation (ASF) under one or more
-+ contributor license agreements. See the NOTICE file distributed with
-+ this work for additional information regarding copyright ownership.
-+ The ASF licenses this file to You under the Apache License, Version 2.0
-+ (the "License"); you may not use this file except in compliance with
-+ the License. You may obtain a copy of the License at
-+
-+ http://www.apache.org/licenses/LICENSE-2.0
-+
-+ Unless required by applicable law or agreed to in writing, software
-+ distributed under the License is distributed on an "AS IS" BASIS,
-+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ See the License for the specific language governing permissions and
-+ limitations under the License.
-+-->
-+
-+<head></head>
-+<body>
-+<h1>A client for the Ceph filesystem</h1>
-+
-+<h3>Introduction</h3>
-+
-+This page describes how to use <a href="http://ceph.newdream.net">Ceph</a>
-+as a backing store with Hadoop. This page assumes that you have downloaded
-+the Ceph software and installed necessary binaries as outlined in the Ceph
-+documentation.
-+
-+<h3>Steps</h3>
-+<ul>
-+ <li>In the Hadoop conf directory edit core-site.xml,
-+ adding the following (with appropriate substitutions). Note that
-+ different nodes can connect to different monitors in the same cluster
-+ without issue (the Ceph client will automatically redirect as necessary).
-+<pre>
-+<property>
-+ <name>fs.default.name</name>
-+ <value>ceph://null</value>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.monAddr</name>
-+ <value><serverIP:port></value>
-+ <description>The location of the Ceph monitor to connect to.
-+ This should be an IP address, not a domain-based web address.</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.libDir</name>
-+ <value>/usr/local/lib</value>
-+ <description>The folder holding libcephfs and libhadoopceph</description>
-+ </property>
-+</pre>
-+ <li>There are also a number of optional Ceph configuration options.
-+<pre>
-+<property>
-+ <name>fs.ceph.blockSize</name>
-+ <value>67108864</value>
-+ <description>Defaulting to 64MB, this is the size (in bytes) you want Ceph to use in striping data internally and presenting it to Hadoop.</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.debug</name>
-+ <value>true</value>
-+ <description>If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.clientDebug</name>
-+ <value>1</value>
-+ <description>If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.messengerDebug</name>
-+ <value>1</value>
-+ <description>If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.readahead</name>
-+ <value>1</value>
-+ <description>Sets the number of object periods to read ahead in prefetching. This should probably be left at the default of 1.</description>
-+</property>
-+
-+<property>
-+ <name>fs.ceph.commandLine</name>
-+ <value>a string</value>
-+ <description>If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
-+By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.</description>
-+</property>
-+</pre>
-+
-+ <li>Start up your Ceph instance according to the Ceph documentation.</li>
-+ <li>Do not use the bin/start-all.sh commands, as they will attempt to start
-+ up an hdfs instance. Just start whatever systems you need and they will
-+ automatically make use of the Ceph filesystem once configured as above.</li>
-+</body>
-+</html>
-\ No newline at end of file
-Index: src/java/core-default.xml
-===================================================================
---- src/java/core-default.xml (revision 832912)
-+++ src/java/core-default.xml (working copy)
-@@ -194,6 +194,12 @@
- </property>
-
- <property>
-+ <name>fs.ceph.impl</name>
-+ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
-+ <description>The FileSystem for ceph: uris.</description>
-+</property>
+diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
+new file mode 100644
+index 0000000..569652f
+--- /dev/null
++++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
+@@ -0,0 +1,91 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
-+<property>
- <name>fs.hftp.impl</name>
- <value>org.apache.hadoop.hdfs.HftpFileSystem</value>
- </property>
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ *
++ * Wraps a number of native function calls to communicate with the Ceph
++ * filesystem.
++ */
++package org.apache.hadoop.fs.ceph;
++
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++
++class CephTalker extends CephFS {
++ // JNI doesn't give us any way to store pointers, so use a long.
++ // Here we're assuming pointers aren't longer than 8 bytes.
++ long cluster;
++
++ // we write a constructor so we can load the libraries
++ public CephTalker(Configuration conf, Log log) {
++ System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
++ System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
++ cluster = 0;
++ }
++
++ protected native boolean ceph_initializeClient(String arguments, int block_size);
++
++ protected native String ceph_getcwd();
++
++ protected native boolean ceph_setcwd(String path);
++
++ protected native boolean ceph_rmdir(String path);
++
++ protected native boolean ceph_unlink(String path);
++
++ protected native boolean ceph_rename(String old_path, String new_path);
++
++ protected native boolean ceph_exists(String path);
++
++ protected native long ceph_getblocksize(String path);
++
++ protected native boolean ceph_isdirectory(String path);
++
++ protected native boolean ceph_isfile(String path);
++
++ protected native String[] ceph_getdir(String path);
++
++ protected native int ceph_mkdirs(String path, int mode);
++
++ protected native int ceph_open_for_append(String path);
++
++ protected native int ceph_open_for_read(String path);
++
++ protected native int ceph_open_for_overwrite(String path, int mode);
++
++ protected native int ceph_close(int filehandle);
++
++ protected native boolean ceph_setPermission(String path, int mode);
++
++ protected native boolean ceph_kill_client();
++
++ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
++
++ protected native int ceph_replication(String Path);
++
++ protected native String[] ceph_hosts(int fh, long offset);
++
++ protected native int ceph_setTimes(String path, long mtime, long atime);
++
++ protected native long ceph_getpos(int fh);
++
++ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++
++ protected native long ceph_seek_from_start(int fh, long pos);
++}
+diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
+index 9e22f1f..cd55361 100644
+--- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
++++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
+@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager {
+ if (modifiedTime != desiredTimestamp) {
+ DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT,
+ DateFormat.SHORT);
++ /*
+ throw new IOException("The distributed cache object " + source +
+ " changed during the job from " +
+ df.format(new Date(desiredTimestamp)) + " to " +
+ df.format(new Date(modifiedTime)));
++ */
+ }
+
+ Path parchive = null;
+diff --git a/src/test/commit-tests b/src/test/commit-tests
+index 1148c8b..85fa53d 100644
+--- a/src/test/commit-tests
++++ b/src/test/commit-tests
+@@ -53,6 +53,7 @@
+ **/TestRPC.java
+ **/TestS3Credentials.java
+ **/TestS3FileSystem.java
++**/TestCeph.java
+ **/TestSaslRPC.java
+ **/TestScriptBasedMapping.java
+ **/TestSequenceFileSerialization.java
+diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
+new file mode 100644
+index 0000000..e46b0ee
+--- /dev/null
++++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
+@@ -0,0 +1,45 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ * Unit tests for the CephFileSystem API implementation.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++
++import java.io.IOException;
++import java.net.URI;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystemContractBaseTest;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++
++public class TestCeph extends FileSystemContractBaseTest {
++
++ @Override
++ protected void setUp() throws IOException {
++ Configuration conf = new Configuration();
++ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
++ CephFileSystem cephfs = new CephFileSystem(cephfaker);
++
++ cephfs.initialize(URI.create("ceph://null"), conf);
++ fs = cephfs;
++ }
++}