// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
*/
package org.apache.hadoop.fs.ceph;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
+
abstract class CephFS {
- protected static final int FATAL = 0;
- protected static final int ERROR = 1;
- protected static final int WARN = 2;
- protected static final int INFO = 3;
- protected static final int DEBUG = 4;
- protected static final int TRACE = 5;
- protected static final int NOLOG = 6;
+ protected static final int FATAL = 0;
+ protected static final int ERROR = 1;
+ protected static final int WARN = 2;
+ protected static final int INFO = 3;
+ protected static final int DEBUG = 4;
+ protected static final int TRACE = 5;
+ protected static final int NOLOG = 6;
- protected static final int ENOTDIR = 20;
+ protected static final int ENOTDIR = 20;
protected static final int EEXIST = 17;
protected static final int ENOENT = 2;
- private boolean debug = false;
- private Log LOG;
-
- public CephFS(Configuration conf, Log log) {
- debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
- LOG = log;
- }
-
- /*
- * Performs any necessary setup to allow general use of the filesystem.
- * Inputs:
- * String argsuments -- a command-line style input of Ceph config params
- * int block_size -- the size in bytes to use for blocks
- * Returns: true on success, false otherwise
- */
- abstract protected boolean ceph_initializeClient(String arguments, int block_size);
+ private boolean debug = false;
+ private Log LOG;
+
+ public CephFS(Configuration conf, Log log) {
+ debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
+ LOG = log;
+ }
+
+ /*
+ * Performs any necessary setup to allow general use of the filesystem.
+ * Inputs:
+ * String argsuments -- a command-line style input of Ceph config params
+ * int block_size -- the size in bytes to use for blocks
+ * Returns: true on success, false otherwise
+ */
+ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
- /*
- * Returns the current working directory (absolute) as a String
- */
- abstract protected String ceph_getcwd();
- /*
- * Changes the working directory.
- * Inputs:
- * String path: The path (relative or absolute) to switch to
- * Returns: true on success, false otherwise.
- */
- abstract protected boolean ceph_setcwd(String path);
- /*
- * Given a path to a directory, removes the directory if empty.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the directory
- * Returns: true on successful delete; false otherwise
- */
+ /*
+ * Returns the current working directory (absolute) as a String
+ */
+ abstract protected String ceph_getcwd();
+
+ /*
+ * Changes the working directory.
+ * Inputs:
+ * String path: The path (relative or absolute) to switch to
+ * Returns: true on success, false otherwise.
+ */
+ abstract protected boolean ceph_setcwd(String path);
+
+ /*
+ * Given a path to a directory, removes the directory if empty.
+ * Inputs:
+ * jstring j_path: The path (relative or absolute) to the directory
+ * Returns: true on successful delete; false otherwise
+ */
abstract protected boolean ceph_rmdir(String path);
- /*
- * Given a path, unlinks it.
- * Inputs:
- * String path: The path (relative or absolute) to the file or empty dir
- * Returns: true if the unlink occurred, false otherwise.
- */
+
+ /*
+ * Given a path, unlinks it.
+ * Inputs:
+ * String path: The path (relative or absolute) to the file or empty dir
+ * Returns: true if the unlink occurred, false otherwise.
+ */
abstract protected boolean ceph_unlink(String path);
- /*
- * Changes a given path name to a new name, assuming new_path doesn't exist.
- * Inputs:
- * jstring j_from: The path whose name you want to change.
- * jstring j_to: The new name for the path.
- * Returns: true if the rename occurred, false otherwise
- */
+
+ /*
+ * Changes a given path name to a new name, assuming new_path doesn't exist.
+ * Inputs:
+ * jstring j_from: The path whose name you want to change.
+ * jstring j_to: The new name for the path.
+ * Returns: true if the rename occurred, false otherwise
+ */
abstract protected boolean ceph_rename(String old_path, String new_path);
- /*
- * Returns true if it the input path exists, false
- * if it does not or there is an unexpected failure.
- */
+
+ /*
+ * Returns true if it the input path exists, false
+ * if it does not or there is an unexpected failure.
+ */
abstract protected boolean ceph_exists(String path);
- /*
- * Get the block size for a given path.
- * Input:
- * String path: The path (relative or absolute) you want
- * the block size for.
- * Returns: block size if the path exists, otherwise a negative number
- * corresponding to the standard C++ error codes (which are positive).
- */
+
+ /*
+ * Get the block size for a given path.
+ * Input:
+ * String path: The path (relative or absolute) you want
+ * the block size for.
+ * Returns: block size if the path exists, otherwise a negative number
+ * corresponding to the standard C++ error codes (which are positive).
+ */
abstract protected long ceph_getblocksize(String path);
- /*
- * Returns true if the given path is a directory, false otherwise.
- */
+
+ /*
+ * Returns true if the given path is a directory, false otherwise.
+ */
abstract protected boolean ceph_isdirectory(String path);
- /*
- * Returns true if the given path is a file; false otherwise.
- */
+
+ /*
+ * Returns true if the given path is a file; false otherwise.
+ */
abstract protected boolean ceph_isfile(String path);
- /*
- * Get the contents of a given directory.
- * Inputs:
- * String path: The path (relative or absolute) to the directory.
- * Returns: A Java String[] of the contents of the directory, or
- * NULL if there is an error (ie, path is not a dir). This listing
- * will not contain . or .. entries.
- */
+
+ /*
+ * Get the contents of a given directory.
+ * Inputs:
+ * String path: The path (relative or absolute) to the directory.
+ * Returns: A Java String[] of the contents of the directory, or
+ * NULL if there is an error (ie, path is not a dir). This listing
+ * will not contain . or .. entries.
+ */
abstract protected String[] ceph_getdir(String path);
- /*
- * Create the specified directory and any required intermediate ones with the
- * given mode.
- */
+
+ /*
+ * Create the specified directory and any required intermediate ones with the
+ * given mode.
+ */
abstract protected int ceph_mkdirs(String path, int mode);
- /*
- * Open a file to append. If the file does not exist, it will be created.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
+
+ /*
+ * Open a file to append. If the file does not exist, it will be created.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
abstract protected int ceph_open_for_append(String path);
- /*
- * Open a file for reading.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
+
+ /*
+ * Open a file for reading.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
abstract protected int ceph_open_for_read(String path);
- /*
- * Opens a file for overwriting; creates it if necessary.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * int mode: The mode to open with.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
+
+ /*
+ * Opens a file for overwriting; creates it if necessary.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * int mode: The mode to open with.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
abstract protected int ceph_open_for_overwrite(String path, int mode);
- /*
- * Closes the given file. Returns 0 on success, or a negative
- * error code otherwise.
- */
+
+ /*
+ * Closes the given file. Returns 0 on success, or a negative
+ * error code otherwise.
+ */
abstract protected int ceph_close(int filehandle);
- /*
- * Change the mode on a path.
- * Inputs:
- * String path: The path to change mode on.
- * int mode: The mode to apply.
- * Returns: true if the mode is properly applied, false if there
- * is any error.
- */
+
+ /*
+ * Change the mode on a path.
+ * Inputs:
+ * String path: The path to change mode on.
+ * int mode: The mode to apply.
+ * Returns: true if the mode is properly applied, false if there
+ * is any error.
+ */
abstract protected boolean ceph_setPermission(String path, int mode);
- /*
- * Closes the Ceph client. This should be called before shutting down
- * (multiple times is okay but redundant).
- */
+
+ /*
+ * Closes the Ceph client. This should be called before shutting down
+ * (multiple times is okay but redundant).
+ */
abstract protected boolean ceph_kill_client();
- /*
- * Get the statistics on a path returned in a custom format defined
- * in CephFileSystem.
- * Inputs:
- * String path: The path to stat.
- * Stat fill: The stat object to fill.
- * Returns: true if the stat is successful, false otherwise.
- */
+
+ /*
+ * Get the statistics on a path returned in a custom format defined
+ * in CephFileSystem.
+ * Inputs:
+ * String path: The path to stat.
+ * Stat fill: The stat object to fill.
+ * Returns: true if the stat is successful, false otherwise.
+ */
abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
- /*
- * Statfs a filesystem in a custom format defined in CephFileSystem.
- * Inputs:
- * String path: A path on the filesystem that you wish to stat.
- * CephStat fill: The CephStat object to fill.
- * Returns: 0 if successful and the CephStat is filled; a negative
- * error code otherwise.
- */
+
+ /*
+ * Statfs a filesystem in a custom format defined in CephFileSystem.
+ * Inputs:
+ * String path: A path on the filesystem that you wish to stat.
+ * CephStat fill: The CephStat object to fill.
+ * Returns: 0 if successful and the CephStat is filled; a negative
+ * error code otherwise.
+ */
abstract protected int ceph_statfs(String path, CephFileSystem.CephStat fill);
- /*
- * Check how many times a path should be replicated (if it is
- * degraded it may not actually be replicated this often).
- * Inputs:
- * String path: The path to check.
- * Returns: an int containing the number of times replicated.
- */
+
+ /*
+ * Check how many times a path should be replicated (if it is
+ * degraded it may not actually be replicated this often).
+ * Inputs:
+ * String path: The path to check.
+ * Returns: an int containing the number of times replicated.
+ */
abstract protected int ceph_replication(String path);
- /*
- * Find the IP address of the primary OSD for a given file and offset.
- * Inputs:
- * int fh: The filehandle for the file.
- * long offset: The offset to get the location of.
- * Returns: a String of the location as IP, or NULL if there is an error.
- */
+
+ /*
+ * Find the IP address of the primary OSD for a given file and offset.
+ * Inputs:
+ * int fh: The filehandle for the file.
+ * long offset: The offset to get the location of.
+ * Returns: a String of the location as IP, or NULL if there is an error.
+ */
abstract protected String ceph_hosts(int fh, long offset);
- /*
- * Set the mtime and atime for a given path.
- * Inputs:
- * String path: The path to set the times for.
- * long mtime: The mtime to set, in millis since epoch (-1 to not set).
- * long atime: The atime to set, in millis since epoch (-1 to not set)
- * Returns: 0 if successful, an error code otherwise.
- */
+
+ /*
+ * Set the mtime and atime for a given path.
+ * Inputs:
+ * String path: The path to set the times for.
+ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
+ * long atime: The atime to set, in millis since epoch (-1 to not set)
+ * Returns: 0 if successful, an error code otherwise.
+ */
abstract protected int ceph_setTimes(String path, long mtime, long atime);
- /*
- * Get the current position in a file (as a long) of a given filehandle.
- * Returns: (long) current file position on success, or a
- * negative error code on failure.
- */
+
+ /*
+ * Get the current position in a file (as a long) of a given filehandle.
+ * Returns: (long) current file position on success, or a
+ * negative error code on failure.
+ */
abstract protected long ceph_getpos(int fh);
- /*
- * Write the given buffer contents to the given filehandle.
- * Inputs:
- * int fh: The filehandle to write to.
- * byte[] buffer: The buffer to write from
- * int buffer_offset: The position in the buffer to write from
- * int length: The number of (sequential) bytes to write.
- * Returns: int, on success the number of bytes written, on failure
- * a negative error code.
- */
+
+ /*
+ * Write the given buffer contents to the given filehandle.
+ * Inputs:
+ * int fh: The filehandle to write to.
+ * byte[] buffer: The buffer to write from
+ * int buffer_offset: The position in the buffer to write from
+ * int length: The number of (sequential) bytes to write.
+ * Returns: int, on success the number of bytes written, on failure
+ * a negative error code.
+ */
abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
- /*
- * Reads into the given byte array from the current position.
- * Inputs:
- * int fh: the filehandle to read from
- * byte[] buffer: the byte array to read into
- * int buffer_offset: where in the buffer to start writing
- * int length: how much to read.
- * There'd better be enough space in the buffer to write all
- * the data from the given offset!
- * Returns: the number of bytes read on success (as an int),
- * or an error code otherwise. */
+ /*
+ * Reads into the given byte array from the current position.
+ * Inputs:
+ * int fh: the filehandle to read from
+ * byte[] buffer: the byte array to read into
+ * int buffer_offset: where in the buffer to start writing
+ * int length: how much to read.
+ * There'd better be enough space in the buffer to write all
+ * the data from the given offset!
+ * Returns: the number of bytes read on success (as an int),
+ * or an error code otherwise. */
abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
- /*
- * Seeks to the given position in the given file.
- * Inputs:
- * int fh: The filehandle to seek in.
- * long pos: The position to seek to.
- * Returns: the new position (as a long) of the filehandle on success,
- * or a negative error code on failure. */
+
+ /*
+ * Seeks to the given position in the given file.
+ * Inputs:
+ * int fh: The filehandle to seek in.
+ * long pos: The position to seek to.
+ * Returns: the new position (as a long) of the filehandle on success,
+ * or a negative error code on failure. */
abstract protected long ceph_seek_from_start(int fh, long pos);
- protected void debug(String statement, int priority) {
- if (debug) System.err.println(statement);
- switch(priority) {
- case FATAL: LOG.fatal(statement);
- break;
- case ERROR: LOG.error(statement);
- break;
- case WARN: LOG.warn(statement);
- break;
- case INFO: LOG.info(statement);
- break;
- case DEBUG: LOG.debug(statement);
- break;
- case TRACE: LOG.trace(statement);
- break;
- case NOLOG: break;
- default: break;
- }
+ protected void debug(String statement, int priority) {
+ if (debug) {
+ System.err.println(statement);
+ }
+ switch (priority) {
+ case FATAL:
+ LOG.fatal(statement);
+ break;
+
+ case ERROR:
+ LOG.error(statement);
+ break;
+
+ case WARN:
+ LOG.warn(statement);
+ break;
+
+ case INFO:
+ LOG.info(statement);
+ break;
+
+ case DEBUG:
+ LOG.debug(statement);
+ break;
+
+ case TRACE:
+ LOG.trace(statement);
+ break;
+
+ case NOLOG:
+ break;
+
+ default:
+ break;
+ }
}
}
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
package org.apache.hadoop.fs.ceph;
+
import java.net.URI;
import java.util.Hashtable;
import java.io.Closeable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+
class CephFaker extends CephFS {
- FileSystem localFS;
- String localPrefix;
- int blockSize;
- Configuration conf;
- Hashtable<Integer, Object> files;
- Hashtable<Integer, String> filenames;
- int fileCount = 0;
- boolean initialized = false;
+ FileSystem localFS;
+ String localPrefix;
+ int blockSize;
+ Configuration conf;
+ Hashtable<Integer, Object> files;
+ Hashtable<Integer, String> filenames;
+ int fileCount = 0;
+ boolean initialized = false;
- public CephFaker(Configuration con, Log log) {
- super(con, log);
- conf = con;
- files = new Hashtable<Integer, Object>();
- filenames = new Hashtable<Integer, String>();
- }
+ public CephFaker(Configuration con, Log log) {
+ super(con, log);
+ conf = con;
+ files = new Hashtable<Integer, Object>();
+ filenames = new Hashtable<Integer, String>();
+ }
- protected boolean ceph_initializeClient(String args, int block_size) {
- if (!initialized) {
- //let's remember the default block_size
- blockSize = block_size;
- /* for a real Ceph deployment, this starts up the client,
- * sets debugging levels, etc. We just need to get the
- * local FileSystem to use, and we'll ignore any
- * command-line arguments. */
- try {
- localFS = FileSystem.getLocal(conf);
- localFS.initialize(URI.create("file://localhost"), conf);
- localFS.setVerifyChecksum(false);
- String testDir = conf.get("hadoop.tmp.dir");
- localPrefix = localFS.getWorkingDirectory().toString();
- int testDirLoc = localPrefix.indexOf(testDir) - 1;
- if (-2 == testDirLoc)
- testDirLoc = localPrefix.length();
- localPrefix = localPrefix.substring(0, testDirLoc)
- + "/" + conf.get("hadoop.tmp.dir");
-
- localFS.setWorkingDirectory(new Path(localPrefix
- +"/user/"
- + System.getProperty("user.name")));
- //I don't know why, but the unit tests expect the default
- //working dir to be /user/username, so satisfy them!
- //debug("localPrefix is " + localPrefix, INFO);
- }
- catch (IOException e) {
- return false;
- }
- initialized = true;
- }
- return true;
- }
-
- protected String ceph_getcwd() {
- return sanitize_path(localFS.getWorkingDirectory().toString());
- }
-
- protected boolean ceph_setcwd(String path) {
- localFS.setWorkingDirectory(new Path(prepare_path(path)));
- return true;
- }
-
- //the caller is responsible for ensuring empty dirs
- protected boolean ceph_rmdir(String pth) {
- Path path = new Path(prepare_path(pth));
- boolean ret = false;
- try {
- if (localFS.listStatus(path).length <= 1) {
- ret = localFS.delete(path, true);
- }
- }
- catch (IOException e){ }
- return ret;
- }
-
- //this needs to work on (empty) directories too
- protected boolean ceph_unlink(String path) {
- path = prepare_path(path);
- boolean ret = false;
- if (ceph_isdirectory(path)) {
- ret = ceph_rmdir(path);
- }
- else {
- try {
- ret = localFS.delete(new Path(path), false);
- }
- catch (IOException e){ }
- }
- return ret;
- }
-
- protected boolean ceph_rename(String oldName, String newName) {
- oldName = prepare_path(oldName);
- newName = prepare_path(newName);
- try {
- Path parent = new Path(newName).getParent();
- Path newPath = new Path(newName);
- if (localFS.exists(parent) && !localFS.exists(newPath))
- return localFS.rename(new Path(oldName), newPath);
- return false;
- }
- catch (IOException e) { return false; }
- }
-
- protected boolean ceph_exists(String path) {
- path = prepare_path(path);
- boolean ret = false;
- try {
- ret = localFS.exists(new Path(path));
- }
- catch (IOException e){ }
- return ret;
- }
-
- protected long ceph_getblocksize(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
- return status.getBlockSize();
- }
- catch (FileNotFoundException e) {
- return -CephFS.ENOENT;
- }
- catch (IOException e) {
- return -1; //just fail generically
- }
- }
-
- protected boolean ceph_isdirectory(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
- return status.isDir();
- }
- catch (IOException e) { return false; }
- }
-
- protected boolean ceph_isfile(String path) {
- path = prepare_path(path);
- boolean ret = false;
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
- ret = !status.isDir();
- }
- catch (Exception e) {}
- return ret;
- }
-
- protected String[] ceph_getdir(String path) {
- path = prepare_path(path);
- if (!ceph_isdirectory(path)) {
- return null;
- }
- try {
- FileStatus[] stats = localFS.listStatus(new Path(path));
- String[] names = new String[stats.length];
- String name;
- for (int i=0; i<stats.length; ++i) {
- name = stats[i].getPath().toString();
- names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR)+1);
- }
- return names;
- }
- catch (IOException e) {}
- return null;
- }
-
- protected int ceph_mkdirs(String path, int mode) {
- path = prepare_path(path);
- //debug("ceph_mkdirs on " + path, INFO);
- try {
- if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
- return 0;
- }
- catch (FileAlreadyExistsException fe) { return ENOTDIR; }
- catch (IOException e) {}
- if (ceph_isdirectory(path))
- return -EEXIST; //apparently it already existed
- return -1;
- }
-
- /*
- * Unlike a real Ceph deployment, you can't do opens on a directory.
- * Since that has unpredictable behavior and you shouldn't do it anyway,
- * it's okay.
- */
- protected int ceph_open_for_append(String path) {
- path = prepare_path(path);
- FSDataOutputStream stream;
- try {
- stream = localFS.append(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- return fileCount++;
- }
- catch (IOException e) { }
- return -1; // failure
- }
-
- protected int ceph_open_for_read(String path) {
- path = prepare_path(path);
- FSDataInputStream stream;
- try {
- stream = localFS.open(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- debug("ceph_open_for_read fh:" + fileCount
- + ", pathname:" + path, INFO);
- return fileCount++;
- }
- catch (IOException e) { }
- return -1; //failure
- }
-
- protected int ceph_open_for_overwrite(String path, int mode) {
- path = prepare_path(path);
- FSDataOutputStream stream;
- try {
- stream = localFS.create(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- debug("ceph_open_for_overwrite fh:" + fileCount
- + ", pathname:" + path, INFO);
- return fileCount++;
- }
- catch (IOException e) { }
- return -1; //failure
- }
-
- protected int ceph_close(int filehandle) {
- debug("ceph_close(filehandle " + filehandle + ")", INFO);
- try {
- ((Closeable)files.get(new Integer(filehandle))).close();
- if(null == files.get(new Integer(filehandle))) {
- return -ENOENT; //this isn't quite the right error code,
- // but the important part is it's negative
- }
- return 0; //hurray, success
- }
- catch (NullPointerException ne) {
- debug("ceph_close caught NullPointerException!" + ne, WARN);
- } //err, how?
- catch (IOException ie) {
- debug("ceph_close caught IOException!" + ie, WARN);
- }
- return -1; //failure
- }
-
- protected boolean ceph_setPermission(String pth, int mode) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
- try {
- localFS.setPermission(path, new FsPermission((short)mode));
- ret = true;
- }
- catch (IOException e) { }
- return ret;
- }
-
- //rather than try and match a Ceph deployment's behavior exactly,
- //just make bad things happen if they try and call methods after this
- protected boolean ceph_kill_client() {
- //debug("ceph_kill_client", INFO);
- localFS.setWorkingDirectory(new Path(localPrefix));
- //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
- try{
- localFS.close(); }
- catch (Exception e) {}
- localFS = null;
- files = null;
- filenames = null;
- return true;
- }
-
- protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
- try {
- FileStatus status = localFS.getFileStatus(path);
- fill.size = status.getLen();
- fill.is_dir = status.isDir();
- fill.block_size = status.getBlockSize();
- fill.mod_time = status.getModificationTime();
- fill.access_time = status.getAccessTime();
- fill.mode = status.getPermission().toShort();
- ret = true;
- }
- catch (IOException e) {}
- return ret;
- }
-
- protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
- pth = prepare_path(pth);
- try {
- FsStatus stat = localFS.getStatus();
- fill.capacity = stat.getCapacity();
- fill.used = stat.getUsed();
- fill.remaining = stat.getRemaining();
- return 0;
- }
- catch (Exception e){}
- return -1; //failure;
- }
-
- protected int ceph_replication(String path) {
- path = prepare_path(path);
- int ret = -1; //-1 for failure
- try {
- ret = localFS.getFileStatus(new Path(path)).getReplication();
- }
- catch (IOException e) {}
- return ret;
- }
-
- protected String ceph_hosts(int fh, long offset) {
- String ret = null;
- try {
- BlockLocation[] locs = localFS.getFileBlockLocations(
- localFS.getFileStatus(new Path(
- filenames.get(new Integer(fh)))), offset, 1);
- ret = locs[0].getNames()[0];
- }
- catch (IOException e) {}
- catch (NullPointerException f) { }
- return ret;
- }
-
- protected int ceph_setTimes(String pth, long mtime, long atime) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- int ret = -1; //generic fail
- try {
- localFS.setTimes(path, mtime, atime);
- ret = 0;
- }
- catch (IOException e) {}
- return ret;
- }
-
- protected long ceph_getpos(int fh) {
- long ret = -1; //generic fail
- try {
- Object stream = files.get(new Integer(fh));
- if (stream instanceof FSDataInputStream) {
- ret = ((FSDataInputStream)stream).getPos();
- }
- else if (stream instanceof FSDataOutputStream) {
- ret = ((FSDataOutputStream)stream).getPos();
- }
- }
- catch (IOException e) {}
- catch (NullPointerException f) { }
- return ret;
- }
-
- protected int ceph_write(int fh, byte[] buffer,
- int buffer_offset, int length) {
- debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
- + ", length:" + length, INFO);
- long ret = -1;//generic fail
- try {
- FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
- debug("ceph_write got outputstream", INFO);
- long startPos = os.getPos();
- os.write(buffer, buffer_offset, length);
- ret = os.getPos() - startPos;
- }
- catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
- catch (NullPointerException f) {
- debug("ceph_write caught NullPointerException!", WARN); }
- return (int)ret;
- }
-
- protected int ceph_read(int fh, byte[] buffer,
- int buffer_offset, int length) {
- long ret = -1;//generic fail
- try {
- FSDataInputStream is = (FSDataInputStream)files.get(new Integer(fh));
- long startPos = is.getPos();
- is.read(buffer, buffer_offset, length);
- ret = is.getPos() - startPos;
- }
- catch (IOException e) {}
- catch (NullPointerException f) {}
- return (int)ret;
- }
-
- protected long ceph_seek_from_start(int fh, long pos) {
- debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
- long ret = -1;//generic fail
- try {
- debug("ceph_seek_from_start filename is "
- + filenames.get(new Integer(fh)), INFO);
- if (null == files.get(new Integer(fh))) {
- debug("ceph_seek_from_start: is is null!", WARN);
- }
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
- debug("ceph_seek_from_start retrieved is!", INFO);
- is.seek(pos);
- ret = is.getPos();
- }
- catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
- catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
- return (int)ret;
- }
-
- /*
- * We need to remove the localFS file prefix before returning to Ceph
- */
- private String sanitize_path(String path) {
- //debug("sanitize_path(" + path + ")", INFO);
- /* if (path.startsWith("file:"))
- path = path.substring("file:".length()); */
- if (path.startsWith(localPrefix)) {
- path = path.substring(localPrefix.length());
- if (path.length() == 0) //it was a root path
- path = "/";
- }
- //debug("sanitize_path returning " + path, INFO);
- return path;
- }
-
- /*
- * If it's an absolute path we need to shove the
- * test dir onto the front as a prefix.
- */
- private String prepare_path(String path) {
- //debug("prepare_path(" + path + ")", INFO);
- if (path.startsWith("/"))
- path = localPrefix + path;
- else if (path.equals("..")) {
- if (ceph_getcwd().equals("/"))
- path = "."; // you can't go up past root!
- }
- //debug("prepare_path returning" + path, INFO);
- return path;
- }
+ protected boolean ceph_initializeClient(String args, int block_size) {
+ if (!initialized) {
+ // let's remember the default block_size
+ blockSize = block_size;
+
+ /* for a real Ceph deployment, this starts up the client,
+ * sets debugging levels, etc. We just need to get the
+ * local FileSystem to use, and we'll ignore any
+ * command-line arguments. */
+ try {
+ localFS = FileSystem.getLocal(conf);
+ localFS.initialize(URI.create("file://localhost"), conf);
+ localFS.setVerifyChecksum(false);
+ String testDir = conf.get("hadoop.tmp.dir");
+
+ localPrefix = localFS.getWorkingDirectory().toString();
+ int testDirLoc = localPrefix.indexOf(testDir) - 1;
+
+ if (-2 == testDirLoc) {
+ testDirLoc = localPrefix.length();
+ }
+ localPrefix = localPrefix.substring(0, testDirLoc) + "/"
+ + conf.get("hadoop.tmp.dir");
+
+ localFS.setWorkingDirectory(
+ new Path(localPrefix + "/user/" + System.getProperty("user.name")));
+ // I don't know why, but the unit tests expect the default
+ // working dir to be /user/username, so satisfy them!
+ // debug("localPrefix is " + localPrefix, INFO);
+ } catch (IOException e) {
+ return false;
+ }
+ initialized = true;
+ }
+ return true;
+ }
+
+ protected String ceph_getcwd() {
+ return sanitize_path(localFS.getWorkingDirectory().toString());
+ }
+
+ protected boolean ceph_setcwd(String path) {
+ localFS.setWorkingDirectory(new Path(prepare_path(path)));
+ return true;
+ }
+
+ // the caller is responsible for ensuring empty dirs
+ protected boolean ceph_rmdir(String pth) {
+ Path path = new Path(prepare_path(pth));
+ boolean ret = false;
+
+ try {
+ if (localFS.listStatus(path).length <= 1) {
+ ret = localFS.delete(path, true);
+ }
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ // this needs to work on (empty) directories too
+ protected boolean ceph_unlink(String path) {
+ path = prepare_path(path);
+ boolean ret = false;
+
+ if (ceph_isdirectory(path)) {
+ ret = ceph_rmdir(path);
+ } else {
+ try {
+ ret = localFS.delete(new Path(path), false);
+ } catch (IOException e) {}
+ }
+ return ret;
+ }
+
+ protected boolean ceph_rename(String oldName, String newName) {
+ oldName = prepare_path(oldName);
+ newName = prepare_path(newName);
+ try {
+ Path parent = new Path(newName).getParent();
+ Path newPath = new Path(newName);
+
+ if (localFS.exists(parent) && !localFS.exists(newPath)) {
+ return localFS.rename(new Path(oldName), newPath);
+ }
+ return false;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ protected boolean ceph_exists(String path) {
+ path = prepare_path(path);
+ boolean ret = false;
+
+ try {
+ ret = localFS.exists(new Path(path));
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ protected long ceph_getblocksize(String path) {
+ path = prepare_path(path);
+ try {
+ FileStatus status = localFS.getFileStatus(new Path(path));
+
+ return status.getBlockSize();
+ } catch (FileNotFoundException e) {
+ return -CephFS.ENOENT;
+ } catch (IOException e) {
+ return -1; // just fail generically
+ }
+ }
+
+ protected boolean ceph_isdirectory(String path) {
+ path = prepare_path(path);
+ try {
+ FileStatus status = localFS.getFileStatus(new Path(path));
+
+ return status.isDir();
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
+ protected boolean ceph_isfile(String path) {
+ path = prepare_path(path);
+ boolean ret = false;
+
+ try {
+ FileStatus status = localFS.getFileStatus(new Path(path));
+
+ ret = !status.isDir();
+ } catch (Exception e) {}
+ return ret;
+ }
+
+ protected String[] ceph_getdir(String path) {
+ path = prepare_path(path);
+ if (!ceph_isdirectory(path)) {
+ return null;
+ }
+ try {
+ FileStatus[] stats = localFS.listStatus(new Path(path));
+ String[] names = new String[stats.length];
+ String name;
+
+ for (int i = 0; i < stats.length; ++i) {
+ name = stats[i].getPath().toString();
+ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
+ }
+ return names;
+ } catch (IOException e) {}
+ return null;
+ }
+
+ protected int ceph_mkdirs(String path, int mode) {
+ path = prepare_path(path);
+ // debug("ceph_mkdirs on " + path, INFO);
+ try {
+ if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
+ return 0;
+ }
+ } catch (FileAlreadyExistsException fe) {
+ return ENOTDIR;
+ } catch (IOException e) {}
+ if (ceph_isdirectory(path)) {
+ return -EEXIST;
+ } // apparently it already existed
+ return -1;
+ }
+
+ /*
+ * Unlike a real Ceph deployment, you can't do opens on a directory.
+ * Since that has unpredictable behavior and you shouldn't do it anyway,
+ * it's okay.
+ */
+ protected int ceph_open_for_append(String path) {
+ path = prepare_path(path);
+ FSDataOutputStream stream;
+
+ try {
+ stream = localFS.append(new Path(path));
+ files.put(new Integer(fileCount), stream);
+ filenames.put(new Integer(fileCount), path);
+ return fileCount++;
+ } catch (IOException e) {}
+ return -1; // failure
+ }
+
+ protected int ceph_open_for_read(String path) {
+ path = prepare_path(path);
+ FSDataInputStream stream;
+
+ try {
+ stream = localFS.open(new Path(path));
+ files.put(new Integer(fileCount), stream);
+ filenames.put(new Integer(fileCount), path);
+ debug("ceph_open_for_read fh:" + fileCount + ", pathname:" + path, INFO);
+ return fileCount++;
+ } catch (IOException e) {}
+ return -1; // failure
+ }
+
+ protected int ceph_open_for_overwrite(String path, int mode) {
+ path = prepare_path(path);
+ FSDataOutputStream stream;
+
+ try {
+ stream = localFS.create(new Path(path));
+ files.put(new Integer(fileCount), stream);
+ filenames.put(new Integer(fileCount), path);
+ debug("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path,
+ INFO);
+ return fileCount++;
+ } catch (IOException e) {}
+ return -1; // failure
+ }
+
+ protected int ceph_close(int filehandle) {
+ debug("ceph_close(filehandle " + filehandle + ")", INFO);
+ try {
+ ((Closeable) files.get(new Integer(filehandle))).close();
+ if (null == files.get(new Integer(filehandle))) {
+ return -ENOENT; // this isn't quite the right error code,
+ // but the important part is it's negative
+ }
+ return 0; // hurray, success
+ } catch (NullPointerException ne) {
+ debug("ceph_close caught NullPointerException!" + ne, WARN);
+ } // err, how?
+ catch (IOException ie) {
+ debug("ceph_close caught IOException!" + ie, WARN);
+ }
+ return -1; // failure
+ }
+
+ protected boolean ceph_setPermission(String pth, int mode) {
+ pth = prepare_path(pth);
+ Path path = new Path(pth);
+ boolean ret = false;
+
+ try {
+ localFS.setPermission(path, new FsPermission((short) mode));
+ ret = true;
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ // rather than try and match a Ceph deployment's behavior exactly,
+ // just make bad things happen if they try and call methods after this
+ protected boolean ceph_kill_client() {
+ // debug("ceph_kill_client", INFO);
+ localFS.setWorkingDirectory(new Path(localPrefix));
+ // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
+ try {
+ localFS.close();
+ } catch (Exception e) {}
+ localFS = null;
+ files = null;
+ filenames = null;
+ return true;
+ }
+
+ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
+ pth = prepare_path(pth);
+ Path path = new Path(pth);
+ boolean ret = false;
+
+ try {
+ FileStatus status = localFS.getFileStatus(path);
+
+ fill.size = status.getLen();
+ fill.is_dir = status.isDir();
+ fill.block_size = status.getBlockSize();
+ fill.mod_time = status.getModificationTime();
+ fill.access_time = status.getAccessTime();
+ fill.mode = status.getPermission().toShort();
+ ret = true;
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
+ pth = prepare_path(pth);
+ try {
+ FsStatus stat = localFS.getStatus();
+
+ fill.capacity = stat.getCapacity();
+ fill.used = stat.getUsed();
+ fill.remaining = stat.getRemaining();
+ return 0;
+ } catch (Exception e) {}
+ return -1; // failure;
+ }
+
+ protected int ceph_replication(String path) {
+ path = prepare_path(path);
+ int ret = -1; // -1 for failure
+
+ try {
+ ret = localFS.getFileStatus(new Path(path)).getReplication();
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ protected String ceph_hosts(int fh, long offset) {
+ String ret = null;
+
+ try {
+ BlockLocation[] locs = localFS.getFileBlockLocations(
+ localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
+ offset, 1);
+
+ ret = locs[0].getNames()[0];
+ } catch (IOException e) {} catch (NullPointerException f) {}
+ return ret;
+ }
+
+ protected int ceph_setTimes(String pth, long mtime, long atime) {
+ pth = prepare_path(pth);
+ Path path = new Path(pth);
+ int ret = -1; // generic fail
+
+ try {
+ localFS.setTimes(path, mtime, atime);
+ ret = 0;
+ } catch (IOException e) {}
+ return ret;
+ }
+
+ protected long ceph_getpos(int fh) {
+ long ret = -1; // generic fail
+
+ try {
+ Object stream = files.get(new Integer(fh));
+
+ if (stream instanceof FSDataInputStream) {
+ ret = ((FSDataInputStream) stream).getPos();
+ } else if (stream instanceof FSDataOutputStream) {
+ ret = ((FSDataOutputStream) stream).getPos();
+ }
+ } catch (IOException e) {} catch (NullPointerException f) {}
+ return ret;
+ }
+
+ protected int ceph_write(int fh, byte[] buffer,
+ int buffer_offset, int length) {
+ debug(
+ "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
+ + length,
+ INFO);
+ long ret = -1; // generic fail
+
+ try {
+ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
+
+ debug("ceph_write got outputstream", INFO);
+ long startPos = os.getPos();
+
+ os.write(buffer, buffer_offset, length);
+ ret = os.getPos() - startPos;
+ } catch (IOException e) {
+ debug("ceph_write caught IOException!", WARN);
+ } catch (NullPointerException f) {
+ debug("ceph_write caught NullPointerException!", WARN);
+ }
+ return (int) ret;
+ }
+
+ protected int ceph_read(int fh, byte[] buffer,
+ int buffer_offset, int length) {
+ long ret = -1; // generic fail
+
+ try {
+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+ long startPos = is.getPos();
+
+ is.read(buffer, buffer_offset, length);
+ ret = is.getPos() - startPos;
+ } catch (IOException e) {} catch (NullPointerException f) {}
+ return (int) ret;
+ }
+
+ protected long ceph_seek_from_start(int fh, long pos) {
+ debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
+ long ret = -1; // generic fail
+
+ try {
+ debug("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)),
+ INFO);
+ if (null == files.get(new Integer(fh))) {
+ debug("ceph_seek_from_start: is is null!", WARN);
+ }
+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+
+ debug("ceph_seek_from_start retrieved is!", INFO);
+ is.seek(pos);
+ ret = is.getPos();
+ } catch (IOException e) {
+ debug("ceph_seek_from_start caught IOException!", WARN);
+ } catch (NullPointerException f) {
+ debug("ceph_seek_from_start caught NullPointerException!", WARN);
+ }
+ return (int) ret;
+ }
+
+ /*
+ * We need to remove the localFS file prefix before returning to Ceph
+ */
+ private String sanitize_path(String path) {
+ // debug("sanitize_path(" + path + ")", INFO);
+ /* if (path.startsWith("file:"))
+ path = path.substring("file:".length()); */
+ if (path.startsWith(localPrefix)) {
+ path = path.substring(localPrefix.length());
+ if (path.length() == 0) { // it was a root path
+ path = "/";
+ }
+ }
+ // debug("sanitize_path returning " + path, INFO);
+ return path;
+ }
+
+ /*
+ * If it's an absolute path we need to shove the
+ * test dir onto the front as a prefix.
+ */
+ private String prepare_path(String path) {
+ // debug("prepare_path(" + path + ")", INFO);
+ if (path.startsWith("/")) {
+ path = localPrefix + path;
+ } else if (path.equals("..")) {
+ if (ceph_getcwd().equals("/")) {
+ path = ".";
+ } // you can't go up past root!
+ }
+ // debug("prepare_path returning" + path, INFO);
+ return path;
+ }
}
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
- *
+ *
* Implements the Hadoop FS interfaces to allow applications to store
* files in Ceph.
*/
package org.apache.hadoop.fs.ceph;
+
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.OutputStream;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.CreateFlag;
+
/**
* <p>
* A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
private final Path root;
private boolean initialized = false;
- private CephFS ceph = null;
+ private CephFS ceph = null;
private boolean debug = false;
private String fs_default_name;
root = new Path("/");
}
- /**
- * Used for testing purposes, this constructor
- * sets the given CephFS instead of defaulting to a
- * CephTalker (with its assumed real Ceph instance to talk to).
- */
- public CephFileSystem(CephFS ceph_fs, String default_path) {
- super();
- root = new Path("/");
- ceph = ceph_fs;
- fs_default_name = default_path;
- }
+ /**
+ * Used for testing purposes, this constructor
+ * sets the given CephFS instead of defaulting to a
+ * CephTalker (with its assumed real Ceph instance to talk to).
+ */
+ public CephFileSystem(CephFS ceph_fs, String default_path) {
+ super();
+ root = new Path("/");
+ ceph = ceph_fs;
+ fs_default_name = default_path;
+ }
/**
* Lets you get the URI of this CephFileSystem.
* @return the URI.
*/
public URI getUri() {
- if (!initialized) return null;
+ if (!initialized) {
+ return null;
+ }
ceph.debug("getUri:exit with return " + uri, ceph.DEBUG);
return uri;
}
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
- if (!initialized) {
- super.initialize(uri, conf);
- setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- statistics = getStatistics(uri.getScheme(), getClass());
-
- if (ceph == null) {
- ceph = new CephTalker(conf, LOG);
- }
- if (null == fs_default_name) {
- fs_default_name = conf.get("fs.default.name");
- }
- //build up the arguments for Ceph
- String arguments = "CephFSInterface";
- arguments += conf.get("fs.ceph.commandLine", "");
- if (conf.get("fs.ceph.clientDebug") != null) {
- arguments += " --debug_client ";
- arguments += conf.get("fs.ceph.clientDebug");
- }
- if (conf.get("fs.ceph.messengerDebug") != null) {
- arguments += " --debug_ms ";
- arguments += conf.get("fs.ceph.messengerDebug");
- }
- if (conf.get("fs.ceph.monAddr") != null) {
- arguments += " -m ";
- arguments += conf.get("fs.ceph.monAddr");
- }
- arguments += " --client-readahead-max-periods="
- + conf.get("fs.ceph.readahead", "1");
- //make sure they gave us a ceph monitor address or conf file
- ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
- if ( (conf.get("fs.ceph.monAddr") == null) &&
- (arguments.indexOf("-m") == -1) &&
- (arguments.indexOf("-c") == -1) ) {
- ceph.debug("initialize:You need to specify a Ceph monitor address.", ceph.FATAL);
- throw new IOException("You must specify a Ceph monitor address or config file!");
- }
- // Initialize the client
- if (!ceph.ceph_initializeClient(arguments,
- conf.getInt("fs.ceph.blockSize", 1<<26))) {
- ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
- throw new IOException("Ceph initialization failed!");
- }
- initialized = true;
- ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
- ceph.ceph_setcwd("/");
+ if (initialized) {
+ return;
+ }
+ super.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ statistics = getStatistics(uri.getScheme(), getClass());
+ if (ceph == null) {
+ ceph = new CephTalker(conf, LOG);
+ }
+ if (null == fs_default_name) {
+ fs_default_name = conf.get("fs.default.name");
}
+ // build up the arguments for Ceph
+ String arguments = "CephFSInterface";
+
+ arguments += conf.get("fs.ceph.commandLine", "");
+ if (conf.get("fs.ceph.clientDebug") != null) {
+ arguments += " --debug_client ";
+ arguments += conf.get("fs.ceph.clientDebug");
+ }
+ if (conf.get("fs.ceph.messengerDebug") != null) {
+ arguments += " --debug_ms ";
+ arguments += conf.get("fs.ceph.messengerDebug");
+ }
+ if (conf.get("fs.ceph.monAddr") != null) {
+ arguments += " -m ";
+ arguments += conf.get("fs.ceph.monAddr");
+ }
+ arguments += " --client-readahead-max-periods="
+ + conf.get("fs.ceph.readahead", "1");
+ // make sure they gave us a ceph monitor address or conf file
+ ceph.debug("initialize:Ceph initialization arguments: " + arguments,
+ ceph.INFO);
+ if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
+ && (arguments.indexOf("-c") == -1)) {
+ ceph.debug("initialize:You need to specify a Ceph monitor address.",
+ ceph.FATAL);
+ throw new IOException(
+ "You must specify a Ceph monitor address or config file!");
+ }
+ // Initialize the client
+ if (!ceph.ceph_initializeClient(arguments,
+ conf.getInt("fs.ceph.blockSize", 1 << 26))) {
+ ceph.debug("initialize:Ceph initialization failed!", ceph.FATAL);
+ throw new IOException("Ceph initialization failed!");
+ }
+ initialized = true;
+ ceph.debug("initialize:Ceph initialized client. Setting cwd to /", ceph.INFO);
+ ceph.ceph_setcwd("/");
ceph.debug("initialize:exit", ceph.DEBUG);
}
*/
@Override
public void close() throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("close:enter", ceph.DEBUG);
- super.close();//this method does stuff, make sure it's run!
- ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
+ super.close(); // this method does stuff, make sure it's run!
+ ceph.debug("close: Calling ceph_kill_client from Java", ceph.TRACE);
ceph.ceph_kill_client();
ceph.debug("close:exit", ceph.DEBUG);
}
* @return An FSDataOutputStream that connects to the file on Ceph.
* @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
*/
- public FSDataOutputStream append (Path file, int bufferSize,
- Progressable progress) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
- ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize, ceph.DEBUG);
+ public FSDataOutputStream append(Path file, int bufferSize,
+ Progressable progress) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
+ ceph.debug("append:enter with path " + file + " bufferSize " + bufferSize,
+ ceph.DEBUG);
Path abs_path = makeAbsolute(file);
- if (progress!=null) progress.progress();
- ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
+
+ if (progress != null) {
+ progress.progress();
+ }
+ ceph.debug("append: Entering ceph_open_for_append from Java", ceph.TRACE);
int fd = ceph.ceph_open_for_append(abs_path.toString());
- ceph.debug("append: Returned to Java", ceph.TRACE);
- if (progress!=null) progress.progress();
- if( fd < 0 ) { //error in open
- throw new IOException("append: Open for append failed on path \"" +
- abs_path.toString() + "\"");
- }
- CephOutputStream cephOStream = new CephOutputStream(getConf(),
- ceph, fd, bufferSize);
+
+ ceph.debug("append: Returned to Java", ceph.TRACE);
+ if (progress != null) {
+ progress.progress();
+ }
+ if (fd < 0) { // error in open
+ throw new IOException(
+ "append: Open for append failed on path \"" + abs_path.toString()
+ + "\"");
+ }
+ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
+ bufferSize);
+
ceph.debug("append:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
* @return the directory Path
*/
public Path getWorkingDirectory() {
- if (!initialized) return null;
+ if (!initialized) {
+ return null;
+ }
ceph.debug("getWorkingDirectory:enter", ceph.DEBUG);
- String cwd = ceph.ceph_getcwd();
+ String cwd = ceph.ceph_getcwd();
+
ceph.debug("getWorkingDirectory:exit with path " + cwd, ceph.DEBUG);
return new Path(fs_default_name + ceph.ceph_getcwd());
}
* @param dir The directory to change to.
*/
@Override
- public void setWorkingDirectory(Path dir) {
- if (!initialized) return;
+ public void setWorkingDirectory(Path dir) {
+ if (!initialized) {
+ return;
+ }
ceph.debug("setWorkingDirecty:enter with new working dir " + dir, ceph.DEBUG);
Path abs_path = makeAbsolute(dir);
+
ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
- if (!ceph.ceph_setcwd(abs_path.toString()))
- ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
+ if (!ceph.ceph_setcwd(abs_path.toString())) {
+ ceph.debug(
+ "setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path "
+ + abs_path,
+ ceph.WARN);
+ }
ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
}
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean exists(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ public boolean exists(Path path) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("exists:enter with path " + path, ceph.DEBUG);
boolean result;
Path abs_path = makeAbsolute(path);
+
if (abs_path.equals(root)) {
result = true;
- }
- else {
- ceph.debug("exists:Calling ceph_exists from Java on path "
- + abs_path.toString(), ceph.TRACE);
- result = ceph.ceph_exists(abs_path.toString());
+ } else {
+ ceph.debug(
+ "exists:Calling ceph_exists from Java on path " + abs_path.toString(),
+ ceph.TRACE);
+ result = ceph.ceph_exists(abs_path.toString());
ceph.debug("exists:Returned from ceph_exists to Java", ceph.TRACE);
}
ceph.debug("exists:exit with value " + result, ceph.DEBUG);
* @param perms The permissions to apply to the created directories.
* @return true if successful, false otherwise
* @throws IOException if initialize() hasn't been called or the path
- * is a child of a file.
+ * is a child of a file.
*/
- @Override
+ @Override
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("mkdirs:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
+
ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
- int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
+ int result = ceph.ceph_mkdirs(abs_path.toString(), (int) perms.toShort());
+
if (result != 0) {
- ceph.debug("mkdirs: make directory " + abs_path
- + "Failing with result " + result, ceph.WARN);
- if (ceph.ENOTDIR == result)
- throw new FileAlreadyExistsException("Parent path is not a directory");
+ ceph.debug(
+ "mkdirs: make directory " + abs_path + "Failing with result " + result,
+ ceph.WARN);
+ if (ceph.ENOTDIR == result) {
+ throw new FileAlreadyExistsException("Parent path is not a directory");
+ }
return false;
- }
- else {
- ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
- return true;
- }
+ } else {
+ ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
+ return true;
+ }
}
/**
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean isFile(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ public boolean isFile(Path path) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("isFile:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
boolean result;
+
if (abs_path.equals(root)) {
- result = false;
- }
- else {
- ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
+ result = false;
+ } else {
+ ceph.debug("isFile:entering ceph_isfile from Java", ceph.TRACE);
result = ceph.ceph_isfile(abs_path.toString());
}
ceph.debug("isFile:exit with result " + result, ceph.DEBUG);
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean isDirectory(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ public boolean isDirectory(Path path) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("isDirectory:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
boolean result;
+
if (abs_path.equals(root)) {
result = true;
- }
- else {
+ } else {
ceph.debug("calling ceph_isdirectory from Java", ceph.TRACE);
result = ceph.ceph_isdirectory(abs_path.toString());
ceph.debug("Returned from ceph_isdirectory to Java", ceph.TRACE);
* @throws FileNotFoundException if the path could not be resolved.
*/
public FileStatus getFileStatus(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("getFileStatus:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
- //sadly, Ceph doesn't really do uids/gids just yet, but
- //everything else is filled
+ // sadly, Ceph doesn't really do uids/gids just yet, but
+ // everything else is filled
FileStatus status;
Stat lstat = new Stat();
- ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
- if(ceph.ceph_stat(abs_path.toString(), lstat)) {
+
+ ceph.debug("getFileStatus: calling ceph_stat from Java", ceph.TRACE);
+ if (ceph.ceph_stat(abs_path.toString(), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
- ceph.ceph_replication(abs_path.toString()),
- lstat.block_size,
- lstat.mod_time,
- lstat.access_time,
- new FsPermission((short)lstat.mode),
- null,
- null,
- new Path(fs_default_name+abs_path.toString()));
- }
- else { //fail out
- throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
- + path + " does not exist or could not be accessed");
+ ceph.ceph_replication(abs_path.toString()), lstat.block_size,
+ lstat.mod_time, lstat.access_time,
+ new FsPermission((short) lstat.mode), null, null,
+ new Path(fs_default_name + abs_path.toString()));
+ } else { // fail out
+ throw new FileNotFoundException(
+ "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
+ + " does not exist or could not be accessed");
}
ceph.debug("getFileStatus:exit", ceph.DEBUG);
* @throws FileNotFoundException if the input path can't be found.
*/
public FileStatus[] listStatus(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("listStatus:enter with path " + path, ceph.WARN);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
+
if (paths != null) {
FileStatus[] statuses = new FileStatus[paths.length];
+
for (int i = 0; i < paths.length; ++i) {
- statuses[i] = getFileStatus(paths[i]);
+ statuses[i] = getFileStatus(paths[i]);
}
ceph.debug("listStatus:exit", ceph.DEBUG);
return statuses;
}
- if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
- //which means that the input wasn't a directory, so throw an Exception if it's not a file
- return null; //or return null if it's a file
+ if (!isFile(path)) {
+ throw new FileNotFoundException();
+ } // if we get here, listPaths returned null
+ // which means that the input wasn't a directory, so throw an Exception if it's not a file
+ return null; // or return null if it's a file
}
@Override
public void setPermission(Path p, FsPermission permission) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
- ceph.debug("setPermission:enter with path " + p
- + " and permissions " + permission, ceph.DEBUG);
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
+ ceph.debug(
+ "setPermission:enter with path " + p + " and permissions " + permission,
+ ceph.DEBUG);
Path abs_path = makeAbsolute(p);
- ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
+
+ ceph.debug("setPermission:calling ceph_setpermission from Java", ceph.TRACE);
ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
- ceph.debug("setPermission:exit", ceph.DEBUG);
+ ceph.debug("setPermission:exit", ceph.DEBUG);
}
/**
* @param mtime Set modification time in number of millis since Jan 1, 1970.
* @param atime Set access time in number of millis since Jan 1, 1970.
*/
- @Override
- public void setTimes(Path p, long mtime, long atime) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
- ceph.debug("setTimes:enter with path " + p + " mtime:" + mtime +
- " atime:" + atime, ceph.DEBUG);
- Path abs_path = makeAbsolute(p);
- ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
- int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
- if (r<0) throw new IOException ("Failed to set times on path "
- + abs_path.toString() + " Error code: " + r);
- ceph.debug("setTimes:exit", ceph.DEBUG);
- }
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
+ ceph.debug(
+ "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime,
+ ceph.DEBUG);
+ Path abs_path = makeAbsolute(p);
+
+ ceph.debug("setTimes:calling ceph_setTimes from Java", ceph.TRACE);
+ int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
+
+ if (r < 0) {
+ throw new IOException(
+ "Failed to set times on path " + abs_path.toString() + " Error code: "
+ + r);
+ }
+ ceph.debug("setTimes:exit", ceph.DEBUG);
+ }
/**
* Create a new file and open an FSDataOutputStream that's connected to it.
* @param flag If CreateFlag.OVERWRITE, overwrite any existing
* file with this name; otherwise don't.
* @param bufferSize Ceph does internal buffering, but you can buffer
- * in the Java code too if you like.
+ * in the Java code too if you like.
* @param replication Ignored by Ceph. This can be
* configured via Ceph configuration.
* @param blockSize Ignored by Ceph. You can set client-wide block sizes
* failure in attempting to open for append with Ceph.
*/
public FSDataOutputStream create(Path path,
- FsPermission permission,
- EnumSet<CreateFlag> flag,
- //boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress
- ) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ // boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("create:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
- if (progress!=null) progress.progress();
+
+ if (progress != null) {
+ progress.progress();
+ }
// We ignore replication since that's not configurable here, and
// progress reporting is quite limited.
// Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE;
// Step 1: existence test
boolean exists = exists(abs_path);
+
if (exists) {
- if(isDirectory(abs_path))
- throw new IOException("create: Cannot overwrite existing directory \""
- + path.toString() + "\" with a file");
- //if (!overwrite)
- if (!flag.contains(CreateFlag.OVERWRITE))
- throw new IOException("createRaw: Cannot open existing file \""
- + abs_path.toString()
- + "\" for writing without overwrite flag");
+ if (isDirectory(abs_path)) {
+ throw new IOException(
+ "create: Cannot overwrite existing directory \"" + path.toString()
+ + "\" with a file");
+ }
+ // if (!overwrite)
+ if (!flag.contains(CreateFlag.OVERWRITE)) {
+ throw new IOException(
+ "createRaw: Cannot open existing file \"" + abs_path.toString()
+ + "\" for writing without overwrite flag");
+ }
}
- if (progress!=null) progress.progress();
+ if (progress != null) {
+ progress.progress();
+ }
// Step 2: create any nonexistent directories in the path
if (!exists) {
Path parent = abs_path.getParent();
+
if (parent != null) { // if parent is root, we're done
- int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
- if (!(r==0 || r==-ceph.EEXIST))
- throw new IOException ("Error creating parent directory; code: " + r);
+ int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
+
+ if (!(r == 0 || r == -ceph.EEXIST)) {
+ throw new IOException("Error creating parent directory; code: " + r);
+ }
+ }
+ if (progress != null) {
+ progress.progress();
}
- if (progress!=null) progress.progress();
}
// Step 3: open the file
ceph.debug("calling ceph_open_for_overwrite from Java", ceph.TRACE);
- int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
- if (progress!=null) progress.progress();
- ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, ceph.TRACE);
+ int fh = ceph.ceph_open_for_overwrite(abs_path.toString(),
+ (int) permission.toShort());
+
+ if (progress != null) {
+ progress.progress();
+ }
+ ceph.debug("Returned from ceph_open_for_overwrite to Java with fh " + fh,
+ ceph.TRACE);
if (fh < 0) {
- throw new IOException("create: Open for overwrite failed on path \"" +
- path.toString() + "\"");
+ throw new IOException(
+ "create: Open for overwrite failed on path \"" + path.toString()
+ + "\"");
}
-
+
// Step 4: create the stream
- OutputStream cephOStream = new CephOutputStream(getConf(),
- ceph, fh, bufferSize);
+ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
+ bufferSize);
+
ceph.debug("create:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
- }
+ }
/**
* Open a Ceph file and attach the file handle to an FSDataInputStream.
* @param path The file to open
* @param bufferSize Ceph does internal buffering; but you can buffer in
- * the Java code too if you like.
+ * the Java code too if you like.
* @return FSDataInputStream reading from the given path.
* @throws IOException if initialize() hasn't been called, the path DNE or is a
* directory, or there is an error getting data to set up the FSDataInputStream.
*/
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("open:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
-
+
int fh = ceph.ceph_open_for_read(abs_path.toString());
- if (fh < 0) { //uh-oh, something's bad!
- if (fh == -ceph.ENOENT) //well that was a stupid open
- throw new IOException("open: absolute path \"" + abs_path.toString()
- + "\" does not exist");
- else //hrm...the file exists but we can't open it :(
- throw new IOException("open: Failed to open file " + abs_path.toString());
+
+ if (fh < 0) { // uh-oh, something's bad!
+ if (fh == -ceph.ENOENT) { // well that was a stupid open
+ throw new IOException(
+ "open: absolute path \"" + abs_path.toString()
+ + "\" does not exist");
+ } else { // hrm...the file exists but we can't open it :(
+ throw new IOException("open: Failed to open file " + abs_path.toString());
+ }
}
- if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
- //but that doesn't mean you should in Hadoop!
+ if (isDirectory(abs_path)) { // yes, it is possible to open Ceph directories
+ // but that doesn't mean you should in Hadoop!
ceph.ceph_close(fh);
- throw new IOException("open: absolute path \"" + abs_path.toString()
- + "\" is a directory!");
+ throw new IOException(
+ "open: absolute path \"" + abs_path.toString() + "\" is a directory!");
}
Stat lstat = new Stat();
- ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
+
+ ceph.debug("open:calling ceph_stat from Java", ceph.TRACE);
ceph.ceph_stat(abs_path.toString(), lstat);
- ceph.debug("open:returned to Java", ceph.TRACE);
+ ceph.debug("open:returned to Java", ceph.TRACE);
long size = lstat.size;
+
if (size < 0) {
- throw new IOException("Failed to get file size for file " + abs_path.toString() +
- " but succeeded in opening file. Something bizarre is going on.");
+ throw new IOException(
+ "Failed to get file size for file " + abs_path.toString()
+ + " but succeeded in opening file. Something bizarre is going on.");
}
- FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
- fh, size, bufferSize);
+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
+ bufferSize);
+
ceph.debug("open:exit", ceph.DEBUG);
return new FSDataInputStream(cephIStream);
- }
+ }
/**
* Rename a file or directory.
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean rename(Path src, Path dst) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ public boolean rename(Path src, Path dst) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("rename:enter with src:" + src + " and dest:" + dst, ceph.DEBUG);
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
+
ceph.debug("calling ceph_rename from Java", ceph.TRACE);
boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
- if (!result) {
- if (isDirectory(abs_dst)) { //move the srcdir into destdir
- ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
- Path new_dst = new Path(abs_dst, abs_src.getName());
- result = rename(abs_src, new_dst);
- ceph.debug("attempt to move " + abs_src.toString()
- + " to " + new_dst.toString()
- + "has result:" + result, ceph.NOLOG);
- }
- }
+
+ if (!result) {
+ if (isDirectory(abs_dst)) { // move the srcdir into destdir
+ ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
+ Path new_dst = new Path(abs_dst, abs_src.getName());
+
+ result = rename(abs_src, new_dst);
+ ceph.debug(
+ "attempt to move " + abs_src.toString() + " to "
+ + new_dst.toString() + "has result:" + result,
+ ceph.NOLOG);
+ }
+ }
ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
return result;
}
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public BlockLocation[] getFileBlockLocations(FileStatus file,
- long start, long len) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
- ceph.debug("getFileBlockLocations:enter with path " + file.getPath() +
- ", start pos " + start + ", length " + len, ceph.DEBUG);
- //sanitize and get the filehandle
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
+ ceph.debug(
+ "getFileBlockLocations:enter with path " + file.getPath()
+ + ", start pos " + start + ", length " + len,
+ ceph.DEBUG);
+ // sanitize and get the filehandle
Path abs_path = makeAbsolute(file.getPath());
- ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java", ceph.TRACE);
+
+ ceph.debug("getFileBlockLocations:call ceph_open_for_read from Java",
+ ceph.TRACE);
int fh = ceph.ceph_open_for_read(abs_path.toString());
- ceph.debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
- + fh, ceph.TRACE);
+
+ ceph.debug(
+ "getFileBlockLocations:return from ceph_open_for_read to Java with fh "
+ + fh,
+ ceph.TRACE);
if (fh < 0) {
- ceph.debug("getFileBlockLocations:got error " + fh +
- ", exiting and returning null!", ceph.ERROR);
+ ceph.debug(
+ "getFileBlockLocations:got error " + fh
+ + ", exiting and returning null!",
+ ceph.ERROR);
return null;
}
- //get the block size
- ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java", ceph.TRACE);
+ // get the block size
+ ceph.debug("getFileBlockLocations:call ceph_getblocksize from Java",
+ ceph.TRACE);
long blockSize = ceph.ceph_getblocksize(abs_path.toString());
- ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
- BlockLocation[] locations =
- new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
- long offset;
+
+ ceph.debug("getFileBlockLocations:return from ceph_getblocksize", ceph.TRACE);
+ BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
+ long offset;
+
for (int i = 0; i < locations.length; ++i) {
- offset = start + i*blockSize;
- ceph.debug("getFileBlockLocations:call ceph_hosts from Java on fh "
- + fh + " and offset " + offset, ceph.TRACE);
+ offset = start + i * blockSize;
+ ceph.debug(
+ "getFileBlockLocations:call ceph_hosts from Java on fh " + fh
+ + " and offset " + offset,
+ ceph.TRACE);
String host = ceph.ceph_hosts(fh, offset);
- ceph.debug("getFileBlockLocations:return from ceph_hosts to Java with host "
- + host, ceph.TRACE);
+
+ ceph.debug(
+ "getFileBlockLocations:return from ceph_hosts to Java with host "
+ + host,
+ ceph.TRACE);
String[] hostArray = new String[1];
+
hostArray[0] = host;
locations[i] = new BlockLocation(hostArray, hostArray,
- start+i*blockSize-(start % blockSize),
- blockSize);
+ start + i * blockSize - (start % blockSize), blockSize);
}
- ceph.debug("getFileBlockLocations:call ceph_close from Java on fh "
- + fh, ceph.TRACE);
+ ceph.debug("getFileBlockLocations:call ceph_close from Java on fh " + fh,
+ ceph.TRACE);
ceph.ceph_close(fh);
- ceph.debug("getFileBlockLocations:return with " + locations.length
- + " locations", ceph.DEBUG);
+ ceph.debug(
+ "getFileBlockLocations:return with " + locations.length + " locations",
+ ceph.DEBUG);
return locations;
}
-
+
/**
* Get usage statistics on the Ceph filesystem.
* @param path A path to the partition you're interested in.
* stat somehow fails.
*/
@Override
- public FsStatus getStatus (Path path) throws IOException {
- if (!initialized) throw new IOException("You have to initialize the "
- + " CephFileSystem before calling other methods.");
+ public FsStatus getStatus(Path path) throws IOException {
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("getStatus:enter with path " + path, ceph.DEBUG);
Path abs_path = makeAbsolute(path);
-
- //currently(Ceph .16) Ceph actually ignores the path
- //but we still pass it in; if Ceph stops ignoring we may need more
- //error-checking code.
+
+ // currently(Ceph .16) Ceph actually ignores the path
+ // but we still pass it in; if Ceph stops ignoring we may need more
+ // error-checking code.
CephStat ceph_stat = new CephStat();
- ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
+
+ ceph.debug("getStatus:calling ceph_statfs from Java", ceph.TRACE);
int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
- if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
+
+ if (result != 0) {
+ throw new IOException(
+ "Somehow failed to statfs the Ceph filesystem. Error code: " + result);
+ }
ceph.debug("getStatus:exit successfully", ceph.DEBUG);
- return new FsStatus(ceph_stat.capacity,
- ceph_stat.used, ceph_stat.remaining);
+ return new FsStatus(ceph_stat.capacity, ceph_stat.used, ceph_stat.remaining);
}
/**
* or you attempt to delete the root directory.
*/
public boolean delete(Path path, boolean recursive) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the "
- +"CephFileSystem before calling other methods.");
+ if (!initialized) {
+ throw new IOException(
+ "You have to initialize the "
+ + "CephFileSystem before calling other methods.");
+ }
ceph.debug("delete:enter with path " + path + " and recursive=" + recursive,
- ceph.DEBUG);
+ ceph.DEBUG);
Path abs_path = makeAbsolute(path);
-
+
// sanity check
- if (abs_path.equals(root))
+ if (abs_path.equals(root)) {
throw new IOException("Error: deleting the root directory is a Bad Idea.");
- if (!exists(abs_path)) return false;
+ }
+ if (!exists(abs_path)) {
+ return false;
+ }
// if the path is a file, try to delete it.
if (isFile(abs_path)) {
- ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
- ceph.TRACE);
+ ceph.debug("delete:calling ceph_unlink from Java with path " + abs_path,
+ ceph.TRACE);
boolean result = ceph.ceph_unlink(abs_path.toString());
- if(!result)
- ceph.debug("delete: failed to delete file \"" +
- abs_path.toString() + "\".", ceph.ERROR);
+
+ if (!result) {
+ ceph.debug(
+ "delete: failed to delete file \"" + abs_path.toString() + "\".",
+ ceph.ERROR);
+ }
ceph.debug("delete:exit with success=" + result, ceph.DEBUG);
return result;
}
/* The path is a directory, so recursively try to delete its contents,
- and then delete the directory. */
- //get the entries; listPaths will remove . and .. for us
+ and then delete the directory. */
+ // get the entries; listPaths will remove . and .. for us
Path[] contents = listPaths(abs_path);
+
if (contents == null) {
- ceph.debug("delete: Failed to read contents of directory \"" +
- abs_path.toString() +
- "\" while trying to delete it, BAILING", ceph.ERROR);
+ ceph.debug(
+ "delete: Failed to read contents of directory \""
+ + abs_path.toString() + "\" while trying to delete it, BAILING",
+ ceph.ERROR);
return false;
}
if (!recursive && contents.length > 0) {
throw new IOException("Directories must be deleted recursively!");
}
// delete the entries
- ceph.debug("delete: recursively calling delete on contents of "
- + abs_path, ceph.DEBUG);
+ ceph.debug("delete: recursively calling delete on contents of " + abs_path,
+ ceph.DEBUG);
for (Path p : contents) {
if (!delete(p, true)) {
- ceph.debug("delete: Failed to delete file \"" +
- p.toString() + "\" while recursively deleting \""
- + abs_path.toString() + "\", BAILING", ceph.ERROR );
- return false;
+ ceph.debug(
+ "delete: Failed to delete file \"" + p.toString()
+ + "\" while recursively deleting \"" + abs_path.toString()
+ + "\", BAILING",
+ ceph.ERROR);
+ return false;
}
}
- //if we've come this far it's a now-empty directory, so delete it!
+ // if we've come this far it's a now-empty directory, so delete it!
boolean result = ceph.ceph_rmdir(abs_path.toString());
- if (!result)
- ceph.debug("delete: failed to delete \"" + abs_path.toString()
- + "\", BAILING", ceph.ERROR);
+
+ if (!result) {
+ ceph.debug(
+ "delete: failed to delete \"" + abs_path.toString() + "\", BAILING",
+ ceph.ERROR);
+ }
ceph.debug("delete:exit", ceph.DEBUG);
return result;
}
* by a separate Ceph configuration.
*/
@Override
- public short getDefaultReplication() {
+ public short getDefaultReplication() {
return 1;
}
* @return the default block size, in bytes, as a long.
*/
@Override
- public long getDefaultBlockSize() {
- return getConf().getInt("fs.ceph.blockSize", 1<<26);
+ public long getDefaultBlockSize() {
+ return getConf().getInt("fs.ceph.blockSize", 1 << 26);
}
// Makes a Path absolute. In a cheap, dirty hack, we're
- // also going to strip off any fs_default_name prefix we see.
+ // also going to strip off any fs_default_name prefix we see.
private Path makeAbsolute(Path path) {
ceph.debug("makeAbsolute:enter with path " + path, ceph.NOLOG);
- if (path == null) return new Path("/");
+ if (path == null) {
+ return new Path("/");
+ }
// first, check for the prefix
- if (path.toString().startsWith(fs_default_name)) {
- Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
+ if (path.toString().startsWith(fs_default_name)) {
+ Path stripped_path = new Path(
+ path.toString().substring(fs_default_name.length()));
+
ceph.debug("makeAbsolute:exit with path " + stripped_path, ceph.NOLOG);
return stripped_path;
}
return path;
}
Path new_path = new Path(ceph.ceph_getcwd(), path);
+
ceph.debug("makeAbsolute:exit with path " + new_path, ceph.NOLOG);
return new_path;
}
-
+
private Path[] listPaths(Path path) throws IOException {
ceph.debug("listPaths:enter with path " + path, ceph.NOLOG);
String dirlist[];
if (dirlist == null) {
return null;
}
-
+
// convert the strings to Paths
Path[] paths = new Path[dirlist.length];
+
for (int i = 0; i < dirlist.length; ++i) {
- ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
- dirlist[i] + "\"", ceph.TRACE);
+ ceph.debug(
+ "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
+ + dirlist[i] + "\"",
+ ceph.TRACE);
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
- if (raw_path.isAbsolute())
- paths[i] = raw_path;
- else
- paths[i] = new Path(abs_path, raw_path);
+
+ if (raw_path.isAbsolute()) {
+ paths[i] = raw_path;
+ } else {
+ paths[i] = new Path(abs_path, raw_path);
+ }
}
ceph.debug("listPaths:exit", ceph.NOLOG);
return paths;
}
-
-
static class Stat {
public long size;
public boolean is_dir;
public long access_time;
public int mode;
- public Stat(){}
+ public Stat() {}
}
+
static class CephStat {
public long capacity;
public long used;
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
*/
package org.apache.hadoop.fs.ceph;
+
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
private long fileLength;
- private CephFS ceph;
+ private CephFS ceph;
- private byte[] buffer;
- private int bufPos = 0;
- private int bufValid = 0;
- private long cephPos = 0;
+ private byte[] buffer;
+ private int bufPos = 0;
+ private int bufValid = 0;
+ private long cephPos = 0;
/**
* Create a new CephInputStream.
* you will need to close and re-open it to access the new data.
*/
public CephInputStream(Configuration conf, CephFS cephfs,
- int fh, long flength, int bufferSize) {
+ int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual ceph_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
- ceph = cephfs;
- buffer = new byte[bufferSize];
- ceph.debug("CephInputStream constructor: initializing stream with fh "
- + fh + " and file length " + flength, ceph.DEBUG);
+ ceph = cephfs;
+ buffer = new byte[bufferSize];
+ ceph.debug(
+ "CephInputStream constructor: initializing stream with fh " + fh
+ + " and file length " + flength,
+ ceph.DEBUG);
}
+
/** Ceph likes things to be closed before it shuts down,
* so closing the IOStream stuff voluntarily in a finalizer is good
*/
- protected void finalize () throws Throwable {
+ protected void finalize() throws Throwable {
try {
- if (!closed) close();
+ if (!closed) {
+ close();
+ }
+ } finally {
+ super.finalize();
+ }
+ }
+
+ private synchronized boolean fillBuffer() throws IOException {
+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+ bufPos = 0;
+ if (bufValid < 0) {
+ int err = bufValid;
+
+ bufValid = 0;
+ // attempt to reset to old position. If it fails, too bad.
+ ceph.ceph_seek_from_start(fileHandle, cephPos);
+ throw new IOException("Failed to fill read buffer! Error code:" + err);
}
- finally { super.finalize(); }
+ cephPos += bufValid;
+ return (bufValid != 0);
}
- private synchronized boolean fillBuffer() throws IOException {
- bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
- bufPos = 0;
- if (bufValid < 0) {
- int err = bufValid;
- bufValid = 0;
- //attempt to reset to old position. If it fails, too bad.
- ceph.ceph_seek_from_start(fileHandle, cephPos);
- throw new IOException("Failed to fill read buffer! Error code:"
- + err);
- }
- cephPos += bufValid;
- return (bufValid != 0);
- }
-
- /*
- * Get the current position of the stream.
- */
+ /*
+ * Get the current position of the stream.
+ */
public synchronized long getPos() throws IOException {
- return cephPos - bufValid + bufPos;
+ return cephPos - bufValid + bufPos;
}
/**
* Find the number of bytes remaining in the file.
*/
@Override
- public synchronized int available() throws IOException {
- return (int) (fileLength - getPos());
- }
+ public synchronized int available() throws IOException {
+ return (int) (fileLength - getPos());
+ }
public synchronized void seek(long targetPos) throws IOException {
- ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
- " on fd " + fileHandle, ceph.TRACE);
+ ceph.debug(
+ "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
+ + fileHandle,
+ ceph.TRACE);
if (targetPos > fileLength) {
- throw new IOException("CephInputStream.seek: failed seek to position "
- + targetPos + " on fd " + fileHandle
- + ": Cannot seek after EOF " + fileLength);
+ throw new IOException(
+ "CephInputStream.seek: failed seek to position " + targetPos
+ + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+ }
+ long oldPos = cephPos;
+
+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+ bufValid = 0;
+ bufPos = 0;
+ if (cephPos < 0) {
+ cephPos = oldPos;
+ throw new IOException("Ceph failed to seek to new position!");
}
- long oldPos = cephPos;
- cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
- bufValid = 0;
- bufPos = 0;
- if (cephPos < 0) {
- cephPos = oldPos;
- throw new IOException ("Ceph failed to seek to new position!");
- }
- }
+ }
/**
* Failovers are handled by the Ceph code at a very low level;
return false;
}
-
/**
* Read a byte from the file.
* @return the next byte.
*/
@Override
- public synchronized int read() throws IOException {
- ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
- + " by calling general read function", ceph.TRACE);
-
- byte result[] = new byte[1];
- if (getPos() >= fileLength) return -1;
- if (-1 == read(result, 0, 1)) return -1;
- if (result[0]<0) return 256+(int)result[0];
- else return result[0];
+ public synchronized int read() throws IOException {
+ ceph.debug(
+ "CephInputStream.read: Reading a single byte from fd " + fileHandle
+ + " by calling general read function",
+ ceph.TRACE);
+
+ byte result[] = new byte[1];
+
+ if (getPos() >= fileLength) {
+ return -1;
+ }
+ if (-1 == read(result, 0, 1)) {
+ return -1;
}
+ if (result[0] < 0) {
+ return 256 + (int) result[0];
+ } else {
+ return result[0];
+ }
+ }
/**
* Read a specified number of bytes from the file into a byte[].
* @param off the offset to start at in the file
* @param len the number of bytes to read
* @return 0 if successful, otherwise an error code.
- * @throws IOException on bad input.
+ * @throws IOException on bad input.
*/
@Override
- public synchronized int read(byte buf[], int off, int len)
- throws IOException {
- ceph.debug("CephInputStream.read: Reading " + len +
- " bytes from fd " + fileHandle, ceph.TRACE);
+ public synchronized int read(byte buf[], int off, int len)
+ throws IOException {
+ ceph.debug(
+ "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle,
+ ceph.TRACE);
- if (closed) {
- throw new IOException("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle +
- ": stream closed");
- }
+ if (closed) {
+ throw new IOException(
+ "CephInputStream.read: cannot read " + len + " bytes from fd "
+ + fileHandle + ": stream closed");
+ }
- // ensure we're not past the end of the file
- if (getPos() >= fileLength) {
- ceph.debug("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": current position is "
- + getPos() + " and file length is " + fileLength,
- ceph.DEBUG);
+ // ensure we're not past the end of the file
+ if (getPos() >= fileLength) {
+ ceph.debug(
+ "CephInputStream.read: cannot read " + len + " bytes from fd "
+ + fileHandle + ": current position is " + getPos()
+ + " and file length is " + fileLength,
+ ceph.DEBUG);
- return -1;
- }
-
- int totalRead = 0;
- int initialLen = len;
- int read;
- do {
- read = Math.min(len, bufValid - bufPos);
- try {
- System.arraycopy(buffer, bufPos, buf, off, read);
- }
- catch(IndexOutOfBoundsException ie) {
- throw new IOException("CephInputStream.read: Indices out of bounds:"
- + "read length is " + len
- + ", buffer offset is " + off
- + ", and buffer size is " + buf.length);
- }
- catch (ArrayStoreException ae) {
- throw new IOException("Uh-oh, CephInputStream failed to do an array"
- + "copy due to type mismatch...");
- }
- catch (NullPointerException ne) {
- throw new IOException("CephInputStream.read: cannot read "
- + len + "bytes from fd:" + fileHandle
- + ": buf is null");
- }
- bufPos += read;
- len -= read;
- off += read;
- totalRead += read;
- } while (len > 0 && fillBuffer());
-
- ceph.debug("CephInputStream.read: Reading " + initialLen
- + " bytes from fd " + fileHandle
- + ": succeeded in reading " + totalRead + " bytes",
- ceph.TRACE);
- return totalRead;
- }
+ return -1;
+ }
+
+ int totalRead = 0;
+ int initialLen = len;
+ int read;
+
+ do {
+ read = Math.min(len, bufValid - bufPos);
+ try {
+ System.arraycopy(buffer, bufPos, buf, off, read);
+ } catch (IndexOutOfBoundsException ie) {
+ throw new IOException(
+ "CephInputStream.read: Indices out of bounds:" + "read length is "
+ + len + ", buffer offset is " + off + ", and buffer size is "
+ + buf.length);
+ } catch (ArrayStoreException ae) {
+ throw new IOException(
+ "Uh-oh, CephInputStream failed to do an array"
+ + "copy due to type mismatch...");
+ } catch (NullPointerException ne) {
+ throw new IOException(
+ "CephInputStream.read: cannot read " + len + "bytes from fd:"
+ + fileHandle + ": buf is null");
+ }
+ bufPos += read;
+ len -= read;
+ off += read;
+ totalRead += read;
+ } while (len > 0 && fillBuffer());
+
+ ceph.debug(
+ "CephInputStream.read: Reading " + initialLen + " bytes from fd "
+ + fileHandle + ": succeeded in reading " + totalRead + " bytes",
+ ceph.TRACE);
+ return totalRead;
+ }
/**
* Close the CephInputStream and release the associated filehandle.
*/
@Override
- public void close() throws IOException {
+ public void close() throws IOException {
ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
if (!closed) {
- int result = ceph.ceph_close(fileHandle);
- closed = true;
- if (result != 0) {
- throw new IOException("Close somehow failed!"
- + "Don't try and use this stream again, though");
- }
- ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
- }
- }
+ int result = ceph.ceph_close(fileHandle);
+
+ closed = true;
+ if (result != 0) {
+ throw new IOException(
+ "Close somehow failed!"
+ + "Don't try and use this stream again, though");
+ }
+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
+ }
+ }
}
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
package org.apache.hadoop.fs.ceph;
+
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
+
/**
* <p>
* An {@link OutputStream} for a CephFileSystem and corresponding
private boolean closed;
- private CephFS ceph;
+ private CephFS ceph;
private int fileHandle;
- private byte[] buffer;
- private int bufUsed = 0;
+ private byte[] buffer;
+ private int bufUsed = 0;
/**
* Construct the CephOutputStream.
* @param fh The Ceph filehandle to connect to.
*/
public CephOutputStream(Configuration conf, CephFS cephfs,
- int fh, int bufferSize) {
- ceph = cephfs;
+ int fh, int bufferSize) {
+ ceph = cephfs;
fileHandle = fh;
closed = false;
- buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
}
- /**Ceph likes things to be closed before it shuts down,
+ /** Ceph likes things to be closed before it shuts down,
*so closing the IOStream stuff voluntarily is good
*/
- protected void finalize () throws Throwable {
+ protected void finalize() throws Throwable {
try {
- if (!closed) close();
+ if (!closed) {
+ close();
+ }
+ } finally {
+ super.finalize();
}
- finally { super.finalize();}
}
/**
* write fails.
*/
@Override
- public synchronized void write(int b) throws IOException {
- ceph.debug("CephOutputStream.write: writing a single byte to fd "
- + fileHandle, ceph.TRACE);
+ public synchronized void write(int b) throws IOException {
+ ceph.debug(
+ "CephOutputStream.write: writing a single byte to fd " + fileHandle,
+ ceph.TRACE);
- if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " +
- "a byte to fd " + fileHandle + ": stream closed");
- }
- // Stick the byte in a buffer and write it
- byte buf[] = new byte[1];
- buf[0] = (byte) b;
- write(buf, 0, 1);
- return;
+ if (closed) {
+ throw new IOException(
+ "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
+ + ": stream closed");
}
+ // Stick the byte in a buffer and write it
+ byte buf[] = new byte[1];
+
+ buf[0] = (byte) b;
+ write(buf, 0, 1);
+ return;
+ }
/**
* Write a byte buffer into the Ceph file.
* @param off the position in the file to start writing at.
* @param len The number of bytes to actually write.
* @throws IOException if you have closed the CephOutputStream, or
- * if buf is null or off + len > buf.length, or
- * if the write fails due to a Ceph error.
+ * if buf is null or off + len > buf.length, or
+ * if the write fails due to a Ceph error.
*/
@Override
- public synchronized void write(byte buf[], int off, int len) throws IOException {
- ceph.debug("CephOutputStream.write: writing " + len +
- " bytes to fd " + fileHandle, ceph.TRACE);
- // make sure stream is open
- if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " + len +
- "bytes to fd " + fileHandle + ": stream closed");
- }
+ public synchronized void write(byte buf[], int off, int len) throws IOException {
+ ceph.debug(
+ "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle,
+ ceph.TRACE);
+ // make sure stream is open
+ if (closed) {
+ throw new IOException(
+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
+ + fileHandle + ": stream closed");
+ }
- int result;
- int write;
- while (len>0) {
- write = Math.min(len, buffer.length - bufUsed);
- try {
- System.arraycopy(buf, off, buffer, bufUsed, write);
- }
- catch (IndexOutOfBoundsException ie) {
- throw new IOException("CephOutputStream.write: Indices out of bounds: "
- + "write length is " + len
- + ", buffer offset is " + off
- + ", and buffer size is " + buf.length);
- }
- catch (ArrayStoreException ae) {
- throw new IOException("Uh-oh, CephOutputStream failed to do an array"
- + " copy due to type mismatch...");
- }
- catch (NullPointerException ne) {
- throw new IOException("CephOutputStream.write: cannot write "
- + len + "bytes to fd " + fileHandle
- + ": buffer is null");
- }
- bufUsed += write;
- len -= write;
- off += write;
- if (bufUsed == buffer.length) {
- result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
- if (result < 0)
- throw new IOException("CephOutputStream.write: Buffered write of "
- + bufUsed + " bytes failed!");
- if (result != bufUsed)
- throw new IOException("CephOutputStream.write: Wrote only "
- + result + " bytes of " + bufUsed
- + " in buffer! Data may be lost or written"
- + " twice to Ceph!");
- bufUsed = 0;
- }
-
- }
- return;
- }
+ int result;
+ int write;
+
+ while (len > 0) {
+ write = Math.min(len, buffer.length - bufUsed);
+ try {
+ System.arraycopy(buf, off, buffer, bufUsed, write);
+ } catch (IndexOutOfBoundsException ie) {
+ throw new IOException(
+ "CephOutputStream.write: Indices out of bounds: "
+ + "write length is " + len + ", buffer offset is " + off
+ + ", and buffer size is " + buf.length);
+ } catch (ArrayStoreException ae) {
+ throw new IOException(
+ "Uh-oh, CephOutputStream failed to do an array"
+ + " copy due to type mismatch...");
+ } catch (NullPointerException ne) {
+ throw new IOException(
+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
+ + fileHandle + ": buffer is null");
+ }
+ bufUsed += write;
+ len -= write;
+ off += write;
+ if (bufUsed == buffer.length) {
+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+ if (result < 0) {
+ throw new IOException(
+ "CephOutputStream.write: Buffered write of " + bufUsed
+ + " bytes failed!");
+ }
+ if (result != bufUsed) {
+ throw new IOException(
+ "CephOutputStream.write: Wrote only " + result + " bytes of "
+ + bufUsed + " in buffer! Data may be lost or written"
+ + " twice to Ceph!");
+ }
+ bufUsed = 0;
+ }
+
+ }
+ return;
+ }
/**
* Flush the buffered data.
* @throws IOException if you've closed the stream or the write fails.
*/
@Override
- public synchronized void flush() throws IOException {
- if (!closed) {
- if (bufUsed == 0) return;
- int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
- if (result < 0) {
- throw new IOException("CephOutputStream.write: Write of "
- + bufUsed + "bytes to fd "
- + fileHandle + " failed");
- }
- if (result != bufUsed) {
- throw new IOException("CephOutputStream.write: Write of " + bufUsed
- + "bytes to fd " + fileHandle
- + "was incomplete: only " + result + " of "
- + bufUsed + " bytes were written.");
- }
- bufUsed = 0;
- return;
- }
- }
+ public synchronized void flush() throws IOException {
+ if (!closed) {
+ if (bufUsed == 0) {
+ return;
+ }
+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+
+ if (result < 0) {
+ throw new IOException(
+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+ + fileHandle + " failed");
+ }
+ if (result != bufUsed) {
+ throw new IOException(
+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
+ + fileHandle + "was incomplete: only " + result + " of " + bufUsed
+ + " bytes were written.");
+ }
+ bufUsed = 0;
+ return;
+ }
+ }
/**
* Close the CephOutputStream.
* @throws IOException if Ceph somehow returns an error. In current code it can't.
*/
@Override
- public synchronized void close() throws IOException {
- ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
- if (!closed) {
- flush();
- int result = ceph.ceph_close(fileHandle);
- if (result != 0) {
- throw new IOException("Close failed!");
- }
+ public synchronized void close() throws IOException {
+ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
+ if (!closed) {
+ flush();
+ int result = ceph.ceph_close(fileHandle);
+
+ if (result != 0) {
+ throw new IOException("Close failed!");
+ }
- closed = true;
- ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
- }
- }
+ closed = true;
+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
+ }
+ }
}
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
*
* Licensed under the Apache License, Version 2.0
*/
package org.apache.hadoop.fs.ceph;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
+
class CephTalker extends CephFS {
- //we write a constructor so we can load the libraries
- public CephTalker(Configuration conf, Log log) {
- super(conf, log);
- System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
- System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
- }
- protected native boolean ceph_initializeClient(String arguments, int block_size);
- protected native String ceph_getcwd();
- protected native boolean ceph_setcwd(String path);
+ // we write a constructor so we can load the libraries
+ public CephTalker(Configuration conf, Log log) {
+ super(conf, log);
+ System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
+ System.load(conf.get("fs.ceph.libDir") + "/libceph.so");
+ }
+
+ protected native boolean ceph_initializeClient(String arguments, int block_size);
+
+ protected native String ceph_getcwd();
+
+ protected native boolean ceph_setcwd(String path);
+
protected native boolean ceph_rmdir(String path);
+
protected native boolean ceph_unlink(String path);
+
protected native boolean ceph_rename(String old_path, String new_path);
+
protected native boolean ceph_exists(String path);
+
protected native long ceph_getblocksize(String path);
+
protected native boolean ceph_isdirectory(String path);
+
protected native boolean ceph_isfile(String path);
+
protected native String[] ceph_getdir(String path);
+
protected native int ceph_mkdirs(String path, int mode);
+
protected native int ceph_open_for_append(String path);
+
protected native int ceph_open_for_read(String path);
+
protected native int ceph_open_for_overwrite(String path, int mode);
+
protected native int ceph_close(int filehandle);
+
protected native boolean ceph_setPermission(String path, int mode);
+
protected native boolean ceph_kill_client();
+
protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
+
protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill);
+
protected native int ceph_replication(String path);
+
protected native String ceph_hosts(int fh, long offset);
+
protected native int ceph_setTimes(String path, long mtime, long atime);
+
protected native long ceph_getpos(int fh);
+
protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
- protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
- protected native long ceph_seek_from_start(int fh, long pos);
+
+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
+
+ protected native long ceph_seek_from_start(int fh, long pos);
}
// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
package org.apache.hadoop.fs.ceph;
+
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+
public class TestCeph extends FileSystemContractBaseTest {
- @Override
- protected void setUp() throws IOException {
+ @Override
+ protected void setUp() throws IOException {
Configuration conf = new Configuration();
CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
CephFileSystem cephfs = new CephFileSystem(cephfaker, "ceph://null");
+
cephfs.initialize(URI.create("ceph://null"), conf);
- fs = cephfs;
- cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
- }
+ fs = cephfs;
+ cephfs.setWorkingDirectory(new Path(getDefaultWorkingDirectory()));
+ }
}