+Index: src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java
+===================================================================
+--- src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0)
++++ src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0)
+@@ -0,0 +1,42 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ *
++ * Unit tests for the CephFileSystem API implementation.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++import java.io.IOException;
++import java.net.URI;
++import org.apache.hadoop.fs.FileSystemContractBaseTest;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++
++public class TestCeph extends FileSystemContractBaseTest {
++
++ @Override
++ protected void setUp() throws IOException {
++ Configuration conf = new Configuration();
++ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
++ CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
++ cephfs.initialize(URI.create("ceph://null"), conf);
++ fs = cephfs;
++ cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
++ }
++}
+Index: src/java/org/apache/hadoop/fs/ceph/SubmitProcess
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0)
+@@ -0,0 +1,2 @@
++1) Remove the emacs indententation rules line.
++2) ? No more?
+\ No newline at end of file
+Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0)
+@@ -0,0 +1,59 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ *
++ * Wraps a number of native function calls to communicate with the Ceph
++ * filesystem.
++ */
++package org.apache.hadoop.fs.ceph;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++class CephTalker extends CephFS {
++ //we write a constructor so we can load the libraries
++ public CephTalker(Configuration conf, Log log) {
++ super(conf, log);
++ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
++ System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++ }
++ protected native boolean ceph_initializeClient(String arguments, int block_size);
++ protected native String ceph_getcwd();
++ protected native boolean ceph_setcwd(String path);
++ protected native boolean ceph_rmdir(String path);
++ protected native boolean ceph_unlink(String path);
++ protected native boolean ceph_rename(String old_path, String new_path);
++ protected native boolean ceph_exists(String path);
++ protected native long ceph_getblocksize(String path);
++ protected native boolean ceph_isdirectory(String path);
++ protected native boolean ceph_isfile(String path);
++ protected native String[] ceph_getdir(String path);
++ protected native int ceph_mkdirs(String path, int mode);
++ protected native int ceph_open_for_append(String path);
++ protected native int ceph_open_for_read(String path);
++ protected native int ceph_open_for_overwrite(String path, int mode);
++ protected native int ceph_close(int filehandle);
++ protected native boolean ceph_setPermission(String path, int mode);
++ protected native boolean ceph_kill_client();
++ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
++ protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
++ protected native int ceph_replication(String path);
++ protected native String ceph_hosts(int fh, long offset);
++ protected native int ceph_setTimes(String path, long mtime, long atime);
++ protected native long ceph_getpos(int fh);
++ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++ protected native long ceph_seek_from_start(int fh, long pos);
++}
Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java
===================================================================
--- src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0)
+++ src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0)
-@@ -0,0 +1,810 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+@@ -0,0 +1,848 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+
+import java.io.IOException;
+import java.io.FileNotFoundException;
-+import java.io.File;
+import java.io.OutputStream;
+import java.net.URI;
-+import java.util.Set;
+import java.util.EnumSet;
-+import java.util.Vector;
+import java.lang.Math;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+ */
+public class CephFileSystem extends FileSystem {
+
-+ private static final int EEXIST = 17;
-+ private static final int ENOENT = 2;
-+
+ private URI uri;
+
+ private final Path root;
+ private boolean initialized = false;
++ private CephFS ceph = null;
+
+ private boolean debug = false;
-+ private String cephDebugLevel;
-+ private String monAddr;
+ private String fs_default_name;
-+
-+ private native boolean ceph_initializeClient(String arguments, int block_size);
-+ private native String ceph_getcwd();
-+ private native boolean ceph_setcwd(String path);
-+ private native boolean ceph_rmdir(String path);
-+ private native boolean ceph_mkdir(String path);
-+ private native boolean ceph_unlink(String path);
-+ private native boolean ceph_rename(String old_path, String new_path);
-+ private native boolean ceph_exists(String path);
-+ private native long ceph_getblocksize(String path);
-+ private native boolean ceph_isdirectory(String path);
-+ private native boolean ceph_isfile(String path);
-+ private native String[] ceph_getdir(String path);
-+ private native int ceph_mkdirs(String path, int mode);
-+ private native int ceph_open_for_append(String path);
-+ private native int ceph_open_for_read(String path);
-+ private native int ceph_open_for_overwrite(String path, int mode);
-+ private native int ceph_close(int filehandle);
-+ private native boolean ceph_setPermission(String path, int mode);
-+ private native boolean ceph_kill_client();
-+ private native boolean ceph_stat(String path, Stat fill);
-+ private native int ceph_statfs(String Path, CephStat fill);
-+ private native int ceph_replication(String path);
-+ private native String ceph_hosts(int fh, long offset);
-+ private native int ceph_setTimes(String path, long mtime, long atime);
+
+ /**
+ * Create a new CephFileSystem.
+ */
+ public CephFileSystem() {
-+ if(debug) debug("CephFileSystem:enter");
+ root = new Path("/");
-+ if(debug) debug("CephFileSystem:exit");
+ }
+
++ /**
++ * Used for testing purposes, this constructor
++ * sets the given CephFS instead of defaulting to a
++ * CephTalker (with its assumed real Ceph instance to talk to).
++ */
++ public CephFileSystem(CephFS ceph_fs, String default_path) {
++ super();
++ root = new Path("/");
++ ceph = ceph_fs;
++ fs_default_name = default_path;
++ }
++
+ /**
+ * Lets you get the URI of this CephFileSystem.
+ * @return the URI.
+ */
+ public URI getUri() {
+ if (!initialized) return null;
-+ if(debug) debug("getUri:enter");
-+ if(debug) debug("getUri:exit with return " + uri);
++ ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
+ return uri;
+ }
+
+ */
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
-+ if(debug) debug("initialize:enter");
+ if (!initialized) {
-+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+ System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+ super.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-+ statistics = getStatistics(uri.getScheme(), getClass());
-+
-+ fs_default_name = conf.get("fs.default.name");
-+ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++ statistics = getStatistics(uri.getScheme(), getClass());
++
++ if (ceph == null) {
++ ceph = new CephTalker(conf, LOG);
++ }
++ if (null == fs_default_name) {
++ fs_default_name = conf.get("fs.default.name");
++ }
+ //build up the arguments for Ceph
+ String arguments = "CephFSInterface";
+ arguments += conf.get("fs.ceph.commandLine", "");
+ if (conf.get("fs.ceph.clientDebug") != null) {
-+ arguments += " --debug_client ";
-+ arguments += conf.get("fs.ceph.clientDebug");
++ arguments += " --debug_client ";
++ arguments += conf.get("fs.ceph.clientDebug");
+ }
+ if (conf.get("fs.ceph.messengerDebug") != null) {
-+ arguments += " --debug_ms ";
-+ arguments += conf.get("fs.ceph.messengerDebug");
++ arguments += " --debug_ms ";
++ arguments += conf.get("fs.ceph.messengerDebug");
+ }
+ if (conf.get("fs.ceph.monAddr") != null) {
-+ arguments += " -m ";
-+ arguments += conf.get("fs.ceph.monAddr");
++ arguments += " -m ";
++ arguments += conf.get("fs.ceph.monAddr");
+ }
-+ arguments += " --client-readahead-max-periods="
-+ + conf.get("fs.ceph.readahead", "1");
++ arguments += " --client-readahead-max-periods="
++ + conf.get("fs.ceph.readahead", "1");
+ //make sure they gave us a ceph monitor address or conf file
++ ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
+ if ( (conf.get("fs.ceph.monAddr") == null) &&
-+ (arguments.indexOf("-m") == -1) &&
-+ (arguments.indexOf("-c") == -1) ) {
-+ if(debug) debug("You need to specify a Ceph monitor address.");
-+ throw new IOException("You must specify a Ceph monitor address or config file!");
++ (arguments.indexOf("-m") == -1) &&
++ (arguments.indexOf("-c") == -1) ) {
++ ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
++ throw new IOException("You must specify a Ceph monitor address or config file!");
+ }
+ // Initialize the client
-+ if (!ceph_initializeClient(arguments,
-+ conf.getInt("fs.ceph.blockSize", 1<<26))) {
-+ if(debug) debug("Ceph initialization failed!");
-+ throw new IOException("Ceph initialization failed!");
++ if (!ceph.ceph_initializeClient(arguments,
++ conf.getInt("fs.ceph.blockSize", 1<<26))) {
++ ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
++ throw new IOException("Ceph initialization failed!");
+ }
+ initialized = true;
-+ if(debug) debug("Initialized client. Setting cwd to /");
-+ ceph_setcwd("/");
++ ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
++ ceph.ceph_setcwd("/");
+ }
-+ if(debug) debug("initialize:exit");
++ ceph.debug("initialize:exit", ceph.DEBUG);
+ }
+
+ /**
+ */
+ @Override
+ public void close() throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("close:enter");
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("close:enter", ceph.DEBUG);
+ super.close();//this method does stuff, make sure it's run!
-+ ceph_kill_client();
-+ if(debug) debug("close:exit");
++ ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
++ ceph.ceph_kill_client();
++ ceph.debug("close:exit", ceph.DEBUG);
+ }
+
+ /**
+ * Get an FSDataOutputStream to append onto a file.
+ * @param file The File you want to append onto
-+ * @param bufferSize Ceph does internal buffering; this is ignored.
++ * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
+ * @param progress The Progressable to report progress to.
+ * Reporting is limited but exists.
+ * @return An FSDataOutputStream that connects to the file on Ceph.
+ * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
+ */
+ public FSDataOutputStream append (Path file, int bufferSize,
-+ Progressable progress) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("append:enter with path " + file + " bufferSize " + bufferSize);
++ Progressable progress) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
+ Path abs_path = makeAbsolute(file);
+ if (progress!=null) progress.progress();
-+ int fd = ceph_open_for_append(abs_path.toString());
++ ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
++ int fd = ceph.ceph_open_for_append(abs_path.toString());
++ ceph.debug("append: Returned to Java", ceph.TRACE);
+ if (progress!=null) progress.progress();
+ if( fd < 0 ) { //error in open
+ throw new IOException("append: Open for append failed on path \"" +
-+ abs_path.toString() + "\"");
++ abs_path.toString() + "\"");
+ }
-+ CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
-+ if(debug) debug("append:exit");
++ CephOutputStream cephOStream = new CephOutputStream(getConf(),
++ ceph, fd, bufferSize);
++ ceph.debug("append:exit", ceph.DEBUG);
+ return new FSDataOutputStream(cephOStream, statistics);
+ }
+
+ */
+ public Path getWorkingDirectory() {
+ if (!initialized) return null;
-+ if(debug) debug("getWorkingDirectory:enter");
-+ if(debug) debug("Working directory is " + ceph_getcwd());
-+ if(debug) debug("getWorkingDirectory:exit");
-+ return new Path(fs_default_name + ceph_getcwd());
++ ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
++ String cwd = ceph.ceph_getcwd();
++ ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
++ return new Path(fs_default_name + ceph.ceph_getcwd());
+ }
+
+ /**
+ * @param dir The directory to change to.
+ */
+ @Override
-+ public void setWorkingDirectory(Path dir) {
++ public void setWorkingDirectory(Path dir) {
+ if (!initialized) return;
-+ if(debug) debug("setWorkingDirecty:enter with new working dir " + dir);
++ ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
+ Path abs_path = makeAbsolute(dir);
-+ if(debug) debug("calling ceph_setcwd from Java");
-+ if (!ceph_setcwd(abs_path.toString()))
-+ if(debug) debug("Warning:ceph_setcwd failed for some reason on path " + abs_path);
-+ if(debug) debug("returned from ceph_setcwd to Java" );
-+ if(debug) debug("setWorkingDirectory:exit");
++ ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
++ if (!ceph.ceph_setcwd(abs_path.toString()))
++ ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
++ ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
+ }
+
+ /**
+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean exists(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("exists:enter with path " + path);
++ public boolean exists(Path path) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("exists:enter with path " + path, ceph.DEBUG);
+ boolean result;
+ Path abs_path = makeAbsolute(path);
+ if (abs_path.equals(root)) {
+ result = true;
+ }
+ else {
-+ if(debug) debug("Calling ceph_exists from Java on path "
-+ + abs_path.toString() + ":");
-+ result = ceph_exists(abs_path.toString());
-+ if(debug) debug("Returned from ceph_exists to Java");
++ ceph.debug("exists:Calling ceph_exists from Java on path "
++ + abs_path.toString(), ceph.TRACE);
++ result = ceph.ceph_exists(abs_path.toString());
++ ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
+ }
-+ if(debug) debug("exists:exit with value " + result);
++ ceph.debug("exists:exit with value " + result, ceph.DEBUG);
+ return result;
+ }
+
+ * @param path The directory path to create
+ * @param perms The permissions to apply to the created directories.
+ * @return true if successful, false otherwise
-+ * @throws IOException if initialize() hasn't been called.
++ * @throws IOException if initialize() hasn't been called or the path
++ * is a child of a file.
+ */
++ @Override
+ public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("mkdirs:enter with path " + path);
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
-+ if(debug) debug("calling ceph_mkdirs from Java");
-+ int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
-+ if(debug) debug("Returned from ceph_mkdirs to Java with result " + result);
-+ if(debug) debug("mkdirs:exit with result " + result);
-+ if (result != 0)
++ ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
++ int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
++ if (result != 0) {
++ ceph.debug("mkdirs: make directory " + abs_path
++ + "Failing with result " + result, ceph.WARN);
++ if (ceph.ENOTDIR == result)
++ throw new FileAlreadyExistsException("Parent path is not a directory");
+ return false;
-+ else return true;
++ }
++ else {
++ ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
++ return true;
++ }
+ }
+
+ /**
+ * Check if a path is a file. This is moderately faster than the
+ * generic implementation.
+ * @param path The path to check.
-+ * @return true if the path is a file, false otherwise.
++ * @return true if the path is definitely a file, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean isFile(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("isFile:enter with path " + path);
++ public boolean isFile(Path path) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+ if (abs_path.equals(root)) {
+ result = false;
+ }
+ else {
-+ result = ceph_isfile(abs_path.toString());
++ ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
++ result = ceph.ceph_isfile(abs_path.toString());
+ }
-+ if(debug) debug("isFile:exit with result " + result);
++ ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
+ return result;
+ }
+
+ * Check if a path is a directory. This is moderately faster than
+ * the generic implementation.
+ * @param path The path to check
-+ * @return true if the path is a directory, false otherwise.
++ * @return true if the path is definitely a directory, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean isDirectory(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("isDirectory:enter with path " + path);
++ public boolean isDirectory(Path path) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+ if (abs_path.equals(root)) {
+ result = true;
+ }
+ else {
-+ if(debug) debug("calling ceph_isdirectory from Java");
-+ result = ceph_isdirectory(abs_path.toString());
-+ if(debug) debug("Returned from ceph_isdirectory to Java");
++ ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
++ result = ceph.ceph_isdirectory(abs_path.toString());
++ ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
+ }
-+ if(debug) debug("isDirectory:exit with result " + result);
++ ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG);
+ return result;
+ }
+
+ * @throws FileNotFoundException if the path could not be resolved.
+ */
+ public FileStatus getFileStatus(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("getFileStatus:enter with path " + path);
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+ //sadly, Ceph doesn't really do uids/gids just yet, but
+ //everything else is filled
+ FileStatus status;
+ Stat lstat = new Stat();
-+ if(ceph_stat(abs_path.toString(), lstat)) {
++ ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
++ if(ceph.ceph_stat(abs_path.toString(), lstat)) {
+ status = new FileStatus(lstat.size, lstat.is_dir,
-+ ceph_replication(abs_path.toString()),
-+ lstat.block_size,
-+ lstat.mod_time,
-+ lstat.access_time,
-+ new FsPermission((short)lstat.mode),
-+ null,
-+ null,
-+ new Path(fs_default_name+abs_path.toString()));
++ ceph.ceph_replication(abs_path.toString()),
++ lstat.block_size,
++ lstat.mod_time,
++ lstat.access_time,
++ new FsPermission((short)lstat.mode),
++ null,
++ null,
++ new Path(fs_default_name+abs_path.toString()));
+ }
+ else { //fail out
-+ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
-+ + path + " does not exist or could not be accessed");
++ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
++ + path + " does not exist or could not be accessed");
+ }
+
-+ if(debug) debug("getFileStatus:exit");
++ ceph.debug("getFileStatus:exit", ceph.DEBUG);
+ return status;
+ }
+
+ * @throws FileNotFoundException if the input path can't be found.
+ */
+ public FileStatus[] listStatus(Path path) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("listStatus:enter with path " + path);
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("listStatus:enter with path " + path, ceph.WARN);
+ Path abs_path = makeAbsolute(path);
+ Path[] paths = listPaths(abs_path);
+ if (paths != null) {
+ FileStatus[] statuses = new FileStatus[paths.length];
+ for (int i = 0; i < paths.length; ++i) {
-+ statuses[i] = getFileStatus(paths[i]);
++ statuses[i] = getFileStatus(paths[i]);
+ }
-+ if(debug) debug("listStatus:exit");
++ ceph.debug("listStatus:exit", ceph.DEBUG);
+ return statuses;
+ }
+ if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
+ }
+
+ @Override
-+ public void setPermission(Path p, FsPermission permission) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
++ public void setPermission(Path p, FsPermission permission) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("setPermission:enter with path " + p
++ + " and permissions " + permission, ceph.DEBUG);
+ Path abs_path = makeAbsolute(p);
-+ ceph_setPermission(abs_path.toString(), permission.toShort());
++ ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
++ ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
++ ceph.debug("setPermission:exit", ceph.DEBUG);
+ }
+
+ /**
+ * @param mtime Set modification time in number of millis since Jan 1, 1970.
+ * @param atime Set access time in number of millis since Jan 1, 1970.
+ */
-+@Override
-+ public void setTimes(Path p, long mtime, long atime) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ Path abs_path = makeAbsolute(p);
-+ int r = ceph_setTimes(abs_path.toString(), mtime, atime);
-+ if (r<0) throw new IOException ("Failed to set times on path "
-+ + abs_path.toString() + " Error code: " + r);
-+}
++ @Override
++ public void setTimes(Path p, long mtime, long atime) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
++ " atime:" + atime, ceph.DEBUG);
++ Path abs_path = makeAbsolute(p);
++ ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
++ int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
++ if (r<0) throw new IOException ("Failed to set times on path "
++ + abs_path.toString() + " Error code: " + r);
++ ceph.debug("setTimes:exit", ceph.DEBUG);
++ }
+
+ /**
+ * Create a new file and open an FSDataOutputStream that's connected to it.
+ * @param permission The permissions to apply to the file.
+ * @param flag If CreateFlag.OVERWRITE, overwrite any existing
+ * file with this name; otherwise don't.
-+ * @param bufferSize Ceph does internal buffering; this is ignored.
++ * @param bufferSize Ceph does internal buffering, but you can buffer
++ * in the Java code too if you like.
+ * @param replication Ignored by Ceph. This can be
+ * configured via Ceph configuration.
+ * @param blockSize Ignored by Ceph. You can set client-wide block sizes
+ * failure in attempting to open for append with Ceph.
+ */
+ public FSDataOutputStream create(Path path,
-+ FsPermission permission,
-+ EnumSet<CreateFlag> flag,
-+ //boolean overwrite,
-+ int bufferSize,
-+ short replication,
-+ long blockSize,
-+ Progressable progress
-+ ) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("create:enter with path " + path);
++ FsPermission permission,
++ EnumSet<CreateFlag> flag,
++ //boolean overwrite,
++ int bufferSize,
++ short replication,
++ long blockSize,
++ Progressable progress
++ ) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("create:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+ if (progress!=null) progress.progress();
+ // We ignore replication since that's not configurable here, and
+ boolean exists = exists(abs_path);
+ if (exists) {
+ if(isDirectory(abs_path))
-+ throw new IOException("create: Cannot overwrite existing directory \""
-+ + path.toString() + "\" with a file");
++ throw new IOException("create: Cannot overwrite existing directory \""
++ + path.toString() + "\" with a file");
+ //if (!overwrite)
+ if (!flag.contains(CreateFlag.OVERWRITE))
-+ throw new IOException("createRaw: Cannot open existing file \""
-+ + abs_path.toString()
-+ + "\" for writing without overwrite flag");
++ throw new IOException("createRaw: Cannot open existing file \""
++ + abs_path.toString()
++ + "\" for writing without overwrite flag");
+ }
+
+ if (progress!=null) progress.progress();
+ if (!exists) {
+ Path parent = abs_path.getParent();
+ if (parent != null) { // if parent is root, we're done
-+ int r = ceph_mkdirs(parent.toString(), permission.toShort());
-+ if (!(r==0 || r==-EEXIST))
-+ throw new IOException ("Error creating parent directory; code: " + r);
++ int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
++ if (!(r==0 || r==-ceph.EEXIST))
++ throw new IOException ("Error creating parent directory; code: " + r);
+ }
+ if (progress!=null) progress.progress();
+ }
+ // Step 3: open the file
-+ if(debug) debug("calling ceph_open_for_overwrite from Java");
-+ int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
++ ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
++ int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
+ if (progress!=null) progress.progress();
-+ if(debug) debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
++ ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
+ if (fh < 0) {
+ throw new IOException("create: Open for overwrite failed on path \"" +
-+ path.toString() + "\"");
++ path.toString() + "\"");
+ }
+
+ // Step 4: create the stream
-+ OutputStream cephOStream = new CephOutputStream(getConf(), fh);
-+ if(debug) debug("create:exit");
++ OutputStream cephOStream = new CephOutputStream(getConf(),
++ ceph, fh, bufferSize);
++ ceph.debug("create:exit", ceph.DEBUG);
+ return new FSDataOutputStream(cephOStream, statistics);
-+ }
++ }
+
+ /**
+ * Open a Ceph file and attach the file handle to an FSDataInputStream.
+ * @param path The file to open
-+ * @param bufferSize Ceph does internal buffering; this is ignored.
++ * @param bufferSize Ceph does internal buffering; but you can buffer in
++ * the Java code too if you like.
+ * @return FSDataInputStream reading from the given path.
+ * @throws IOException if initialize() hasn't been called, the path DNE or is a
+ * directory, or there is an error getting data to set up the FSDataInputStream.
+ */
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("open:enter with path " + path);
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("open:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+
-+ int fh = ceph_open_for_read(abs_path.toString());
++ int fh = ceph.ceph_open_for_read(abs_path.toString());
+ if (fh < 0) { //uh-oh, something's bad!
-+ if (fh == -ENOENT) //well that was a stupid open
-+ throw new IOException("open: absolute path \"" + abs_path.toString()
-+ + "\" does not exist");
++ if (fh == -ceph.ENOENT) //well that was a stupid open
++ throw new IOException("open: absolute path \"" + abs_path.toString()
++ + "\" does not exist");
+ else //hrm...the file exists but we can't open it :(
-+ throw new IOException("open: Failed to open file " + abs_path.toString());
++ throw new IOException("open: Failed to open file " + abs_path.toString());
+ }
+
+ if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
+ //but that doesn't mean you should in Hadoop!
-+ ceph_close(fh);
++ ceph.ceph_close(fh);
+ throw new IOException("open: absolute path \"" + abs_path.toString()
-+ + "\" is a directory!");
++ + "\" is a directory!");
+ }
+ Stat lstat = new Stat();
-+ ceph_stat(abs_path.toString(), lstat);
++ ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
++ ceph.ceph_stat(abs_path.toString(), lstat);
++ ceph.debug("open:returned to Java", ceph.TRACE);
+ long size = lstat.size;
+ if (size < 0) {
+ throw new IOException("Failed to get file size for file " + abs_path.toString() +
-+ " but succeeded in opening file. Something bizarre is going on.");
++ " but succeeded in opening file. Something bizarre is going on.");
+ }
-+ FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
-+ if(debug) debug("open:exit");
++ FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
++ fh, size, bufferSize);
++ ceph.debug("open:exit", ceph.DEBUG);
+ return new FSDataInputStream(cephIStream);
-+ }
++ }
+
+ /**
+ * Rename a file or directory.
+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public boolean rename(Path src, Path dst) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("rename:enter");
-+ if(debug) debug("calling ceph_rename from Java");
++ public boolean rename(Path src, Path dst) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
+ Path abs_src = makeAbsolute(src);
+ Path abs_dst = makeAbsolute(dst);
-+ boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
-+ if(debug) debug("return from ceph_rename to Java with result " + result);
-+ if(debug) debug("rename:exit");
++ ceph.debug("calling ceph_rename from Java", ceph.TRACE);
++ boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
++ if (!result) {
++ if (isDirectory(abs_dst)) { //move the srcdir into destdir
++ ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
++ Path new_dst = new Path(abs_dst, abs_src.getName());
++ result = rename(abs_src, new_dst);
++ ceph.debug("attempt to move " + abs_src.toString()
++ + " to " + new_dst.toString()
++ + "has result:" + result, ceph.NOLOG);
++ }
++ }
++ ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
+ return result;
+ }
+
+ * @throws IOException if initialize() hasn't been called.
+ */
+ @Override
-+ public BlockLocation[] getFileBlockLocations(FileStatus file,
-+ long start, long len) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
++ public BlockLocation[] getFileBlockLocations(FileStatus file,
++ long start, long len) throws IOException {
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
++ ", start pos " + start + ", length " + len, ceph.DEBUG);
+ //sanitize and get the filehandle
+ Path abs_path = makeAbsolute(file.getPath());
-+ int fh = ceph_open_for_read(abs_path.toString());
++ ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
++ int fh = ceph.ceph_open_for_read(abs_path.toString());
++ ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
++ + fh, ceph.TRACE);
+ if (fh < 0) {
++ ceph.debug("getFileBlockLocations:got error " + fh +
++ ", exiting and returning null!", ceph.ERROR);
+ return null;
+ }
+ //get the block size
-+ long blockSize = ceph_getblocksize(abs_path.toString());
++ ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
++ long blockSize = ceph.ceph_getblocksize(abs_path.toString());
++ ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
+ BlockLocation[] locations =
-+ new BlockLocation[(int)Math.ceil((len-start)/(float)blockSize)];
++ new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
++ long offset;
+ for (int i = 0; i < locations.length; ++i) {
-+ String host = ceph_hosts(fh, start + i*blockSize);
++ offset = start + i*blockSize;
++ ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
++ + fh + " and offset " + offset, ceph.TRACE);
++ String host = ceph.ceph_hosts(fh, offset);
++ ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
++ + host, ceph.TRACE);
+ String[] hostArray = new String[1];
+ hostArray[0] = host;
+ locations[i] = new BlockLocation(hostArray, hostArray,
-+ start+i*blockSize, blockSize);
++ start+i*blockSize-(start % blockSize),
++ blockSize);
+ }
-+ ceph_close(fh);
++ ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
++ + fh, ceph.TRACE);
++ ceph.ceph_close(fh);
++ ceph.debug("getFileBlockLocations:return with " + locations.length
++ + " locations", ceph.DEBUG);
+ return locations;
+ }
+
+ * Get usage statistics on the Ceph filesystem.
+ * @param path A path to the partition you're interested in.
+ * Ceph doesn't partition, so this is ignored.
-+ * @return FsStatus reporting capacity, usage, and remaining spac.
++ * @return FsStatus reporting capacity, usage, and remaining space.
+ * @throws IOException if initialize() hasn't been called, or the
+ * stat somehow fails.
+ */
+ @Override
-+ public FsStatus getStatus (Path path) throws IOException {
-+ if (!initialized) throw new IOException("You have to initialize the"
-+ + " CephFileSystem before calling other methods.");
-+ if(debug) debug("getStatus:enter");
++ public FsStatus getStatus (Path path) throws IOException {
++ if (!initialized) throw new IOException("You have to initialize the "
++ + " CephFileSystem before calling other methods.");
++ ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+
-+ //currently(Ceph .14) Ceph actually ignores the path
++ //currently(Ceph .16) Ceph actually ignores the path
+ //but we still pass it in; if Ceph stops ignoring we may need more
+ //error-checking code.
+ CephStat ceph_stat = new CephStat();
-+ int result = ceph_statfs(abs_path.toString(), ceph_stat);
++ ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
++ int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
+ if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
-+ if(debug) debug("getStatus:exit");
++ ceph.debug("getStatus:exit successfully", ceph.DEBUG);
+ return new FsStatus(ceph_stat.capacity,
-+ ceph_stat.used, ceph_stat.remaining);
++ ceph_stat.used, ceph_stat.remaining);
+ }
+
+ /**
+ * Delete the given path, and optionally its children.
+ * @param path the path to delete.
-+ * @param recursive If the path is a directory and this is false,
++ * @param recursive If the path is a non-empty directory and this is false,
+ * delete will throw an IOException. If path is a file this is ignored.
+ * @return true if the delete succeeded, false otherwise (including if
+ * path doesn't exist).
+ * or you attempt to delete the root directory.
+ */
+ public boolean delete(Path path, boolean recursive) throws IOException {
-+ if (!initialized) throw new IOException ("You have to initialize the"
-+ +"CephFileSystem before calling other methods.");
-+ if(debug) debug("delete:enter");
++ if (!initialized) throw new IOException ("You have to initialize the "
++ +"CephFileSystem before calling other methods.");
++ ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
++ ceph.DEBUG);
+ Path abs_path = makeAbsolute(path);
+
-+ if(debug) debug("delete: Deleting path " + abs_path.toString());
+ // sanity check
+ if (abs_path.equals(root))
+ throw new IOException("Error: deleting the root directory is a Bad Idea.");
-+
+ if (!exists(abs_path)) return false;
+
+ // if the path is a file, try to delete it.
+ if (isFile(abs_path)) {
-+ boolean result = ceph_unlink(abs_path.toString());
++ ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
++ ceph.TRACE);
++ boolean result = ceph.ceph_unlink(abs_path.toString());
+ if(!result)
-+ if(debug) debug("delete: failed to delete file \"" +
-+ abs_path.toString() + "\".");
-+ if(debug) debug("delete:exit");
++ ceph.debug("delete: failed to delete file \"" +
++ abs_path.toString() + "\".", ceph.ERROR);
++ ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
+ return result;
+ }
+
+ /* The path is a directory, so recursively try to delete its contents,
+ and then delete the directory. */
-+ if (!recursive) {
-+ throw new IOException("Directories must be deleted recursively!");
-+ }
+ //get the entries; listPaths will remove . and .. for us
+ Path[] contents = listPaths(abs_path);
+ if (contents == null) {
-+ if(debug) debug("delete: Failed to read contents of directory \"" +
-+ abs_path.toString() + "\" while trying to delete it");
-+ if(debug) debug("delete:exit");
++ ceph.debug("delete: Failed to read contents of directory \"" +
++ abs_path.toString() +
++ "\" while trying to delete it, BAILING", ceph.ERROR);
+ return false;
+ }
++ if (!recursive && contents.length > 0) {
++ throw new IOException("Directories must be deleted recursively!");
++ }
+ // delete the entries
++ ceph.debug("delete: recursively calling delete on contents of "
++ + abs_path, ceph.DEBUG);
+ for (Path p : contents) {
+ if (!delete(p, true)) {
-+ if(debug) debug("delete: Failed to delete file \"" +
-+ p.toString() + "\" while recursively deleting \""
-+ + abs_path.toString() + "\"" );
-+ if(debug) debug("delete:exit");
-+ return false;
++ ceph.debug("delete: Failed to delete file \"" +
++ p.toString() + "\" while recursively deleting \""
++ + abs_path.toString() + "\", BAILING", ceph.ERROR );
++ return false;
+ }
+ }
+ //if we've come this far it's a now-empty directory, so delete it!
-+ boolean result = ceph_rmdir(abs_path.toString());
++ boolean result = ceph.ceph_rmdir(abs_path.toString());
+ if (!result)
-+ if(debug) debug("delete: failed to delete \"" + abs_path.toString() + "\"");
-+ if(debug) debug("delete:exit");
++ ceph.debug("delete: failed to delete \"" + abs_path.toString()
++ + "\", BAILING", ceph.ERROR);
++ ceph.debug("delete:exit", ceph.DEBUG);
+ return result;
+ }
+
+ * by a separate Ceph configuration.
+ */
+ @Override
-+ public short getDefaultReplication() {
++ public short getDefaultReplication() {
+ return 1;
+ }
+
+ * @return the default block size, in bytes, as a long.
+ */
+ @Override
-+ public long getDefaultBlockSize() {
++ public long getDefaultBlockSize() {
+ return getConf().getInt("fs.ceph.blockSize", 1<<26);
+ }
+
+ // Makes a Path absolute. In a cheap, dirty hack, we're
+ // also going to strip off any fs_default_name prefix we see.
+ private Path makeAbsolute(Path path) {
-+ if(debug) debug("makeAbsolute:enter with path " + path);
++ ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
+ if (path == null) return new Path("/");
+ // first, check for the prefix
+ if (path.toString().startsWith(fs_default_name)) {
+ Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
-+ if(debug) debug("makeAbsolute:exit with path " + stripped_path);
++ ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
+ return stripped_path;
+ }
+
+ if (path.isAbsolute()) {
-+ if(debug) debug("makeAbsolute:exit with path " + path);
++ ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG);
+ return path;
+ }
-+ Path new_path = new Path(ceph_getcwd(), path);
-+ if(debug) debug("makeAbsolute:exit with path " + new_path);
++ Path new_path = new Path(ceph.ceph_getcwd(), path);
++ ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
+ return new_path;
+ }
+
+ private Path[] listPaths(Path path) throws IOException {
-+ if(debug) debug("listPaths:enter with path " + path);
++ ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
+ String dirlist[];
+
+ Path abs_path = makeAbsolute(path);
+
+ // If it's a directory, get the listing. Otherwise, complain and give up.
-+ if(debug) debug("calling ceph_getdir from Java with path " + abs_path);
-+ dirlist = ceph_getdir(abs_path.toString());
-+ if(debug) debug("returning from ceph_getdir to Java");
++ ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG);
++ dirlist = ceph.ceph_getdir(abs_path.toString());
++ ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG);
+
+ if (dirlist == null) {
-+ throw new IOException("listPaths: path " + path.toString() + " is not a directory.");
++ return null;
+ }
+
+ // convert the strings to Paths
+ Path[] paths = new Path[dirlist.length];
+ for (int i = 0; i < dirlist.length; ++i) {
-+ if(debug) debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
-+ dirlist[i] + "\"");
++ ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
++ dirlist[i] + "\"", ceph.TRACE);
+ // convert each listing to an absolute path
+ Path raw_path = new Path(dirlist[i]);
+ if (raw_path.isAbsolute())
-+ paths[i] = raw_path;
++ paths[i] = raw_path;
+ else
-+ paths[i] = new Path(abs_path, raw_path);
++ paths[i] = new Path(abs_path, raw_path);
+ }
-+ if(debug) debug("listPaths:exit");
++ ceph.debug("listPaths:exit", ceph.NOLOG);
+ return paths;
+ }
+
-+ private void debug(String statement) {
-+ System.err.println(statement);
-+ }
++
+
-+ private static class Stat {
++ static class Stat {
+ public long size;
+ public boolean is_dir;
+ public long block_size;
+ public Stat(){}
+ }
+
-+ private static class CephStat {
++ static class CephStat {
+ public long capacity;
+ public long used;
+ public long remaining;
+ public CephStat() {}
+ }
+}
+Index: src/java/org/apache/hadoop/fs/ceph/CephFS.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0)
+@@ -0,0 +1,272 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ *
++ * Abstract base class for communicating with a Ceph filesystem and its
++ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
++ * As only the Ceph package should be using this directly, all methods
++ * are protected.
++ */
++package org.apache.hadoop.fs.ceph;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.commons.logging.Log;
++
++abstract class CephFS {
++
++ protected static final int FATAL = 0;
++ protected static final int ERROR = 1;
++ protected static final int WARN = 2;
++ protected static final int INFO = 3;
++ protected static final int DEBUG = 4;
++ protected static final int TRACE = 5;
++ protected static final int NOLOG = 6;
++
++ protected static final int ENOTDIR = 20;
++ protected static final int EEXIST = 17;
++ protected static final int ENOENT = 2;
++
++ private boolean debug = false;
++ private Log LOG;
++
++ public CephFS(Configuration conf, Log log) {
++ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++ LOG = log;
++ }
++
++ /*
++ * Performs any necessary setup to allow general use of the filesystem.
++ * Inputs:
++ * String argsuments -- a command-line style input of Ceph config params
++ * int block_size -- the size in bytes to use for blocks
++ * Returns: true on success, false otherwise
++ */
++ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
++
++ /*
++ * Returns the current working directory (absolute) as a String
++ */
++ abstract protected String ceph_getcwd();
++ /*
++ * Changes the working directory.
++ * Inputs:
++ * String path: The path (relative or absolute) to switch to
++ * Returns: true on success, false otherwise.
++ */
++ abstract protected boolean ceph_setcwd(String path);
++ /*
++ * Given a path to a directory, removes the directory if empty.
++ * Inputs:
++ * jstring j_path: The path (relative or absolute) to the directory
++ * Returns: true on successful delete; false otherwise
++ */
++ abstract protected boolean ceph_rmdir(String path);
++ /*
++ * Given a path, unlinks it.
++ * Inputs:
++ * String path: The path (relative or absolute) to the file or empty dir
++ * Returns: true if the unlink occurred, false otherwise.
++ */
++ abstract protected boolean ceph_unlink(String path);
++ /*
++ * Changes a given path name to a new name, assuming new_path doesn't exist.
++ * Inputs:
++ * jstring j_from: The path whose name you want to change.
++ * jstring j_to: The new name for the path.
++ * Returns: true if the rename occurred, false otherwise
++ */
++ abstract protected boolean ceph_rename(String old_path, String new_path);
++ /*
++ * Returns true if it the input path exists, false
++ * if it does not or there is an unexpected failure.
++ */
++ abstract protected boolean ceph_exists(String path);
++ /*
++ * Get the block size for a given path.
++ * Input:
++ * String path: The path (relative or absolute) you want
++ * the block size for.
++ * Returns: block size if the path exists, otherwise a negative number
++ * corresponding to the standard C++ error codes (which are positive).
++ */
++ abstract protected long ceph_getblocksize(String path);
++ /*
++ * Returns true if the given path is a directory, false otherwise.
++ */
++ abstract protected boolean ceph_isdirectory(String path);
++ /*
++ * Returns true if the given path is a file; false otherwise.
++ */
++ abstract protected boolean ceph_isfile(String path);
++ /*
++ * Get the contents of a given directory.
++ * Inputs:
++ * String path: The path (relative or absolute) to the directory.
++ * Returns: A Java String[] of the contents of the directory, or
++ * NULL if there is an error (ie, path is not a dir). This listing
++ * will not contain . or .. entries.
++ */
++ abstract protected String[] ceph_getdir(String path);
++ /*
++ * Create the specified directory and any required intermediate ones with the
++ * given mode.
++ */
++ abstract protected int ceph_mkdirs(String path, int mode);
++ /*
++ * Open a file to append. If the file does not exist, it will be created.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_append(String path);
++ /*
++ * Open a file for reading.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_read(String path);
++ /*
++ * Opens a file for overwriting; creates it if necessary.
++ * Opening a dir is possible but may have bad results.
++ * Inputs:
++ * String path: The path to open.
++ * int mode: The mode to open with.
++ * Returns: an int filehandle, or a number<0 if an error occurs.
++ */
++ abstract protected int ceph_open_for_overwrite(String path, int mode);
++ /*
++ * Closes the given file. Returns 0 on success, or a negative
++ * error code otherwise.
++ */
++ abstract protected int ceph_close(int filehandle);
++ /*
++ * Change the mode on a path.
++ * Inputs:
++ * String path: The path to change mode on.
++ * int mode: The mode to apply.
++ * Returns: true if the mode is properly applied, false if there
++ * is any error.
++ */
++ abstract protected boolean ceph_setPermission(String path, int mode);
++ /*
++ * Closes the Ceph client. This should be called before shutting down
++ * (multiple times is okay but redundant).
++ */
++ abstract protected boolean ceph_kill_client();
++ /*
++ * Get the statistics on a path returned in a custom format defined
++ * in CephFileSystem.
++ * Inputs:
++ * String path: The path to stat.
++ * Stat fill: The stat object to fill.
++ * Returns: true if the stat is successful, false otherwise.
++ */
++ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
++ /*
++ * Statfs a filesystem in a custom format defined in CephFileSystem.
++ * Inputs:
++ * String path: A path on the filesystem that you wish to stat.
++ * CephStat fill: The CephStat object to fill.
++ * Returns: 0 if successful and the CephStat is filled; a negative
++ * error code otherwise.
++ */
++ abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
++ /*
++ * Check how many times a path should be replicated (if it is
++ * degraded it may not actually be replicated this often).
++ * Inputs:
++ * String path: The path to check.
++ * Returns: an int containing the number of times replicated.
++ */
++ abstract protected int ceph_replication(String path);
++ /*
++ * Find the IP address of the primary OSD for a given file and offset.
++ * Inputs:
++ * int fh: The filehandle for the file.
++ * long offset: The offset to get the location of.
++ * Returns: a String of the location as IP, or NULL if there is an error.
++ */
++ abstract protected String ceph_hosts(int fh, long offset);
++ /*
++ * Set the mtime and atime for a given path.
++ * Inputs:
++ * String path: The path to set the times for.
++ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
++ * long atime: The atime to set, in millis since epoch (-1 to not set)
++ * Returns: 0 if successful, an error code otherwise.
++ */
++ abstract protected int ceph_setTimes(String path, long mtime, long atime);
++ /*
++ * Get the current position in a file (as a long) of a given filehandle.
++ * Returns: (long) current file position on success, or a
++ * negative error code on failure.
++ */
++ abstract protected long ceph_getpos(int fh);
++ /*
++ * Write the given buffer contents to the given filehandle.
++ * Inputs:
++ * int fh: The filehandle to write to.
++ * byte[] buffer: The buffer to write from
++ * int buffer_offset: The position in the buffer to write from
++ * int length: The number of (sequential) bytes to write.
++ * Returns: int, on success the number of bytes written, on failure
++ * a negative error code.
++ */
++ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
++
++ /*
++ * Reads into the given byte array from the current position.
++ * Inputs:
++ * int fh: the filehandle to read from
++ * byte[] buffer: the byte array to read into
++ * int buffer_offset: where in the buffer to start writing
++ * int length: how much to read.
++ * There'd better be enough space in the buffer to write all
++ * the data from the given offset!
++ * Returns: the number of bytes read on success (as an int),
++ * or an error code otherwise. */
++ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
++ /*
++ * Seeks to the given position in the given file.
++ * Inputs:
++ * int fh: The filehandle to seek in.
++ * long pos: The position to seek to.
++ * Returns: the new position (as a long) of the filehandle on success,
++ * or a negative error code on failure. */
++ abstract protected long ceph_seek_from_start(int fh, long pos);
++
++ protected void debug(String statement, int priority) {
++ if (debug) System.err.println(statement);
++ switch(priority) {
++ case FATAL: LOG.fatal(statement);
++ break;
++ case ERROR: LOG.error(statement);
++ break;
++ case WARN: LOG.warn(statement);
++ break;
++ case INFO: LOG.info(statement);
++ break;
++ case DEBUG: LOG.debug(statement);
++ break;
++ case TRACE: LOG.trace(statement);
++ break;
++ case NOLOG: break;
++ default: break;
++ }
++ }
++}
+Index: src/java/org/apache/hadoop/fs/ceph/CephFaker.java
+===================================================================
+--- src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0)
++++ src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0)
+@@ -0,0 +1,480 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
++/**
++ *
++ * Licensed under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
++ * implied. See the License for the specific language governing
++ * permissions and limitations under the License.
++ *
++ *
++ * This uses the local Filesystem but pretends to be communicating
++ * with a Ceph deployment, for unit testing the CephFileSystem.
++ */
++
++package org.apache.hadoop.fs.ceph;
++
++import java.net.URI;
++import java.util.Hashtable;
++import java.io.Closeable;
++import java.io.FileNotFoundException;
++import java.io.IOException;
++
++import org.apache.commons.logging.Log;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.BlockLocation;
++import org.apache.hadoop.fs.FileAlreadyExistsException;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FsStatus;
++import org.apache.hadoop.fs.FSDataInputStream;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsPermission;
++
++class CephFaker extends CephFS {
++
++ FileSystem localFS;
++ String localPrefix;
++ int blockSize;
++ Configuration conf;
++ Hashtable<Integer, Object> files;
++ Hashtable<Integer, String> filenames;
++ int fileCount = 0;
++ boolean initialized = false;
++
++ public CephFaker(Configuration con, Log log) {
++ super(con, log);
++ conf = con;
++ files = new Hashtable<Integer, Object>();
++ filenames = new Hashtable<Integer, String>();
++ }
++
++ protected boolean ceph_initializeClient(String args, int block_size) {
++ if (!initialized) {
++ //let's remember the default block_size
++ blockSize = block_size;
++ /* for a real Ceph deployment, this starts up the client,
++ * sets debugging levels, etc. We just need to get the
++ * local FileSystem to use, and we'll ignore any
++ * command-line arguments. */
++ try {
++ localFS = FileSystem.getLocal(conf);
++ localFS.initialize(URI.create("file://localhost"), conf);
++ localFS.setVerifyChecksum(false);
++ String testDir = conf.get("hadoop.tmp.dir");
++ localPrefix = localFS.getWorkingDirectory().toString();
++ int testDirLoc = localPrefix.indexOf(testDir) - 1;
++ if (-2 == testDirLoc)
++ testDirLoc = localPrefix.length();
++ localPrefix = localPrefix.substring(0, testDirLoc)
++ + "/" + conf.get("hadoop.tmp.dir");
++
++ localFS.setWorkingDirectory(new Path(localPrefix
++ +"/user/"
++ + System.getProperty("user.name")));
++ //I don't know why, but the unit tests expect the default
++ //working dir to be /user/username, so satisfy them!
++ //debug("localPrefix is " + localPrefix, INFO);
++ }
++ catch (IOException e) {
++ return false;
++ }
++ initialized = true;
++ }
++ return true;
++ }
++
++ protected String ceph_getcwd() {
++ return sanitize_path(localFS.getWorkingDirectory().toString());
++ }
++
++ protected boolean ceph_setcwd(String path) {
++ localFS.setWorkingDirectory(new Path(prepare_path(path)));
++ return true;
++ }
++
++ //the caller is responsible for ensuring empty dirs
++ protected boolean ceph_rmdir(String pth) {
++ Path path = new Path(prepare_path(pth));
++ boolean ret = false;
++ try {
++ if (localFS.listStatus(path).length <= 1) {
++ ret = localFS.delete(path, true);
++ }
++ }
++ catch (IOException e){ }
++ return ret;
++ }
++
++ //this needs to work on (empty) directories too
++ protected boolean ceph_unlink(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++ if (ceph_isdirectory(path)) {
++ ret = ceph_rmdir(path);
++ }
++ else {
++ try {
++ ret = localFS.delete(new Path(path), false);
++ }
++ catch (IOException e){ }
++ }
++ return ret;
++ }
++
++ protected boolean ceph_rename(String oldName, String newName) {
++ oldName = prepare_path(oldName);
++ newName = prepare_path(newName);
++ try {
++ Path parent = new Path(newName).getParent();
++ Path newPath = new Path(newName);
++ if (localFS.exists(parent) && !localFS.exists(newPath))
++ return localFS.rename(new Path(oldName), newPath);
++ return false;
++ }
++ catch (IOException e) { return false; }
++ }
++
++ protected boolean ceph_exists(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++ try {
++ ret = localFS.exists(new Path(path));
++ }
++ catch (IOException e){ }
++ return ret;
++ }
++
++ protected long ceph_getblocksize(String path) {
++ path = prepare_path(path);
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++ return status.getBlockSize();
++ }
++ catch (FileNotFoundException e) {
++ return -CephFS.ENOENT;
++ }
++ catch (IOException e) {
++ return -1; //just fail generically
++ }
++ }
++
++ protected boolean ceph_isdirectory(String path) {
++ path = prepare_path(path);
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++ return status.isDir();
++ }
++ catch (IOException e) { return false; }
++ }
++
++ protected boolean ceph_isfile(String path) {
++ path = prepare_path(path);
++ boolean ret = false;
++ try {
++ FileStatus status = localFS.getFileStatus(new Path(path));
++ ret = !status.isDir();
++ }
++ catch (Exception e) {}
++ return ret;
++ }
++
++ protected String[] ceph_getdir(String path) {
++ path = prepare_path(path);
++ if (!ceph_isdirectory(path)) {
++ return null;
++ }
++ try {
++ FileStatus[] stats = localFS.listStatus(new Path(path));
++ String[] names = new String[stats.length];
++ String name;
++ for (int i=0; i<stats.length; ++i) {
++ name = stats[i].getPath().toString();
++ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
++ }
++ return names;
++ }
++ catch (IOException e) {}
++ return null;
++ }
++
++ protected int ceph_mkdirs(String path, int mode) {
++ path = prepare_path(path);
++ //debug("ceph_mkdirs on " + path, INFO);
++ try {
++ if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
++ return 0;
++ }
++ catch (FileAlreadyExistsException fe) { return ENOTDIR; }
++ catch (IOException e) {}
++ if (ceph_isdirectory(path))
++ return -EEXIST; //apparently it already existed
++ return -1;
++ }
++
++ /*
++ * Unlike a real Ceph deployment, you can't do opens on a directory.
++ * Since that has unpredictable behavior and you shouldn't do it anyway,
++ * it's okay.
++ */
++ protected int ceph_open_for_append(String path) {
++ path = prepare_path(path);
++ FSDataOutputStream stream;
++ try {
++ stream = localFS.append(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ return fileCount++;
++ }
++ catch (IOException e) { }
++ return -1; // failure
++ }
++
++ protected int ceph_open_for_read(String path) {
++ path = prepare_path(path);
++ FSDataInputStream stream;
++ try {
++ stream = localFS.open(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ debug("ceph_open_for_read fh:" + fileCount
++ + ", pathname:" + path, INFO);
++ return fileCount++;
++ }
++ catch (IOException e) { }
++ return -1; //failure
++ }
++
++ protected int ceph_open_for_overwrite(String path, int mode) {
++ path = prepare_path(path);
++ FSDataOutputStream stream;
++ try {
++ stream = localFS.create(new Path(path));
++ files.put(new Integer(fileCount), stream);
++ filenames.put(new Integer(fileCount), path);
++ debug("ceph_open_for_overwrite fh:" + fileCount
++ + ", pathname:" + path, INFO);
++ return fileCount++;
++ }
++ catch (IOException e) { }
++ return -1; //failure
++ }
++
++ protected int ceph_close(int filehandle) {
++ debug("ceph_close(filehandle " + filehandle + ")", INFO);
++ try {
++ ((Closeable)files.get(new Integer(filehandle))).close();
++ if(null == files.get(new Integer(filehandle))) {
++ return -ENOENT; //this isn't quite the right error code,
++ // but the important part is it's negative
++ }
++ return 0; //hurray, success
++ }
++ catch (NullPointerException ne) {
++ debug("ceph_close caught NullPointerException!" + ne, WARN);
++ } //err, how?
++ catch (IOException ie) {
++ debug("ceph_close caught IOException!" + ie, WARN);
++ }
++ return -1; //failure
++ }
++
++ protected boolean ceph_setPermission(String pth, int mode) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ boolean ret = false;
++ try {
++ localFS.setPermission(path, new FsPermission((short)mode));
++ ret = true;
++ }
++ catch (IOException e) { }
++ return ret;
++ }
++
++ //rather than try and match a Ceph deployment's behavior exactly,
++ //just make bad things happen if they try and call methods after this
++ protected boolean ceph_kill_client() {
++ //debug("ceph_kill_client", INFO);
++ localFS.setWorkingDirectory(new Path(localPrefix));
++ //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
++ try{
++ localFS.close(); }
++ catch (Exception e) {}
++ localFS = null;
++ files = null;
++ filenames = null;
++ return true;
++ }
++
++ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ boolean ret = false;
++ try {
++ FileStatus status = localFS.getFileStatus(path);
++ fill.size = status.getLen();
++ fill.is_dir = status.isDir();
++ fill.block_size = status.getBlockSize();
++ fill.mod_time = status.getModificationTime();
++ fill.access_time = status.getAccessTime();
++ fill.mode = status.getPermission().toShort();
++ ret = true;
++ }
++ catch (IOException e) {}
++ return ret;
++ }
++
++ protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
++ pth = prepare_path(pth);
++ try {
++ FsStatus stat = localFS.getStatus();
++ fill.capacity = stat.getCapacity();
++ fill.used = stat.getUsed();
++ fill.remaining = stat.getRemaining();
++ return 0;
++ }
++ catch (Exception e){}
++ return -1; //failure;
++ }
++
++ protected int ceph_replication(String path) {
++ path = prepare_path(path);
++ int ret = -1; //-1 for failure
++ try {
++ ret = localFS.getFileStatus(new Path(path)).getReplication();
++ }
++ catch (IOException e) {}
++ return ret;
++ }
++
++ protected String ceph_hosts(int fh, long offset) {
++ String ret = null;
++ try {
++ BlockLocation[] locs = localFS.getFileBlockLocations(
++ localFS.getFileStatus(new Path(
++ filenames.get(new Integer(fh)))), offset, 1);
++ ret = locs[0].getNames()[0];
++ }
++ catch (IOException e) {}
++ catch (NullPointerException f) { }
++ return ret;
++ }
++
++ protected int ceph_setTimes(String pth, long mtime, long atime) {
++ pth = prepare_path(pth);
++ Path path = new Path(pth);
++ int ret = -1; //generic fail
++ try {
++ localFS.setTimes(path, mtime, atime);
++ ret = 0;
++ }
++ catch (IOException e) {}
++ return ret;
++ }
++
++ protected long ceph_getpos(int fh) {
++ long ret = -1; //generic fail
++ try {
++ Object stream = files.get(new Integer(fh));
++ if (stream instanceof FSDataInputStream) {
++ ret = ((FSDataInputStream)stream).getPos();
++ }
++ else if (stream instanceof FSDataOutputStream) {
++ ret = ((FSDataOutputStream)stream).getPos();
++ }
++ }
++ catch (IOException e) {}
++ catch (NullPointerException f) { }
++ return ret;
++ }
++
++ protected int ceph_write(int fh, byte[] buffer,
++ int buffer_offset, int length) {
++ debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
++ + ", length:" + length, INFO);
++ long ret = -1;//generic fail
++ try {
++ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
++ debug("ceph_write got outputstream", INFO);
++ long startPos = os.getPos();
++ os.write(buffer, buffer_offset, length);
++ ret = os.getPos() - startPos;
++ }
++ catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
++ catch (NullPointerException f) {
++ debug("ceph_write caught NullPointerException!", WARN); }
++ return (int)ret;
++ }
++
++ protected int ceph_read(int fh, byte[] buffer,
++ int buffer_offset, int length) {
++ long ret = -1;//generic fail
++ try {
++ FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
++ long startPos = is.getPos();
++ is.read(buffer, buffer_offset, length);
++ ret = is.getPos() - startPos;
++ }
++ catch (IOException e) {}
++ catch (NullPointerException f) {}
++ return (int)ret;
++ }
++
++ protected long ceph_seek_from_start(int fh, long pos) {
++ debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
++ long ret = -1;//generic fail
++ try {
++ debug("ceph_seek_from_start filename is "
++ + filenames.get(new Integer(fh)), INFO);
++ if (null == files.get(new Integer(fh))) {
++ debug("ceph_seek_from_start: is is null!", WARN);
++ }
++ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
++ debug("ceph_seek_from_start retrieved is!", INFO);
++ is.seek(pos);
++ ret = is.getPos();
++ }
++ catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
++ catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
++ return (int)ret;
++ }
++
++ /*
++ * We need to remove the localFS file prefix before returning to Ceph
++ */
++ private String sanitize_path(String path) {
++ //debug("sanitize_path(" + path + ")", INFO);
++ /* if (path.startsWith("file:"))
++ path = path.substring("file:".length()); */
++ if (path.startsWith(localPrefix)) {
++ path = path.substring(localPrefix.length());
++ if (path.length() == 0) //it was a root path
++ path = "/";
++ }
++ //debug("sanitize_path returning " + path, INFO);
++ return path;
++ }
++
++ /*
++ * If it's an absolute path we need to shove the
++ * test dir onto the front as a prefix.
++ */
++ private String prepare_path(String path) {
++ //debug("prepare_path(" + path + ")", INFO);
++ if (path.startsWith("/"))
++ path = localPrefix + path;
++ else if (path.equals("..")) {
++ if (ceph_getcwd().equals("/"))
++ path = "."; // you can't go up past root!
++ }
++ //debug("prepare_path returning" + path, INFO);
++ return path;
++ }
++}
Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java
===================================================================
--- src/java/org/apache/hadoop/fs/ceph/CephInputStream.java (revision 0)
+++ src/java/org/apache/hadoop/fs/ceph/CephInputStream.java (revision 0)
-@@ -0,0 +1,203 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+@@ -0,0 +1,235 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ */
+package org.apache.hadoop.fs.ceph;
+
-+import java.io.BufferedOutputStream;
-+import java.io.DataInputStream;
-+import java.io.File;
-+import java.io.FileInputStream;
-+import java.io.FileOutputStream;
+import java.io.IOException;
-+import java.io.InputStream;
-+import java.io.OutputStream;
-+//import java.lang.IndexOutOfBoundsException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+ */
+public class CephInputStream extends FSInputStream {
+
-+ private int bufferSize;
-+
+ private boolean closed;
+
+ private int fileHandle;
+
+ private long fileLength;
+
-+ private boolean debug;
++ private CephFS ceph;
++
++ private byte[] buffer;
++ private int bufPos = 0;
++ private int bufValid = 0;
++ private long cephPos = 0;
+
-+ private native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+ private native long ceph_seek_from_start(int fh, long pos);
-+ private native long ceph_getpos(int fh);
-+ private native int ceph_close(int fh);
-+
+ /**
+ * Create a new CephInputStream.
+ * @param conf The system configuration. Unused.
+ * @param flength The current length of the file. If the length changes
+ * you will need to close and re-open it to access the new data.
+ */
-+ public CephInputStream(Configuration conf, int fh, long flength) {
-+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+ System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++ public CephInputStream(Configuration conf, CephFS cephfs,
++ int fh, long flength, int bufferSize) {
+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
+ // call and providing the file handle.
+ fileLength = flength;
+ fileHandle = fh;
+ closed = false;
-+ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-+ if(debug) debug("CephInputStream constructor: initializing stream with fh "
-+ + fh + " and file length " + flength);
++ ceph = cephfs;
++ buffer = new byte[bufferSize];
++ ceph.debug("CephInputStream constructor: initializing stream with fh "
++ + fh + " and file length " + flength, ceph.DEBUG);
+
+ }
+ /** Ceph likes things to be closed before it shuts down,
+ finally { super.finalize(); }
+ }
+
++ private synchronized boolean fillBuffer() throws IOException {
++ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
++ bufPos = 0;
++ if (bufValid < 0) {
++ int err = bufValid;
++ bufValid = 0;
++ //attempt to reset to old position. If it fails, too bad.
++ ceph.ceph_seek_from_start(fileHandle, cephPos);
++ throw new IOException("Failed to fill read buffer! Error code:"
++ + err);
++ }
++ cephPos += bufValid;
++ return (bufValid != 0);
++ }
++
++ /*
++ * Get the current position of the stream.
++ */
+ public synchronized long getPos() throws IOException {
-+ return ceph_getpos(fileHandle);
++ return cephPos - bufValid + bufPos;
+ }
++
+ /**
+ * Find the number of bytes remaining in the file.
+ */
+ @Override
-+ public synchronized int available() throws IOException {
++ public synchronized int available() throws IOException {
+ return (int) (fileLength - getPos());
+ }
+
+ public synchronized void seek(long targetPos) throws IOException {
-+ if(debug) debug("CephInputStream.seek: Seeking to position " + targetPos +
-+ " on fd " + fileHandle);
++ ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
++ " on fd " + fileHandle, ceph.TRACE);
+ if (targetPos > fileLength) {
-+ throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
-+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
++ throw new IOException("CephInputStream.seek: failed seek to position "
++ + targetPos + " on fd " + fileHandle
++ + ": Cannot seek after EOF " + fileLength);
+ }
-+ ceph_seek_from_start(fileHandle, targetPos);
-+ }
++ long oldPos = cephPos;
++ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
++ bufValid = 0;
++ bufPos = 0;
++ if (cephPos < 0) {
++ cephPos = oldPos;
++ throw new IOException ("Ceph failed to seek to new position!");
++ }
++ }
+
+ /**
+ * Failovers are handled by the Ceph code at a very low level;
+ * @return the next byte.
+ */
+ @Override
-+ public synchronized int read() throws IOException {
-+ if(debug) debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-+ + " by calling general read function");
++ public synchronized int read() throws IOException {
++ ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
++ + " by calling general read function", ceph.TRACE);
+
+ byte result[] = new byte[1];
+ if (getPos() >= fileLength) return -1;
+ }
+
+ /**
-+ * Read a specified number of bytes into a byte[] from the file.
++ * Read a specified number of bytes from the file into a byte[].
+ * @param buf the byte array to read into.
+ * @param off the offset to start at in the file
+ * @param len the number of bytes to read
+ * @return 0 if successful, otherwise an error code.
++ * @throws IOException on bad input.
+ */
+ @Override
-+ public synchronized int read(byte buf[], int off, int len) throws IOException {
-+ if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
++ public synchronized int read(byte buf[], int off, int len)
++ throws IOException {
++ ceph.debug("CephInputStream.read: Reading " + len +
++ " bytes from fd " + fileHandle, ceph.TRACE);
+
+ if (closed) {
-+ throw new IOException("CephInputStream.read: cannot read " + len +
-+ " bytes from fd " + fileHandle + ": stream closed");
-+ }
-+ if (null == buf) {
-+ throw new NullPointerException("Read buffer is null");
-+ }
-+
-+ // check for proper index bounds
-+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
-+ throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
-+ + "read length is " + len + ", buffer offset is "
-+ + off +", and buffer size is " + buf.length);
++ throw new IOException("CephInputStream.read: cannot read " + len +
++ " bytes from fd " + fileHandle +
++ ": stream closed");
+ }
-+
++
+ // ensure we're not past the end of the file
-+ if (getPos() >= fileLength)
-+ {
-+ if(debug) debug("CephInputStream.read: cannot read " + len +
-+ " bytes from fd " + fileHandle + ": current position is " +
-+ getPos() + " and file length is " + fileLength);
-+
-+ return -1;
++ if (getPos() >= fileLength) {
++ ceph.debug("CephInputStream.read: cannot read " + len +
++ " bytes from fd " + fileHandle + ": current position is "
++ + getPos() + " and file length is " + fileLength,
++ ceph.DEBUG);
++
++ return -1;
++ }
++
++ int totalRead = 0;
++ int initialLen = len;
++ int read;
++ do {
++ read = Math.min(len, bufValid - bufPos);
++ try {
++ System.arraycopy(buffer, bufPos, buf, off, read);
++ }
++ catch(IndexOutOfBoundsException ie) {
++ throw new IOException("CephInputStream.read: Indices out of bounds:"
++ + "read length is " + len
++ + ", buffer offset is " + off
++ + ", and buffer size is " + buf.length);
++ }
++ catch (ArrayStoreException ae) {
++ throw new IOException("Uh-oh, CephInputStream failed to do an array"
++ + "copy due to type mismatch...");
++ }
++ catch (NullPointerException ne) {
++ throw new IOException("CephInputStream.read: cannot read "
++ + len + "bytes from fd:" + fileHandle
++ + ": buf is null");
++ }
++ bufPos += read;
++ len -= read;
++ off += read;
++ totalRead += read;
++ } while (len > 0 && fillBuffer());
++
++ ceph.debug("CephInputStream.read: Reading " + initialLen
++ + " bytes from fd " + fileHandle
++ + ": succeeded in reading " + totalRead + " bytes",
++ ceph.TRACE);
++ return totalRead;
+ }
-+ // actually do the read
-+ int result = ceph_read(fileHandle, buf, off, len);
-+ if (result < 0)
-+ if(debug) debug("CephInputStream.read: Reading " + len
-+ + " bytes from fd " + fileHandle + " failed.");
-+
-+ if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd "
-+ + fileHandle + ": succeeded in reading " + result + " bytes");
-+ return result;
-+ }
+
+ /**
+ * Close the CephInputStream and release the associated filehandle.
+ */
+ @Override
-+ public void close() throws IOException {
-+ if(debug) debug("CephOutputStream.close:enter");
-+ if (closed) {
-+ throw new IOException("Stream closed");
-+ }
-+
-+ int result = ceph_close(fileHandle);
-+ if (result != 0) {
-+ throw new IOException("Close failed!");
-+ }
-+ closed = true;
-+ if(debug) debug("CephOutputStream.close:exit");
-+ }
-+
-+ private void debug(String out) {
-+ System.err.println(out);
-+ }
++ public void close() throws IOException {
++ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++ if (!closed) {
++ int result = ceph.ceph_close(fileHandle);
++ closed = true;
++ if (result != 0) {
++ throw new IOException("Close somehow failed!"
++ + "Don't try and use this stream again, though");
++ }
++ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
++ }
++ }
+}
Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java
===================================================================
--- src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0)
+++ src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0)
-@@ -0,0 +1,196 @@
-+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+@@ -0,0 +1,202 @@
++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+
+package org.apache.hadoop.fs.ceph;
+
-+import java.io.File;
-+import java.io.FileInputStream;
-+import java.io.FileOutputStream;
+import java.io.IOException;
-+import java.io.InputStream;
+import java.io.OutputStream;
-+import java.util.ArrayList;
-+import java.util.List;
-+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ */
+public class CephOutputStream extends OutputStream {
+
-+ private int bufferSize;
-+
+ private boolean closed;
+
-+ private int fileHandle;
-+
-+ private boolean debug;
++ private CephFS ceph;
+
++ private int fileHandle;
+
-+ private native long ceph_seek_from_start(int fh, long pos);
-+ private native long ceph_getpos(int fh);
-+ private native int ceph_close(int fh);
-+ private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
++ private byte[] buffer;
++ private int bufUsed = 0;
+
+ /**
+ * Construct the CephOutputStream.
+ * @param conf The FileSystem configuration.
+ * @param fh The Ceph filehandle to connect to.
+ */
-+ public CephOutputStream(Configuration conf, int fh) {
-+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-+ System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
++ public CephOutputStream(Configuration conf, CephFS cephfs,
++ int fh, int bufferSize) {
++ ceph = cephfs;
+ fileHandle = fh;
+ closed = false;
-+ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
++ buffer = new byte[bufferSize];
+ }
+
+ /**Ceph likes things to be closed before it shuts down,
+ * @return The file offset in bytes.
+ */
+ public long getPos() throws IOException {
-+ return ceph_getpos(fileHandle);
++ return ceph.ceph_getpos(fileHandle);
+ }
+
+ /**
+ * write fails.
+ */
+ @Override
-+ public synchronized void write(int b) throws IOException {
-+ if(debug) debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
++ public synchronized void write(int b) throws IOException {
++ ceph.debug("CephOutputStream.write: writing a single byte to fd "
++ + fileHandle, ceph.TRACE);
+
+ if (closed) {
-+ throw new IOException("CephOutputStream.write: cannot write " +
-+ "a byte to fd " + fileHandle + ": stream closed");
++ throw new IOException("CephOutputStream.write: cannot write " +
++ "a byte to fd " + fileHandle + ": stream closed");
+ }
+ // Stick the byte in a buffer and write it
+ byte buf[] = new byte[1];
+ buf[0] = (byte) b;
-+ int result = ceph_write(fileHandle, buf, 0, 1);
-+ if (1 != result)
-+ if(debug) debug("CephOutputStream.write: failed writing a single byte to fd "
-+ + fileHandle + ": Ceph write() result = " + result);
++ write(buf, 0, 1);
+ return;
+ }
+
+ * @param off the position in the file to start writing at.
+ * @param len The number of bytes to actually write.
+ * @throws IOException if you have closed the CephOutputStream, or
-+ * the write fails.
-+ * @throws NullPointerException if buf is null.
-+ * @throws IndexOutOfBoundsException if len > buf.length.
++ * if buf is null or off + len > buf.length, or
++ * if the write fails due to a Ceph error.
+ */
+ @Override
-+ public synchronized void write(byte buf[], int off, int len) throws IOException {
-+ if(debug) debug("CephOutputStream.write: writing " + len +
-+ " bytes to fd " + fileHandle);
-+ // make sure stream is open
-+ if (closed) {
-+ throw new IOException("CephOutputStream.write: cannot write " + len +
-+ "bytes to fd " + fileHandle + ": stream closed");
-+ }
-+
-+ // sanity check
-+ if (null == buf) {
-+ throw new NullPointerException("CephOutputStream.write: cannot write " + len +
-+ "bytes to fd " + fileHandle + ": write buffer is null");
-+ }
-+
-+ // check for proper index bounds
-+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
-+ throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
-+ + "write length is " + len + ", buffer offset is "
-+ + off +", and buffer size is " + buf.length);
-+ }
-+
-+ // write!
-+ int result = ceph_write(fileHandle, buf, off, len);
-+ if (result < 0) {
-+ throw new IOException("CephOutputStream.write: Write of " + len +
-+ "bytes to fd " + fileHandle + " failed");
-+ }
-+ if (result != len) {
-+ throw new IOException("CephOutputStream.write: Write of " + len +
-+ "bytes to fd " + fileHandle + "was incomplete: only "
-+ + result + " of " + len + " bytes were written.");
-+ }
-+ return;
-+ }
++ public synchronized void write(byte buf[], int off, int len) throws IOException {
++ ceph.debug("CephOutputStream.write: writing " + len +
++ " bytes to fd " + fileHandle, ceph.TRACE);
++ // make sure stream is open
++ if (closed) {
++ throw new IOException("CephOutputStream.write: cannot write " + len +
++ "bytes to fd " + fileHandle + ": stream closed");
++ }
++
++ int result;
++ int write;
++ while (len>0) {
++ write = Math.min(len, buffer.length - bufUsed);
++ try {
++ System.arraycopy(buf, off, buffer, bufUsed, write);
++ }
++ catch (IndexOutOfBoundsException ie) {
++ throw new IOException("CephOutputStream.write: Indices out of bounds: "
++ + "write length is " + len
++ + ", buffer offset is " + off
++ + ", and buffer size is " + buf.length);
++ }
++ catch (ArrayStoreException ae) {
++ throw new IOException("Uh-oh, CephOutputStream failed to do an array"
++ + " copy due to type mismatch...");
++ }
++ catch (NullPointerException ne) {
++ throw new IOException("CephOutputStream.write: cannot write "
++ + len + "bytes to fd " + fileHandle
++ + ": buffer is null");
++ }
++ bufUsed += write;
++ len -= write;
++ off += write;
++ if (bufUsed == buffer.length) {
++ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++ if (result < 0)
++ throw new IOException("CephOutputStream.write: Buffered write of "
++ + bufUsed + " bytes failed!");
++ if (result != bufUsed)
++ throw new IOException("CephOutputStream.write: Wrote only "
++ + result + " bytes of " + bufUsed
++ + " in buffer! Data may be lost or written"
++ + " twice to Ceph!");
++ bufUsed = 0;
++ }
++
++ }
++ return;
++ }
+
+ /**
-+ * Flush the written data. It doesn't actually do anything; all writes are synchronous.
-+ * @throws IOException if you've closed the stream.
++ * Flush the buffered data.
++ * @throws IOException if you've closed the stream or the write fails.
+ */
+ @Override
-+ public synchronized void flush() throws IOException {
-+ if (closed) {
-+ throw new IOException("Stream closed");
-+ }
-+ return;
-+ }
++ public synchronized void flush() throws IOException {
++ if (!closed) {
++ if (bufUsed == 0) return;
++ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
++ if (result < 0) {
++ throw new IOException("CephOutputStream.write: Write of "
++ + bufUsed + "bytes to fd "
++ + fileHandle + " failed");
++ }
++ if (result != bufUsed) {
++ throw new IOException("CephOutputStream.write: Write of " + bufUsed
++ + "bytes to fd " + fileHandle
++ + "was incomplete: only " + result + " of "
++ + bufUsed + " bytes were written.");
++ }
++ bufUsed = 0;
++ return;
++ }
++ }
+
+ /**
+ * Close the CephOutputStream.
+ * @throws IOException if Ceph somehow returns an error. In current code it can't.
+ */
+ @Override
-+ public synchronized void close() throws IOException {
-+ if(debug) debug("CephOutputStream.close:enter");
-+ if (closed) {
-+ throw new IOException("Stream closed");
-+ }
-+
-+ int result = ceph_close(fileHandle);
-+ if (result != 0) {
-+ throw new IOException("Close failed!");
-+ }
-+
-+ closed = true;
-+ if(debug) debug("CephOutputStream.close:exit");
-+ }
-+
-+ private void debug(String out) {
-+ System.err.println(out);
-+ }
++ public synchronized void close() throws IOException {
++ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
++ if (!closed) {
++ flush();
++ int result = ceph.ceph_close(fileHandle);
++ if (result != 0) {
++ throw new IOException("Close failed!");
++ }
++
++ closed = true;
++ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
++ }
++ }
+}
Index: src/java/org/apache/hadoop/fs/ceph/package.html
===================================================================
--- src/java/org/apache/hadoop/fs/ceph/package.html (revision 0)
+++ src/java/org/apache/hadoop/fs/ceph/package.html (revision 0)
-@@ -0,0 +1,100 @@
+@@ -0,0 +1,101 @@
+<html>
+
+<!--
+<property>
+ <name>fs.ceph.debug</name>
+ <value>true</value>
-+ <description>If true, the Java-based code will print debugging information.</description>
++ <description>If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.</description>
+</property>
+
+<property>
+ <name>fs.ceph.clientDebug</name>
+ <value>1</value>
-+ <description>If non-zero, the Ceph client will print debugging information (a higher number=more debugging).</description>
++ <description>If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).</description>
+</property>
+
+<property>
+ <name>fs.ceph.messengerDebug</name>
+ <value>1</value>
-+ <description>If non-zero, the Ceph messenger will print debugging information (a higher number=more debugging)</description>
++ <description>If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)</description>
+</property>
+
+<property>
+<property>
+ <name>fs.ceph.commandLine</name>
+ <value>a string</value>
-+ <description>If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here.</description>
++ <description>If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
++By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.</description>
+</property>
+</pre>
+
\ No newline at end of file
Index: src/java/core-default.xml
===================================================================
---- src/java/core-default.xml (revision 814022)
+--- src/java/core-default.xml (revision 832912)
+++ src/java/core-default.xml (working copy)
-@@ -175,6 +175,12 @@
+@@ -194,6 +194,12 @@
</property>
<property>