From 66b84fa7863ffc132f553fba62b529cbf6a34cf8 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 20 Apr 2011 17:29:23 -0700 Subject: [PATCH] hadoop: whitespace fixes Signed-off-by: Colin McCabe --- src/client/hadoop/ceph/CephFS.java | 466 +++++----- src/client/hadoop/ceph/CephFaker.java | 891 ++++++++++--------- src/client/hadoop/ceph/CephFileSystem.java | 793 ++++++++++------- src/client/hadoop/ceph/CephInputStream.java | 269 +++--- src/client/hadoop/ceph/CephOutputStream.java | 225 ++--- src/client/hadoop/ceph/CephTalker.java | 52 +- src/client/hadoop/ceph/TestCeph.java | 14 +- 7 files changed, 1505 insertions(+), 1205 deletions(-) diff --git a/src/client/hadoop/ceph/CephFS.java b/src/client/hadoop/ceph/CephFS.java index ac09dc5da1f9a..7c92fce76fcb8 100644 --- a/src/client/hadoop/ceph/CephFS.java +++ b/src/client/hadoop/ceph/CephFS.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * * Licensed under the Apache License, Version 2.0 @@ -21,252 +22,295 @@ */ package org.apache.hadoop.fs.ceph; + import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; + abstract class CephFS { - protected static final int FATAL = 0; - protected static final int ERROR = 1; - protected static final int WARN = 2; - protected static final int INFO = 3; - protected static final int DEBUG = 4; - protected static final int TRACE = 5; - protected static final int NOLOG = 6; + protected static final int FATAL = 0; + protected static final int ERROR = 1; + protected static final int WARN = 2; + protected static final int INFO = 3; + protected static final int DEBUG = 4; + protected static final int TRACE = 5; + protected static final int NOLOG = 6; - protected static final int ENOTDIR = 20; + protected static final int ENOTDIR = 20; protected static final int EEXIST = 17; protected static final int ENOENT = 2; - private boolean debug = false; - private Log LOG; - - public CephFS(Configuration conf, Log log) { - debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); - LOG = log; - } - - /* - * Performs any necessary setup to allow general use of the filesystem. - * Inputs: - * String argsuments -- a command-line style input of Ceph config params - * int block_size -- the size in bytes to use for blocks - * Returns: true on success, false otherwise - */ - abstract protected boolean ceph_initializeClient(String arguments, int block_size); + private boolean debug = false; + private Log LOG; + + public CephFS(Configuration conf, Log log) { + debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); + LOG = log; + } + + /* + * Performs any necessary setup to allow general use of the filesystem. + * Inputs: + * String argsuments -- a command-line style input of Ceph config params + * int block_size -- the size in bytes to use for blocks + * Returns: true on success, false otherwise + */ + abstract protected boolean ceph_initializeClient(String arguments, int block_size); - /* - * Returns the current working directory (absolute) as a String - */ - abstract protected String ceph_getcwd(); - /* - * Changes the working directory. - * Inputs: - * String path: The path (relative or absolute) to switch to - * Returns: true on success, false otherwise. - */ - abstract protected boolean ceph_setcwd(String path); - /* - * Given a path to a directory, removes the directory if empty. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory - * Returns: true on successful delete; false otherwise - */ + /* + * Returns the current working directory (absolute) as a String + */ + abstract protected String ceph_getcwd(); + + /* + * Changes the working directory. + * Inputs: + * String path: The path (relative or absolute) to switch to + * Returns: true on success, false otherwise. + */ + abstract protected boolean ceph_setcwd(String path); + + /* + * Given a path to a directory, removes the directory if empty. + * Inputs: + * jstring j_path: The path (relative or absolute) to the directory + * Returns: true on successful delete; false otherwise + */ abstract protected boolean ceph_rmdir(String path); - /* - * Given a path, unlinks it. - * Inputs: - * String path: The path (relative or absolute) to the file or empty dir - * Returns: true if the unlink occurred, false otherwise. - */ + + /* + * Given a path, unlinks it. + * Inputs: + * String path: The path (relative or absolute) to the file or empty dir + * Returns: true if the unlink occurred, false otherwise. + */ abstract protected boolean ceph_unlink(String path); - /* - * Changes a given path name to a new name, assuming new_path doesn't exist. - * Inputs: - * jstring j_from: The path whose name you want to change. - * jstring j_to: The new name for the path. - * Returns: true if the rename occurred, false otherwise - */ + + /* + * Changes a given path name to a new name, assuming new_path doesn't exist. + * Inputs: + * jstring j_from: The path whose name you want to change. + * jstring j_to: The new name for the path. + * Returns: true if the rename occurred, false otherwise + */ abstract protected boolean ceph_rename(String old_path, String new_path); - /* - * Returns true if it the input path exists, false - * if it does not or there is an unexpected failure. - */ + + /* + * Returns true if it the input path exists, false + * if it does not or there is an unexpected failure. + */ abstract protected boolean ceph_exists(String path); - /* - * Get the block size for a given path. - * Input: - * String path: The path (relative or absolute) you want - * the block size for. - * Returns: block size if the path exists, otherwise a negative number - * corresponding to the standard C++ error codes (which are positive). - */ + + /* + * Get the block size for a given path. + * Input: + * String path: The path (relative or absolute) you want + * the block size for. + * Returns: block size if the path exists, otherwise a negative number + * corresponding to the standard C++ error codes (which are positive). + */ abstract protected long ceph_getblocksize(String path); - /* - * Returns true if the given path is a directory, false otherwise. - */ + + /* + * Returns true if the given path is a directory, false otherwise. + */ abstract protected boolean ceph_isdirectory(String path); - /* - * Returns true if the given path is a file; false otherwise. - */ + + /* + * Returns true if the given path is a file; false otherwise. + */ abstract protected boolean ceph_isfile(String path); - /* - * Get the contents of a given directory. - * Inputs: - * String path: The path (relative or absolute) to the directory. - * Returns: A Java String[] of the contents of the directory, or - * NULL if there is an error (ie, path is not a dir). This listing - * will not contain . or .. entries. - */ + + /* + * Get the contents of a given directory. + * Inputs: + * String path: The path (relative or absolute) to the directory. + * Returns: A Java String[] of the contents of the directory, or + * NULL if there is an error (ie, path is not a dir). This listing + * will not contain . or .. entries. + */ abstract protected String[] ceph_getdir(String path); - /* - * Create the specified directory and any required intermediate ones with the - * given mode. - */ + + /* + * Create the specified directory and any required intermediate ones with the + * given mode. + */ abstract protected int ceph_mkdirs(String path, int mode); - /* - * Open a file to append. If the file does not exist, it will be created. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ + + /* + * Open a file to append. If the file does not exist, it will be created. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ abstract protected int ceph_open_for_append(String path); - /* - * Open a file for reading. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ + + /* + * Open a file for reading. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ abstract protected int ceph_open_for_read(String path); - /* - * Opens a file for overwriting; creates it if necessary. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * int mode: The mode to open with. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ + + /* + * Opens a file for overwriting; creates it if necessary. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * int mode: The mode to open with. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ abstract protected int ceph_open_for_overwrite(String path, int mode); - /* - * Closes the given file. Returns 0 on success, or a negative - * error code otherwise. - */ + + /* + * Closes the given file. Returns 0 on success, or a negative + * error code otherwise. + */ abstract protected int ceph_close(int filehandle); - /* - * Change the mode on a path. - * Inputs: - * String path: The path to change mode on. - * int mode: The mode to apply. - * Returns: true if the mode is properly applied, false if there - * is any error. - */ + + /* + * Change the mode on a path. + * Inputs: + * String path: The path to change mode on. + * int mode: The mode to apply. + * Returns: true if the mode is properly applied, false if there + * is any error. + */ abstract protected boolean ceph_setPermission(String path, int mode); - /* - * Closes the Ceph client. This should be called before shutting down - * (multiple times is okay but redundant). - */ + + /* + * Closes the Ceph client. This should be called before shutting down + * (multiple times is okay but redundant). + */ abstract protected boolean ceph_kill_client(); - /* - * Get the statistics on a path returned in a custom format defined - * in CephFileSystem. - * Inputs: - * String path: The path to stat. - * Stat fill: The stat object to fill. - * Returns: true if the stat is successful, false otherwise. - */ + + /* + * Get the statistics on a path returned in a custom format defined + * in CephFileSystem. + * Inputs: + * String path: The path to stat. + * Stat fill: The stat object to fill. + * Returns: true if the stat is successful, false otherwise. + */ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill); - /* - * Statfs a filesystem in a custom format defined in CephFileSystem. - * Inputs: - * String path: A path on the filesystem that you wish to stat. - * CephStat fill: The CephStat object to fill. - * Returns: 0 if successful and the CephStat is filled; a negative - * error code otherwise. - */ + + /* + * Statfs a filesystem in a custom format defined in CephFileSystem. + * Inputs: + * String path: A path on the filesystem that you wish to stat. + * CephStat fill: The CephStat object to fill. + * Returns: 0 if successful and the CephStat is filled; a negative + * error code otherwise. + */ abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill); - /* - * Check how many times a path should be replicated (if it is - * degraded it may not actually be replicated this often). - * Inputs: - * String path: The path to check. - * Returns: an int containing the number of times replicated. - */ + + /* + * Check how many times a path should be replicated (if it is + * degraded it may not actually be replicated this often). + * Inputs: + * String path: The path to check. + * Returns: an int containing the number of times replicated. + */ abstract protected int ceph_replication(String path); - /* - * Find the IP address of the primary OSD for a given file and offset. - * Inputs: - * int fh: The filehandle for the file. - * long offset: The offset to get the location of. - * Returns: a String of the location as IP, or NULL if there is an error. - */ + + /* + * Find the IP address of the primary OSD for a given file and offset. + * Inputs: + * int fh: The filehandle for the file. + * long offset: The offset to get the location of. + * Returns: a String of the location as IP, or NULL if there is an error. + */ abstract protected String ceph_hosts(int fh, long offset); - /* - * Set the mtime and atime for a given path. - * Inputs: - * String path: The path to set the times for. - * long mtime: The mtime to set, in millis since epoch (-1 to not set). - * long atime: The atime to set, in millis since epoch (-1 to not set) - * Returns: 0 if successful, an error code otherwise. - */ + + /* + * Set the mtime and atime for a given path. + * Inputs: + * String path: The path to set the times for. + * long mtime: The mtime to set, in millis since epoch (-1 to not set). + * long atime: The atime to set, in millis since epoch (-1 to not set) + * Returns: 0 if successful, an error code otherwise. + */ abstract protected int ceph_setTimes(String path, long mtime, long atime); - /* - * Get the current position in a file (as a long) of a given filehandle. - * Returns: (long) current file position on success, or a - * negative error code on failure. - */ + + /* + * Get the current position in a file (as a long) of a given filehandle. + * Returns: (long) current file position on success, or a + * negative error code on failure. + */ abstract protected long ceph_getpos(int fh); - /* - * Write the given buffer contents to the given filehandle. - * Inputs: - * int fh: The filehandle to write to. - * byte[] buffer: The buffer to write from - * int buffer_offset: The position in the buffer to write from - * int length: The number of (sequential) bytes to write. - * Returns: int, on success the number of bytes written, on failure - * a negative error code. - */ + + /* + * Write the given buffer contents to the given filehandle. + * Inputs: + * int fh: The filehandle to write to. + * byte[] buffer: The buffer to write from + * int buffer_offset: The position in the buffer to write from + * int length: The number of (sequential) bytes to write. + * Returns: int, on success the number of bytes written, on failure + * a negative error code. + */ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); - /* - * Reads into the given byte array from the current position. - * Inputs: - * int fh: the filehandle to read from - * byte[] buffer: the byte array to read into - * int buffer_offset: where in the buffer to start writing - * int length: how much to read. - * There'd better be enough space in the buffer to write all - * the data from the given offset! - * Returns: the number of bytes read on success (as an int), - * or an error code otherwise. */ + /* + * Reads into the given byte array from the current position. + * Inputs: + * int fh: the filehandle to read from + * byte[] buffer: the byte array to read into + * int buffer_offset: where in the buffer to start writing + * int length: how much to read. + * There'd better be enough space in the buffer to write all + * the data from the given offset! + * Returns: the number of bytes read on success (as an int), + * or an error code otherwise. */ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); - /* - * Seeks to the given position in the given file. - * Inputs: - * int fh: The filehandle to seek in. - * long pos: The position to seek to. - * Returns: the new position (as a long) of the filehandle on success, - * or a negative error code on failure. */ + + /* + * Seeks to the given position in the given file. + * Inputs: + * int fh: The filehandle to seek in. + * long pos: The position to seek to. + * Returns: the new position (as a long) of the filehandle on success, + * or a negative error code on failure. */ abstract protected long ceph_seek_from_start(int fh, long pos); - protected void debug(String statement, int priority) { - if (debug) System.err.println(statement); - switch(priority) { - case FATAL: LOG.fatal(statement); - break; - case ERROR: LOG.error(statement); - break; - case WARN: LOG.warn(statement); - break; - case INFO: LOG.info(statement); - break; - case DEBUG: LOG.debug(statement); - break; - case TRACE: LOG.trace(statement); - break; - case NOLOG: break; - default: break; - } + protected void debug(String statement, int priority) { + if (debug) { + System.err.println(statement); + } + switch (priority) { + case FATAL: + LOG.fatal(statement); + break; + + case ERROR: + LOG.error(statement); + break; + + case WARN: + LOG.warn(statement); + break; + + case INFO: + LOG.info(statement); + break; + + case DEBUG: + LOG.debug(statement); + break; + + case TRACE: + LOG.trace(statement); + break; + + case NOLOG: + break; + + default: + break; + } } } diff --git a/src/client/hadoop/ceph/CephFaker.java b/src/client/hadoop/ceph/CephFaker.java index 4257320ddd08a..5e638035d87d5 100644 --- a/src/client/hadoop/ceph/CephFaker.java +++ b/src/client/hadoop/ceph/CephFaker.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * * Licensed under the Apache License, Version 2.0 @@ -20,6 +21,7 @@ package org.apache.hadoop.fs.ceph; + import java.net.URI; import java.util.Hashtable; import java.io.Closeable; @@ -38,443 +40,462 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; + class CephFaker extends CephFS { - FileSystem localFS; - String localPrefix; - int blockSize; - Configuration conf; - Hashtable files; - Hashtable filenames; - int fileCount = 0; - boolean initialized = false; + FileSystem localFS; + String localPrefix; + int blockSize; + Configuration conf; + Hashtable files; + Hashtable filenames; + int fileCount = 0; + boolean initialized = false; - public CephFaker(Configuration con, Log log) { - super(con, log); - conf = con; - files = new Hashtable(); - filenames = new Hashtable(); - } + public CephFaker(Configuration con, Log log) { + super(con, log); + conf = con; + files = new Hashtable(); + filenames = new Hashtable(); + } - protected boolean ceph_initializeClient(String args, int block_size) { - if (!initialized) { - //let's remember the default block_size - blockSize = block_size; - /* for a real Ceph deployment, this starts up the client, - * sets debugging levels, etc. We just need to get the - * local FileSystem to use, and we'll ignore any - * command-line arguments. */ - try { - localFS = FileSystem.getLocal(conf); - localFS.initialize(URI.create("file://localhost"), conf); - localFS.setVerifyChecksum(false); - String testDir = conf.get("hadoop.tmp.dir"); - localPrefix = localFS.getWorkingDirectory().toString(); - int testDirLoc = localPrefix.indexOf(testDir) - 1; - if (-2 == testDirLoc) - testDirLoc = localPrefix.length(); - localPrefix = localPrefix.substring(0, testDirLoc) - + "/" + conf.get("hadoop.tmp.dir"); - - localFS.setWorkingDirectory(new Path(localPrefix - +"/user/" - + System.getProperty("user.name"))); - //I don't know why, but the unit tests expect the default - //working dir to be /user/username, so satisfy them! - //debug("localPrefix is " + localPrefix, INFO); - } - catch (IOException e) { - return false; - } - initialized = true; - } - return true; - } - - protected String ceph_getcwd() { - return sanitize_path(localFS.getWorkingDirectory().toString()); - } - - protected boolean ceph_setcwd(String path) { - localFS.setWorkingDirectory(new Path(prepare_path(path))); - return true; - } - - //the caller is responsible for ensuring empty dirs - protected boolean ceph_rmdir(String pth) { - Path path = new Path(prepare_path(pth)); - boolean ret = false; - try { - if (localFS.listStatus(path).length <= 1) { - ret = localFS.delete(path, true); - } - } - catch (IOException e){ } - return ret; - } - - //this needs to work on (empty) directories too - protected boolean ceph_unlink(String path) { - path = prepare_path(path); - boolean ret = false; - if (ceph_isdirectory(path)) { - ret = ceph_rmdir(path); - } - else { - try { - ret = localFS.delete(new Path(path), false); - } - catch (IOException e){ } - } - return ret; - } - - protected boolean ceph_rename(String oldName, String newName) { - oldName = prepare_path(oldName); - newName = prepare_path(newName); - try { - Path parent = new Path(newName).getParent(); - Path newPath = new Path(newName); - if (localFS.exists(parent) && !localFS.exists(newPath)) - return localFS.rename(new Path(oldName), newPath); - return false; - } - catch (IOException e) { return false; } - } - - protected boolean ceph_exists(String path) { - path = prepare_path(path); - boolean ret = false; - try { - ret = localFS.exists(new Path(path)); - } - catch (IOException e){ } - return ret; - } - - protected long ceph_getblocksize(String path) { - path = prepare_path(path); - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - return status.getBlockSize(); - } - catch (FileNotFoundException e) { - return -CephFS.ENOENT; - } - catch (IOException e) { - return -1; //just fail generically - } - } - - protected boolean ceph_isdirectory(String path) { - path = prepare_path(path); - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - return status.isDir(); - } - catch (IOException e) { return false; } - } - - protected boolean ceph_isfile(String path) { - path = prepare_path(path); - boolean ret = false; - try { - FileStatus status = localFS.getFileStatus(new Path(path)); - ret = !status.isDir(); - } - catch (Exception e) {} - return ret; - } - - protected String[] ceph_getdir(String path) { - path = prepare_path(path); - if (!ceph_isdirectory(path)) { - return null; - } - try { - FileStatus[] stats = localFS.listStatus(new Path(path)); - String[] names = new String[stats.length]; - String name; - for (int i=0; i * A {@link FileSystem} backed by Ceph.. @@ -68,7 +71,7 @@ public class CephFileSystem extends FileSystem { private final Path root; private boolean initialized = false; - private CephFS ceph = null; + private CephFS ceph = null; private boolean debug = false; private String fs_default_name; @@ -80,24 +83,26 @@ public class CephFileSystem extends FileSystem { root = new Path("/"); } - /** - * Used for testing purposes, this constructor - * sets the given CephFS instead of defaulting to a - * CephTalker (with its assumed real Ceph instance to talk to). - */ - public CephFileSystem(CephFS ceph_fs, String default_path) { - super(); - root = new Path("/"); - ceph = ceph_fs; - fs_default_name = default_path; - } + /** + * Used for testing purposes, this constructor + * sets the given CephFS instead of defaulting to a + * CephTalker (with its assumed real Ceph instance to talk to). + */ + public CephFileSystem(CephFS ceph_fs, String default_path) { + super(); + root = new Path("/"); + ceph = ceph_fs; + fs_default_name = default_path; + } /** * Lets you get the URI of this CephFileSystem. * @return the URI. */ public URI getUri() { - if (!initialized) return null; + if (!initialized) { + return null; + } ceph.debug("getUri:exit with return " + uri, ceph.DEBUG); return uri; } @@ -113,53 +118,56 @@ public class CephFileSystem extends FileSystem { */ @Override public void initialize(URI uri, Configuration conf) throws IOException { - if (!initialized) { - super.initialize(uri, conf); - setConf(conf); - this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - statistics = getStatistics(uri.getScheme(), getClass()); - - if (ceph == null) { - ceph = new CephTalker(conf, LOG); - } - if (null == fs_default_name) { - fs_default_name = conf.get("fs.default.name"); - } - //build up the arguments for Ceph - String arguments = "CephFSInterface"; - arguments += conf.get("fs.ceph.commandLine", ""); - if (conf.get("fs.ceph.clientDebug") != null) { - arguments += " --debug_client "; - arguments += conf.get("fs.ceph.clientDebug"); - } - if (conf.get("fs.ceph.messengerDebug") != null) { - arguments += " --debug_ms "; - arguments += conf.get("fs.ceph.messengerDebug"); - } - if (conf.get("fs.ceph.monAddr") != null) { - arguments += " -m "; - arguments += conf.get("fs.ceph.monAddr"); - } - arguments += " --client-readahead-max-periods=" - + conf.get("fs.ceph.readahead", "1"); - //make sure they gave us a ceph monitor address or conf file - ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO); - if ( (conf.get("fs.ceph.monAddr") == null) && - (arguments.indexOf("-m") == -1) && - (arguments.indexOf("-c") == -1) ) { - ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL); - throw new IOException("You must specify a Ceph monitor address or config file!"); - } - // Initialize the client - if (!ceph.ceph_initializeClient(arguments, - conf.getInt("fs.ceph.blockSize", 1<<26))) { - ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL); - throw new IOException("Ceph initialization failed!"); - } - initialized = true; - ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO); - ceph.ceph_setcwd("/"); + if (initialized) { + return; + } + super.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + statistics = getStatistics(uri.getScheme(), getClass()); + if (ceph == null) { + ceph = new CephTalker(conf, LOG); + } + if (null == fs_default_name) { + fs_default_name = conf.get("fs.default.name"); } + // build up the arguments for Ceph + String arguments = "CephFSInterface"; + + arguments += conf.get("fs.ceph.commandLine", ""); + if (conf.get("fs.ceph.clientDebug") != null) { + arguments += " --debug_client "; + arguments += conf.get("fs.ceph.clientDebug"); + } + if (conf.get("fs.ceph.messengerDebug") != null) { + arguments += " --debug_ms "; + arguments += conf.get("fs.ceph.messengerDebug"); + } + if (conf.get("fs.ceph.monAddr") != null) { + arguments += " -m "; + arguments += conf.get("fs.ceph.monAddr"); + } + arguments += " --client-readahead-max-periods=" + + conf.get("fs.ceph.readahead", "1"); + // make sure they gave us a ceph monitor address or conf file + ceph.debug("initialize:Ceph initialization arguments: " + arguments, + ceph.INFO); + if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1) + && (arguments.indexOf("-c") == -1)) { + ceph.debug("initialize:You need to specify a Ceph monitor address.", + ceph.FATAL); + throw new IOException( + "You must specify a Ceph monitor address or config file!"); + } + // Initialize the client + if (!ceph.ceph_initializeClient(arguments, + conf.getInt("fs.ceph.blockSize", 1 << 26))) { + ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL); + throw new IOException("Ceph initialization failed!"); + } + initialized = true; + ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO); + ceph.ceph_setcwd("/"); ceph.debug("initialize:exit", ceph.DEBUG); } @@ -170,11 +178,14 @@ public class CephFileSystem extends FileSystem { */ @Override public void close() throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("close:enter", ceph.DEBUG); - super.close();//this method does stuff, make sure it's run! - ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE); + super.close(); // this method does stuff, make sure it's run! + ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE); ceph.ceph_kill_client(); ceph.debug("close:exit", ceph.DEBUG); } @@ -188,23 +199,35 @@ public class CephFileSystem extends FileSystem { * @return An FSDataOutputStream that connects to the file on Ceph. * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to. */ - public FSDataOutputStream append (Path file, int bufferSize, - Progressable progress) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); - ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG); + public FSDataOutputStream append(Path file, int bufferSize, + Progressable progress) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } + ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, + ceph.DEBUG); Path abs_path = makeAbsolute(file); - if (progress!=null) progress.progress(); - ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE); + + if (progress != null) { + progress.progress(); + } + ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE); int fd = ceph.ceph_open_for_append(abs_path.toString()); - ceph.debug("append: Returned to Java", ceph.TRACE); - if (progress!=null) progress.progress(); - if( fd < 0 ) { //error in open - throw new IOException("append: Open for append failed on path \"" + - abs_path.toString() + "\""); - } - CephOutputStream cephOStream = new CephOutputStream(getConf(), - ceph, fd, bufferSize); + + ceph.debug("append: Returned to Java", ceph.TRACE); + if (progress != null) { + progress.progress(); + } + if (fd < 0) { // error in open + throw new IOException( + "append: Open for append failed on path \"" + abs_path.toString() + + "\""); + } + CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd, + bufferSize); + ceph.debug("append:exit", ceph.DEBUG); return new FSDataOutputStream(cephOStream, statistics); } @@ -214,9 +237,12 @@ public class CephFileSystem extends FileSystem { * @return the directory Path */ public Path getWorkingDirectory() { - if (!initialized) return null; + if (!initialized) { + return null; + } ceph.debug("getWorkingDirectory:enter", ceph.DEBUG); - String cwd = ceph.ceph_getcwd(); + String cwd = ceph.ceph_getcwd(); + ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG); return new Path(fs_default_name + ceph.ceph_getcwd()); } @@ -229,13 +255,20 @@ public class CephFileSystem extends FileSystem { * @param dir The directory to change to. */ @Override - public void setWorkingDirectory(Path dir) { - if (!initialized) return; + public void setWorkingDirectory(Path dir) { + if (!initialized) { + return; + } ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG); Path abs_path = makeAbsolute(dir); + ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE); - if (!ceph.ceph_setcwd(abs_path.toString())) - ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN); + if (!ceph.ceph_setcwd(abs_path.toString())) { + ceph.debug( + "setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + + abs_path, + ceph.WARN); + } ceph.debug("setWorkingDirectory:exit", ceph.DEBUG); } @@ -247,19 +280,23 @@ public class CephFileSystem extends FileSystem { * @throws IOException if initialize() hasn't been called. */ @Override - public boolean exists(Path path) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + public boolean exists(Path path) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("exists:enter with path " + path, ceph.DEBUG); boolean result; Path abs_path = makeAbsolute(path); + if (abs_path.equals(root)) { result = true; - } - else { - ceph.debug("exists:Calling ceph_exists from Java on path " - + abs_path.toString(), ceph.TRACE); - result = ceph.ceph_exists(abs_path.toString()); + } else { + ceph.debug( + "exists:Calling ceph_exists from Java on path " + abs_path.toString(), + ceph.TRACE); + result = ceph.ceph_exists(abs_path.toString()); ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE); } ceph.debug("exists:exit with value " + result, ceph.DEBUG); @@ -273,27 +310,33 @@ public class CephFileSystem extends FileSystem { * @param perms The permissions to apply to the created directories. * @return true if successful, false otherwise * @throws IOException if initialize() hasn't been called or the path - * is a child of a file. + * is a child of a file. */ - @Override + @Override public boolean mkdirs(Path path, FsPermission perms) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); + ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE); - int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); + int result = ceph.ceph_mkdirs(abs_path.toString(), (int) perms.toShort()); + if (result != 0) { - ceph.debug("mkdirs: make directory " + abs_path - + "Failing with result " + result, ceph.WARN); - if (ceph.ENOTDIR == result) - throw new FileAlreadyExistsException("Parent path is not a directory"); + ceph.debug( + "mkdirs: make directory " + abs_path + "Failing with result " + result, + ceph.WARN); + if (ceph.ENOTDIR == result) { + throw new FileAlreadyExistsException("Parent path is not a directory"); + } return false; - } - else { - ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG); - return true; - } + } else { + ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG); + return true; + } } /** @@ -304,17 +347,20 @@ public class CephFileSystem extends FileSystem { * @throws IOException if initialize() hasn't been called. */ @Override - public boolean isFile(Path path) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + public boolean isFile(Path path) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("isFile:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); boolean result; + if (abs_path.equals(root)) { - result = false; - } - else { - ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE); + result = false; + } else { + ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE); result = ceph.ceph_isfile(abs_path.toString()); } ceph.debug("isFile:exit with result " + result, ceph.DEBUG); @@ -329,16 +375,19 @@ public class CephFileSystem extends FileSystem { * @throws IOException if initialize() hasn't been called. */ @Override - public boolean isDirectory(Path path) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + public boolean isDirectory(Path path) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); boolean result; + if (abs_path.equals(root)) { result = true; - } - else { + } else { ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE); result = ceph.ceph_isdirectory(abs_path.toString()); ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE); @@ -356,29 +405,29 @@ public class CephFileSystem extends FileSystem { * @throws FileNotFoundException if the path could not be resolved. */ public FileStatus getFileStatus(Path path) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); - //sadly, Ceph doesn't really do uids/gids just yet, but - //everything else is filled + // sadly, Ceph doesn't really do uids/gids just yet, but + // everything else is filled FileStatus status; Stat lstat = new Stat(); - ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE); - if(ceph.ceph_stat(abs_path.toString(), lstat)) { + + ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE); + if (ceph.ceph_stat(abs_path.toString(), lstat)) { status = new FileStatus(lstat.size, lstat.is_dir, - ceph.ceph_replication(abs_path.toString()), - lstat.block_size, - lstat.mod_time, - lstat.access_time, - new FsPermission((short)lstat.mode), - null, - null, - new Path(fs_default_name+abs_path.toString())); - } - else { //fail out - throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File " - + path + " does not exist or could not be accessed"); + ceph.ceph_replication(abs_path.toString()), lstat.block_size, + lstat.mod_time, lstat.access_time, + new FsPermission((short) lstat.mode), null, null, + new Path(fs_default_name + abs_path.toString())); + } else { // fail out + throw new FileNotFoundException( + "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path + + " does not exist or could not be accessed"); } ceph.debug("getFileStatus:exit", ceph.DEBUG); @@ -394,34 +443,46 @@ public class CephFileSystem extends FileSystem { * @throws FileNotFoundException if the input path can't be found. */ public FileStatus[] listStatus(Path path) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("listStatus:enter with path " + path, ceph.WARN); Path abs_path = makeAbsolute(path); Path[] paths = listPaths(abs_path); + if (paths != null) { FileStatus[] statuses = new FileStatus[paths.length]; + for (int i = 0; i < paths.length; ++i) { - statuses[i] = getFileStatus(paths[i]); + statuses[i] = getFileStatus(paths[i]); } ceph.debug("listStatus:exit", ceph.DEBUG); return statuses; } - if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null - //which means that the input wasn't a directory, so throw an Exception if it's not a file - return null; //or return null if it's a file + if (!isFile(path)) { + throw new FileNotFoundException(); + } // if we get here, listPaths returned null + // which means that the input wasn't a directory, so throw an Exception if it's not a file + return null; // or return null if it's a file } @Override public void setPermission(Path p, FsPermission permission) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); - ceph.debug("setPermission:enter with path " + p - + " and permissions " + permission, ceph.DEBUG); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } + ceph.debug( + "setPermission:enter with path " + p + " and permissions " + permission, + ceph.DEBUG); Path abs_path = makeAbsolute(p); - ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE); + + ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE); ceph.ceph_setPermission(abs_path.toString(), permission.toShort()); - ceph.debug("setPermission:exit", ceph.DEBUG); + ceph.debug("setPermission:exit", ceph.DEBUG); } /** @@ -430,19 +491,28 @@ public class CephFileSystem extends FileSystem { * @param mtime Set modification time in number of millis since Jan 1, 1970. * @param atime Set access time in number of millis since Jan 1, 1970. */ - @Override - public void setTimes(Path p, long mtime, long atime) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); - ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime + - " atime:" + atime, ceph.DEBUG); - Path abs_path = makeAbsolute(p); - ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE); - int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime); - if (r<0) throw new IOException ("Failed to set times on path " - + abs_path.toString() + " Error code: " + r); - ceph.debug("setTimes:exit", ceph.DEBUG); - } + @Override + public void setTimes(Path p, long mtime, long atime) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } + ceph.debug( + "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime, + ceph.DEBUG); + Path abs_path = makeAbsolute(p); + + ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE); + int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime); + + if (r < 0) { + throw new IOException( + "Failed to set times on path " + abs_path.toString() + " Error code: " + + r); + } + ceph.debug("setTimes:exit", ceph.DEBUG); + } /** * Create a new file and open an FSDataOutputStream that's connected to it. @@ -451,7 +521,7 @@ public class CephFileSystem extends FileSystem { * @param flag If CreateFlag.OVERWRITE, overwrite any existing * file with this name; otherwise don't. * @param bufferSize Ceph does internal buffering, but you can buffer - * in the Java code too if you like. + * in the Java code too if you like. * @param replication Ignored by Ceph. This can be * configured via Ceph configuration. * @param blockSize Ignored by Ceph. You can set client-wide block sizes @@ -464,19 +534,24 @@ public class CephFileSystem extends FileSystem { * failure in attempting to open for append with Ceph. */ public FSDataOutputStream create(Path path, - FsPermission permission, - EnumSet flag, - //boolean overwrite, - int bufferSize, - short replication, - long blockSize, - Progressable progress - ) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + FsPermission permission, + EnumSet flag, + // boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("create:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); - if (progress!=null) progress.progress(); + + if (progress != null) { + progress.progress(); + } // We ignore replication since that's not configurable here, and // progress reporting is quite limited. // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE; @@ -484,90 +559,118 @@ public class CephFileSystem extends FileSystem { // Step 1: existence test boolean exists = exists(abs_path); + if (exists) { - if(isDirectory(abs_path)) - throw new IOException("create: Cannot overwrite existing directory \"" - + path.toString() + "\" with a file"); - //if (!overwrite) - if (!flag.contains(CreateFlag.OVERWRITE)) - throw new IOException("createRaw: Cannot open existing file \"" - + abs_path.toString() - + "\" for writing without overwrite flag"); + if (isDirectory(abs_path)) { + throw new IOException( + "create: Cannot overwrite existing directory \"" + path.toString() + + "\" with a file"); + } + // if (!overwrite) + if (!flag.contains(CreateFlag.OVERWRITE)) { + throw new IOException( + "createRaw: Cannot open existing file \"" + abs_path.toString() + + "\" for writing without overwrite flag"); + } } - if (progress!=null) progress.progress(); + if (progress != null) { + progress.progress(); + } // Step 2: create any nonexistent directories in the path if (!exists) { Path parent = abs_path.getParent(); + if (parent != null) { // if parent is root, we're done - int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort()); - if (!(r==0 || r==-ceph.EEXIST)) - throw new IOException ("Error creating parent directory; code: " + r); + int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort()); + + if (!(r == 0 || r == -ceph.EEXIST)) { + throw new IOException("Error creating parent directory; code: " + r); + } + } + if (progress != null) { + progress.progress(); } - if (progress!=null) progress.progress(); } // Step 3: open the file ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE); - int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); - if (progress!=null) progress.progress(); - ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE); + int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), + (int) permission.toShort()); + + if (progress != null) { + progress.progress(); + } + ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, + ceph.TRACE); if (fh < 0) { - throw new IOException("create: Open for overwrite failed on path \"" + - path.toString() + "\""); + throw new IOException( + "create: Open for overwrite failed on path \"" + path.toString() + + "\""); } - + // Step 4: create the stream - OutputStream cephOStream = new CephOutputStream(getConf(), - ceph, fh, bufferSize); + OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh, + bufferSize); + ceph.debug("create:exit", ceph.DEBUG); return new FSDataOutputStream(cephOStream, statistics); - } + } /** * Open a Ceph file and attach the file handle to an FSDataInputStream. * @param path The file to open * @param bufferSize Ceph does internal buffering; but you can buffer in - * the Java code too if you like. + * the Java code too if you like. * @return FSDataInputStream reading from the given path. * @throws IOException if initialize() hasn't been called, the path DNE or is a * directory, or there is an error getting data to set up the FSDataInputStream. */ public FSDataInputStream open(Path path, int bufferSize) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("open:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); - + int fh = ceph.ceph_open_for_read(abs_path.toString()); - if (fh < 0) { //uh-oh, something's bad! - if (fh == -ceph.ENOENT) //well that was a stupid open - throw new IOException("open: absolute path \"" + abs_path.toString() - + "\" does not exist"); - else //hrm...the file exists but we can't open it :( - throw new IOException("open: Failed to open file " + abs_path.toString()); + + if (fh < 0) { // uh-oh, something's bad! + if (fh == -ceph.ENOENT) { // well that was a stupid open + throw new IOException( + "open: absolute path \"" + abs_path.toString() + + "\" does not exist"); + } else { // hrm...the file exists but we can't open it :( + throw new IOException("open: Failed to open file " + abs_path.toString()); + } } - if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories - //but that doesn't mean you should in Hadoop! + if (isDirectory(abs_path)) { // yes, it is possible to open Ceph directories + // but that doesn't mean you should in Hadoop! ceph.ceph_close(fh); - throw new IOException("open: absolute path \"" + abs_path.toString() - + "\" is a directory!"); + throw new IOException( + "open: absolute path \"" + abs_path.toString() + "\" is a directory!"); } Stat lstat = new Stat(); - ceph.debug("open:calling ceph_stat from Java", ceph.TRACE); + + ceph.debug("open:calling ceph_stat from Java", ceph.TRACE); ceph.ceph_stat(abs_path.toString(), lstat); - ceph.debug("open:returned to Java", ceph.TRACE); + ceph.debug("open:returned to Java", ceph.TRACE); long size = lstat.size; + if (size < 0) { - throw new IOException("Failed to get file size for file " + abs_path.toString() + - " but succeeded in opening file. Something bizarre is going on."); + throw new IOException( + "Failed to get file size for file " + abs_path.toString() + + " but succeeded in opening file. Something bizarre is going on."); } - FSInputStream cephIStream = new CephInputStream(getConf(), ceph, - fh, size, bufferSize); + FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size, + bufferSize); + ceph.debug("open:exit", ceph.DEBUG); return new FSDataInputStream(cephIStream); - } + } /** * Rename a file or directory. @@ -577,24 +680,31 @@ public class CephFileSystem extends FileSystem { * @throws IOException if initialize() hasn't been called. */ @Override - public boolean rename(Path src, Path dst) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + public boolean rename(Path src, Path dst) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG); Path abs_src = makeAbsolute(src); Path abs_dst = makeAbsolute(dst); + ceph.debug("calling ceph_rename from Java", ceph.TRACE); boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString()); - if (!result) { - if (isDirectory(abs_dst)) { //move the srcdir into destdir - ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG); - Path new_dst = new Path(abs_dst, abs_src.getName()); - result = rename(abs_src, new_dst); - ceph.debug("attempt to move " + abs_src.toString() - + " to " + new_dst.toString() - + "has result:" + result, ceph.NOLOG); - } - } + + if (!result) { + if (isDirectory(abs_dst)) { // move the srcdir into destdir + ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG); + Path new_dst = new Path(abs_dst, abs_src.getName()); + + result = rename(abs_src, new_dst); + ceph.debug( + "attempt to move " + abs_src.toString() + " to " + + new_dst.toString() + "has result:" + result, + ceph.NOLOG); + } + } ceph.debug("rename:exit with result: " + result, ceph.DEBUG); return result; } @@ -614,51 +724,70 @@ public class CephFileSystem extends FileSystem { * @throws IOException if initialize() hasn't been called. */ @Override - public BlockLocation[] getFileBlockLocations(FileStatus file, - long start, long len) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); - ceph.debug("getFileBlockLocations:enter with path " + file.getPath() + - ", start pos " + start + ", length " + len, ceph.DEBUG); - //sanitize and get the filehandle + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } + ceph.debug( + "getFileBlockLocations:enter with path " + file.getPath() + + ", start pos " + start + ", length " + len, + ceph.DEBUG); + // sanitize and get the filehandle Path abs_path = makeAbsolute(file.getPath()); - ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE); + + ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", + ceph.TRACE); int fh = ceph.ceph_open_for_read(abs_path.toString()); - ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh " - + fh, ceph.TRACE); + + ceph.debug( + "getFileBlockLocations:return from ceph_open_for_read to Java with fh " + + fh, + ceph.TRACE); if (fh < 0) { - ceph.debug("getFileBlockLocations:got error " + fh + - ", exiting and returning null!", ceph.ERROR); + ceph.debug( + "getFileBlockLocations:got error " + fh + + ", exiting and returning null!", + ceph.ERROR); return null; } - //get the block size - ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE); + // get the block size + ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", + ceph.TRACE); long blockSize = ceph.ceph_getblocksize(abs_path.toString()); - ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE); - BlockLocation[] locations = - new BlockLocation[(int)Math.ceil(len/(float)blockSize)]; - long offset; + + ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE); + BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)]; + long offset; + for (int i = 0; i < locations.length; ++i) { - offset = start + i*blockSize; - ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh " - + fh + " and offset " + offset, ceph.TRACE); + offset = start + i * blockSize; + ceph.debug( + "getFileBlockLocations:call ceph_hosts from Java on fh " + fh + + " and offset " + offset, + ceph.TRACE); String host = ceph.ceph_hosts(fh, offset); - ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host " - + host, ceph.TRACE); + + ceph.debug( + "getFileBlockLocations:return from ceph_hosts to Java with host " + + host, + ceph.TRACE); String[] hostArray = new String[1]; + hostArray[0] = host; locations[i] = new BlockLocation(hostArray, hostArray, - start+i*blockSize-(start % blockSize), - blockSize); + start + i * blockSize - (start % blockSize), blockSize); } - ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " - + fh, ceph.TRACE); + ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " + fh, + ceph.TRACE); ceph.ceph_close(fh); - ceph.debug("getFileBlockLocations:return with " + locations.length - + " locations", ceph.DEBUG); + ceph.debug( + "getFileBlockLocations:return with " + locations.length + " locations", + ceph.DEBUG); return locations; } - + /** * Get usage statistics on the Ceph filesystem. * @param path A path to the partition you're interested in. @@ -668,22 +797,29 @@ public class CephFileSystem extends FileSystem { * stat somehow fails. */ @Override - public FsStatus getStatus (Path path) throws IOException { - if (!initialized) throw new IOException("You have to initialize the " - + " CephFileSystem before calling other methods."); + public FsStatus getStatus(Path path) throws IOException { + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("getStatus:enter with path " + path, ceph.DEBUG); Path abs_path = makeAbsolute(path); - - //currently(Ceph .16) Ceph actually ignores the path - //but we still pass it in; if Ceph stops ignoring we may need more - //error-checking code. + + // currently(Ceph .16) Ceph actually ignores the path + // but we still pass it in; if Ceph stops ignoring we may need more + // error-checking code. CephStat ceph_stat = new CephStat(); - ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE); + + ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE); int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat); - if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result); + + if (result != 0) { + throw new IOException( + "Somehow failed to statfs the Ceph filesystem. Error code: " + result); + } ceph.debug("getStatus:exit successfully", ceph.DEBUG); - return new FsStatus(ceph_stat.capacity, - ceph_stat.used, ceph_stat.remaining); + return new FsStatus(ceph_stat.capacity, ceph_stat.used, ceph_stat.remaining); } /** @@ -698,58 +834,74 @@ public class CephFileSystem extends FileSystem { * or you attempt to delete the root directory. */ public boolean delete(Path path, boolean recursive) throws IOException { - if (!initialized) throw new IOException ("You have to initialize the " - +"CephFileSystem before calling other methods."); + if (!initialized) { + throw new IOException( + "You have to initialize the " + + "CephFileSystem before calling other methods."); + } ceph.debug("delete:enter with path " + path + " and recursive=" + recursive, - ceph.DEBUG); + ceph.DEBUG); Path abs_path = makeAbsolute(path); - + // sanity check - if (abs_path.equals(root)) + if (abs_path.equals(root)) { throw new IOException("Error: deleting the root directory is a Bad Idea."); - if (!exists(abs_path)) return false; + } + if (!exists(abs_path)) { + return false; + } // if the path is a file, try to delete it. if (isFile(abs_path)) { - ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path, - ceph.TRACE); + ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path, + ceph.TRACE); boolean result = ceph.ceph_unlink(abs_path.toString()); - if(!result) - ceph.debug("delete: failed to delete file \"" + - abs_path.toString() + "\".", ceph.ERROR); + + if (!result) { + ceph.debug( + "delete: failed to delete file \"" + abs_path.toString() + "\".", + ceph.ERROR); + } ceph.debug("delete:exit with success=" + result, ceph.DEBUG); return result; } /* The path is a directory, so recursively try to delete its contents, - and then delete the directory. */ - //get the entries; listPaths will remove . and .. for us + and then delete the directory. */ + // get the entries; listPaths will remove . and .. for us Path[] contents = listPaths(abs_path); + if (contents == null) { - ceph.debug("delete: Failed to read contents of directory \"" + - abs_path.toString() + - "\" while trying to delete it, BAILING", ceph.ERROR); + ceph.debug( + "delete: Failed to read contents of directory \"" + + abs_path.toString() + "\" while trying to delete it, BAILING", + ceph.ERROR); return false; } if (!recursive && contents.length > 0) { throw new IOException("Directories must be deleted recursively!"); } // delete the entries - ceph.debug("delete: recursively calling delete on contents of " - + abs_path, ceph.DEBUG); + ceph.debug("delete: recursively calling delete on contents of " + abs_path, + ceph.DEBUG); for (Path p : contents) { if (!delete(p, true)) { - ceph.debug("delete: Failed to delete file \"" + - p.toString() + "\" while recursively deleting \"" - + abs_path.toString() + "\", BAILING", ceph.ERROR ); - return false; + ceph.debug( + "delete: Failed to delete file \"" + p.toString() + + "\" while recursively deleting \"" + abs_path.toString() + + "\", BAILING", + ceph.ERROR); + return false; } } - //if we've come this far it's a now-empty directory, so delete it! + // if we've come this far it's a now-empty directory, so delete it! boolean result = ceph.ceph_rmdir(abs_path.toString()); - if (!result) - ceph.debug("delete: failed to delete \"" + abs_path.toString() - + "\", BAILING", ceph.ERROR); + + if (!result) { + ceph.debug( + "delete: failed to delete \"" + abs_path.toString() + "\", BAILING", + ceph.ERROR); + } ceph.debug("delete:exit", ceph.DEBUG); return result; } @@ -760,7 +912,7 @@ public class CephFileSystem extends FileSystem { * by a separate Ceph configuration. */ @Override - public short getDefaultReplication() { + public short getDefaultReplication() { return 1; } @@ -769,18 +921,22 @@ public class CephFileSystem extends FileSystem { * @return the default block size, in bytes, as a long. */ @Override - public long getDefaultBlockSize() { - return getConf().getInt("fs.ceph.blockSize", 1<<26); + public long getDefaultBlockSize() { + return getConf().getInt("fs.ceph.blockSize", 1 << 26); } // Makes a Path absolute. In a cheap, dirty hack, we're - // also going to strip off any fs_default_name prefix we see. + // also going to strip off any fs_default_name prefix we see. private Path makeAbsolute(Path path) { ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG); - if (path == null) return new Path("/"); + if (path == null) { + return new Path("/"); + } // first, check for the prefix - if (path.toString().startsWith(fs_default_name)) { - Path stripped_path = new Path(path.toString().substring(fs_default_name.length())); + if (path.toString().startsWith(fs_default_name)) { + Path stripped_path = new Path( + path.toString().substring(fs_default_name.length())); + ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG); return stripped_path; } @@ -790,10 +946,11 @@ public class CephFileSystem extends FileSystem { return path; } Path new_path = new Path(ceph.ceph_getcwd(), path); + ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG); return new_path; } - + private Path[] listPaths(Path path) throws IOException { ceph.debug("listPaths:enter with path " + path, ceph.NOLOG); String dirlist[]; @@ -808,25 +965,28 @@ public class CephFileSystem extends FileSystem { if (dirlist == null) { return null; } - + // convert the strings to Paths Path[] paths = new Path[dirlist.length]; + for (int i = 0; i < dirlist.length; ++i) { - ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + - dirlist[i] + "\"", ceph.TRACE); + ceph.debug( + "Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + + dirlist[i] + "\"", + ceph.TRACE); // convert each listing to an absolute path Path raw_path = new Path(dirlist[i]); - if (raw_path.isAbsolute()) - paths[i] = raw_path; - else - paths[i] = new Path(abs_path, raw_path); + + if (raw_path.isAbsolute()) { + paths[i] = raw_path; + } else { + paths[i] = new Path(abs_path, raw_path); + } } ceph.debug("listPaths:exit", ceph.NOLOG); return paths; } - - static class Stat { public long size; public boolean is_dir; @@ -835,9 +995,10 @@ public class CephFileSystem extends FileSystem { public long access_time; public int mode; - public Stat(){} + public Stat() {} } + static class CephStat { public long capacity; public long used; diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java index a6684576d2ff2..6f350efbc575c 100644 --- a/src/client/hadoop/ceph/CephInputStream.java +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * * Licensed under the Apache License, Version 2.0 @@ -19,6 +20,7 @@ */ package org.apache.hadoop.fs.ceph; + import java.io.IOException; import org.apache.hadoop.conf.Configuration; @@ -38,12 +40,12 @@ public class CephInputStream extends FSInputStream { private long fileLength; - private CephFS ceph; + private CephFS ceph; - private byte[] buffer; - private int bufPos = 0; - private int bufValid = 0; - private long cephPos = 0; + private byte[] buffer; + private int bufPos = 0; + private int bufValid = 0; + private long cephPos = 0; /** * Create a new CephInputStream. @@ -53,75 +55,84 @@ public class CephInputStream extends FSInputStream { * you will need to close and re-open it to access the new data. */ public CephInputStream(Configuration conf, CephFS cephfs, - int fh, long flength, int bufferSize) { + int fh, long flength, int bufferSize) { // Whoever's calling the constructor is responsible for doing the actual ceph_open // call and providing the file handle. fileLength = flength; fileHandle = fh; closed = false; - ceph = cephfs; - buffer = new byte[bufferSize]; - ceph.debug("CephInputStream constructor: initializing stream with fh " - + fh + " and file length " + flength, ceph.DEBUG); + ceph = cephfs; + buffer = new byte[bufferSize]; + ceph.debug( + "CephInputStream constructor: initializing stream with fh " + fh + + " and file length " + flength, + ceph.DEBUG); } + /** Ceph likes things to be closed before it shuts down, * so closing the IOStream stuff voluntarily in a finalizer is good */ - protected void finalize () throws Throwable { + protected void finalize() throws Throwable { try { - if (!closed) close(); + if (!closed) { + close(); + } + } finally { + super.finalize(); + } + } + + private synchronized boolean fillBuffer() throws IOException { + bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length); + bufPos = 0; + if (bufValid < 0) { + int err = bufValid; + + bufValid = 0; + // attempt to reset to old position. If it fails, too bad. + ceph.ceph_seek_from_start(fileHandle, cephPos); + throw new IOException("Failed to fill read buffer! Error code:" + err); } - finally { super.finalize(); } + cephPos += bufValid; + return (bufValid != 0); } - private synchronized boolean fillBuffer() throws IOException { - bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length); - bufPos = 0; - if (bufValid < 0) { - int err = bufValid; - bufValid = 0; - //attempt to reset to old position. If it fails, too bad. - ceph.ceph_seek_from_start(fileHandle, cephPos); - throw new IOException("Failed to fill read buffer! Error code:" - + err); - } - cephPos += bufValid; - return (bufValid != 0); - } - - /* - * Get the current position of the stream. - */ + /* + * Get the current position of the stream. + */ public synchronized long getPos() throws IOException { - return cephPos - bufValid + bufPos; + return cephPos - bufValid + bufPos; } /** * Find the number of bytes remaining in the file. */ @Override - public synchronized int available() throws IOException { - return (int) (fileLength - getPos()); - } + public synchronized int available() throws IOException { + return (int) (fileLength - getPos()); + } public synchronized void seek(long targetPos) throws IOException { - ceph.debug("CephInputStream.seek: Seeking to position " + targetPos + - " on fd " + fileHandle, ceph.TRACE); + ceph.debug( + "CephInputStream.seek: Seeking to position " + targetPos + " on fd " + + fileHandle, + ceph.TRACE); if (targetPos > fileLength) { - throw new IOException("CephInputStream.seek: failed seek to position " - + targetPos + " on fd " + fileHandle - + ": Cannot seek after EOF " + fileLength); + throw new IOException( + "CephInputStream.seek: failed seek to position " + targetPos + + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); + } + long oldPos = cephPos; + + cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); + bufValid = 0; + bufPos = 0; + if (cephPos < 0) { + cephPos = oldPos; + throw new IOException("Ceph failed to seek to new position!"); } - long oldPos = cephPos; - cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); - bufValid = 0; - bufPos = 0; - if (cephPos < 0) { - cephPos = oldPos; - throw new IOException ("Ceph failed to seek to new position!"); - } - } + } /** * Failovers are handled by the Ceph code at a very low level; @@ -133,22 +144,31 @@ public class CephInputStream extends FSInputStream { return false; } - /** * Read a byte from the file. * @return the next byte. */ @Override - public synchronized int read() throws IOException { - ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle - + " by calling general read function", ceph.TRACE); - - byte result[] = new byte[1]; - if (getPos() >= fileLength) return -1; - if (-1 == read(result, 0, 1)) return -1; - if (result[0]<0) return 256+(int)result[0]; - else return result[0]; + public synchronized int read() throws IOException { + ceph.debug( + "CephInputStream.read: Reading a single byte from fd " + fileHandle + + " by calling general read function", + ceph.TRACE); + + byte result[] = new byte[1]; + + if (getPos() >= fileLength) { + return -1; + } + if (-1 == read(result, 0, 1)) { + return -1; } + if (result[0] < 0) { + return 256 + (int) result[0]; + } else { + return result[0]; + } + } /** * Read a specified number of bytes from the file into a byte[]. @@ -156,80 +176,83 @@ public class CephInputStream extends FSInputStream { * @param off the offset to start at in the file * @param len the number of bytes to read * @return 0 if successful, otherwise an error code. - * @throws IOException on bad input. + * @throws IOException on bad input. */ @Override - public synchronized int read(byte buf[], int off, int len) - throws IOException { - ceph.debug("CephInputStream.read: Reading " + len + - " bytes from fd " + fileHandle, ceph.TRACE); + public synchronized int read(byte buf[], int off, int len) + throws IOException { + ceph.debug( + "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle, + ceph.TRACE); - if (closed) { - throw new IOException("CephInputStream.read: cannot read " + len + - " bytes from fd " + fileHandle + - ": stream closed"); - } + if (closed) { + throw new IOException( + "CephInputStream.read: cannot read " + len + " bytes from fd " + + fileHandle + ": stream closed"); + } - // ensure we're not past the end of the file - if (getPos() >= fileLength) { - ceph.debug("CephInputStream.read: cannot read " + len + - " bytes from fd " + fileHandle + ": current position is " - + getPos() + " and file length is " + fileLength, - ceph.DEBUG); + // ensure we're not past the end of the file + if (getPos() >= fileLength) { + ceph.debug( + "CephInputStream.read: cannot read " + len + " bytes from fd " + + fileHandle + ": current position is " + getPos() + + " and file length is " + fileLength, + ceph.DEBUG); - return -1; - } - - int totalRead = 0; - int initialLen = len; - int read; - do { - read = Math.min(len, bufValid - bufPos); - try { - System.arraycopy(buffer, bufPos, buf, off, read); - } - catch(IndexOutOfBoundsException ie) { - throw new IOException("CephInputStream.read: Indices out of bounds:" - + "read length is " + len - + ", buffer offset is " + off - + ", and buffer size is " + buf.length); - } - catch (ArrayStoreException ae) { - throw new IOException("Uh-oh, CephInputStream failed to do an array" - + "copy due to type mismatch..."); - } - catch (NullPointerException ne) { - throw new IOException("CephInputStream.read: cannot read " - + len + "bytes from fd:" + fileHandle - + ": buf is null"); - } - bufPos += read; - len -= read; - off += read; - totalRead += read; - } while (len > 0 && fillBuffer()); - - ceph.debug("CephInputStream.read: Reading " + initialLen - + " bytes from fd " + fileHandle - + ": succeeded in reading " + totalRead + " bytes", - ceph.TRACE); - return totalRead; - } + return -1; + } + + int totalRead = 0; + int initialLen = len; + int read; + + do { + read = Math.min(len, bufValid - bufPos); + try { + System.arraycopy(buffer, bufPos, buf, off, read); + } catch (IndexOutOfBoundsException ie) { + throw new IOException( + "CephInputStream.read: Indices out of bounds:" + "read length is " + + len + ", buffer offset is " + off + ", and buffer size is " + + buf.length); + } catch (ArrayStoreException ae) { + throw new IOException( + "Uh-oh, CephInputStream failed to do an array" + + "copy due to type mismatch..."); + } catch (NullPointerException ne) { + throw new IOException( + "CephInputStream.read: cannot read " + len + "bytes from fd:" + + fileHandle + ": buf is null"); + } + bufPos += read; + len -= read; + off += read; + totalRead += read; + } while (len > 0 && fillBuffer()); + + ceph.debug( + "CephInputStream.read: Reading " + initialLen + " bytes from fd " + + fileHandle + ": succeeded in reading " + totalRead + " bytes", + ceph.TRACE); + return totalRead; + } /** * Close the CephInputStream and release the associated filehandle. */ @Override - public void close() throws IOException { + public void close() throws IOException { ceph.debug("CephOutputStream.close:enter", ceph.TRACE); if (!closed) { - int result = ceph.ceph_close(fileHandle); - closed = true; - if (result != 0) { - throw new IOException("Close somehow failed!" - + "Don't try and use this stream again, though"); - } - ceph.debug("CephOutputStream.close:exit", ceph.TRACE); - } - } + int result = ceph.ceph_close(fileHandle); + + closed = true; + if (result != 0) { + throw new IOException( + "Close somehow failed!" + + "Don't try and use this stream again, though"); + } + ceph.debug("CephOutputStream.close:exit", ceph.TRACE); + } + } } diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java index 0ca1d9771710d..03ea3b9cc8b64 100644 --- a/src/client/hadoop/ceph/CephOutputStream.java +++ b/src/client/hadoop/ceph/CephOutputStream.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * * Licensed under the Apache License, Version 2.0 @@ -20,12 +21,14 @@ package org.apache.hadoop.fs.ceph; + import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Progressable; + /** *

* An {@link OutputStream} for a CephFileSystem and corresponding @@ -35,12 +38,12 @@ public class CephOutputStream extends OutputStream { private boolean closed; - private CephFS ceph; + private CephFS ceph; private int fileHandle; - private byte[] buffer; - private int bufUsed = 0; + private byte[] buffer; + private int bufUsed = 0; /** * Construct the CephOutputStream. @@ -48,21 +51,24 @@ public class CephOutputStream extends OutputStream { * @param fh The Ceph filehandle to connect to. */ public CephOutputStream(Configuration conf, CephFS cephfs, - int fh, int bufferSize) { - ceph = cephfs; + int fh, int bufferSize) { + ceph = cephfs; fileHandle = fh; closed = false; - buffer = new byte[bufferSize]; + buffer = new byte[bufferSize]; } - /**Ceph likes things to be closed before it shuts down, + /** Ceph likes things to be closed before it shuts down, *so closing the IOStream stuff voluntarily is good */ - protected void finalize () throws Throwable { + protected void finalize() throws Throwable { try { - if (!closed) close(); + if (!closed) { + close(); + } + } finally { + super.finalize(); } - finally { super.finalize();} } /** @@ -80,20 +86,23 @@ public class CephOutputStream extends OutputStream { * write fails. */ @Override - public synchronized void write(int b) throws IOException { - ceph.debug("CephOutputStream.write: writing a single byte to fd " - + fileHandle, ceph.TRACE); + public synchronized void write(int b) throws IOException { + ceph.debug( + "CephOutputStream.write: writing a single byte to fd " + fileHandle, + ceph.TRACE); - if (closed) { - throw new IOException("CephOutputStream.write: cannot write " + - "a byte to fd " + fileHandle + ": stream closed"); - } - // Stick the byte in a buffer and write it - byte buf[] = new byte[1]; - buf[0] = (byte) b; - write(buf, 0, 1); - return; + if (closed) { + throw new IOException( + "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle + + ": stream closed"); } + // Stick the byte in a buffer and write it + byte buf[] = new byte[1]; + + buf[0] = (byte) b; + write(buf, 0, 1); + return; + } /** * Write a byte buffer into the Ceph file. @@ -101,102 +110,110 @@ public class CephOutputStream extends OutputStream { * @param off the position in the file to start writing at. * @param len The number of bytes to actually write. * @throws IOException if you have closed the CephOutputStream, or - * if buf is null or off + len > buf.length, or - * if the write fails due to a Ceph error. + * if buf is null or off + len > buf.length, or + * if the write fails due to a Ceph error. */ @Override - public synchronized void write(byte buf[], int off, int len) throws IOException { - ceph.debug("CephOutputStream.write: writing " + len + - " bytes to fd " + fileHandle, ceph.TRACE); - // make sure stream is open - if (closed) { - throw new IOException("CephOutputStream.write: cannot write " + len + - "bytes to fd " + fileHandle + ": stream closed"); - } + public synchronized void write(byte buf[], int off, int len) throws IOException { + ceph.debug( + "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle, + ceph.TRACE); + // make sure stream is open + if (closed) { + throw new IOException( + "CephOutputStream.write: cannot write " + len + "bytes to fd " + + fileHandle + ": stream closed"); + } - int result; - int write; - while (len>0) { - write = Math.min(len, buffer.length - bufUsed); - try { - System.arraycopy(buf, off, buffer, bufUsed, write); - } - catch (IndexOutOfBoundsException ie) { - throw new IOException("CephOutputStream.write: Indices out of bounds: " - + "write length is " + len - + ", buffer offset is " + off - + ", and buffer size is " + buf.length); - } - catch (ArrayStoreException ae) { - throw new IOException("Uh-oh, CephOutputStream failed to do an array" - + " copy due to type mismatch..."); - } - catch (NullPointerException ne) { - throw new IOException("CephOutputStream.write: cannot write " - + len + "bytes to fd " + fileHandle - + ": buffer is null"); - } - bufUsed += write; - len -= write; - off += write; - if (bufUsed == buffer.length) { - result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); - if (result < 0) - throw new IOException("CephOutputStream.write: Buffered write of " - + bufUsed + " bytes failed!"); - if (result != bufUsed) - throw new IOException("CephOutputStream.write: Wrote only " - + result + " bytes of " + bufUsed - + " in buffer! Data may be lost or written" - + " twice to Ceph!"); - bufUsed = 0; - } - - } - return; - } + int result; + int write; + + while (len > 0) { + write = Math.min(len, buffer.length - bufUsed); + try { + System.arraycopy(buf, off, buffer, bufUsed, write); + } catch (IndexOutOfBoundsException ie) { + throw new IOException( + "CephOutputStream.write: Indices out of bounds: " + + "write length is " + len + ", buffer offset is " + off + + ", and buffer size is " + buf.length); + } catch (ArrayStoreException ae) { + throw new IOException( + "Uh-oh, CephOutputStream failed to do an array" + + " copy due to type mismatch..."); + } catch (NullPointerException ne) { + throw new IOException( + "CephOutputStream.write: cannot write " + len + "bytes to fd " + + fileHandle + ": buffer is null"); + } + bufUsed += write; + len -= write; + off += write; + if (bufUsed == buffer.length) { + result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); + if (result < 0) { + throw new IOException( + "CephOutputStream.write: Buffered write of " + bufUsed + + " bytes failed!"); + } + if (result != bufUsed) { + throw new IOException( + "CephOutputStream.write: Wrote only " + result + " bytes of " + + bufUsed + " in buffer! Data may be lost or written" + + " twice to Ceph!"); + } + bufUsed = 0; + } + + } + return; + } /** * Flush the buffered data. * @throws IOException if you've closed the stream or the write fails. */ @Override - public synchronized void flush() throws IOException { - if (!closed) { - if (bufUsed == 0) return; - int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); - if (result < 0) { - throw new IOException("CephOutputStream.write: Write of " - + bufUsed + "bytes to fd " - + fileHandle + " failed"); - } - if (result != bufUsed) { - throw new IOException("CephOutputStream.write: Write of " + bufUsed - + "bytes to fd " + fileHandle - + "was incomplete: only " + result + " of " - + bufUsed + " bytes were written."); - } - bufUsed = 0; - return; - } - } + public synchronized void flush() throws IOException { + if (!closed) { + if (bufUsed == 0) { + return; + } + int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); + + if (result < 0) { + throw new IOException( + "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " + + fileHandle + " failed"); + } + if (result != bufUsed) { + throw new IOException( + "CephOutputStream.write: Write of " + bufUsed + "bytes to fd " + + fileHandle + "was incomplete: only " + result + " of " + bufUsed + + " bytes were written."); + } + bufUsed = 0; + return; + } + } /** * Close the CephOutputStream. * @throws IOException if Ceph somehow returns an error. In current code it can't. */ @Override - public synchronized void close() throws IOException { - ceph.debug("CephOutputStream.close:enter", ceph.TRACE); - if (!closed) { - flush(); - int result = ceph.ceph_close(fileHandle); - if (result != 0) { - throw new IOException("Close failed!"); - } + public synchronized void close() throws IOException { + ceph.debug("CephOutputStream.close:enter", ceph.TRACE); + if (!closed) { + flush(); + int result = ceph.ceph_close(fileHandle); + + if (result != 0) { + throw new IOException("Close failed!"); + } - closed = true; - ceph.debug("CephOutputStream.close:exit", ceph.TRACE); - } - } + closed = true; + ceph.debug("CephOutputStream.close:exit", ceph.TRACE); + } + } } diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java index 20b494b7342ed..9e416a0e231c0 100644 --- a/src/client/hadoop/ceph/CephTalker.java +++ b/src/client/hadoop/ceph/CephTalker.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * * Licensed under the Apache License, Version 2.0 @@ -19,41 +20,70 @@ */ package org.apache.hadoop.fs.ceph; + import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; + class CephTalker extends CephFS { - //we write a constructor so we can load the libraries - public CephTalker(Configuration conf, Log log) { - super(conf, log); - System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); - System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); - } - protected native boolean ceph_initializeClient(String arguments, int block_size); - protected native String ceph_getcwd(); - protected native boolean ceph_setcwd(String path); + // we write a constructor so we can load the libraries + public CephTalker(Configuration conf, Log log) { + super(conf, log); + System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so"); + System.load(conf.get("fs.ceph.libDir") + "/libceph.so"); + } + + protected native boolean ceph_initializeClient(String arguments, int block_size); + + protected native String ceph_getcwd(); + + protected native boolean ceph_setcwd(String path); + protected native boolean ceph_rmdir(String path); + protected native boolean ceph_unlink(String path); + protected native boolean ceph_rename(String old_path, String new_path); + protected native boolean ceph_exists(String path); + protected native long ceph_getblocksize(String path); + protected native boolean ceph_isdirectory(String path); + protected native boolean ceph_isfile(String path); + protected native String[] ceph_getdir(String path); + protected native int ceph_mkdirs(String path, int mode); + protected native int ceph_open_for_append(String path); + protected native int ceph_open_for_read(String path); + protected native int ceph_open_for_overwrite(String path, int mode); + protected native int ceph_close(int filehandle); + protected native boolean ceph_setPermission(String path, int mode); + protected native boolean ceph_kill_client(); + protected native boolean ceph_stat(String path, CephFileSystem.Stat fill); + protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill); + protected native int ceph_replication(String path); + protected native String ceph_hosts(int fh, long offset); + protected native int ceph_setTimes(String path, long mtime, long atime); + protected native long ceph_getpos(int fh); + protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); - protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); - protected native long ceph_seek_from_start(int fh, long pos); + + protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length); + + protected native long ceph_seek_from_start(int fh, long pos); } diff --git a/src/client/hadoop/ceph/TestCeph.java b/src/client/hadoop/ceph/TestCeph.java index eaafab0ba1968..7f872c888b2e1 100644 --- a/src/client/hadoop/ceph/TestCeph.java +++ b/src/client/hadoop/ceph/TestCeph.java @@ -1,4 +1,5 @@ // -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -21,6 +22,7 @@ package org.apache.hadoop.fs.ceph; + import java.io.IOException; import java.net.URI; import org.apache.hadoop.fs.FileSystemContractBaseTest; @@ -28,15 +30,17 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + public class TestCeph extends FileSystemContractBaseTest { - @Override - protected void setUp() throws IOException { + @Override + protected void setUp() throws IOException { Configuration conf = new Configuration(); CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG); CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null"); + cephfs.initialize(URI.create("ceph://null"), conf); - fs = cephfs; - cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory())); - } + fs = cephfs; + cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory())); + } } -- 2.39.5