--- /dev/null
+package org.apache.hadoop.fs.ceph;
+
+/**
+ * Thrown if something goes wrong with Ceph.
+ */
+public class CephException extends RuntimeException {
+
+ public CephException(Throwable t) {
+ super(t);
+ }
+
+}
--- /dev/null
+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
+ * This will not start a Ceph instance; one must already be running.
+ * </p>
+ * @author Esteban Molina-Estolano
+ */
+public class CephFileSystem extends FileSystem {
+
+ private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
+
+ static {
+ System.loadLibrary("hadoopcephfs");
+ }
+
+ private URI uri;
+
+ private FileSystem localFs;
+
+ private long clientPointer;
+
+ private Path root;
+
+ private Path parent;
+
+ //private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+ /*
+ * Native Ceph methods. Each one has long client as a parameter,
+ * which should always be the member variable clientPointer. This is
+ * to avoid the hideous JNI code required to access the member variable
+ * from C++. clientPointer is set at initialization of the
+ * CephFileSystem instance, and should be untouched thereafter. I
+ * include wrapper functions to automatically add the parameter.
+ */
+
+ private boolean ceph_copyFromLocalFile(String localPath, String cephPath)
+ { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); }
+ private boolean ceph_copyToLocalFile(String cephPath, String localPath)
+ { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); }
+ private String ceph_getcwd() { return ceph_getcwd(clientPointer); }
+ private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); }
+ private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); }
+ private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); }
+ private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); }
+ private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); }
+ private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); }
+ private long ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); }
+ private long ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); }
+ private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); }
+ private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); }
+ private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); }
+ private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); }
+ private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); }
+
+ private boolean ceph_kill_client() {
+ System.out.println("Killing Ceph client with pointer " + clientPointer);
+ return ceph_kill_client(clientPointer);
+ }
+
+ private native long ceph_initializeClient();
+ private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath);
+ private native boolean ceph_copyToLocalFile (long client, String cephPath, String localPath);
+ private native String ceph_getcwd (long client);
+ private native boolean ceph_setcwd (long client, String path);
+ private native boolean ceph_rmdir (long client, String path);
+ private native boolean ceph_mkdir (long client, String path);
+ private native boolean ceph_unlink (long client, String path);
+ private native boolean ceph_rename (long client, String old_path, String new_path);
+ private native boolean ceph_exists (long client, String path);
+ private native long ceph_getblocksize (long client, String path);
+ private native long ceph_getfilesize (long client, String path);
+ private native boolean ceph_isdirectory (long client, String path);
+ private native boolean ceph_isfile (long client, String path);
+ private native String[]ceph_getdir (long client, String path);
+ private native int ceph_open_for_read (long client, String path);
+ private native int ceph_open_for_overwrite(long client, String path);
+ private native boolean ceph_kill_client (long client);
+
+
+
+
+ public CephFileSystem() {
+ root = new Path("/");
+ parent = new Path("..");
+ }
+
+ /*
+ public S3FileSystem(FileSystemStore store) {
+ this.store = store;
+ } */
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ //store.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+
+ // TODO: local filesystem? we really need to figure out this conf thingy
+ this.localFs = get(URI.create("file:///"), conf);
+
+ // Initializes the client
+ this.clientPointer = ceph_initializeClient();
+ System.out.println("Initialized client with pointer " + clientPointer
+ + ". Setting cwd to /");
+ ceph_setcwd("/");
+ // DEBUG
+ // attempt to do three exists operations on root
+ //System.out.println("DEBUG: attempting isdir() on root (/)");
+ // ceph_isdirectory("/");
+ //System.out.println("DEBUG: attempting exists() on root (/)");
+ //ceph_exists("/");
+ }
+
+ @Override
+ public void close() throws IOException {
+ System.out.println("Pretending to shut down client with pointer " + clientPointer
+ + ". Not really doing anything.");
+ }
+
+ @Override
+ public String getName() {
+ return getUri().toString();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return makeAbsolute(new Path(ceph_getcwd()));
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ Path abs_path = makeAbsolute(dir);
+
+ // error conditions if path's not a directory
+ boolean isDir = false;
+ boolean path_exists = false;
+ try {
+ isDir = isDirectory(abs_path);
+ path_exists = exists(abs_path);
+ }
+
+ catch (IOException e) {
+ System.out.println("Warning: isDirectory threw an exception");
+ }
+
+ if (!isDir) {
+ if (path_exists)
+ System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
+ "): path is not a directory");
+ else
+ System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
+ "): path does not exist");
+ }
+ else {
+ ceph_setcwd(dir.toString());
+ }
+ //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() +
+ // "changes cwd to" + getWorkingDirectory().toString());
+ }
+
+ // Makes a Path absolute. In a cheap, dirty hack, we're
+ // also going to strip off any "ceph://null" prefix we see.
+ private Path makeAbsolute(Path path) {
+ // first, check for the prefix
+ if (path.toString().startsWith("ceph://null")) {
+
+ Path stripped_path = new Path(path.toString().substring("ceph://null".length()));
+ //System.out.println("Stripping path \"" + path.toString() + "\" to \""
+ // + stripped_path.toString() + "\"");
+ return stripped_path;
+ }
+
+
+ if (path.isAbsolute()) {
+ return path;
+ }
+ Path wd = getWorkingDirectory();
+ //System.out.println("Working directory is " + wd.toString());
+ if (wd.toString().equals(""))
+ return new Path(root, path);
+ else
+ return new Path(wd, path);
+ }
+
+ private String[] getEmptyStringArray(int size) {
+ return new String[size];
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ boolean result;
+ Path abs_path = makeAbsolute(path);
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning true for exists(/)");
+ result = true;
+ }
+ else
+ {
+ //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":");
+ result = ceph_exists(abs_path.toString());
+ //System.out.println("Returned from ceph_exists to Java");
+ }
+ // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+
+ return result;
+ }
+
+
+ /* Creates the directory and all nonexistent parents. */
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+
+ Path abs_path = makeAbsolute(path);
+ //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" +
+ // abs_path.toString() + "\"");
+
+ // If the directory exists, we're happy, right? less work for us!
+
+ if(exists(abs_path))
+ throw new IOException("Error: attempting to create an existing directory");
+
+ // The basic idea:
+ // get parent. if parent = null (happens if dir is root), fail. You can't really make
+ // the root directory...
+ // if parent doesn't exist, recursively make parent; fail if this fails.
+ // ceph_mkdir desired path.
+
+ Path parent = path.getParent();
+ boolean result = true;
+
+ // if the parent's null, we're trying to create the root directory. This is BAD.
+ if (null == parent) {
+ System.out.println("Error: failed making directory \"" + abs_path.toString() +
+ "\": directory has null parent (directory is root)") ;
+ result = false;
+ }
+ else {
+ // try to make the parent if it doesn't exist
+ if (!exists(parent)) {
+ //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() +
+ // "does not exist. Recursively creating:");
+ if(!mkdirs(parent)) {
+ System.out.println("mkdirs: failed creating directory \"" +
+ abs_path.toString() +
+ "because of failure recursively creating parent" +
+ parent.toString());
+ result = false;
+ }
+ }
+ }
+ // try to make the directory, unless the parent was null or we
+ // tried and failed to make the parent
+ if (result) {
+ result = ceph_mkdir(abs_path.toString());
+ }
+ //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() +
+ // ": result is " + result);
+ return result;
+ }
+
+
+
+ // @Override
+
+ public boolean __isDirectory(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning true for isDirectory(/)");
+ result = true;
+ }
+ else
+ result = ceph_isdirectory(abs_path.toString());
+ //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+ return result;
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning false for isFile(/)");
+ result = false;
+ }
+ else
+ {
+ result = ceph_isfile(abs_path.toString());
+ }
+ //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+
+ return result;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path p) throws IOException {
+ // For the moment, hardwired block size and replication
+ Path abs_p = makeAbsolute(p);
+ return new FileStatus(__getLength(abs_p), __isDirectory(abs_p), 2,
+ 8388608, 0, abs_p);
+ }
+
+
+ // array of statuses for the directory's contents
+ // steal or factor out iteration code from delete()
+ @Override
+ public FileStatus[] listStatus(Path p) throws IOException {
+ Path abs_p = makeAbsolute(p);
+
+ }
+
+
+ @Override
+ public Path[] listPathsRaw(Path path) throws IOException {
+
+ String dirlist[];
+
+ Path abs_path = makeAbsolute(path);
+
+ //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \""
+ // + abs_path.toString() + "\"");
+
+ // If it's a directory, get the listing. Otherwise, complain and give up.
+ if (isDirectory(abs_path))
+ dirlist = ceph_getdir(abs_path.toString());
+ else
+ {
+ if (exists(abs_path)) { }
+ // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
+ // "\" failed; the path is not a directory.");
+ else {}
+ // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
+ // "\" failed; the path does not exist.");
+ return null;
+ }
+
+
+ // convert the strings to Paths
+ Path paths[] = new Path[dirlist.length];
+ for(int i = 0; i < dirlist.length; ++i) {
+ //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
+ // dirlist[i] + "\"");
+
+ // convert each listing to an absolute path
+ Path raw_path = new Path(dirlist[i]);
+ if (raw_path.isAbsolute())
+ paths[i] = raw_path;
+ else
+ paths[i] = new Path(abs_path, raw_path);
+ }
+ return paths;
+ }
+
+ public FSDataOutputStream create(Path f,
+ boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
+
+
+ Path absfilepath = makeAbsolute(file);
+
+ // We ignore progress reporting and replication.
+ // Required semantics: if the file exists, overwrite if overwrite == true, and
+ // throw an exception if overwrite == false.
+
+ // Step 1: existence test
+ if(isDirectory(absfilepath))
+ throw new IOException("create: Cannot overwrite existing directory \""
+ + absfilepath.toString() + "\" with a file");
+ if (!overwrite) {
+ if (exists(absfilepath)) {
+ throw new IOException("createRaw: Cannot open existing file \""
+ + absfilepath.toString()
+ + "\" for writing without overwrite flag");
+ }
+ }
+
+ // Step 2: create any nonexistent directories in the path
+ Path parent = absfilepath.getParent();
+ if (parent != null) { // if parent is root, we're done
+ if(!exists(parent)) {
+ //System.out.println("createRaw: parent directory of path \""
+ // + absfilepath.toString() + "\" does not exist. Creating:");
+ mkdirs(parent);
+ }
+ }
+
+ // Step 3: open the file
+ int fh = ceph_open_for_overwrite(absfilepath.toString());
+ if (fh < 0) {
+ throw new IOException("createRaw: Open for overwrite failed on path \"" +
+ absfilepath.toString() + "\"");
+ }
+
+ // Step 4: create the stream
+ FSDataOutputStream result = new CephOutputStream(getConf(), clientPointer, fh);
+ //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString()
+ // + "\" for writing with fh " + fh);
+
+ return result;
+ }
+
+
+
+ // Opens a Ceph file and attaches the file handle to an FSDataInputStream.
+ @Override
+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ Path abs_path = makeAbsolute(path);
+
+ if(!isFile(abs_path)) {
+ if (!exists(abs_path))
+ throw new IOException("open: absolute path \"" + abs_path.toString()
+ + "\" does not exist");
+ else
+ throw new IOException("open: absolute path \"" + abs_path.toString()
+ + "\" is not a file");
+ }
+
+ int fh = ceph_open_for_read(abs_path.toString());
+ if (fh < 0) {
+ throw new IOException("open: Failed to open file " + abs_path.toString());
+ }
+ long size = ceph_getfilesize(abs_path.toString());
+ if (size < 0) {
+ throw new IOException("Failed to get file size for file " + abs_path.toString() +
+ " but succeeded in opening file. Something bizarre is going on.");
+ }
+ FSDataInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
+ return result;
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ // TODO: Check corner cases: dst already exists,
+ // or path is directory with children
+
+ return ceph_rename(src.toString(), dst.toString());
+ }
+
+ @Override
+ public boolean delete(Path path) throws IOException {
+
+ Path abs_path = makeAbsolute(path);
+
+ //System.out.println("deleteRaw: Deleting path " + abs_path.toString());
+ // sanity check
+ if (abs_path.toString().equals("/"))
+ throw new IOException("Error: deleting the root directory is a Bad Idea.");
+
+ // if the path is a file, try to delete it.
+ if (isFile(abs_path)) {
+ boolean result = ceph_unlink(path.toString());
+ if(!result) {
+ // System.out.println("deleteRaw: failed to delete file \"" +
+ // abs_path.toString() + "\".");
+ return false;
+ }
+ else
+ return true;
+ }
+
+ /* If the path is a directory, recursively try to delete its contents,
+ and then delete the directory. */
+
+ Path[] contents = listPathsRaw(path);
+ if (contents == null) {
+ // System.out.println("deleteRaw: Failed to read contents of directory \"" +
+ // abs_path.toString() + "\" while trying to delete it");
+ return false;
+ }
+
+ // recursively delete, skipping "." and ".." entries
+ Path parent = abs_path.getParent();
+ for (Path p : contents) {
+ if (makeAbsolute(p).equals(abs_path)) continue; // "." entry
+ if (null != parent) {
+ if (p.equals(parent)) continue; // ".." entry
+ }
+
+ if (!deleteRaw(p)) {
+ // System.out.println("deleteRaw: Failed to delete file \"" +
+ // p.toString() + "\" while recursively deleting \""
+ // + abs_path.toString() + "\"" );
+ return false;
+ }
+ }
+
+ boolean result = ceph_unlink(path.toString());
+ if (!result)
+ System.out.println("delete: failed to delete \"" + abs_path.toString() + "\"");
+ return result;
+ }
+
+
+ //@Override
+ private long __getLength(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+
+ if (!exists(abs_path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.__getLength: File or directory " + abs_path.toString() + " does not exist.");
+ }
+
+ long filesize = ceph_getfilesize(abs_path.toString());
+ if (filesize < 0) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved.");
+ }
+ return filesize;
+ }
+
+ /**
+ * User-defined replication is not supported for Ceph file systems at the moment.
+ */
+ @Override
+ public short getReplication(Path path) throws IOException {
+ return 1;
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ /**
+ * User-defined replication is not supported for Ceph file systems at the moment.
+ */
+ @Override
+ public boolean setReplicationRaw(Path path, short replication)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public long getBlockSize(Path path) throws IOException {
+
+ if (!exists(path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
+ }
+ long result = ceph_getblocksize(path.toString());
+ if (!isFile(path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file.");
+ }
+ else {
+ System.err.println("DEBUG: getBlockSize: alleged file really is a file");
+ }
+ if (result < 4096) {
+ System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " +
+ "path exists; strange block size of " + result + " defaulting to 8192");
+ return 8192;
+ }
+
+
+ return result;
+ //return DEFAULT_BLOCK_SIZE;
+ // return ceph_getblocksize(path.toString());
+
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return DEFAULT_BLOCK_SIZE;
+ //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
+ }
+
+ /**
+ * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+ */
+ @Override
+ public String[][] getFileCacheHints(Path f, long start, long len)
+ throws IOException {
+ // TODO: Check this is the correct behavior
+ if (!exists(f)) {
+ return null;
+ }
+ return new String[][] { { "localhost" } };
+ }
+
+ @Override
+ public void lock(Path path, boolean shared) throws IOException {
+ // TODO: Design and implement? or just ignore locking?
+ return;
+ }
+
+ @Override
+ public void release(Path path) throws IOException {
+ return; //deprecated
+ }
+
+ /* old API
+ @Override
+ public void reportChecksumFailure(Path f,
+ FSDataInputStream in, long inPos,
+ FSDataInputStream sums, long sumsPos) {
+ // TODO: What to do here?
+ return;
+ } */
+
+ @Override
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {
+ if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString());
+ }
+ //FileUtil.copy(localFs, src, this, dst, true, getConf());
+ }
+
+ @Override
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {
+ // make sure Ceph path exists
+ Path abs_src = makeAbsolute(src);
+ Path abs_dst = makeAbsolute(dst);
+
+ if (isDirectory(abs_dst))
+ throw new IOException("Error in copyFromLocalFile: " +
+ "attempting to open an existing directory as a file");
+ Path abs_dst_parent = abs_dst.getParent();
+
+ if (!exists(abs_dst_parent))
+ mkdirs(abs_dst_parent);
+
+ if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString());
+ }
+ //FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+ }
+
+
+
+ @Override
+ public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
+
+ Path abs_ceph_src = makeAbsolute(ceph_src);
+
+ //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() +
+ // "\" to local file \"" + local_dst.toString() + "\" using client "
+ // + clientPointer);
+
+ // make sure the alleged source file exists, and is actually a file, not
+ // a directory or a ballpoint pen or something
+ if (!isFile(abs_ceph_src)) {
+ if (!exists(abs_ceph_src)) {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \""
+ + local_dst.toString() +
+ "\" because the source file does not exist");
+ }
+ else {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\" because the Ceph path is not a file");
+ }
+ }
+
+ // if the destination's parent directory doesn't exist, create it.
+ Path local_dst_parent_dir = local_dst.getParent();
+ if(null == local_dst_parent_dir)
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\": destination is root");
+
+ if(!localFs.mkdirs(local_dst_parent_dir))
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\": creating the destination's parent directory failed.");
+ else
+ {
+ if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString()))
+ {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \""
+ + local_dst.toString() + "\"");
+ }
+ }
+ //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() +
+ // "\" to local file \"" + local_dst.toString() + "\"");
+ }
+
+
+ @Override
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return tmpLocalFile;
+ }
+
+ @Override
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+
+ // diagnostic methods
+
+ /* void dump() throws IOException {
+ store.dump();
+ }
+
+ void purge() throws IOException {
+ store.purge();
+ }*/
+
+}
--- /dev/null
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
+ * This will not start a Ceph instance; one must already be running.
+ * </p>
+ * @author Esteban Molina-Estolano
+ */
+public class CephFileSystem extends FileSystem {
+
+ private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
+
+ static {
+ System.loadLibrary("hadoopcephfs");
+ }
+
+ private URI uri;
+
+ private FileSystem localFs;
+
+ private long clientPointer;
+
+ private Path root;
+
+ private Path parent;
+
+ //private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+ /*
+ * Native Ceph methods. Each one has long client as a parameter,
+ * which should always be the member variable clientPointer. This is
+ * to avoid the hideous JNI code required to access the member variable
+ * from C++. clientPointer is set at initialization of the
+ * CephFileSystem instance, and should be untouched thereafter. I
+ * include wrapper functions to automatically add the parameter.
+ */
+
+
+ private boolean ceph_copyFromLocalFile(String localPath, String cephPath)
+ { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); }
+ private boolean ceph_copyToLocalFile(String cephPath, String localPath)
+ { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); }
+ private String ceph_getcwd() { return ceph_getcwd(clientPointer); }
+ private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); }
+ private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); }
+ private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); }
+ private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); }
+ private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); }
+ private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); }
+ private long ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); }
+ private long ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); }
+ private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); }
+ private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); }
+ private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); }
+ private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); }
+ private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); }
+
+ private boolean ceph_kill_client() {
+ System.out.println("Killing Ceph client with pointer " + clientPointer);
+ return ceph_kill_client(clientPointer);
+ }
+
+ private native long ceph_initializeClient();
+ private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath);
+ private native boolean ceph_copyToLocalFile (long client, String cephPath, String localPath);
+ private native String ceph_getcwd (long client);
+ private native boolean ceph_setcwd (long client, String path);
+ private native boolean ceph_rmdir (long client, String path);
+ private native boolean ceph_mkdir (long client, String path);
+ private native boolean ceph_unlink (long client, String path);
+ private native boolean ceph_rename (long client, String old_path, String new_path);
+ private native boolean ceph_exists (long client, String path);
+ private native long ceph_getblocksize (long client, String path);
+ private native long ceph_getfilesize (long client, String path);
+ private native boolean ceph_isdirectory (long client, String path);
+ private native boolean ceph_isfile (long client, String path);
+ private native String[]ceph_getdir (long client, String path);
+ private native int ceph_open_for_read (long client, String path);
+ private native int ceph_open_for_overwrite(long client, String path);
+ private native boolean ceph_kill_client (long client);
+
+
+
+
+ public CephFileSystem() {
+ root = new Path("/");
+ parent = new Path("..");
+ }
+
+ /*
+ public S3FileSystem(FileSystemStore store) {
+ this.store = store;
+ } */
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ //store.initialize(uri, conf);
+ setConf(conf);
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+
+ // TODO: local filesystem? we really need to figure out this conf thingy
+ this.localFs = get(URI.create("file:///"), conf);
+
+ // Initializes the client
+ this.clientPointer = ceph_initializeClient();
+ System.out.println("Initialized client with pointer " + clientPointer
+ + ". Setting cwd to /");
+ ceph_setcwd("/");
+ // DEBUG
+ // attempt to do three exists operations on root
+ //System.out.println("DEBUG: attempting isdir() on root (/)");
+ // ceph_isdirectory("/");
+ //System.out.println("DEBUG: attempting exists() on root (/)");
+ //ceph_exists("/");
+ }
+
+ @Override
+ public void close() throws IOException {
+ System.out.println("Pretending to shut down client with pointer " + clientPointer
+ + ". Not really doing anything.");
+ }
+
+ @Override
+ public String getName() {
+ return getUri().toString();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return makeAbsolute(new Path(ceph_getcwd()));
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ Path abs_path = makeAbsolute(dir);
+
+ // error conditions if path's not a directory
+ boolean isDir = false;
+ boolean path_exists = false;
+ try {
+ isDir = isDirectory(abs_path);
+ path_exists = exists(abs_path);
+ }
+
+ catch (IOException e) {
+ System.out.println("Warning: isDirectory threw an exception");
+ }
+
+ if (!isDir) {
+ if (path_exists)
+ System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
+ "): path is not a directory");
+ else
+ System.out.println("Warning: SetWorkingDirectory(" + dir.toString() +
+ "): path does not exist");
+ }
+ else {
+ ceph_setcwd(dir.toString());
+ }
+ //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() +
+ // "changes cwd to" + getWorkingDirectory().toString());
+ }
+
+ // Makes a Path absolute. In a cheap, dirty hack, we're
+ // also going to strip off any "ceph://null" prefix we see.
+ private Path makeAbsolute(Path path) {
+ // first, check for the prefix
+ if (path.toString().startsWith("ceph://null")) {
+
+ Path stripped_path = new Path(path.toString().substring("ceph://null".length()));
+ //System.out.println("Stripping path \"" + path.toString() + "\" to \""
+ // + stripped_path.toString() + "\"");
+ return stripped_path;
+ }
+
+
+ if (path.isAbsolute()) {
+ return path;
+ }
+ Path wd = getWorkingDirectory();
+ //System.out.println("Working directory is " + wd.toString());
+ if (wd.toString().equals(""))
+ return new Path(root, path);
+ else
+ return new Path(wd, path);
+ }
+
+ private String[] getEmptyStringArray(int size) {
+ return new String[size];
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ boolean result;
+ Path abs_path = makeAbsolute(path);
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning true for exists(/)");
+ result = true;
+ }
+ else
+ {
+ //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":");
+ result = ceph_exists(abs_path.toString());
+ //System.out.println("Returned from ceph_exists to Java");
+ }
+ // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+
+ return result;
+ }
+
+
+ /* Creates the directory and all nonexistent parents. */
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+
+ Path abs_path = makeAbsolute(path);
+ //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" +
+ // abs_path.toString() + "\"");
+
+ // If the directory exists, we're happy, right? less work for us!
+
+ if(exists(abs_path))
+ throw new IOException("Error: attempting to create an existing directory");
+
+ // The basic idea:
+ // get parent. if parent = null (happens if dir is root), fail. You can't really make
+ // the root directory...
+ // if parent doesn't exist, recursively make parent; fail if this fails.
+ // ceph_mkdir desired path.
+
+ Path parent = path.getParent();
+ boolean result = true;
+
+ // if the parent's null, we're trying to create the root directory. This is BAD.
+ if (null == parent) {
+ System.out.println("Error: failed making directory \"" + abs_path.toString() +
+ "\": directory has null parent (directory is root)") ;
+ result = false;
+ }
+ else {
+ // try to make the parent if it doesn't exist
+ if (!exists(parent)) {
+ //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() +
+ // "does not exist. Recursively creating:");
+ if(!mkdirs(parent)) {
+ System.out.println("mkdirs: failed creating directory \"" +
+ abs_path.toString() +
+ "because of failure recursively creating parent" +
+ parent.toString());
+ result = false;
+ }
+ }
+ }
+ // try to make the directory, unless the parent was null or we
+ // tried and failed to make the parent
+ if (result) {
+ result = ceph_mkdir(abs_path.toString());
+ }
+ //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() +
+ // ": result is " + result);
+ return result;
+ }
+
+
+ @Override
+ public boolean isDirectory(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+
+
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning true for isDirectory(/)");
+ result = true;
+ }
+ else
+ result = ceph_isdirectory(abs_path.toString());
+ //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+ return result;
+ }
+
+ @Override
+ public boolean isFile(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+ boolean result;
+ if (abs_path.toString().equals("/"))
+ {
+ //System.out.println("Bug workaround! returning false for isFile(/)");
+ result = false;
+ }
+ else
+ {
+ result = ceph_isfile(abs_path.toString());
+ }
+ //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" +
+ // abs_path.toString() + "\", result = " + result);
+
+ return result;
+ }
+
+
+ @Override
+ public Path[] listPathsRaw(Path path) throws IOException {
+
+ String dirlist[];
+
+ Path abs_path = makeAbsolute(path);
+
+ //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \""
+ // + abs_path.toString() + "\"");
+
+ // If it's a directory, get the listing. Otherwise, complain and give up.
+ if (isDirectory(abs_path))
+ dirlist = ceph_getdir(abs_path.toString());
+ else
+ {
+ if (exists(abs_path)) { }
+ // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
+ // "\" failed; the path is not a directory.");
+ else {}
+ // System.out.println("listPathsRaw on path \"" + abs_path.toString() +
+ // "\" failed; the path does not exist.");
+ return null;
+ }
+
+
+ // convert the strings to Paths
+ Path paths[] = new Path[dirlist.length];
+ for(int i = 0; i < dirlist.length; ++i) {
+ //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
+ // dirlist[i] + "\"");
+
+ // convert each listing to an absolute path
+ Path raw_path = new Path(dirlist[i]);
+ if (raw_path.isAbsolute())
+ paths[i] = raw_path;
+ else
+ paths[i] = new Path(abs_path, raw_path);
+ }
+ return paths;
+ }
+
+ @Override
+ public FSOutputStream createRaw(Path file, boolean overwrite,
+ short replication, long blockSize) throws IOException {
+
+ return createRaw(file, overwrite, replication, blockSize, null);
+ }
+
+
+ @Override
+ public FSOutputStream createRaw(Path file, boolean overwrite,
+ short replication, long blockSize, Progressable progress)
+ throws IOException {
+
+
+ Path absfilepath = makeAbsolute(file);
+
+ //System.out.println("createRaw: opening path \"" + file.toString() + "\" as absolute path \""
+ // + absfilepath.toString() + "\" for writing");
+
+ // we're ignoring progress reporting and replication entirely.
+ // required semantics: if the file already exists, overwrite
+ // it if overwrite = true and throw an exception if
+ // overwrite = false.
+
+ // Step 1: existence test
+ if(isDirectory(absfilepath))
+ throw new IOException("createRaw: Cannot make an output stream to existing directory \""
+ + absfilepath.toString() + "\"");
+
+ if (!overwrite) {
+ if (!exists(absfilepath)) {
+ throw new IOException("createRaw: Cannot open existing file \"" + absfilepath.toString()
+ + "\" for writing without overwrite flag");
+ }
+ }
+
+ // Step 2: do the directories in the path exist?
+ Path parent = absfilepath.getParent();
+ if (parent != null) // if parent is root, we're done
+ {
+ if(!exists(parent))
+ {
+ //System.out.println("createRaw: parent directory of path \""
+ // + absfilepath.toString() + "\" does not exist. Creating:");
+ mkdirs(parent);
+ }
+ }
+
+ // Step 3: open the file
+ int fh = ceph_open_for_overwrite(absfilepath.toString());
+ if (fh < 0) {
+ throw new IOException("createRaw: Open for overwrite failed on path \"" +
+ absfilepath.toString() + "\"");
+ }
+
+ // Step 4: create the stream
+ CephOutputStream result = new CephOutputStream(getConf(), clientPointer, fh);
+ //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString()
+ // + "\" for writing with fh " + fh);
+
+ return result;
+ }
+
+ @Override
+ public FSInputStream openRaw(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+
+ //System.out.println("openRaw: opening path \"" + path.toString() + "\" as absolute path \""
+ // + abs_path.toString() + "\" for reading");
+
+ if(!isFile(abs_path))
+ {
+ String error;
+ if (!exists(abs_path))
+ throw new IOException("openRaw: absolute path \"" + abs_path.toString()
+ + "\" does not exist");
+ else
+ throw new IOException("openRaw: absolute path \"" + abs_path.toString()
+ + "\" is not a file");
+ }
+
+ int fh = ceph_open_for_read(abs_path.toString());
+ if (fh < 0) {
+ throw new IOException("openRaw: Failed to open file " + abs_path.toString());
+ }
+
+ long size = ceph_getfilesize(abs_path.toString());
+ if (size < 0) {
+ throw new IOException("Failed to get file size for file " + abs_path.toString() +
+ " but succeeded in opening file. Something excitingly bizarre is going on.");
+ }
+
+ FSInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
+
+ //System.out.println("openRaw: opened absolute path \"" + abs_path.toString() +
+ // "\" for reading with fd " + fh + " and size " + size);
+
+ return result;
+ }
+
+ @Override
+ public boolean renameRaw(Path src, Path dst) throws IOException {
+ // TODO: Check corner cases: dst already exists,
+ // or if path is directory with children
+
+ return ceph_rename(src.toString(), dst.toString());
+ }
+
+ @Override
+ public boolean deleteRaw(Path path) throws IOException {
+
+ Path abs_path = makeAbsolute(path);
+
+ //System.out.println("deleteRaw: Deleting path " +
+ // abs_path.toString());
+ if (abs_path.toString().equals("/"))
+ throw new IOException("Error: attempting to delete the root directory");
+
+ // if it's a file, try to delete it.
+ if (isFile(abs_path)) {
+ boolean result = ceph_unlink(path.toString());
+ if(!result) {
+ // System.out.println("deleteRaw: failed to delete file \"" +
+ // abs_path.toString() + "\".");
+ return false;
+ }
+ else
+ return true;
+ }
+
+ // if it's a directory, recursively try to delete its contents,
+ // and then delete the directory.
+ // Otherwise, return false.
+
+ Path[] contents = listPathsRaw(path);
+ if (contents == null) {
+ // System.out.println("deleteRaw: Failed to read contents of directory \"" +
+ // abs_path.toString() + "\" while trying to delete it");
+ return false;
+ }
+
+ // debug listing of directory contents
+ //System.out.println("deleteRaw: Enumerating contents of directory \"" +
+ // abs_path.toString() + "\" to delete");
+
+ // for (Path p : contents) {
+ // System.out.println(" Contains path \"" + p.toString() + "\", absolute path is \""
+ // + makeAbsolute(p).toString() + "\"");
+ //}
+
+ // recursively delete the directory's contents
+ Path parent = abs_path.getParent();
+ for (Path p : contents) {
+ // skip self ("." directory entry)
+ if (makeAbsolute(p).equals(abs_path)) {
+ //System.out.println("Skipping self");
+ continue;
+ }
+ // skip parent (".." directory entry)
+ if (null != parent) {
+ if (p.equals(parent)) {
+ //System.out.println("Skipping parent");
+ continue;
+ }
+ }
+
+ //System.out.println("Path " + abs_path.toString() + " contains path "
+ // + makeAbsolute(p).toString() +
+ // ". Attempting to delete it recursively:");
+ if (!deleteRaw(p)) {
+ // System.out.println("deleteRaw: Failed to delete file \"" +
+ // p.toString() + "\" while recursively deleting \""
+ // + abs_path.toString() + "\"" );
+ return false;
+ }
+ }
+
+ boolean result = ceph_unlink(path.toString());
+ if (!result)
+ System.out.println("deleteRaw: failed to delete \"" + abs_path.toString() + "\"");
+ return result;
+ }
+
+
+ @Override
+ public long getLength(Path path) throws IOException {
+ Path abs_path = makeAbsolute(path);
+
+ if (!exists(abs_path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: File or directory " + abs_path.toString() + " does not exist.");
+ }
+
+ long filesize = ceph_getfilesize(abs_path.toString());
+ if (filesize < 0) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved.");
+ }
+ return filesize;
+
+ }
+
+ /**
+ * User-defined replication is not supported for Ceph file systems at the moment.
+ */
+ @Override
+ public short getReplication(Path path) throws IOException {
+ return 1;
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return 1;
+ }
+
+ /**
+ * User-defined replication is not supported for Ceph file systems at the moment.
+ */
+ @Override
+ public boolean setReplicationRaw(Path path, short replication)
+ throws IOException {
+ return true;
+ }
+
+ @Override
+ public long getBlockSize(Path path) throws IOException {
+
+ if (!exists(path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
+ }
+ long result = ceph_getblocksize(path.toString());
+ if (!isFile(path)) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file.");
+ }
+ else {
+ System.err.println("DEBUG: getBlockSize: alleged file really is a file");
+ }
+ if (result < 4096) {
+ System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " +
+ "path exists; strange block size of " + result + " defaulting to 8192");
+ return 8192;
+ }
+
+
+ return result;
+ //return DEFAULT_BLOCK_SIZE;
+ // return ceph_getblocksize(path.toString());
+
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return DEFAULT_BLOCK_SIZE;
+ //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
+ }
+
+ /**
+ * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+ */
+ @Override
+ public String[][] getFileCacheHints(Path f, long start, long len)
+ throws IOException {
+ // TODO: Check this is the correct behavior
+ if (!exists(f)) {
+ return null;
+ }
+ return new String[][] { { "localhost" } };
+ }
+
+ @Override
+ public void lock(Path path, boolean shared) throws IOException {
+ // TODO: Design and implement? or just ignore locking?
+ return;
+ }
+
+ @Override
+ public void release(Path path) throws IOException {
+ return; //deprecated
+ }
+
+ @Override
+ public void reportChecksumFailure(Path f,
+ FSInputStream in, long inPos,
+ FSInputStream sums, long sumsPos) {
+ // TODO: What to do here?
+ return;
+ }
+
+ @Override
+ public void moveFromLocalFile(Path src, Path dst) throws IOException {
+ if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString());
+ }
+ //FileUtil.copy(localFs, src, this, dst, true, getConf());
+ }
+
+ @Override
+ public void copyFromLocalFile(Path src, Path dst) throws IOException {
+ // make sure Ceph path exists
+ Path abs_src = makeAbsolute(src);
+ Path abs_dst = makeAbsolute(dst);
+
+ if (isDirectory(abs_dst))
+ throw new IOException("Error in copyFromLocalFile: " +
+ "attempting to open an existing directory as a file");
+ Path abs_dst_parent = abs_dst.getParent();
+
+ if (!exists(abs_dst_parent))
+ mkdirs(abs_dst_parent);
+
+ if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) {
+ throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString());
+ }
+ //FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+ }
+
+
+
+ @Override
+ public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
+
+ Path abs_ceph_src = makeAbsolute(ceph_src);
+
+ //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() +
+ // "\" to local file \"" + local_dst.toString() + "\" using client "
+ // + clientPointer);
+
+ // make sure the alleged source file exists, and is actually a file, not
+ // a directory or a ballpoint pen or something
+ if (!isFile(abs_ceph_src)) {
+ if (!exists(abs_ceph_src)) {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \""
+ + local_dst.toString() +
+ "\" because the source file does not exist");
+ }
+ else {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\" because the Ceph path is not a file");
+ }
+ }
+
+ // if the destination's parent directory doesn't exist, create it.
+ Path local_dst_parent_dir = local_dst.getParent();
+ if(null == local_dst_parent_dir)
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\": destination is root");
+
+ if(!localFs.mkdirs(local_dst_parent_dir))
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \"" +
+ local_dst.toString() +
+ "\": creating the destination's parent directory failed.");
+ else
+ {
+ if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString()))
+ {
+ throw new IOException("copyToLocalFile: failed copying Ceph file \"" +
+ abs_ceph_src.toString() + "\" to local file \""
+ + local_dst.toString() + "\"");
+ }
+ }
+ //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() +
+ // "\" to local file \"" + local_dst.toString() + "\"");
+ }
+
+
+ @Override
+ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ return tmpLocalFile;
+ }
+
+ @Override
+ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+ throws IOException {
+ moveFromLocalFile(tmpLocalFile, fsOutputFile);
+ }
+
+ // diagnostic methods
+
+ /* void dump() throws IOException {
+ store.dump();
+ }
+
+ void purge() throws IOException {
+ store.purge();
+ }*/
+
+}
--- /dev/null
+
+
+
+
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by a <a href="ceph.sourceforge.net">Ceph</a> store.
+ * </p>
+ */
+public class CephFileSystem extends Filesystem {
+
+
+ private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+ private URI uri;
+
+
+ public CephFileSystem() {
+ // perform all setup in initialize()
+ }
+
+ @Override
+ public URI getURI() {
+ return uri;
+ }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ setConf(conf);
+
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+
+ // TODO: local filesystem? we really need to figure out this conf thingy
+ this.localFs = get(URI.create("file:///"), conf);
+
+ // Initializes the client
+ this.clientPointer = ceph_initializeClient();
+ System.out.println("Initialized client with pointer " + clientPointer
+ + ". Setting cwd to /");
+ ceph_setcwd("/");
+ }
+
+
+
+ @Override
+ public void close() throws IOException {
+ System.out.println("Pretending to shut down client with pointer " + clientPointer
+ + ". Not really doing anything.");
+ }
+
+
+}
+
--- /dev/null
+package org.apache.hadoop.fs.ceph;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+//import java.lang.IndexOutOfBoundsException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+
+class CephInputStream extends FSInputStream {
+
+ static {
+ System.loadLibrary("hadoopcephfs");
+ }
+
+ private int bufferSize;
+
+ //private Block[] blocks;
+
+ private boolean closed;
+
+ private long clientPointer;
+
+ private int fileHandle;
+
+ private long fileLength;
+
+ //private long pos = 0;
+
+ //private DataInputStream blockStream;
+
+ //private long blockEnd = -1;
+
+ private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length);
+ private native long ceph_seek_from_start(long client, int fh, long pos);
+ private native long ceph_getpos(long client, int fh);
+ private native int ceph_close(long client, int fh);
+
+ private int ceph_read(byte[] buffer, int buffer_offset, int length)
+ { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
+ private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); }
+ private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); }
+ private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
+
+ /*
+ public S3InputStream(Configuration conf, FileSystemStore store,
+ INode inode) {
+
+ this.store = store;
+ this.blocks = inode.getBlocks();
+ for (Block block : blocks) {
+ this.fileLength += block.getLength();
+ }
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ }
+ */
+
+ public CephInputStream(Configuration conf, long clientp, int fh, long flength) {
+
+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
+ // call and providing the file handle.
+ clientPointer = clientp;
+ fileLength = flength;
+ fileHandle = fh;
+ //System.out.println("CephInputStream constructor: initializing stream with fh "
+ // + fh + " and file length " + flength);
+
+ // TODO: Then what do we need from the config? The buffer size maybe?
+ // Anything? Bueller?
+
+ }
+
+ @Override
+ public synchronized long getPos() throws IOException {
+ return ceph_getpos();
+ }
+
+ @Override
+ public synchronized int available() throws IOException {
+ return (int) (fileLength - getPos());
+ }
+
+ @Override
+ public synchronized void seek(long targetPos) throws IOException {
+ //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
+ // " on fd " + fileHandle);
+ if (targetPos > fileLength) {
+ throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+ }
+ ceph_seek_from_start(targetPos);
+ }
+
+
+ // reads a byte
+ @Override
+ public synchronized int read() throws IOException {
+ //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ // + " by calling general read function");
+
+ byte result[] = new byte[1];
+ if (getPos() >= fileLength) return -1;
+ if (-1 == read(result, 0, 1)) return -1;
+ return result[0];
+ }
+
+
+ @Override
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
+ //System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
+
+ if (closed) {
+ throw new IOException("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": stream closed");
+ }
+ if (null == buf) {
+ throw new NullPointerException("Read buffer is null");
+ }
+
+ // check for proper index bounds
+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
+ throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+ + "read length is " + len + ", buffer offset is "
+ + off +", and buffer size is " + buf.length);
+ }
+
+ // ensure we're not past the end of the file
+ if (getPos() >= fileLength)
+ {
+ System.out.println("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": current position is " +
+ getPos() + " and file length is " + fileLength);
+
+ return -1;
+ }
+ // actually do the read
+ int result = ceph_read(buf, off, len);
+ if (result < 0)
+ System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
+ + fileHandle + " failed.");
+ else {}
+ // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
+ // + fileHandle + ": succeeded in reading " + result + " bytes");
+
+
+
+ return result;
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ int result = ceph_close();
+ if (result != 0) {
+ throw new IOException("Close failed!");
+ }
+
+ closed = true;
+ }
+
+ /**
+ * We don't support marks.
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readLimit) {
+ // Do nothing
+ }
+
+ @Override
+ public void reset() throws IOException {
+ throw new IOException("Mark not supported");
+ }
+
+}
--- /dev/null
+package org.apache.hadoop.fs.ceph;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+class CephOutputStream extends FSDataOutputStream {
+
+ static {
+ System.loadLibrary("hadoopcephfs");
+ }
+
+
+ private int bufferSize;
+
+ private long fileLength;
+
+ //private FileSystemStore store;
+
+ private Path path;
+
+ private long blockSize;
+
+ private File backupFile;
+
+ private OutputStream backupStream;
+
+ private Random r = new Random();
+
+ private boolean closed;
+
+ private int fileHandle;
+
+ private long clientPointer;
+
+ private int bytesWrittenToBlock = 0;
+
+ private byte[] outBuf;
+
+ //private List<Block> blocks = new ArrayList<Block>();
+
+ //private Block nextBlock;
+
+
+
+ private long ceph_seek_from_start(long pos) {
+ return ceph_seek_from_start(clientPointer, fileHandle, pos);
+ }
+ private long ceph_getpos() {
+ return ceph_getpos(clientPointer, fileHandle);
+ }
+ private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
+ private int ceph_write(byte[] buffer, int buffer_offset, int length)
+ { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
+
+
+ private native long ceph_seek_from_start(long client, int fh, long pos);
+ private native long ceph_getpos(long client, int fh);
+ private native int ceph_close(long client, int fh);
+ private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length);
+
+
+ /* public CephOutputStream(Configuration conf, FileSystemStore store,
+ Path path, long blockSize, Progressable progress) throws IOException {
+
+ // basic pseudocode:
+ // call ceph_open_for_write to open the file
+ // store the file handle
+ // store the client pointer
+ // look up and store the block size while we're at it
+ // the following code's old. kill it
+
+ this.store = store;
+ this.path = path;
+ this.blockSize = blockSize;
+ this.backupFile = newBackupFile();
+ this.backupStream = new FileOutputStream(backupFile);
+ this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ this.outBuf = new byte[bufferSize];
+
+ }*/
+
+
+ // The file handle
+ public CephOutputStream(Configuration conf, long clientp, int fh) {
+ clientPointer = clientp;
+ fileHandle = fh;
+ //fileLength = flength;
+ closed = false;
+ }
+
+ // possibly useful for the local copy, write later thing?
+ // keep it around for now
+ private File newBackupFile() throws IOException {
+ File result = File.createTempFile("s3fs-out", "");
+ result.deleteOnExit();
+ return result;
+ }
+
+
+ @Override
+ public long getPos() throws IOException {
+ // change to get the position from Ceph client
+ return ceph_getpos();
+ }
+
+ // writes a byte
+ @Override
+ public synchronized void write(int b) throws IOException {
+ //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle);
+
+ if (closed) {
+ throw new IOException("CephOutputStream.write: cannot write " +
+ "a byte to fd " + fileHandle + ": stream closed");
+ }
+ // Stick the byte in a buffer and write it
+ byte buf[] = new byte[1];
+ buf[0] = (byte) b;
+ int result = ceph_write(buf, 0, 1);
+ if (1 != result)
+ System.out.println("CephOutputStream.write: failed writing a single byte to fd "
+ + fileHandle + ": Ceph write() result = " + result);
+ return;
+ }
+
+ @Override
+ public synchronized void write(byte buf[], int off, int len) throws IOException {
+ //System.out.println("CephOutputStream.write: writing " + len +
+ // " bytes to fd " + fileHandle);
+
+ // make sure stream is open
+ if (closed) {
+ throw new IOException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": stream closed");
+ }
+
+ // sanity check
+ if (null == buf) {
+ throw new NullPointerException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": write buffer is null");
+ }
+
+ // check for proper index bounds
+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
+ throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
+ + "write length is " + len + ", buffer offset is "
+ + off +", and buffer size is " + buf.length);
+ }
+
+ // write!
+ int result = ceph_write(buf, off, len);
+ if (result < 0) {
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + " failed");
+ }
+ if (result != len) {
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + "was incomplete: only "
+ + result + " of " + len + " bytes were written.");
+ }
+ return;
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ return;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+
+ int result = ceph_close();
+ if (result != 0) {
+ throw new IOException("Close failed!");
+ }
+
+ closed = true;
+
+ }
+
+
+}
+
+