+++ /dev/null
-package org.apache.hadoop.fs.ceph;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Progressable;
-
-/**
- * <p>
- * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
- * This will not start a Ceph instance; one must already be running.
- * </p>
- * @author Esteban Molina-Estolano
- */
-public class CephFileSystem extends FileSystem {
-
- private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
-
- static {
- System.loadLibrary("hadoopcephfs");
- }
-
- private URI uri;
-
- private FileSystem localFs;
-
- private long clientPointer;
-
- private Path root;
-
- private Path parent;
-
- //private Path workingDir = new Path("/user", System.getProperty("user.name"));
-
- /*
- * Native Ceph methods. Each one has long client as a parameter,
- * which should always be the member variable clientPointer. This is
- * to avoid the hideous JNI code required to access the member variable
- * from C++. clientPointer is set at initialization of the
- * CephFileSystem instance, and should be untouched thereafter. I
- * include wrapper functions to automatically add the parameter.
- */
-
-
- private boolean ceph_copyFromLocalFile(String localPath, String cephPath)
- { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); }
- private boolean ceph_copyToLocalFile(String cephPath, String localPath)
- { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); }
- private String ceph_getcwd() { return ceph_getcwd(clientPointer); }
- private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); }
- private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); }
- private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); }
- private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); }
- private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); }
- private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); }
- private long ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); }
- private long ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); }
- private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); }
- private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); }
- private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); }
- private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); }
- private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); }
-
- private boolean ceph_kill_client() {
- System.out.println("Killing Ceph client with pointer " + clientPointer);
- return ceph_kill_client(clientPointer);
- }
-
- private native long ceph_initializeClient();
- private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath);
- private native boolean ceph_copyToLocalFile (long client, String cephPath, String localPath);
- private native String ceph_getcwd (long client);
- private native boolean ceph_setcwd (long client, String path);
- private native boolean ceph_rmdir (long client, String path);
- private native boolean ceph_mkdir (long client, String path);
- private native boolean ceph_unlink (long client, String path);
- private native boolean ceph_rename (long client, String old_path, String new_path);
- private native boolean ceph_exists (long client, String path);
- private native long ceph_getblocksize (long client, String path);
- private native long ceph_getfilesize (long client, String path);
- private native boolean ceph_isdirectory (long client, String path);
- private native boolean ceph_isfile (long client, String path);
- private native String[]ceph_getdir (long client, String path);
- private native int ceph_open_for_read (long client, String path);
- private native int ceph_open_for_overwrite(long client, String path);
- private native boolean ceph_kill_client (long client);
-
-
-
-
- public CephFileSystem() {
- root = new Path("/");
- parent = new Path("..");
- }
-
- /*
- public S3FileSystem(FileSystemStore store) {
- this.store = store;
- } */
-
- @Override
- public URI getUri() {
- return uri;
- }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- //store.initialize(uri, conf);
- setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-
- // TODO: local filesystem? we really need to figure out this conf thingy
- this.localFs = get(URI.create("file:///"), conf);
-
- // Initializes the client
- this.clientPointer = ceph_initializeClient();
- System.out.println("Initialized client with pointer " + clientPointer
- + ". Setting cwd to /");
- ceph_setcwd("/");
- // DEBUG
- // attempt to do three exists operations on root
- //System.out.println("DEBUG: attempting isdir() on root (/)");
- // ceph_isdirectory("/");
- //System.out.println("DEBUG: attempting exists() on root (/)");
- //ceph_exists("/");
- }
-
- @Override
- public void close() throws IOException {
- System.out.println("Pretending to shut down client with pointer " + clientPointer
- + ". Not really doing anything.");
- }
-
- @Override
- public String getName() {
- return getUri().toString();
- }
-
- @Override
- public Path getWorkingDirectory() {
- return makeAbsolute(new Path(ceph_getcwd()));
- }
-
- @Override
- public void setWorkingDirectory(Path dir) {
- Path abs_path = makeAbsolute(dir);
-
- // error conditions if path's not a directory
- boolean isDir = false;
- boolean path_exists = false;
- try {
- isDir = isDirectory(abs_path);
- path_exists = exists(abs_path);
- }
-
- catch (IOException e) {
- System.out.println("Warning: isDirectory threw an exception");
- }
-
- if (!isDir) {
- if (path_exists)
- System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
- "): path is not a directory");
- else
- System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
- "): path does not exist");
- }
- else {
- ceph_setcwd(dir.toString());
- }
- //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() +
- // "changes cwd to" + getWorkingDirectory().toString());
- }
-
- // Makes a Path absolute. In a cheap, dirty hack, we're
- // also going to strip off any "ceph://null" prefix we see.
- private Path makeAbsolute(Path path) {
- // first, check for the prefix
- if (path.toString().startsWith("ceph://null")) {
-
- Path stripped_path = new Path(path.toString().substring("ceph://null".length()));
- //System.out.println("Stripping path \"" + path.toString() + "\" to \""
- // + stripped_path.toString() + "\"");
- return stripped_path;
- }
-
-
- if (path.isAbsolute()) {
- return path;
- }
- Path wd = getWorkingDirectory();
- //System.out.println("Working directory is " + wd.toString());
- if (wd.toString().equals(""))
- return new Path(root, path);
- else
- return new Path(wd, path);
- }
-
- private String[] getEmptyStringArray(int size) {
- return new String[size];
- }
-
- @Override
- public boolean exists(Path path) throws IOException {
- boolean result;
- Path abs_path = makeAbsolute(path);
- if (abs_path.toString().equals("/"))
- {
- //System.out.println("Bug workaround! returning true for exists(/)");
- result = true;
- }
- else
- {
- //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":");
- result = ceph_exists(abs_path.toString());
- //System.out.println("Returned from ceph_exists to Java");
- }
- // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" +
- // abs_path.toString() + "\", result = " + result);
-
- return result;
- }
-
-
- /* Creates the directory and all nonexistent parents. */
- @Override
- public boolean mkdirs(Path path) throws IOException {
-
- Path abs_path = makeAbsolute(path);
- //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" +
- // abs_path.toString() + "\"");
-
- // If the directory exists, we're happy, right? less work for us!
-
- if(exists(abs_path))
- throw new IOException("Error: attempting to create an existing directory");
-
- // The basic idea:
- // get parent. if parent = null (happens if dir is root), fail. You can't really make
- // the root directory...
- // if parent doesn't exist, recursively make parent; fail if this fails.
- // ceph_mkdir desired path.
-
- Path parent = path.getParent();
- boolean result = true;
-
- // if the parent's null, we're trying to create the root directory. This is BAD.
- if (null == parent) {
- System.out.println("Error: failed making directory \"" + abs_path.toString() +
- "\": directory has null parent (directory is root)") ;
- result = false;
- }
- else {
- // try to make the parent if it doesn't exist
- if (!exists(parent)) {
- //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() +
- // "does not exist. Recursively creating:");
- if(!mkdirs(parent)) {
- System.out.println("mkdirs: failed creating directory \"" +
- abs_path.toString() +
- "because of failure recursively creating parent" +
- parent.toString());
- result = false;
- }
- }
- }
- // try to make the directory, unless the parent was null or we
- // tried and failed to make the parent
- if (result) {
- result = ceph_mkdir(abs_path.toString());
- }
- //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() +
- // ": result is " + result);
- return result;
- }
-
-
- @Override
- public boolean isDirectory(Path path) throws IOException {
- Path abs_path = makeAbsolute(path);
- boolean result;
-
-
- if (abs_path.toString().equals("/"))
- {
- //System.out.println("Bug workaround! returning true for isDirectory(/)");
- result = true;
- }
- else
- result = ceph_isdirectory(abs_path.toString());
- //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" +
- // abs_path.toString() + "\", result = " + result);
- return result;
- }
-
- @Override
- public boolean isFile(Path path) throws IOException {
- Path abs_path = makeAbsolute(path);
- boolean result;
- if (abs_path.toString().equals("/"))
- {
- //System.out.println("Bug workaround! returning false for isFile(/)");
- result = false;
- }
- else
- {
- result = ceph_isfile(abs_path.toString());
- }
- //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" +
- // abs_path.toString() + "\", result = " + result);
-
- return result;
- }
-
-
- @Override
- public Path[] listPathsRaw(Path path) throws IOException {
-
- String dirlist[];
-
- Path abs_path = makeAbsolute(path);
-
- //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \""
- // + abs_path.toString() + "\"");
-
- // If it's a directory, get the listing. Otherwise, complain and give up.
- if (isDirectory(abs_path))
- dirlist = ceph_getdir(abs_path.toString());
- else
- {
- if (exists(abs_path)) { }
- // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
- // "\" failed; the path is not a directory.");
- else {}
- // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
- // "\" failed; the path does not exist.");
- return null;
- }
-
-
- // convert the strings to Paths
- Path paths[] = new Path[dirlist.length];
- for(int i = 0; i < dirlist.length; ++i) {
- //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
- // dirlist[i] + "\"");
-
- // convert each listing to an absolute path
- Path raw_path = new Path(dirlist[i]);
- if (raw_path.isAbsolute())
- paths[i] = raw_path;
- else
- paths[i] = new Path(abs_path, raw_path);
- }
- return paths;
- }
-
- @Override
- public FSOutputStream createRaw(Path file, boolean overwrite,
- short replication, long blockSize) throws IOException {
-
- return createRaw(file, overwrite, replication, blockSize, null);
- }
-
-
- @Override
- public FSOutputStream createRaw(Path file, boolean overwrite,
- short replication, long blockSize, Progressable progress)
- throws IOException {
-
-
- Path absfilepath = makeAbsolute(file);
-
- //System.out.println("createRaw: opening path \"" + file.toString() + "\" as absolute path \""
- // + absfilepath.toString() + "\" for writing");
-
- // we're ignoring progress reporting and replication entirely.
- // required semantics: if the file already exists, overwrite
- // it if overwrite = true and throw an exception if
- // overwrite = false.
-
- // Step 1: existence test
- if(isDirectory(absfilepath))
- throw new IOException("createRaw: Cannot make an output stream to existing directory \""
- + absfilepath.toString() + "\"");
-
- if (!overwrite) {
- if (!exists(absfilepath)) {
- throw new IOException("createRaw: Cannot open existing file \"" + absfilepath.toString()
- + "\" for writing without overwrite flag");
- }
- }
-
- // Step 2: do the directories in the path exist?
- Path parent = absfilepath.getParent();
- if (parent != null) // if parent is root, we're done
- {
- if(!exists(parent))
- {
- //System.out.println("createRaw: parent directory of path \""
- // + absfilepath.toString() + "\" does not exist. Creating:");
- mkdirs(parent);
- }
- }
-
- // Step 3: open the file
- int fh = ceph_open_for_overwrite(absfilepath.toString());
- if (fh < 0) {
- throw new IOException("createRaw: Open for overwrite failed on path \"" +
- absfilepath.toString() + "\"");
- }
-
- // Step 4: create the stream
- CephOutputStream result = new CephOutputStream(getConf(), clientPointer, fh);
- //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString()
- // + "\" for writing with fh " + fh);
-
- return result;
- }
-
- @Override
- public FSInputStream openRaw(Path path) throws IOException {
- Path abs_path = makeAbsolute(path);
-
- //System.out.println("openRaw: opening path \"" + path.toString() + "\" as absolute path \""
- // + abs_path.toString() + "\" for reading");
-
- if(!isFile(abs_path))
- {
- String error;
- if (!exists(abs_path))
- throw new IOException("openRaw: absolute path \"" + abs_path.toString()
- + "\" does not exist");
- else
- throw new IOException("openRaw: absolute path \"" + abs_path.toString()
- + "\" is not a file");
- }
-
- int fh = ceph_open_for_read(abs_path.toString());
- if (fh < 0) {
- throw new IOException("openRaw: Failed to open file " + abs_path.toString());
- }
-
- long size = ceph_getfilesize(abs_path.toString());
- if (size < 0) {
- throw new IOException("Failed to get file size for file " + abs_path.toString() +
- " but succeeded in opening file. Something excitingly bizarre is going on.");
- }
-
- FSInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
-
- //System.out.println("openRaw: opened absolute path \"" + abs_path.toString() +
- // "\" for reading with fd " + fh + " and size " + size);
-
- return result;
- }
-
- @Override
- public boolean renameRaw(Path src, Path dst) throws IOException {
- // TODO: Check corner cases: dst already exists,
- // or if path is directory with children
-
- return ceph_rename(src.toString(), dst.toString());
- }
-
- @Override
- public boolean deleteRaw(Path path) throws IOException {
-
- Path abs_path = makeAbsolute(path);
-
- //System.out.println("deleteRaw: Deleting path " +
- // abs_path.toString());
- if (abs_path.toString().equals("/"))
- throw new IOException("Error: attempting to delete the root directory");
-
- // if it's a file, try to delete it.
- if (isFile(abs_path)) {
- boolean result = ceph_unlink(path.toString());
- if(!result) {
- // System.out.println("deleteRaw: failed to delete file \"" +
- // abs_path.toString() + "\".");
- return false;
- }
- else
- return true;
- }
-
- // if it's a directory, recursively try to delete its contents,
- // and then delete the directory.
- // Otherwise, return false.
-
- Path[] contents = listPathsRaw(path);
- if (contents == null) {
- // System.out.println("deleteRaw: Failed to read contents of directory \"" +
- // abs_path.toString() + "\" while trying to delete it");
- return false;
- }
-
- // debug listing of directory contents
- //System.out.println("deleteRaw: Enumerating contents of directory \"" +
- // abs_path.toString() + "\" to delete");
-
- // for (Path p : contents) {
- // System.out.println(" Contains path \"" + p.toString() + "\", absolute path is \""
- // + makeAbsolute(p).toString() + "\"");
- //}
-
- // recursively delete the directory's contents
- Path parent = abs_path.getParent();
- for (Path p : contents) {
- // skip self ("." directory entry)
- if (makeAbsolute(p).equals(abs_path)) {
- //System.out.println("Skipping self");
- continue;
- }
- // skip parent (".." directory entry)
- if (null != parent) {
- if (p.equals(parent)) {
- //System.out.println("Skipping parent");
- continue;
- }
- }
-
- //System.out.println("Path " + abs_path.toString() + " contains path "
- // + makeAbsolute(p).toString() +
- // ". Attempting to delete it recursively:");
- if (!deleteRaw(p)) {
- // System.out.println("deleteRaw: Failed to delete file \"" +
- // p.toString() + "\" while recursively deleting \""
- // + abs_path.toString() + "\"" );
- return false;
- }
- }
-
- boolean result = ceph_unlink(path.toString());
- if (!result)
- System.out.println("deleteRaw: failed to delete \"" + abs_path.toString() + "\"");
- return result;
- }
-
-
- @Override
- public long getLength(Path path) throws IOException {
- Path abs_path = makeAbsolute(path);
-
- if (!exists(abs_path)) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: File or directory " + abs_path.toString() + " does not exist.");
- }
-
- long filesize = ceph_getfilesize(abs_path.toString());
- if (filesize < 0) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved.");
- }
- return filesize;
-
- }
-
- /**
- * User-defined replication is not supported for Ceph file systems at the moment.
- */
- @Override
- public short getReplication(Path path) throws IOException {
- return 1;
- }
-
- @Override
- public short getDefaultReplication() {
- return 1;
- }
-
- /**
- * User-defined replication is not supported for Ceph file systems at the moment.
- */
- @Override
- public boolean setReplicationRaw(Path path, short replication)
- throws IOException {
- return true;
- }
-
- @Override
- public long getBlockSize(Path path) throws IOException {
-
- if (!exists(path)) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
- }
- long result = ceph_getblocksize(path.toString());
- if (!isFile(path)) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file.");
- }
- else {
- System.err.println("DEBUG: getBlockSize: alleged file really is a file");
- }
- if (result < 4096) {
- System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " +
- "path exists; strange block size of " + result + " defaulting to 8192");
- return 8192;
- }
-
-
- return result;
- //return DEFAULT_BLOCK_SIZE;
- // return ceph_getblocksize(path.toString());
-
- }
-
- @Override
- public long getDefaultBlockSize() {
- return DEFAULT_BLOCK_SIZE;
- //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
- }
-
- /**
- * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
- */
- @Override
- public String[][] getFileCacheHints(Path f, long start, long len)
- throws IOException {
- // TODO: Check this is the correct behavior
- if (!exists(f)) {
- return null;
- }
- return new String[][] { { "localhost" } };
- }
-
- @Override
- public void lock(Path path, boolean shared) throws IOException {
- // TODO: Design and implement? or just ignore locking?
- return;
- }
-
- @Override
- public void release(Path path) throws IOException {
- return; //deprecated
- }
-
- @Override
- public void reportChecksumFailure(Path f,
- FSInputStream in, long inPos,
- FSInputStream sums, long sumsPos) {
- // TODO: What to do here?
- return;
- }
-
- @Override
- public void moveFromLocalFile(Path src, Path dst) throws IOException {
- if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString());
- }
- //FileUtil.copy(localFs, src, this, dst, true, getConf());
- }
-
- @Override
- public void copyFromLocalFile(Path src, Path dst) throws IOException {
- // make sure Ceph path exists
- Path abs_src = makeAbsolute(src);
- Path abs_dst = makeAbsolute(dst);
-
- if (isDirectory(abs_dst))
- throw new IOException("Error in copyFromLocalFile: " +
- "attempting to open an existing directory as a file");
- Path abs_dst_parent = abs_dst.getParent();
-
- if (!exists(abs_dst_parent))
- mkdirs(abs_dst_parent);
-
- if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) {
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString());
- }
- //FileUtil.copy(localFs, src, this, dst, false, true, getConf());
- }
-
-
-
- @Override
- public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
-
- Path abs_ceph_src = makeAbsolute(ceph_src);
-
- //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() +
- // "\" to local file \"" + local_dst.toString() + "\" using client "
- // + clientPointer);
-
- // make sure the alleged source file exists, and is actually a file, not
- // a directory or a ballpoint pen or something
- if (!isFile(abs_ceph_src)) {
- if (!exists(abs_ceph_src)) {
- throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
- abs_ceph_src.toString() + "\" to local file \""
- + local_dst.toString() +
- "\" because the source file does not exist");
- }
- else {
- throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
- abs_ceph_src.toString() + "\" to local file \"" +
- local_dst.toString() +
- "\" because the Ceph path is not a file");
- }
- }
-
- // if the destination's parent directory doesn't exist, create it.
- Path local_dst_parent_dir = local_dst.getParent();
- if(null == local_dst_parent_dir)
- throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
- abs_ceph_src.toString() + "\" to local file \"" +
- local_dst.toString() +
- "\": destination is root");
-
- if(!localFs.mkdirs(local_dst_parent_dir))
- throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
- abs_ceph_src.toString() + "\" to local file \"" +
- local_dst.toString() +
- "\": creating the destination's parent directory failed.");
- else
- {
- if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString()))
- {
- throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
- abs_ceph_src.toString() + "\" to local file \""
- + local_dst.toString() + "\"");
- }
- }
- //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() +
- // "\" to local file \"" + local_dst.toString() + "\"");
- }
-
-
- @Override
- public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
- throws IOException {
- return tmpLocalFile;
- }
-
- @Override
- public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
- throws IOException {
- moveFromLocalFile(tmpLocalFile, fsOutputFile);
- }
-
- // diagnostic methods
-
- /* void dump() throws IOException {
- store.dump();
- }
-
- void purge() throws IOException {
- store.purge();
- }*/
-
-}