--- /dev/null
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
+/**
+ *
+ * Licensed under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ *
+ * Abstract base class for communicating with a Ceph filesystem and its
+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
+ * As only the Ceph package should be using this directly, all methods
+ * are protected.
+ */
+package org.apache.hadoop.fs.ceph;
+
+abstract class CephFS {
+
+ /*
+ * Performs any necessary setup to allow general use of the filesystem.
+ * Inputs:
+ * String argsuments -- a command-line style input of Ceph config params
+ * int block_size -- the size in bytes to use for blocks
+ * Returns: true on success, false otherwise
+ */
+ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
+
+ /*
+ * Returns the current working directory.(absolute) as a String
+ */
+ abstract protected String ceph_getcwd();
+ /*
+ * Changes the working directory.
+ * Inputs:
+ * String path: The path (relative or absolute) to switch to
+ * Returns: true on success, false otherwise.
+ */
+ abstract protected boolean ceph_setcwd(String path);
+ /*
+ * Given a path to a directory, removes the directory.if empty.
+ * Inputs:
+ * jstring j_path: The path (relative or absolute) to the directory
+ * Returns: true on successful delete; false otherwise
+ */
+ abstract protected boolean ceph_rmdir(String path);
+ /*
+ * Given a path, unlinks it.
+ * Inputs:
+ * String path: The path (relative or absolute) to the file or empty dir
+ * Returns: true if the unlink occurred, false otherwise.
+ */
+ abstract protected boolean ceph_unlink(String path);
+ /*
+ * Changes a given path name to a new name.
+ * Inputs:
+ * jstring j_from: The path whose name you want to change.
+ * jstring j_to: The new name for the path.
+ * Returns: true if the rename occurred, false otherwise
+ */
+ abstract protected boolean ceph_rename(String old_path, String new_path);
+ /*
+ * Returns true if it the input path exists, false
+ * if it does not or there is an unexpected failure.
+ */
+ abstract protected boolean ceph_exists(String path);
+ /*
+ * Get the block size for a given path.
+ * Input:
+ * String path: The path (relative or absolute) you want
+ * the block size for.
+ * Returns: block size if the path exists, otherwise a negative number
+ * corresponding to the standard C++ error codes (which are positive).
+ */
+ abstract protected long ceph_getblocksize(String path);
+ /*
+ * Returns true if the given path is a directory, false otherwise.
+ */
+ abstract protected boolean ceph_isdirectory(String path);
+ /*
+ * Returns true if the given path is a file; false otherwise.
+ */
+ abstract protected boolean ceph_isfile(String path);
+ /*
+ * Get the contents of a given directory.
+ * Inputs:
+ * String path: The path (relative or absolute) to the directory.
+ * Returns: A Java String[] of the contents of the directory, or
+ * NULL if there is an error (ie, path is not a dir). This listing
+ * will not contain . or .. entries.
+ */
+ abstract protected String[] ceph_getdir(String path);
+ /*
+ * Create the specified directory and any required intermediate ones with the
+ * given mode.
+ */
+ abstract protected int ceph_mkdirs(String path, int mode);
+ /*
+ * Open a file to append. If the file does not exist, it will be created.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
+ abstract protected int ceph_open_for_append(String path);
+ /*
+ * Open a file for reading.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
+ abstract protected int ceph_open_for_read(String path);
+ /*
+ * Opens a file for overwriting; creates it if necessary.
+ * Opening a dir is possible but may have bad results.
+ * Inputs:
+ * String path: The path to open.
+ * int mode: The mode to open with.
+ * Returns: an int filehandle, or a number<0 if an error occurs.
+ */
+ abstract protected int ceph_open_for_overwrite(String path, int mode);
+ /*
+ * Closes a given filehandle.
+ */
+ abstract protected int ceph_close(int filehandle);
+ /*
+ * Change the mode on a path.
+ * Inputs:
+ * String path: The path to change mode on.
+ * int mode: The mode to apply.
+ * Returns: true if the mode is properly applied, false if there
+ * is any error.
+ */
+ abstract protected boolean ceph_setPermission(String path, int mode);
+ /*
+ * Closes the Ceph client. This should be called before shutting down
+ * (multiple times is okay but redundant).
+ */
+ abstract protected boolean ceph_kill_client();
+ /*
+ * Get the statistics on a path returned in a custom format defined below.
+ * Inputs:
+ * String path: The path to stat.
+ * Stat fill: The stat object to fill.
+ * Returns: true if the stat is successful, false otherwise.
+ */
+ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
+ /*
+ * Statfs a filesystem in a custom format defined in CephFileSystem.
+ * Inputs:
+ * String path: A path on the filesystem that you wish to stat.
+ * CephStat fill: The CephStat object to fill.
+ * Returns: true if successful and the CephStat is filled; false otherwise.
+ */
+ abstract protected int ceph_statfs(String Path, CephFileSystem.CephStat fill);
+ /*
+ * Check how many times a path should be replicated (if it is
+ * degraded it may not actually be replicated this often).
+ * Inputs:
+ * String path: The path to check.
+ * Returns: an int containing the number of times replicated.
+ */
+ abstract protected int ceph_replication(String path);
+ /*
+ * Find the IP:port addresses of the primary OSD for a given file and offset.
+ * Inputs:
+ * int fh: The filehandle for the file.
+ * long offset: The offset to get the location of.
+ * Returns: a String of the location as IP, or NULL if there is an error.
+ */
+ abstract protected String ceph_hosts(int fh, long offset);
+ /*
+ * Set the mtime and atime for a given path.
+ * Inputs:
+ * String path: The path to set the times for.
+ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
+ * long atime: The atime to set, in millis since epoch (-1 to not set)
+ * Returns: 0 if successful, an error code otherwise.
+ */
+ abstract protected int ceph_setTimes(String path, long mtime, long atime);
+
+}
private final Path root;
private boolean initialized = false;
+ private CephFS ceph = null;
private boolean debug = false;
private String fs_default_name;
-
- /*
- * Performs any necessary setup to allow general use of the filesystem.
- * Inputs:
- * String argsuments -- a command-line style input of Ceph config params
- * int block_size -- the size in bytes to use for blocks
- * Returns: true on success, false otherwise
- */
- private native boolean ceph_initializeClient(String arguments, int block_size);
-
- /*
- * Returns the current working directory.(absolute) as a String
- */
- private native String ceph_getcwd();
- /*
- * Changes the working directory.
- * Inputs:
- * String path: The path (relative or absolute) to switch to
- * Returns: true on success, false otherwise.
- */
- private native boolean ceph_setcwd(String path);
- /*
- * Given a path to a directory, removes the directory.if empty.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the directory
- * Returns: true on successful delete; false otherwise
- */
- private native boolean ceph_rmdir(String path);
- /*
- * Given a path, unlinks it.
- * Inputs:
- * String path: The path (relative or absolute) to the file or empty dir
- * Returns: true if the unlink occurred, false otherwise.
- */
- private native boolean ceph_unlink(String path);
- /*
- * Changes a given path name to a new name.
- * Inputs:
- * jstring j_from: The path whose name you want to change.
- * jstring j_to: The new name for the path.
- * Returns: true if the rename occurred, false otherwise
- */
- private native boolean ceph_rename(String old_path, String new_path);
- /*
- * Returns true if it the input path exists, false
- * if it does not or there is an unexpected failure.
- */
- private native boolean ceph_exists(String path);
- /*
- * Get the block size for a given path.
- * Input:
- * String path: The path (relative or absolute) you want
- * the block size for.
- * Returns: block size if the path exists, otherwise a negative number
- * corresponding to the standard C++ error codes (which are positive).
- */
- private native long ceph_getblocksize(String path);
- /*
- * Returns true if the given path is a directory, false otherwise.
- */
- private native boolean ceph_isdirectory(String path);
- /*
- * Returns true if the given path is a file; false otherwise.
- */
- private native boolean ceph_isfile(String path);
- /*
- * Get the contents of a given directory.
- * Inputs:
- * String path: The path (relative or absolute) to the directory.
- * Returns: A Java String[] of the contents of the directory, or
- * NULL if there is an error (ie, path is not a dir). This listing
- * will not contain . or .. entries.
- */
- private native String[] ceph_getdir(String path);
- /*
- * Create the specified directory and any required intermediate ones with the
- * given mode.
- */
- private native int ceph_mkdirs(String path, int mode);
- /*
- * Open a file to append. If the file does not exist, it will be created.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- private native int ceph_open_for_append(String path);
- /*
- * Open a file for reading.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- private native int ceph_open_for_read(String path);
- /*
- * Opens a file for overwriting; creates it if necessary.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * int mode: The mode to open with.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- private native int ceph_open_for_overwrite(String path, int mode);
- /*
- * Closes a given filehandle.
- */
- private native int ceph_close(int filehandle);
- /*
- * Change the mode on a path.
- * Inputs:
- * String path: The path to change mode on.
- * int mode: The mode to apply.
- * Returns: true if the mode is properly applied, false if there
- * is any error.
- */
- private native boolean ceph_setPermission(String path, int mode);
- /*
- * Closes the Ceph client. This should be called before shutting down
- * (multiple times is okay but redundant).
- */
- private native boolean ceph_kill_client();
- /*
- * Get the statistics on a path returned in a custom format defined below.
- * Inputs:
- * String path: The path to stat.
- * Stat fill: The stat object to fill.
- * Returns: true if the stat is successful, false otherwise.
- */
- private native boolean ceph_stat(String path, Stat fill);
- /*
- * Statfs a filesystem in a custom format defined in CephFileSystem.
- * Inputs:
- * String path: A path on the filesystem that you wish to stat.
- * CephStat fill: The CephStat object to fill.
- * Returns: true if successful and the CephStat is filled; false otherwise.
- */
- private native int ceph_statfs(String Path, CephStat fill);
- /*
- * Check how many times a path should be replicated (if it is
- * degraded it may not actually be replicated this often).
- * Inputs:
- * String path: The path to check.
- * Returns: an int containing the number of times replicated.
- */
- private native int ceph_replication(String path);
- /*
- * Find the IP:port addresses of the primary OSD for a given file and offset.
- * Inputs:
- * int fh: The filehandle for the file.
- * long offset: The offset to get the location of.
- * Returns: a String of the location as IP, or NULL if there is an error.
- */
- private native String ceph_hosts(int fh, long offset);
- /*
- * Set the mtime and atime for a given path.
- * Inputs:
- * String path: The path to set the times for.
- * long mtime: The mtime to set, in millis since epoch (-1 to not set).
- * long atime: The atime to set, in millis since epoch (-1 to not set)
- * Returns: 0 if successful, an error code otherwise.
- */
- private native int ceph_setTimes(String path, long mtime, long atime);
/**
* Create a new CephFileSystem.
public void initialize(URI uri, Configuration conf) throws IOException {
debug("initialize:enter", DEBUG);
if (!initialized) {
- System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
- System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
super.initialize(uri, conf);
setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- statistics = getStatistics(uri.getScheme(), getClass());
+ statistics = getStatistics(uri.getScheme(), getClass());
+
+ if (ceph == null) {
+ ceph = new CephTalker(conf);
+ }
fs_default_name = conf.get("fs.default.name");
debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
throw new IOException("You must specify a Ceph monitor address or config file!");
}
// Initialize the client
- if (!ceph_initializeClient(arguments,
+ if (!ceph.ceph_initializeClient(arguments,
conf.getInt("fs.ceph.blockSize", 1<<26))) {
debug("initialize:Ceph initialization failed!", FATAL);
throw new IOException("Ceph initialization failed!");
}
initialized = true;
debug("initialize:Ceph initialized client. Setting cwd to /", INFO);
- ceph_setcwd("/");
+ ceph.ceph_setcwd("/");
}
debug("initialize:exit", DEBUG);
}
+ /**
+ * Used for testing purposes, this version of initialize
+ * sets the given CephFS instead of defaulting to a
+ * CephTalker (with its assumed real Ceph instance to talk to).
+ */
+ protected void initialize(URI uri, Configuration conf, CephFS ceph_fs)
+ throws IOException{
+ ceph = ceph_fs;
+ initialize(uri, conf);
+ }
+
/**
* Close down the CephFileSystem. Runs the base-class close method
* and then kills the Ceph client itself.
debug("close:enter", DEBUG);
super.close();//this method does stuff, make sure it's run!
debug("close: Calling ceph_kill_client from Java", TRACE);
- ceph_kill_client();
+ ceph.ceph_kill_client();
debug("close:exit", DEBUG);
}
Path abs_path = makeAbsolute(file);
if (progress!=null) progress.progress();
debug("append: Entering ceph_open_for_append from Java", TRACE);
- int fd = ceph_open_for_append(abs_path.toString());
+ int fd = ceph.ceph_open_for_append(abs_path.toString());
debug("append: Returned to Java", TRACE);
if (progress!=null) progress.progress();
if( fd < 0 ) { //error in open
public Path getWorkingDirectory() {
if (!initialized) return null;
debug("getWorkingDirectory:enter", DEBUG);
- String cwd = ceph_getcwd();
+ String cwd = ceph.ceph_getcwd();
debug("getWorkingDirectory:exit with path " + cwd, DEBUG);
- return new Path(fs_default_name + ceph_getcwd());
+ return new Path(fs_default_name + ceph.ceph_getcwd());
}
/**
debug("setWorkingDirecty:enter with new working dir " + dir, DEBUG);
Path abs_path = makeAbsolute(dir);
debug("setWorkingDirectory:calling ceph_setcwd from Java", TRACE);
- if (!ceph_setcwd(abs_path.toString()))
+ if (!ceph.ceph_setcwd(abs_path.toString()))
debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ERROR);
debug("setWorkingDirectory:exit", DEBUG);
}
else {
debug("exists:Calling ceph_exists from Java on path "
+ abs_path.toString(), TRACE);
- result = ceph_exists(abs_path.toString());
+ result = ceph.ceph_exists(abs_path.toString());
debug("exists:Returned from ceph_exists to Java", TRACE);
}
debug("exists:exit with value " + result, DEBUG);
debug("mkdirs:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
debug("mkdirs:calling ceph_mkdirs from Java", TRACE);
- int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
+ int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
debug("mkdirs:exit with result " + result, DEBUG);
if (result != 0)
return false;
}
else {
debug("isFile:entering ceph_isfile from Java", TRACE);
- result = ceph_isfile(abs_path.toString());
+ result = ceph.ceph_isfile(abs_path.toString());
}
debug("isFile:exit with result " + result, DEBUG);
return result;
}
else {
debug("calling ceph_isdirectory from Java", TRACE);
- result = ceph_isdirectory(abs_path.toString());
+ result = ceph.ceph_isdirectory(abs_path.toString());
debug("Returned from ceph_isdirectory to Java", TRACE);
}
debug("isDirectory:exit with result " + result, DEBUG);
FileStatus status;
Stat lstat = new Stat();
debug("getFileStatus: calling ceph_stat from Java", TRACE);
- if(ceph_stat(abs_path.toString(), lstat)) {
+ if(ceph.ceph_stat(abs_path.toString(), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
- ceph_replication(abs_path.toString()),
+ ceph.ceph_replication(abs_path.toString()),
lstat.block_size,
lstat.mod_time,
lstat.access_time,
+ " and permissions " + permission, DEBUG);
Path abs_path = makeAbsolute(p);
debug("setPermission:calling ceph_setpermission from Java", TRACE);
- ceph_setPermission(abs_path.toString(), permission.toShort());
+ ceph.ceph_setPermission(abs_path.toString(), permission.toShort());
debug("setPermission:exit", DEBUG);
}
" atime:" + atime, DEBUG);
Path abs_path = makeAbsolute(p);
debug("setTimes:calling ceph_setTimes from Java", TRACE);
- int r = ceph_setTimes(abs_path.toString(), mtime, atime);
+ int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime);
if (r<0) throw new IOException ("Failed to set times on path "
+ abs_path.toString() + " Error code: " + r);
debug("setTimes:exit", DEBUG);
if (!exists) {
Path parent = abs_path.getParent();
if (parent != null) { // if parent is root, we're done
- int r = ceph_mkdirs(parent.toString(), permission.toShort());
+ int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort());
if (!(r==0 || r==-EEXIST))
throw new IOException ("Error creating parent directory; code: " + r);
}
}
// Step 3: open the file
debug("calling ceph_open_for_overwrite from Java", TRACE);
- int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
+ int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
if (progress!=null) progress.progress();
debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, TRACE);
if (fh < 0) {
debug("open:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
- int fh = ceph_open_for_read(abs_path.toString());
+ int fh = ceph.ceph_open_for_read(abs_path.toString());
if (fh < 0) { //uh-oh, something's bad!
if (fh == -ENOENT) //well that was a stupid open
throw new IOException("open: absolute path \"" + abs_path.toString()
if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
//but that doesn't mean you should in Hadoop!
- ceph_close(fh);
+ ceph.ceph_close(fh);
throw new IOException("open: absolute path \"" + abs_path.toString()
+ "\" is a directory!");
}
Stat lstat = new Stat();
debug("open:calling ceph_stat from Java", TRACE);
- ceph_stat(abs_path.toString(), lstat);
+ ceph.ceph_stat(abs_path.toString(), lstat);
debug("open:returned to Java", TRACE);
long size = lstat.size;
if (size < 0) {
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
debug("calling ceph_rename from Java", TRACE);
- boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
+ boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
debug("rename:exit with result: " + result, DEBUG);
return result;
}
//sanitize and get the filehandle
Path abs_path = makeAbsolute(file.getPath());
debug("getFileBlockLocations:call ceph_open_for_read from Java", TRACE);
- int fh = ceph_open_for_read(abs_path.toString());
+ int fh = ceph.ceph_open_for_read(abs_path.toString());
debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
+ fh, TRACE);
if (fh < 0) {
}
//get the block size
debug("getFileBlockLocations:call ceph_getblocksize from Java", TRACE);
- long blockSize = ceph_getblocksize(abs_path.toString());
+ long blockSize = ceph.ceph_getblocksize(abs_path.toString());
debug("getFileBlockLocations:return from ceph_getblocksize", TRACE);
BlockLocation[] locations =
new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
offset = start + i*blockSize;
debug("getFileBlockLocations:call ceph_hosts from Java on fh "
+ fh + " and offset " + offset, TRACE);
- String host = ceph_hosts(fh, offset);
+ String host = ceph.ceph_hosts(fh, offset);
debug("getFileBlockLocations:return from ceph_hosts to Java with host "
+ host, TRACE);
String[] hostArray = new String[1];
}
debug("getFileBlockLocations:call ceph_close from Java on fh "
+ fh, TRACE);
- ceph_close(fh);
+ ceph.ceph_close(fh);
debug("getFileBlockLocations:return with " + locations.length
+ " locations", DEBUG);
return locations;
//error-checking code.
CephStat ceph_stat = new CephStat();
debug("getStatus:calling ceph_statfs from Java", TRACE);
- int result = ceph_statfs(abs_path.toString(), ceph_stat);
+ int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat);
if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
debug("getStatus:exit successfully", DEBUG);
return new FsStatus(ceph_stat.capacity,
if (isFile(abs_path)) {
debug("delete:calling ceph_unlink from Java with path " + abs_path,
TRACE);
- boolean result = ceph_unlink(abs_path.toString());
+ boolean result = ceph.ceph_unlink(abs_path.toString());
if(!result)
debug("delete: failed to delete file \"" +
abs_path.toString() + "\".", ERROR);
}
}
//if we've come this far it's a now-empty directory, so delete it!
- boolean result = ceph_rmdir(abs_path.toString());
+ boolean result = ceph.ceph_rmdir(abs_path.toString());
if (!result)
debug("delete: failed to delete \"" + abs_path.toString()
+ "\", BAILING", ERROR);
debug("makeAbsolute:exit with path " + path, NOLOG);
return path;
}
- Path new_path = new Path(ceph_getcwd(), path);
+ Path new_path = new Path(ceph.ceph_getcwd(), path);
debug("makeAbsolute:exit with path " + new_path, NOLOG);
return new_path;
}
// If it's a directory, get the listing. Otherwise, complain and give up.
debug("calling ceph_getdir from Java with path " + abs_path, NOLOG);
- dirlist = ceph_getdir(abs_path.toString());
+ dirlist = ceph.ceph_getdir(abs_path.toString());
debug("returning from ceph_getdir to Java", NOLOG);
if (dirlist == null) {
}
}
- private static class Stat {
+ static class Stat {
public long size;
public boolean is_dir;
public long block_size;
public Stat(){}
}
- private static class CephStat {
+ static class CephStat {
public long capacity;
public long used;
public long remaining;