From: Greg Farnum Date: Thu, 5 Nov 2009 20:58:29 +0000 (-0800) Subject: Hadoop: Update JavaDoc and put in new patch file X-Git-Tag: v0.18~128^2~29 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=db142df3c85e9bc33e9aa96a937603440303e579;p=ceph.git Hadoop: Update JavaDoc and put in new patch file --- diff --git a/src/client/hadoop/HADOOP-ceph.patch b/src/client/hadoop/HADOOP-ceph.patch index 966453e11108..1f3c7a3a919c 100644 --- a/src/client/hadoop/HADOOP-ceph.patch +++ b/src/client/hadoop/HADOOP-ceph.patch @@ -1,9 +1,128 @@ +Index: src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java +=================================================================== +--- src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0) ++++ src/test/core/org/apache/hadoop/fs/ceph/TestCeph.java (revision 0) +@@ -0,0 +1,42 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ * ++ * Unit tests for the CephFileSystem API implementation. ++ */ ++ ++package org.apache.hadoop.fs.ceph; ++ ++import java.io.IOException; ++import java.net.URI; ++import org.apache.hadoop.fs.FileSystemContractBaseTest; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++ ++public class TestCeph extends FileSystemContractBaseTest { ++ ++ @Override ++ protected void setUp() throws IOException { ++ Configuration conf = new Configuration(); ++ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG); ++ CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null"); ++ cephfs.initialize(URI.create("ceph://null"), conf); ++ fs = cephfs; ++ cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory())); ++ } ++} +Index: src/java/org/apache/hadoop/fs/ceph/SubmitProcess +=================================================================== +--- src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0) ++++ src/java/org/apache/hadoop/fs/ceph/SubmitProcess (revision 0) +@@ -0,0 +1,2 @@ ++1) Remove the emacs indententation rules line. ++2) ? No more? +\ No newline at end of file +Index: src/java/org/apache/hadoop/fs/ceph/CephTalker.java +=================================================================== +--- src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0) ++++ src/java/org/apache/hadoop/fs/ceph/CephTalker.java (revision 0) +@@ -0,0 +1,59 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- ++/** ++ * ++ * Licensed under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or ++ * implied. See the License for the specific language governing ++ * permissions and limitations under the License. ++ * ++ * ++ * Wraps a number of native function calls to communicate with the Ceph ++ * filesystem. ++ */ ++package org.apache.hadoop.fs.ceph; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.commons.logging.Log; ++ ++class CephTalker extends CephFS { ++ //we write a constructor so we can load the libraries ++ public CephTalker(Configuration conf, Log log) { ++ super(conf, log); ++ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); ++ System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); ++ } ++ protected native boolean ceph_initializeClient(String arguments, int block_size); ++ protected native String ceph_getcwd(); ++ protected native boolean ceph_setcwd(String path); ++ protected native boolean ceph_rmdir(String path); ++ protected native boolean ceph_unlink(String path); ++ protected native boolean ceph_rename(String old_path, String new_path); ++ protected native boolean ceph_exists(String path); ++ protected native long ceph_getblocksize(String path); ++ protected native boolean ceph_isdirectory(String path); ++ protected native boolean ceph_isfile(String path); ++ protected native String[] ceph_getdir(String path); ++ protected native int ceph_mkdirs(String path, int mode); ++ protected native int ceph_open_for_append(String path); ++ protected native int ceph_open_for_read(String path); ++ protected native int ceph_open_for_overwrite(String path, int mode); ++ protected native int ceph_close(int filehandle); ++ protected native boolean ceph_setPermission(String path, int mode); ++ protected native boolean ceph_kill_client(); ++ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill); ++ protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill); ++ protected native int ceph_replication(String path); ++ protected native String ceph_hosts(int fh, long offset); ++ protected native int ceph_setTimes(String path, long mtime, long atime); ++ protected native long ceph_getpos(int fh); ++ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); ++ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); ++ protected native long ceph_seek_from_start(int fh, long pos); ++} Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java =================================================================== --- src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0) +++ src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java (revision 0) -@@ -0,0 +1,810 @@ -+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +@@ -0,0 +1,848 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- +/** + * + * Licensed under the Apache License, Version 2.0 @@ -26,16 +145,14 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + +import java.io.IOException; +import java.io.FileNotFoundException; -+import java.io.File; +import java.io.OutputStream; +import java.net.URI; -+import java.util.Set; +import java.util.EnumSet; -+import java.util.Vector; +import java.lang.Math; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; ++import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; @@ -71,61 +188,41 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + */ +public class CephFileSystem extends FileSystem { + -+ private static final int EEXIST = 17; -+ private static final int ENOENT = 2; -+ + private URI uri; + + private final Path root; + private boolean initialized = false; ++ private CephFS ceph = null; + + private boolean debug = false; -+ private String cephDebugLevel; -+ private String monAddr; + private String fs_default_name; -+ -+ private native boolean ceph_initializeClient(String arguments, int block_size); -+ private native String ceph_getcwd(); -+ private native boolean ceph_setcwd(String path); -+ private native boolean ceph_rmdir(String path); -+ private native boolean ceph_mkdir(String path); -+ private native boolean ceph_unlink(String path); -+ private native boolean ceph_rename(String old_path, String new_path); -+ private native boolean ceph_exists(String path); -+ private native long ceph_getblocksize(String path); -+ private native boolean ceph_isdirectory(String path); -+ private native boolean ceph_isfile(String path); -+ private native String[] ceph_getdir(String path); -+ private native int ceph_mkdirs(String path, int mode); -+ private native int ceph_open_for_append(String path); -+ private native int ceph_open_for_read(String path); -+ private native int ceph_open_for_overwrite(String path, int mode); -+ private native int ceph_close(int filehandle); -+ private native boolean ceph_setPermission(String path, int mode); -+ private native boolean ceph_kill_client(); -+ private native boolean ceph_stat(String path, Stat fill); -+ private native int ceph_statfs(String Path, CephStat fill); -+ private native int ceph_replication(String path); -+ private native String ceph_hosts(int fh, long offset); -+ private native int ceph_setTimes(String path, long mtime, long atime); + + /** + * Create a new CephFileSystem. + */ + public CephFileSystem() { -+ if(debug) debug("CephFileSystem:enter"); + root = new Path("/"); -+ if(debug) debug("CephFileSystem:exit"); + } + ++ /** ++ * Used for testing purposes, this constructor ++ * sets the given CephFS instead of defaulting to a ++ * CephTalker (with its assumed real Ceph instance to talk to). ++ */ ++ public CephFileSystem(CephFS ceph_fs, String default_path) { ++ super(); ++ root = new Path("/"); ++ ceph = ceph_fs; ++ fs_default_name = default_path; ++ } ++ + /** + * Lets you get the URI of this CephFileSystem. + * @return the URI. + */ + public URI getUri() { + if (!initialized) return null; -+ if(debug) debug("getUri:enter"); -+ if(debug) debug("getUri:exit with return " + uri); ++ ceph.debug("getUri:exit with return " + uri, ceph.DEBUG); + return uri; + } + @@ -140,52 +237,54 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + */ + @Override + public void initialize(URI uri, Configuration conf) throws IOException { -+ if(debug) debug("initialize:enter"); + if (!initialized) { -+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); -+ System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); + super.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); -+ statistics = getStatistics(uri.getScheme(), getClass()); -+ -+ fs_default_name = conf.get("fs.default.name"); -+ debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); ++ statistics = getStatistics(uri.getScheme(), getClass()); ++ ++ if (ceph == null) { ++ ceph = new CephTalker(conf, LOG); ++ } ++ if (null == fs_default_name) { ++ fs_default_name = conf.get("fs.default.name"); ++ } + //build up the arguments for Ceph + String arguments = "CephFSInterface"; + arguments += conf.get("fs.ceph.commandLine", ""); + if (conf.get("fs.ceph.clientDebug") != null) { -+ arguments += " --debug_client "; -+ arguments += conf.get("fs.ceph.clientDebug"); ++ arguments += " --debug_client "; ++ arguments += conf.get("fs.ceph.clientDebug"); + } + if (conf.get("fs.ceph.messengerDebug") != null) { -+ arguments += " --debug_ms "; -+ arguments += conf.get("fs.ceph.messengerDebug"); ++ arguments += " --debug_ms "; ++ arguments += conf.get("fs.ceph.messengerDebug"); + } + if (conf.get("fs.ceph.monAddr") != null) { -+ arguments += " -m "; -+ arguments += conf.get("fs.ceph.monAddr"); ++ arguments += " -m "; ++ arguments += conf.get("fs.ceph.monAddr"); + } -+ arguments += " --client-readahead-max-periods=" -+ + conf.get("fs.ceph.readahead", "1"); ++ arguments += " --client-readahead-max-periods=" ++ + conf.get("fs.ceph.readahead", "1"); + //make sure they gave us a ceph monitor address or conf file ++ ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO); + if ( (conf.get("fs.ceph.monAddr") == null) && -+ (arguments.indexOf("-m") == -1) && -+ (arguments.indexOf("-c") == -1) ) { -+ if(debug) debug("You need to specify a Ceph monitor address."); -+ throw new IOException("You must specify a Ceph monitor address or config file!"); ++ (arguments.indexOf("-m") == -1) && ++ (arguments.indexOf("-c") == -1) ) { ++ ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL); ++ throw new IOException("You must specify a Ceph monitor address or config file!"); + } + // Initialize the client -+ if (!ceph_initializeClient(arguments, -+ conf.getInt("fs.ceph.blockSize", 1<<26))) { -+ if(debug) debug("Ceph initialization failed!"); -+ throw new IOException("Ceph initialization failed!"); ++ if (!ceph.ceph_initializeClient(arguments, ++ conf.getInt("fs.ceph.blockSize", 1<<26))) { ++ ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL); ++ throw new IOException("Ceph initialization failed!"); + } + initialized = true; -+ if(debug) debug("Initialized client. Setting cwd to /"); -+ ceph_setcwd("/"); ++ ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO); ++ ceph.ceph_setcwd("/"); + } -+ if(debug) debug("initialize:exit"); ++ ceph.debug("initialize:exit", ceph.DEBUG); + } + + /** @@ -195,38 +294,42 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + */ + @Override + public void close() throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("close:enter"); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("close:enter", ceph.DEBUG); + super.close();//this method does stuff, make sure it's run! -+ ceph_kill_client(); -+ if(debug) debug("close:exit"); ++ ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE); ++ ceph.ceph_kill_client(); ++ ceph.debug("close:exit", ceph.DEBUG); + } + + /** + * Get an FSDataOutputStream to append onto a file. + * @param file The File you want to append onto -+ * @param bufferSize Ceph does internal buffering; this is ignored. ++ * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like. + * @param progress The Progressable to report progress to. + * Reporting is limited but exists. + * @return An FSDataOutputStream that connects to the file on Ceph. + * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to. + */ + public FSDataOutputStream append (Path file, int bufferSize, -+ Progressable progress) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("append:enter with path " + file + " bufferSize " + bufferSize); ++ Progressable progress) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG); + Path abs_path = makeAbsolute(file); + if (progress!=null) progress.progress(); -+ int fd = ceph_open_for_append(abs_path.toString()); ++ ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE); ++ int fd = ceph.ceph_open_for_append(abs_path.toString()); ++ ceph.debug("append: Returned to Java", ceph.TRACE); + if (progress!=null) progress.progress(); + if( fd < 0 ) { //error in open + throw new IOException("append: Open for append failed on path \"" + -+ abs_path.toString() + "\""); ++ abs_path.toString() + "\""); + } -+ CephOutputStream cephOStream = new CephOutputStream(getConf(), fd); -+ if(debug) debug("append:exit"); ++ CephOutputStream cephOStream = new CephOutputStream(getConf(), ++ ceph, fd, bufferSize); ++ ceph.debug("append:exit", ceph.DEBUG); + return new FSDataOutputStream(cephOStream, statistics); + } + @@ -236,10 +339,10 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + */ + public Path getWorkingDirectory() { + if (!initialized) return null; -+ if(debug) debug("getWorkingDirectory:enter"); -+ if(debug) debug("Working directory is " + ceph_getcwd()); -+ if(debug) debug("getWorkingDirectory:exit"); -+ return new Path(fs_default_name + ceph_getcwd()); ++ ceph.debug("getWorkingDirectory:enter", ceph.DEBUG); ++ String cwd = ceph.ceph_getcwd(); ++ ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG); ++ return new Path(fs_default_name + ceph.ceph_getcwd()); + } + + /** @@ -250,15 +353,14 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @param dir The directory to change to. + */ + @Override -+ public void setWorkingDirectory(Path dir) { ++ public void setWorkingDirectory(Path dir) { + if (!initialized) return; -+ if(debug) debug("setWorkingDirecty:enter with new working dir " + dir); ++ ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG); + Path abs_path = makeAbsolute(dir); -+ if(debug) debug("calling ceph_setcwd from Java"); -+ if (!ceph_setcwd(abs_path.toString())) -+ if(debug) debug("Warning:ceph_setcwd failed for some reason on path " + abs_path); -+ if(debug) debug("returned from ceph_setcwd to Java" ); -+ if(debug) debug("setWorkingDirectory:exit"); ++ ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE); ++ if (!ceph.ceph_setcwd(abs_path.toString())) ++ ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN); ++ ceph.debug("setWorkingDirectory:exit", ceph.DEBUG); + } + + /** @@ -269,22 +371,22 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @throws IOException if initialize() hasn't been called. + */ + @Override -+ public boolean exists(Path path) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("exists:enter with path " + path); ++ public boolean exists(Path path) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("exists:enter with path " + path, ceph.DEBUG); + boolean result; + Path abs_path = makeAbsolute(path); + if (abs_path.equals(root)) { + result = true; + } + else { -+ if(debug) debug("Calling ceph_exists from Java on path " -+ + abs_path.toString() + ":"); -+ result = ceph_exists(abs_path.toString()); -+ if(debug) debug("Returned from ceph_exists to Java"); ++ ceph.debug("exists:Calling ceph_exists from Java on path " ++ + abs_path.toString(), ceph.TRACE); ++ result = ceph.ceph_exists(abs_path.toString()); ++ ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE); + } -+ if(debug) debug("exists:exit with value " + result); ++ ceph.debug("exists:exit with value " + result, ceph.DEBUG); + return result; + } + @@ -294,43 +396,52 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @param path The directory path to create + * @param perms The permissions to apply to the created directories. + * @return true if successful, false otherwise -+ * @throws IOException if initialize() hasn't been called. ++ * @throws IOException if initialize() hasn't been called or the path ++ * is a child of a file. + */ ++ @Override + public boolean mkdirs(Path path, FsPermission perms) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("mkdirs:enter with path " + path); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); -+ if(debug) debug("calling ceph_mkdirs from Java"); -+ int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); -+ if(debug) debug("Returned from ceph_mkdirs to Java with result " + result); -+ if(debug) debug("mkdirs:exit with result " + result); -+ if (result != 0) ++ ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE); ++ int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); ++ if (result != 0) { ++ ceph.debug("mkdirs: make directory " + abs_path ++ + "Failing with result " + result, ceph.WARN); ++ if (ceph.ENOTDIR == result) ++ throw new FileAlreadyExistsException("Parent path is not a directory"); + return false; -+ else return true; ++ } ++ else { ++ ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG); ++ return true; ++ } + } + + /** + * Check if a path is a file. This is moderately faster than the + * generic implementation. + * @param path The path to check. -+ * @return true if the path is a file, false otherwise. ++ * @return true if the path is definitely a file, false otherwise. + * @throws IOException if initialize() hasn't been called. + */ + @Override -+ public boolean isFile(Path path) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("isFile:enter with path " + path); ++ public boolean isFile(Path path) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("isFile:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + boolean result; + if (abs_path.equals(root)) { + result = false; + } + else { -+ result = ceph_isfile(abs_path.toString()); ++ ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE); ++ result = ceph.ceph_isfile(abs_path.toString()); + } -+ if(debug) debug("isFile:exit with result " + result); ++ ceph.debug("isFile:exit with result " + result, ceph.DEBUG); + return result; + } + @@ -338,25 +449,25 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * Check if a path is a directory. This is moderately faster than + * the generic implementation. + * @param path The path to check -+ * @return true if the path is a directory, false otherwise. ++ * @return true if the path is definitely a directory, false otherwise. + * @throws IOException if initialize() hasn't been called. + */ + @Override -+ public boolean isDirectory(Path path) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("isDirectory:enter with path " + path); ++ public boolean isDirectory(Path path) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + boolean result; + if (abs_path.equals(root)) { + result = true; + } + else { -+ if(debug) debug("calling ceph_isdirectory from Java"); -+ result = ceph_isdirectory(abs_path.toString()); -+ if(debug) debug("Returned from ceph_isdirectory to Java"); ++ ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE); ++ result = ceph.ceph_isdirectory(abs_path.toString()); ++ ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE); + } -+ if(debug) debug("isDirectory:exit with result " + result); ++ ceph.debug("isDirectory:exit with result " + result, ceph.DEBUG); + return result; + } + @@ -369,31 +480,32 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @throws FileNotFoundException if the path could not be resolved. + */ + public FileStatus getFileStatus(Path path) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("getFileStatus:enter with path " + path); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + //sadly, Ceph doesn't really do uids/gids just yet, but + //everything else is filled + FileStatus status; + Stat lstat = new Stat(); -+ if(ceph_stat(abs_path.toString(), lstat)) { ++ ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE); ++ if(ceph.ceph_stat(abs_path.toString(), lstat)) { + status = new FileStatus(lstat.size, lstat.is_dir, -+ ceph_replication(abs_path.toString()), -+ lstat.block_size, -+ lstat.mod_time, -+ lstat.access_time, -+ new FsPermission((short)lstat.mode), -+ null, -+ null, -+ new Path(fs_default_name+abs_path.toString())); ++ ceph.ceph_replication(abs_path.toString()), ++ lstat.block_size, ++ lstat.mod_time, ++ lstat.access_time, ++ new FsPermission((short)lstat.mode), ++ null, ++ null, ++ new Path(fs_default_name+abs_path.toString())); + } + else { //fail out -+ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File " -+ + path + " does not exist or could not be accessed"); ++ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File " ++ + path + " does not exist or could not be accessed"); + } + -+ if(debug) debug("getFileStatus:exit"); ++ ceph.debug("getFileStatus:exit", ceph.DEBUG); + return status; + } + @@ -406,17 +518,17 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @throws FileNotFoundException if the input path can't be found. + */ + public FileStatus[] listStatus(Path path) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("listStatus:enter with path " + path); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("listStatus:enter with path " + path, ceph.WARN); + Path abs_path = makeAbsolute(path); + Path[] paths = listPaths(abs_path); + if (paths != null) { + FileStatus[] statuses = new FileStatus[paths.length]; + for (int i = 0; i < paths.length; ++i) { -+ statuses[i] = getFileStatus(paths[i]); ++ statuses[i] = getFileStatus(paths[i]); + } -+ if(debug) debug("listStatus:exit"); ++ ceph.debug("listStatus:exit", ceph.DEBUG); + return statuses; + } + if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null @@ -425,11 +537,15 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + } + + @Override -+ public void setPermission(Path p, FsPermission permission) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); ++ public void setPermission(Path p, FsPermission permission) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("setPermission:enter with path " + p ++ + " and permissions " + permission, ceph.DEBUG); + Path abs_path = makeAbsolute(p); -+ ceph_setPermission(abs_path.toString(), permission.toShort()); ++ ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE); ++ ceph.ceph_setPermission(abs_path.toString(), permission.toShort()); ++ ceph.debug("setPermission:exit", ceph.DEBUG); + } + + /** @@ -438,15 +554,19 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @param mtime Set modification time in number of millis since Jan 1, 1970. + * @param atime Set access time in number of millis since Jan 1, 1970. + */ -+@Override -+ public void setTimes(Path p, long mtime, long atime) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ Path abs_path = makeAbsolute(p); -+ int r = ceph_setTimes(abs_path.toString(), mtime, atime); -+ if (r<0) throw new IOException ("Failed to set times on path " -+ + abs_path.toString() + " Error code: " + r); -+} ++ @Override ++ public void setTimes(Path p, long mtime, long atime) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime + ++ " atime:" + atime, ceph.DEBUG); ++ Path abs_path = makeAbsolute(p); ++ ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE); ++ int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime); ++ if (r<0) throw new IOException ("Failed to set times on path " ++ + abs_path.toString() + " Error code: " + r); ++ ceph.debug("setTimes:exit", ceph.DEBUG); ++ } + + /** + * Create a new file and open an FSDataOutputStream that's connected to it. @@ -454,7 +574,8 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @param permission The permissions to apply to the file. + * @param flag If CreateFlag.OVERWRITE, overwrite any existing + * file with this name; otherwise don't. -+ * @param bufferSize Ceph does internal buffering; this is ignored. ++ * @param bufferSize Ceph does internal buffering, but you can buffer ++ * in the Java code too if you like. + * @param replication Ignored by Ceph. This can be + * configured via Ceph configuration. + * @param blockSize Ignored by Ceph. You can set client-wide block sizes @@ -467,17 +588,17 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * failure in attempting to open for append with Ceph. + */ + public FSDataOutputStream create(Path path, -+ FsPermission permission, -+ EnumSet flag, -+ //boolean overwrite, -+ int bufferSize, -+ short replication, -+ long blockSize, -+ Progressable progress -+ ) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("create:enter with path " + path); ++ FsPermission permission, ++ EnumSet flag, ++ //boolean overwrite, ++ int bufferSize, ++ short replication, ++ long blockSize, ++ Progressable progress ++ ) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("create:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + if (progress!=null) progress.progress(); + // We ignore replication since that's not configurable here, and @@ -489,13 +610,13 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + boolean exists = exists(abs_path); + if (exists) { + if(isDirectory(abs_path)) -+ throw new IOException("create: Cannot overwrite existing directory \"" -+ + path.toString() + "\" with a file"); ++ throw new IOException("create: Cannot overwrite existing directory \"" ++ + path.toString() + "\" with a file"); + //if (!overwrite) + if (!flag.contains(CreateFlag.OVERWRITE)) -+ throw new IOException("createRaw: Cannot open existing file \"" -+ + abs_path.toString() -+ + "\" for writing without overwrite flag"); ++ throw new IOException("createRaw: Cannot open existing file \"" ++ + abs_path.toString() ++ + "\" for writing without overwrite flag"); + } + + if (progress!=null) progress.progress(); @@ -504,68 +625,73 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + if (!exists) { + Path parent = abs_path.getParent(); + if (parent != null) { // if parent is root, we're done -+ int r = ceph_mkdirs(parent.toString(), permission.toShort()); -+ if (!(r==0 || r==-EEXIST)) -+ throw new IOException ("Error creating parent directory; code: " + r); ++ int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort()); ++ if (!(r==0 || r==-ceph.EEXIST)) ++ throw new IOException ("Error creating parent directory; code: " + r); + } + if (progress!=null) progress.progress(); + } + // Step 3: open the file -+ if(debug) debug("calling ceph_open_for_overwrite from Java"); -+ int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); ++ ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE); ++ int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); + if (progress!=null) progress.progress(); -+ if(debug) debug("Returned from ceph_open_for_overwrite to Java with fh " + fh); ++ ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE); + if (fh < 0) { + throw new IOException("create: Open for overwrite failed on path \"" + -+ path.toString() + "\""); ++ path.toString() + "\""); + } + + // Step 4: create the stream -+ OutputStream cephOStream = new CephOutputStream(getConf(), fh); -+ if(debug) debug("create:exit"); ++ OutputStream cephOStream = new CephOutputStream(getConf(), ++ ceph, fh, bufferSize); ++ ceph.debug("create:exit", ceph.DEBUG); + return new FSDataOutputStream(cephOStream, statistics); -+ } ++ } + + /** + * Open a Ceph file and attach the file handle to an FSDataInputStream. + * @param path The file to open -+ * @param bufferSize Ceph does internal buffering; this is ignored. ++ * @param bufferSize Ceph does internal buffering; but you can buffer in ++ * the Java code too if you like. + * @return FSDataInputStream reading from the given path. + * @throws IOException if initialize() hasn't been called, the path DNE or is a + * directory, or there is an error getting data to set up the FSDataInputStream. + */ + public FSDataInputStream open(Path path, int bufferSize) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("open:enter with path " + path); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("open:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + -+ int fh = ceph_open_for_read(abs_path.toString()); ++ int fh = ceph.ceph_open_for_read(abs_path.toString()); + if (fh < 0) { //uh-oh, something's bad! -+ if (fh == -ENOENT) //well that was a stupid open -+ throw new IOException("open: absolute path \"" + abs_path.toString() -+ + "\" does not exist"); ++ if (fh == -ceph.ENOENT) //well that was a stupid open ++ throw new IOException("open: absolute path \"" + abs_path.toString() ++ + "\" does not exist"); + else //hrm...the file exists but we can't open it :( -+ throw new IOException("open: Failed to open file " + abs_path.toString()); ++ throw new IOException("open: Failed to open file " + abs_path.toString()); + } + + if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories + //but that doesn't mean you should in Hadoop! -+ ceph_close(fh); ++ ceph.ceph_close(fh); + throw new IOException("open: absolute path \"" + abs_path.toString() -+ + "\" is a directory!"); ++ + "\" is a directory!"); + } + Stat lstat = new Stat(); -+ ceph_stat(abs_path.toString(), lstat); ++ ceph.debug("open:calling ceph_stat from Java", ceph.TRACE); ++ ceph.ceph_stat(abs_path.toString(), lstat); ++ ceph.debug("open:returned to Java", ceph.TRACE); + long size = lstat.size; + if (size < 0) { + throw new IOException("Failed to get file size for file " + abs_path.toString() + -+ " but succeeded in opening file. Something bizarre is going on."); ++ " but succeeded in opening file. Something bizarre is going on."); + } -+ FSInputStream cephIStream = new CephInputStream(getConf(), fh, size); -+ if(debug) debug("open:exit"); ++ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, ++ fh, size, bufferSize); ++ ceph.debug("open:exit", ceph.DEBUG); + return new FSDataInputStream(cephIStream); -+ } ++ } + + /** + * Rename a file or directory. @@ -575,16 +701,25 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @throws IOException if initialize() hasn't been called. + */ + @Override -+ public boolean rename(Path src, Path dst) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("rename:enter"); -+ if(debug) debug("calling ceph_rename from Java"); ++ public boolean rename(Path src, Path dst) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG); + Path abs_src = makeAbsolute(src); + Path abs_dst = makeAbsolute(dst); -+ boolean result = ceph_rename(abs_src.toString(), abs_dst.toString()); -+ if(debug) debug("return from ceph_rename to Java with result " + result); -+ if(debug) debug("rename:exit"); ++ ceph.debug("calling ceph_rename from Java", ceph.TRACE); ++ boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString()); ++ if (!result) { ++ if (isDirectory(abs_dst)) { //move the srcdir into destdir ++ ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG); ++ Path new_dst = new Path(abs_dst, abs_src.getName()); ++ result = rename(abs_src, new_dst); ++ ceph.debug("attempt to move " + abs_src.toString() ++ + " to " + new_dst.toString() ++ + "has result:" + result, ceph.NOLOG); ++ } ++ } ++ ceph.debug("rename:exit with result: " + result, ceph.DEBUG); + return result; + } + @@ -603,28 +738,48 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @throws IOException if initialize() hasn't been called. + */ + @Override -+ public BlockLocation[] getFileBlockLocations(FileStatus file, -+ long start, long len) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); ++ public BlockLocation[] getFileBlockLocations(FileStatus file, ++ long start, long len) throws IOException { ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("getFileBlockLocations:enter with path " + file.getPath() + ++ ", start pos " + start + ", length " + len, ceph.DEBUG); + //sanitize and get the filehandle + Path abs_path = makeAbsolute(file.getPath()); -+ int fh = ceph_open_for_read(abs_path.toString()); ++ ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE); ++ int fh = ceph.ceph_open_for_read(abs_path.toString()); ++ ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh " ++ + fh, ceph.TRACE); + if (fh < 0) { ++ ceph.debug("getFileBlockLocations:got error " + fh + ++ ", exiting and returning null!", ceph.ERROR); + return null; + } + //get the block size -+ long blockSize = ceph_getblocksize(abs_path.toString()); ++ ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE); ++ long blockSize = ceph.ceph_getblocksize(abs_path.toString()); ++ ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE); + BlockLocation[] locations = -+ new BlockLocation[(int)Math.ceil((len-start)/(float)blockSize)]; ++ new BlockLocation[(int)Math.ceil(len/(float)blockSize)]; ++ long offset; + for (int i = 0; i < locations.length; ++i) { -+ String host = ceph_hosts(fh, start + i*blockSize); ++ offset = start + i*blockSize; ++ ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh " ++ + fh + " and offset " + offset, ceph.TRACE); ++ String host = ceph.ceph_hosts(fh, offset); ++ ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host " ++ + host, ceph.TRACE); + String[] hostArray = new String[1]; + hostArray[0] = host; + locations[i] = new BlockLocation(hostArray, hostArray, -+ start+i*blockSize, blockSize); ++ start+i*blockSize-(start % blockSize), ++ blockSize); + } -+ ceph_close(fh); ++ ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " ++ + fh, ceph.TRACE); ++ ceph.ceph_close(fh); ++ ceph.debug("getFileBlockLocations:return with " + locations.length ++ + " locations", ceph.DEBUG); + return locations; + } + @@ -632,32 +787,33 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * Get usage statistics on the Ceph filesystem. + * @param path A path to the partition you're interested in. + * Ceph doesn't partition, so this is ignored. -+ * @return FsStatus reporting capacity, usage, and remaining spac. ++ * @return FsStatus reporting capacity, usage, and remaining space. + * @throws IOException if initialize() hasn't been called, or the + * stat somehow fails. + */ + @Override -+ public FsStatus getStatus (Path path) throws IOException { -+ if (!initialized) throw new IOException("You have to initialize the" -+ + " CephFileSystem before calling other methods."); -+ if(debug) debug("getStatus:enter"); ++ public FsStatus getStatus (Path path) throws IOException { ++ if (!initialized) throw new IOException("You have to initialize the " ++ + " CephFileSystem before calling other methods."); ++ ceph.debug("getStatus:enter with path " + path, ceph.DEBUG); + Path abs_path = makeAbsolute(path); + -+ //currently(Ceph .14) Ceph actually ignores the path ++ //currently(Ceph .16) Ceph actually ignores the path + //but we still pass it in; if Ceph stops ignoring we may need more + //error-checking code. + CephStat ceph_stat = new CephStat(); -+ int result = ceph_statfs(abs_path.toString(), ceph_stat); ++ ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE); ++ int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat); + if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result); -+ if(debug) debug("getStatus:exit"); ++ ceph.debug("getStatus:exit successfully", ceph.DEBUG); + return new FsStatus(ceph_stat.capacity, -+ ceph_stat.used, ceph_stat.remaining); ++ ceph_stat.used, ceph_stat.remaining); + } + + /** + * Delete the given path, and optionally its children. + * @param path the path to delete. -+ * @param recursive If the path is a directory and this is false, ++ * @param recursive If the path is a non-empty directory and this is false, + * delete will throw an IOException. If path is a file this is ignored. + * @return true if the delete succeeded, false otherwise (including if + * path doesn't exist). @@ -666,56 +822,59 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * or you attempt to delete the root directory. + */ + public boolean delete(Path path, boolean recursive) throws IOException { -+ if (!initialized) throw new IOException ("You have to initialize the" -+ +"CephFileSystem before calling other methods."); -+ if(debug) debug("delete:enter"); ++ if (!initialized) throw new IOException ("You have to initialize the " ++ +"CephFileSystem before calling other methods."); ++ ceph.debug("delete:enter with path " + path + " and recursive=" + recursive, ++ ceph.DEBUG); + Path abs_path = makeAbsolute(path); + -+ if(debug) debug("delete: Deleting path " + abs_path.toString()); + // sanity check + if (abs_path.equals(root)) + throw new IOException("Error: deleting the root directory is a Bad Idea."); -+ + if (!exists(abs_path)) return false; + + // if the path is a file, try to delete it. + if (isFile(abs_path)) { -+ boolean result = ceph_unlink(abs_path.toString()); ++ ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path, ++ ceph.TRACE); ++ boolean result = ceph.ceph_unlink(abs_path.toString()); + if(!result) -+ if(debug) debug("delete: failed to delete file \"" + -+ abs_path.toString() + "\"."); -+ if(debug) debug("delete:exit"); ++ ceph.debug("delete: failed to delete file \"" + ++ abs_path.toString() + "\".", ceph.ERROR); ++ ceph.debug("delete:exit with success=" + result, ceph.DEBUG); + return result; + } + + /* The path is a directory, so recursively try to delete its contents, + and then delete the directory. */ -+ if (!recursive) { -+ throw new IOException("Directories must be deleted recursively!"); -+ } + //get the entries; listPaths will remove . and .. for us + Path[] contents = listPaths(abs_path); + if (contents == null) { -+ if(debug) debug("delete: Failed to read contents of directory \"" + -+ abs_path.toString() + "\" while trying to delete it"); -+ if(debug) debug("delete:exit"); ++ ceph.debug("delete: Failed to read contents of directory \"" + ++ abs_path.toString() + ++ "\" while trying to delete it, BAILING", ceph.ERROR); + return false; + } ++ if (!recursive && contents.length > 0) { ++ throw new IOException("Directories must be deleted recursively!"); ++ } + // delete the entries ++ ceph.debug("delete: recursively calling delete on contents of " ++ + abs_path, ceph.DEBUG); + for (Path p : contents) { + if (!delete(p, true)) { -+ if(debug) debug("delete: Failed to delete file \"" + -+ p.toString() + "\" while recursively deleting \"" -+ + abs_path.toString() + "\"" ); -+ if(debug) debug("delete:exit"); -+ return false; ++ ceph.debug("delete: Failed to delete file \"" + ++ p.toString() + "\" while recursively deleting \"" ++ + abs_path.toString() + "\", BAILING", ceph.ERROR ); ++ return false; + } + } + //if we've come this far it's a now-empty directory, so delete it! -+ boolean result = ceph_rmdir(abs_path.toString()); ++ boolean result = ceph.ceph_rmdir(abs_path.toString()); + if (!result) -+ if(debug) debug("delete: failed to delete \"" + abs_path.toString() + "\""); -+ if(debug) debug("delete:exit"); ++ ceph.debug("delete: failed to delete \"" + abs_path.toString() ++ + "\", BAILING", ceph.ERROR); ++ ceph.debug("delete:exit", ceph.DEBUG); + return result; + } + @@ -725,7 +884,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * by a separate Ceph configuration. + */ + @Override -+ public short getDefaultReplication() { ++ public short getDefaultReplication() { + return 1; + } + @@ -734,67 +893,65 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + * @return the default block size, in bytes, as a long. + */ + @Override -+ public long getDefaultBlockSize() { ++ public long getDefaultBlockSize() { + return getConf().getInt("fs.ceph.blockSize", 1<<26); + } + + // Makes a Path absolute. In a cheap, dirty hack, we're + // also going to strip off any fs_default_name prefix we see. + private Path makeAbsolute(Path path) { -+ if(debug) debug("makeAbsolute:enter with path " + path); ++ ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG); + if (path == null) return new Path("/"); + // first, check for the prefix + if (path.toString().startsWith(fs_default_name)) { + Path stripped_path = new Path(path.toString().substring(fs_default_name.length())); -+ if(debug) debug("makeAbsolute:exit with path " + stripped_path); ++ ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG); + return stripped_path; + } + + if (path.isAbsolute()) { -+ if(debug) debug("makeAbsolute:exit with path " + path); ++ ceph.debug("makeAbsolute:exit with path " + path, ceph.NOLOG); + return path; + } -+ Path new_path = new Path(ceph_getcwd(), path); -+ if(debug) debug("makeAbsolute:exit with path " + new_path); ++ Path new_path = new Path(ceph.ceph_getcwd(), path); ++ ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG); + return new_path; + } + + private Path[] listPaths(Path path) throws IOException { -+ if(debug) debug("listPaths:enter with path " + path); ++ ceph.debug("listPaths:enter with path " + path, ceph.NOLOG); + String dirlist[]; + + Path abs_path = makeAbsolute(path); + + // If it's a directory, get the listing. Otherwise, complain and give up. -+ if(debug) debug("calling ceph_getdir from Java with path " + abs_path); -+ dirlist = ceph_getdir(abs_path.toString()); -+ if(debug) debug("returning from ceph_getdir to Java"); ++ ceph.debug("calling ceph_getdir from Java with path " + abs_path, ceph.NOLOG); ++ dirlist = ceph.ceph_getdir(abs_path.toString()); ++ ceph.debug("returning from ceph_getdir to Java", ceph.NOLOG); + + if (dirlist == null) { -+ throw new IOException("listPaths: path " + path.toString() + " is not a directory."); ++ return null; + } + + // convert the strings to Paths + Path[] paths = new Path[dirlist.length]; + for (int i = 0; i < dirlist.length; ++i) { -+ if(debug) debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + -+ dirlist[i] + "\""); ++ ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + ++ dirlist[i] + "\"", ceph.TRACE); + // convert each listing to an absolute path + Path raw_path = new Path(dirlist[i]); + if (raw_path.isAbsolute()) -+ paths[i] = raw_path; ++ paths[i] = raw_path; + else -+ paths[i] = new Path(abs_path, raw_path); ++ paths[i] = new Path(abs_path, raw_path); + } -+ if(debug) debug("listPaths:exit"); ++ ceph.debug("listPaths:exit", ceph.NOLOG); + return paths; + } + -+ private void debug(String statement) { -+ System.err.println(statement); -+ } ++ + -+ private static class Stat { ++ static class Stat { + public long size; + public boolean is_dir; + public long block_size; @@ -805,7 +962,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + public Stat(){} + } + -+ private static class CephStat { ++ static class CephStat { + public long capacity; + public long used; + public long remaining; @@ -813,12 +970,774 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephFileSystem.java + public CephStat() {} + } +} +Index: src/java/org/apache/hadoop/fs/ceph/CephFS.java +=================================================================== +--- src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0) ++++ src/java/org/apache/hadoop/fs/ceph/CephFS.java (revision 0) +@@ -0,0 +1,272 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- ++/** ++ * ++ * Licensed under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or ++ * implied. See the License for the specific language governing ++ * permissions and limitations under the License. ++ * ++ * ++ * Abstract base class for communicating with a Ceph filesystem and its ++ * C++ codebase from Java, or pretending to do so (for unit testing purposes). ++ * As only the Ceph package should be using this directly, all methods ++ * are protected. ++ */ ++package org.apache.hadoop.fs.ceph; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.commons.logging.Log; ++ ++abstract class CephFS { ++ ++ protected static final int FATAL = 0; ++ protected static final int ERROR = 1; ++ protected static final int WARN = 2; ++ protected static final int INFO = 3; ++ protected static final int DEBUG = 4; ++ protected static final int TRACE = 5; ++ protected static final int NOLOG = 6; ++ ++ protected static final int ENOTDIR = 20; ++ protected static final int EEXIST = 17; ++ protected static final int ENOENT = 2; ++ ++ private boolean debug = false; ++ private Log LOG; ++ ++ public CephFS(Configuration conf, Log log) { ++ debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); ++ LOG = log; ++ } ++ ++ /* ++ * Performs any necessary setup to allow general use of the filesystem. ++ * Inputs: ++ * String argsuments -- a command-line style input of Ceph config params ++ * int block_size -- the size in bytes to use for blocks ++ * Returns: true on success, false otherwise ++ */ ++ abstract protected boolean ceph_initializeClient(String arguments, int block_size); ++ ++ /* ++ * Returns the current working directory (absolute) as a String ++ */ ++ abstract protected String ceph_getcwd(); ++ /* ++ * Changes the working directory. ++ * Inputs: ++ * String path: The path (relative or absolute) to switch to ++ * Returns: true on success, false otherwise. ++ */ ++ abstract protected boolean ceph_setcwd(String path); ++ /* ++ * Given a path to a directory, removes the directory if empty. ++ * Inputs: ++ * jstring j_path: The path (relative or absolute) to the directory ++ * Returns: true on successful delete; false otherwise ++ */ ++ abstract protected boolean ceph_rmdir(String path); ++ /* ++ * Given a path, unlinks it. ++ * Inputs: ++ * String path: The path (relative or absolute) to the file or empty dir ++ * Returns: true if the unlink occurred, false otherwise. ++ */ ++ abstract protected boolean ceph_unlink(String path); ++ /* ++ * Changes a given path name to a new name, assuming new_path doesn't exist. ++ * Inputs: ++ * jstring j_from: The path whose name you want to change. ++ * jstring j_to: The new name for the path. ++ * Returns: true if the rename occurred, false otherwise ++ */ ++ abstract protected boolean ceph_rename(String old_path, String new_path); ++ /* ++ * Returns true if it the input path exists, false ++ * if it does not or there is an unexpected failure. ++ */ ++ abstract protected boolean ceph_exists(String path); ++ /* ++ * Get the block size for a given path. ++ * Input: ++ * String path: The path (relative or absolute) you want ++ * the block size for. ++ * Returns: block size if the path exists, otherwise a negative number ++ * corresponding to the standard C++ error codes (which are positive). ++ */ ++ abstract protected long ceph_getblocksize(String path); ++ /* ++ * Returns true if the given path is a directory, false otherwise. ++ */ ++ abstract protected boolean ceph_isdirectory(String path); ++ /* ++ * Returns true if the given path is a file; false otherwise. ++ */ ++ abstract protected boolean ceph_isfile(String path); ++ /* ++ * Get the contents of a given directory. ++ * Inputs: ++ * String path: The path (relative or absolute) to the directory. ++ * Returns: A Java String[] of the contents of the directory, or ++ * NULL if there is an error (ie, path is not a dir). This listing ++ * will not contain . or .. entries. ++ */ ++ abstract protected String[] ceph_getdir(String path); ++ /* ++ * Create the specified directory and any required intermediate ones with the ++ * given mode. ++ */ ++ abstract protected int ceph_mkdirs(String path, int mode); ++ /* ++ * Open a file to append. If the file does not exist, it will be created. ++ * Opening a dir is possible but may have bad results. ++ * Inputs: ++ * String path: The path to open. ++ * Returns: an int filehandle, or a number<0 if an error occurs. ++ */ ++ abstract protected int ceph_open_for_append(String path); ++ /* ++ * Open a file for reading. ++ * Opening a dir is possible but may have bad results. ++ * Inputs: ++ * String path: The path to open. ++ * Returns: an int filehandle, or a number<0 if an error occurs. ++ */ ++ abstract protected int ceph_open_for_read(String path); ++ /* ++ * Opens a file for overwriting; creates it if necessary. ++ * Opening a dir is possible but may have bad results. ++ * Inputs: ++ * String path: The path to open. ++ * int mode: The mode to open with. ++ * Returns: an int filehandle, or a number<0 if an error occurs. ++ */ ++ abstract protected int ceph_open_for_overwrite(String path, int mode); ++ /* ++ * Closes the given file. Returns 0 on success, or a negative ++ * error code otherwise. ++ */ ++ abstract protected int ceph_close(int filehandle); ++ /* ++ * Change the mode on a path. ++ * Inputs: ++ * String path: The path to change mode on. ++ * int mode: The mode to apply. ++ * Returns: true if the mode is properly applied, false if there ++ * is any error. ++ */ ++ abstract protected boolean ceph_setPermission(String path, int mode); ++ /* ++ * Closes the Ceph client. This should be called before shutting down ++ * (multiple times is okay but redundant). ++ */ ++ abstract protected boolean ceph_kill_client(); ++ /* ++ * Get the statistics on a path returned in a custom format defined ++ * in CephFileSystem. ++ * Inputs: ++ * String path: The path to stat. ++ * Stat fill: The stat object to fill. ++ * Returns: true if the stat is successful, false otherwise. ++ */ ++ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill); ++ /* ++ * Statfs a filesystem in a custom format defined in CephFileSystem. ++ * Inputs: ++ * String path: A path on the filesystem that you wish to stat. ++ * CephStat fill: The CephStat object to fill. ++ * Returns: 0 if successful and the CephStat is filled; a negative ++ * error code otherwise. ++ */ ++ abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill); ++ /* ++ * Check how many times a path should be replicated (if it is ++ * degraded it may not actually be replicated this often). ++ * Inputs: ++ * String path: The path to check. ++ * Returns: an int containing the number of times replicated. ++ */ ++ abstract protected int ceph_replication(String path); ++ /* ++ * Find the IP address of the primary OSD for a given file and offset. ++ * Inputs: ++ * int fh: The filehandle for the file. ++ * long offset: The offset to get the location of. ++ * Returns: a String of the location as IP, or NULL if there is an error. ++ */ ++ abstract protected String ceph_hosts(int fh, long offset); ++ /* ++ * Set the mtime and atime for a given path. ++ * Inputs: ++ * String path: The path to set the times for. ++ * long mtime: The mtime to set, in millis since epoch (-1 to not set). ++ * long atime: The atime to set, in millis since epoch (-1 to not set) ++ * Returns: 0 if successful, an error code otherwise. ++ */ ++ abstract protected int ceph_setTimes(String path, long mtime, long atime); ++ /* ++ * Get the current position in a file (as a long) of a given filehandle. ++ * Returns: (long) current file position on success, or a ++ * negative error code on failure. ++ */ ++ abstract protected long ceph_getpos(int fh); ++ /* ++ * Write the given buffer contents to the given filehandle. ++ * Inputs: ++ * int fh: The filehandle to write to. ++ * byte[] buffer: The buffer to write from ++ * int buffer_offset: The position in the buffer to write from ++ * int length: The number of (sequential) bytes to write. ++ * Returns: int, on success the number of bytes written, on failure ++ * a negative error code. ++ */ ++ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); ++ ++ /* ++ * Reads into the given byte array from the current position. ++ * Inputs: ++ * int fh: the filehandle to read from ++ * byte[] buffer: the byte array to read into ++ * int buffer_offset: where in the buffer to start writing ++ * int length: how much to read. ++ * There'd better be enough space in the buffer to write all ++ * the data from the given offset! ++ * Returns: the number of bytes read on success (as an int), ++ * or an error code otherwise. */ ++ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); ++ /* ++ * Seeks to the given position in the given file. ++ * Inputs: ++ * int fh: The filehandle to seek in. ++ * long pos: The position to seek to. ++ * Returns: the new position (as a long) of the filehandle on success, ++ * or a negative error code on failure. */ ++ abstract protected long ceph_seek_from_start(int fh, long pos); ++ ++ protected void debug(String statement, int priority) { ++ if (debug) System.err.println(statement); ++ switch(priority) { ++ case FATAL: LOG.fatal(statement); ++ break; ++ case ERROR: LOG.error(statement); ++ break; ++ case WARN: LOG.warn(statement); ++ break; ++ case INFO: LOG.info(statement); ++ break; ++ case DEBUG: LOG.debug(statement); ++ break; ++ case TRACE: LOG.trace(statement); ++ break; ++ case NOLOG: break; ++ default: break; ++ } ++ } ++} +Index: src/java/org/apache/hadoop/fs/ceph/CephFaker.java +=================================================================== +--- src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0) ++++ src/java/org/apache/hadoop/fs/ceph/CephFaker.java (revision 0) +@@ -0,0 +1,480 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- ++/** ++ * ++ * Licensed under the Apache License, Version 2.0 ++ * (the "License"); you may not use this file except in compliance with ++ * the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or ++ * implied. See the License for the specific language governing ++ * permissions and limitations under the License. ++ * ++ * ++ * This uses the local Filesystem but pretends to be communicating ++ * with a Ceph deployment, for unit testing the CephFileSystem. ++ */ ++ ++package org.apache.hadoop.fs.ceph; ++ ++import java.net.URI; ++import java.util.Hashtable; ++import java.io.Closeable; ++import java.io.FileNotFoundException; ++import java.io.IOException; ++ ++import org.apache.commons.logging.Log; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.BlockLocation; ++import org.apache.hadoop.fs.FileAlreadyExistsException; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.FsStatus; ++import org.apache.hadoop.fs.FSDataInputStream; ++import org.apache.hadoop.fs.FSDataOutputStream; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.permission.FsPermission; ++ ++class CephFaker extends CephFS { ++ ++ FileSystem localFS; ++ String localPrefix; ++ int blockSize; ++ Configuration conf; ++ Hashtable files; ++ Hashtable filenames; ++ int fileCount = 0; ++ boolean initialized = false; ++ ++ public CephFaker(Configuration con, Log log) { ++ super(con, log); ++ conf = con; ++ files = new Hashtable(); ++ filenames = new Hashtable(); ++ } ++ ++ protected boolean ceph_initializeClient(String args, int block_size) { ++ if (!initialized) { ++ //let's remember the default block_size ++ blockSize = block_size; ++ /* for a real Ceph deployment, this starts up the client, ++ * sets debugging levels, etc. We just need to get the ++ * local FileSystem to use, and we'll ignore any ++ * command-line arguments. */ ++ try { ++ localFS = FileSystem.getLocal(conf); ++ localFS.initialize(URI.create("file://localhost"), conf); ++ localFS.setVerifyChecksum(false); ++ String testDir = conf.get("hadoop.tmp.dir"); ++ localPrefix = localFS.getWorkingDirectory().toString(); ++ int testDirLoc = localPrefix.indexOf(testDir) - 1; ++ if (-2 == testDirLoc) ++ testDirLoc = localPrefix.length(); ++ localPrefix = localPrefix.substring(0, testDirLoc) ++ + "/" + conf.get("hadoop.tmp.dir"); ++ ++ localFS.setWorkingDirectory(new Path(localPrefix ++ +"/user/" ++ + System.getProperty("user.name"))); ++ //I don't know why, but the unit tests expect the default ++ //working dir to be /user/username, so satisfy them! ++ //debug("localPrefix is " + localPrefix, INFO); ++ } ++ catch (IOException e) { ++ return false; ++ } ++ initialized = true; ++ } ++ return true; ++ } ++ ++ protected String ceph_getcwd() { ++ return sanitize_path(localFS.getWorkingDirectory().toString()); ++ } ++ ++ protected boolean ceph_setcwd(String path) { ++ localFS.setWorkingDirectory(new Path(prepare_path(path))); ++ return true; ++ } ++ ++ //the caller is responsible for ensuring empty dirs ++ protected boolean ceph_rmdir(String pth) { ++ Path path = new Path(prepare_path(pth)); ++ boolean ret = false; ++ try { ++ if (localFS.listStatus(path).length <= 1) { ++ ret = localFS.delete(path, true); ++ } ++ } ++ catch (IOException e){ } ++ return ret; ++ } ++ ++ //this needs to work on (empty) directories too ++ protected boolean ceph_unlink(String path) { ++ path = prepare_path(path); ++ boolean ret = false; ++ if (ceph_isdirectory(path)) { ++ ret = ceph_rmdir(path); ++ } ++ else { ++ try { ++ ret = localFS.delete(new Path(path), false); ++ } ++ catch (IOException e){ } ++ } ++ return ret; ++ } ++ ++ protected boolean ceph_rename(String oldName, String newName) { ++ oldName = prepare_path(oldName); ++ newName = prepare_path(newName); ++ try { ++ Path parent = new Path(newName).getParent(); ++ Path newPath = new Path(newName); ++ if (localFS.exists(parent) && !localFS.exists(newPath)) ++ return localFS.rename(new Path(oldName), newPath); ++ return false; ++ } ++ catch (IOException e) { return false; } ++ } ++ ++ protected boolean ceph_exists(String path) { ++ path = prepare_path(path); ++ boolean ret = false; ++ try { ++ ret = localFS.exists(new Path(path)); ++ } ++ catch (IOException e){ } ++ return ret; ++ } ++ ++ protected long ceph_getblocksize(String path) { ++ path = prepare_path(path); ++ try { ++ FileStatus status = localFS.getFileStatus(new Path(path)); ++ return status.getBlockSize(); ++ } ++ catch (FileNotFoundException e) { ++ return -CephFS.ENOENT; ++ } ++ catch (IOException e) { ++ return -1; //just fail generically ++ } ++ } ++ ++ protected boolean ceph_isdirectory(String path) { ++ path = prepare_path(path); ++ try { ++ FileStatus status = localFS.getFileStatus(new Path(path)); ++ return status.isDir(); ++ } ++ catch (IOException e) { return false; } ++ } ++ ++ protected boolean ceph_isfile(String path) { ++ path = prepare_path(path); ++ boolean ret = false; ++ try { ++ FileStatus status = localFS.getFileStatus(new Path(path)); ++ ret = !status.isDir(); ++ } ++ catch (Exception e) {} ++ return ret; ++ } ++ ++ protected String[] ceph_getdir(String path) { ++ path = prepare_path(path); ++ if (!ceph_isdirectory(path)) { ++ return null; ++ } ++ try { ++ FileStatus[] stats = localFS.listStatus(new Path(path)); ++ String[] names = new String[stats.length]; ++ String name; ++ for (int i=0; i fileLength) { -+ throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos + -+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); ++ throw new IOException("CephInputStream.seek: failed seek to position " ++ + targetPos + " on fd " + fileHandle ++ + ": Cannot seek after EOF " + fileLength); + } -+ ceph_seek_from_start(fileHandle, targetPos); -+ } ++ long oldPos = cephPos; ++ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); ++ bufValid = 0; ++ bufPos = 0; ++ if (cephPos < 0) { ++ cephPos = oldPos; ++ throw new IOException ("Ceph failed to seek to new position!"); ++ } ++ } + + /** + * Failovers are handled by the Ceph code at a very low level; @@ -942,9 +1878,9 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java + * @return the next byte. + */ + @Override -+ public synchronized int read() throws IOException { -+ if(debug) debug("CephInputStream.read: Reading a single byte from fd " + fileHandle -+ + " by calling general read function"); ++ public synchronized int read() throws IOException { ++ ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle ++ + " by calling general read function", ceph.TRACE); + + byte result[] = new byte[1]; + if (getPos() >= fileLength) return -1; @@ -954,79 +1890,94 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephInputStream.java + } + + /** -+ * Read a specified number of bytes into a byte[] from the file. ++ * Read a specified number of bytes from the file into a byte[]. + * @param buf the byte array to read into. + * @param off the offset to start at in the file + * @param len the number of bytes to read + * @return 0 if successful, otherwise an error code. ++ * @throws IOException on bad input. + */ + @Override -+ public synchronized int read(byte buf[], int off, int len) throws IOException { -+ if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); ++ public synchronized int read(byte buf[], int off, int len) ++ throws IOException { ++ ceph.debug("CephInputStream.read: Reading " + len + ++ " bytes from fd " + fileHandle, ceph.TRACE); + + if (closed) { -+ throw new IOException("CephInputStream.read: cannot read " + len + -+ " bytes from fd " + fileHandle + ": stream closed"); -+ } -+ if (null == buf) { -+ throw new NullPointerException("Read buffer is null"); -+ } -+ -+ // check for proper index bounds -+ if((off < 0) || (len < 0) || (off + len > buf.length)) { -+ throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: " -+ + "read length is " + len + ", buffer offset is " -+ + off +", and buffer size is " + buf.length); ++ throw new IOException("CephInputStream.read: cannot read " + len + ++ " bytes from fd " + fileHandle + ++ ": stream closed"); + } -+ ++ + // ensure we're not past the end of the file -+ if (getPos() >= fileLength) -+ { -+ if(debug) debug("CephInputStream.read: cannot read " + len + -+ " bytes from fd " + fileHandle + ": current position is " + -+ getPos() + " and file length is " + fileLength); -+ -+ return -1; ++ if (getPos() >= fileLength) { ++ ceph.debug("CephInputStream.read: cannot read " + len + ++ " bytes from fd " + fileHandle + ": current position is " ++ + getPos() + " and file length is " + fileLength, ++ ceph.DEBUG); ++ ++ return -1; ++ } ++ ++ int totalRead = 0; ++ int initialLen = len; ++ int read; ++ do { ++ read = Math.min(len, bufValid - bufPos); ++ try { ++ System.arraycopy(buffer, bufPos, buf, off, read); ++ } ++ catch(IndexOutOfBoundsException ie) { ++ throw new IOException("CephInputStream.read: Indices out of bounds:" ++ + "read length is " + len ++ + ", buffer offset is " + off ++ + ", and buffer size is " + buf.length); ++ } ++ catch (ArrayStoreException ae) { ++ throw new IOException("Uh-oh, CephInputStream failed to do an array" ++ + "copy due to type mismatch..."); ++ } ++ catch (NullPointerException ne) { ++ throw new IOException("CephInputStream.read: cannot read " ++ + len + "bytes from fd:" + fileHandle ++ + ": buf is null"); ++ } ++ bufPos += read; ++ len -= read; ++ off += read; ++ totalRead += read; ++ } while (len > 0 && fillBuffer()); ++ ++ ceph.debug("CephInputStream.read: Reading " + initialLen ++ + " bytes from fd " + fileHandle ++ + ": succeeded in reading " + totalRead + " bytes", ++ ceph.TRACE); ++ return totalRead; + } -+ // actually do the read -+ int result = ceph_read(fileHandle, buf, off, len); -+ if (result < 0) -+ if(debug) debug("CephInputStream.read: Reading " + len -+ + " bytes from fd " + fileHandle + " failed."); -+ -+ if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd " -+ + fileHandle + ": succeeded in reading " + result + " bytes"); -+ return result; -+ } + + /** + * Close the CephInputStream and release the associated filehandle. + */ + @Override -+ public void close() throws IOException { -+ if(debug) debug("CephOutputStream.close:enter"); -+ if (closed) { -+ throw new IOException("Stream closed"); -+ } -+ -+ int result = ceph_close(fileHandle); -+ if (result != 0) { -+ throw new IOException("Close failed!"); -+ } -+ closed = true; -+ if(debug) debug("CephOutputStream.close:exit"); -+ } -+ -+ private void debug(String out) { -+ System.err.println(out); -+ } ++ public void close() throws IOException { ++ ceph.debug("CephOutputStream.close:enter", ceph.TRACE); ++ if (!closed) { ++ int result = ceph.ceph_close(fileHandle); ++ closed = true; ++ if (result != 0) { ++ throw new IOException("Close somehow failed!" ++ + "Don't try and use this stream again, though"); ++ } ++ ceph.debug("CephOutputStream.close:exit", ceph.TRACE); ++ } ++ } +} Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java =================================================================== --- src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0) +++ src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java (revision 0) -@@ -0,0 +1,196 @@ -+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +@@ -0,0 +1,202 @@ ++// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- +/** + * + * Licensed under the Apache License, Version 2.0 @@ -1048,18 +1999,10 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java + +package org.apache.hadoop.fs.ceph; + -+import java.io.File; -+import java.io.FileInputStream; -+import java.io.FileOutputStream; +import java.io.IOException; -+import java.io.InputStream; +import java.io.OutputStream; -+import java.util.ArrayList; -+import java.util.List; -+import java.util.Random; + +import org.apache.hadoop.conf.Configuration; -+import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** @@ -1069,32 +2012,26 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java + */ +public class CephOutputStream extends OutputStream { + -+ private int bufferSize; -+ + private boolean closed; + -+ private int fileHandle; -+ -+ private boolean debug; ++ private CephFS ceph; + ++ private int fileHandle; + -+ private native long ceph_seek_from_start(int fh, long pos); -+ private native long ceph_getpos(int fh); -+ private native int ceph_close(int fh); -+ private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); -+ ++ private byte[] buffer; ++ private int bufUsed = 0; + + /** + * Construct the CephOutputStream. + * @param conf The FileSystem configuration. + * @param fh The Ceph filehandle to connect to. + */ -+ public CephOutputStream(Configuration conf, int fh) { -+ System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); -+ System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); ++ public CephOutputStream(Configuration conf, CephFS cephfs, ++ int fh, int bufferSize) { ++ ceph = cephfs; + fileHandle = fh; + closed = false; -+ debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); ++ buffer = new byte[bufferSize]; + } + + /**Ceph likes things to be closed before it shuts down, @@ -1112,7 +2049,7 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java + * @return The file offset in bytes. + */ + public long getPos() throws IOException { -+ return ceph_getpos(fileHandle); ++ return ceph.ceph_getpos(fileHandle); + } + + /** @@ -1122,20 +2059,18 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java + * write fails. + */ + @Override -+ public synchronized void write(int b) throws IOException { -+ if(debug) debug("CephOutputStream.write: writing a single byte to fd " + fileHandle); ++ public synchronized void write(int b) throws IOException { ++ ceph.debug("CephOutputStream.write: writing a single byte to fd " ++ + fileHandle, ceph.TRACE); + + if (closed) { -+ throw new IOException("CephOutputStream.write: cannot write " + -+ "a byte to fd " + fileHandle + ": stream closed"); ++ throw new IOException("CephOutputStream.write: cannot write " + ++ "a byte to fd " + fileHandle + ": stream closed"); + } + // Stick the byte in a buffer and write it + byte buf[] = new byte[1]; + buf[0] = (byte) b; -+ int result = ceph_write(fileHandle, buf, 0, 1); -+ if (1 != result) -+ if(debug) debug("CephOutputStream.write: failed writing a single byte to fd " -+ + fileHandle + ": Ceph write() result = " + result); ++ write(buf, 0, 1); + return; + } + @@ -1145,88 +2080,110 @@ Index: src/java/org/apache/hadoop/fs/ceph/CephOutputStream.java + * @param off the position in the file to start writing at. + * @param len The number of bytes to actually write. + * @throws IOException if you have closed the CephOutputStream, or -+ * the write fails. -+ * @throws NullPointerException if buf is null. -+ * @throws IndexOutOfBoundsException if len > buf.length. ++ * if buf is null or off + len > buf.length, or ++ * if the write fails due to a Ceph error. + */ + @Override -+ public synchronized void write(byte buf[], int off, int len) throws IOException { -+ if(debug) debug("CephOutputStream.write: writing " + len + -+ " bytes to fd " + fileHandle); -+ // make sure stream is open -+ if (closed) { -+ throw new IOException("CephOutputStream.write: cannot write " + len + -+ "bytes to fd " + fileHandle + ": stream closed"); -+ } -+ -+ // sanity check -+ if (null == buf) { -+ throw new NullPointerException("CephOutputStream.write: cannot write " + len + -+ "bytes to fd " + fileHandle + ": write buffer is null"); -+ } -+ -+ // check for proper index bounds -+ if((off < 0) || (len < 0) || (off + len > buf.length)) { -+ throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: " -+ + "write length is " + len + ", buffer offset is " -+ + off +", and buffer size is " + buf.length); -+ } -+ -+ // write! -+ int result = ceph_write(fileHandle, buf, off, len); -+ if (result < 0) { -+ throw new IOException("CephOutputStream.write: Write of " + len + -+ "bytes to fd " + fileHandle + " failed"); -+ } -+ if (result != len) { -+ throw new IOException("CephOutputStream.write: Write of " + len + -+ "bytes to fd " + fileHandle + "was incomplete: only " -+ + result + " of " + len + " bytes were written."); -+ } -+ return; -+ } ++ public synchronized void write(byte buf[], int off, int len) throws IOException { ++ ceph.debug("CephOutputStream.write: writing " + len + ++ " bytes to fd " + fileHandle, ceph.TRACE); ++ // make sure stream is open ++ if (closed) { ++ throw new IOException("CephOutputStream.write: cannot write " + len + ++ "bytes to fd " + fileHandle + ": stream closed"); ++ } ++ ++ int result; ++ int write; ++ while (len>0) { ++ write = Math.min(len, buffer.length - bufUsed); ++ try { ++ System.arraycopy(buf, off, buffer, bufUsed, write); ++ } ++ catch (IndexOutOfBoundsException ie) { ++ throw new IOException("CephOutputStream.write: Indices out of bounds: " ++ + "write length is " + len ++ + ", buffer offset is " + off ++ + ", and buffer size is " + buf.length); ++ } ++ catch (ArrayStoreException ae) { ++ throw new IOException("Uh-oh, CephOutputStream failed to do an array" ++ + " copy due to type mismatch..."); ++ } ++ catch (NullPointerException ne) { ++ throw new IOException("CephOutputStream.write: cannot write " ++ + len + "bytes to fd " + fileHandle ++ + ": buffer is null"); ++ } ++ bufUsed += write; ++ len -= write; ++ off += write; ++ if (bufUsed == buffer.length) { ++ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); ++ if (result < 0) ++ throw new IOException("CephOutputStream.write: Buffered write of " ++ + bufUsed + " bytes failed!"); ++ if (result != bufUsed) ++ throw new IOException("CephOutputStream.write: Wrote only " ++ + result + " bytes of " + bufUsed ++ + " in buffer! Data may be lost or written" ++ + " twice to Ceph!"); ++ bufUsed = 0; ++ } ++ ++ } ++ return; ++ } + + /** -+ * Flush the written data. It doesn't actually do anything; all writes are synchronous. -+ * @throws IOException if you've closed the stream. ++ * Flush the buffered data. ++ * @throws IOException if you've closed the stream or the write fails. + */ + @Override -+ public synchronized void flush() throws IOException { -+ if (closed) { -+ throw new IOException("Stream closed"); -+ } -+ return; -+ } ++ public synchronized void flush() throws IOException { ++ if (!closed) { ++ if (bufUsed == 0) return; ++ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); ++ if (result < 0) { ++ throw new IOException("CephOutputStream.write: Write of " ++ + bufUsed + "bytes to fd " ++ + fileHandle + " failed"); ++ } ++ if (result != bufUsed) { ++ throw new IOException("CephOutputStream.write: Write of " + bufUsed ++ + "bytes to fd " + fileHandle ++ + "was incomplete: only " + result + " of " ++ + bufUsed + " bytes were written."); ++ } ++ bufUsed = 0; ++ return; ++ } ++ } + + /** + * Close the CephOutputStream. + * @throws IOException if Ceph somehow returns an error. In current code it can't. + */ + @Override -+ public synchronized void close() throws IOException { -+ if(debug) debug("CephOutputStream.close:enter"); -+ if (closed) { -+ throw new IOException("Stream closed"); -+ } -+ -+ int result = ceph_close(fileHandle); -+ if (result != 0) { -+ throw new IOException("Close failed!"); -+ } -+ -+ closed = true; -+ if(debug) debug("CephOutputStream.close:exit"); -+ } -+ -+ private void debug(String out) { -+ System.err.println(out); -+ } ++ public synchronized void close() throws IOException { ++ ceph.debug("CephOutputStream.close:enter", ceph.TRACE); ++ if (!closed) { ++ flush(); ++ int result = ceph.ceph_close(fileHandle); ++ if (result != 0) { ++ throw new IOException("Close failed!"); ++ } ++ ++ closed = true; ++ ceph.debug("CephOutputStream.close:exit", ceph.TRACE); ++ } ++ } +} Index: src/java/org/apache/hadoop/fs/ceph/package.html =================================================================== --- src/java/org/apache/hadoop/fs/ceph/package.html (revision 0) +++ src/java/org/apache/hadoop/fs/ceph/package.html (revision 0) -@@ -0,0 +1,100 @@ +@@ -0,0 +1,101 @@ + + +