From 391b2fa0e5d5b5df1e8f067933a795d8343e56e1 Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 21 Jul 2009 13:27:25 -0700 Subject: [PATCH] Added Hadoop fs components. --- src/client/hadoop/ceph/CephException.java | 12 + src/client/hadoop/ceph/CephFileSystem.java | 734 +++++++++++++++++ .../hadoop/ceph/CephFileSystem.java.old | 755 ++++++++++++++++++ .../hadoop/ceph/CephFileSystem.java.tmp | 59 ++ src/client/hadoop/ceph/CephInputStream.java | 189 +++++ src/client/hadoop/ceph/CephOutputStream.java | 201 +++++ 6 files changed, 1950 insertions(+) create mode 100644 src/client/hadoop/ceph/CephException.java create mode 100644 src/client/hadoop/ceph/CephFileSystem.java create mode 100644 src/client/hadoop/ceph/CephFileSystem.java.old create mode 100644 src/client/hadoop/ceph/CephFileSystem.java.tmp create mode 100644 src/client/hadoop/ceph/CephInputStream.java create mode 100644 src/client/hadoop/ceph/CephOutputStream.java diff --git a/src/client/hadoop/ceph/CephException.java b/src/client/hadoop/ceph/CephException.java new file mode 100644 index 0000000000000..0b9332414b561 --- /dev/null +++ b/src/client/hadoop/ceph/CephException.java @@ -0,0 +1,12 @@ +package org.apache.hadoop.fs.ceph; + +/** + * Thrown if something goes wrong with Ceph. + */ +public class CephException extends RuntimeException { + + public CephException(Throwable t) { + super(t); + } + +} diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java new file mode 100644 index 0000000000000..5be6fea3138bb --- /dev/null +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -0,0 +1,734 @@ +// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +package org.apache.hadoop.fs.ceph; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.fs.FileStatus; + +/** + *

+ * A {@link FileSystem} backed by Ceph.. + * This will not start a Ceph instance; one must already be running. + *

+ * @author Esteban Molina-Estolano + */ +public class CephFileSystem extends FileSystem { + + private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024; + + static { + System.loadLibrary("hadoopcephfs"); + } + + private URI uri; + + private FileSystem localFs; + + private long clientPointer; + + private Path root; + + private Path parent; + + //private Path workingDir = new Path("/user", System.getProperty("user.name")); + + /* + * Native Ceph methods. Each one has long client as a parameter, + * which should always be the member variable clientPointer. This is + * to avoid the hideous JNI code required to access the member variable + * from C++. clientPointer is set at initialization of the + * CephFileSystem instance, and should be untouched thereafter. I + * include wrapper functions to automatically add the parameter. + */ + + private boolean ceph_copyFromLocalFile(String localPath, String cephPath) + { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); } + private boolean ceph_copyToLocalFile(String cephPath, String localPath) + { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); } + private String ceph_getcwd() { return ceph_getcwd(clientPointer); } + private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); } + private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); } + private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); } + private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); } + private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); } + private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); } + private long ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); } + private long ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); } + private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); } + private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); } + private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); } + private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); } + private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); } + + private boolean ceph_kill_client() { + System.out.println("Killing Ceph client with pointer " + clientPointer); + return ceph_kill_client(clientPointer); + } + + private native long ceph_initializeClient(); + private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath); + private native boolean ceph_copyToLocalFile (long client, String cephPath, String localPath); + private native String ceph_getcwd (long client); + private native boolean ceph_setcwd (long client, String path); + private native boolean ceph_rmdir (long client, String path); + private native boolean ceph_mkdir (long client, String path); + private native boolean ceph_unlink (long client, String path); + private native boolean ceph_rename (long client, String old_path, String new_path); + private native boolean ceph_exists (long client, String path); + private native long ceph_getblocksize (long client, String path); + private native long ceph_getfilesize (long client, String path); + private native boolean ceph_isdirectory (long client, String path); + private native boolean ceph_isfile (long client, String path); + private native String[]ceph_getdir (long client, String path); + private native int ceph_open_for_read (long client, String path); + private native int ceph_open_for_overwrite(long client, String path); + private native boolean ceph_kill_client (long client); + + + + + public CephFileSystem() { + root = new Path("/"); + parent = new Path(".."); + } + + /* + public S3FileSystem(FileSystemStore store) { + this.store = store; + } */ + + @Override + public URI getUri() { + return uri; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + //store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + // TODO: local filesystem? we really need to figure out this conf thingy + this.localFs = get(URI.create("file:///"), conf); + + // Initializes the client + this.clientPointer = ceph_initializeClient(); + System.out.println("Initialized client with pointer " + clientPointer + + ". Setting cwd to /"); + ceph_setcwd("/"); + // DEBUG + // attempt to do three exists operations on root + //System.out.println("DEBUG: attempting isdir() on root (/)"); + // ceph_isdirectory("/"); + //System.out.println("DEBUG: attempting exists() on root (/)"); + //ceph_exists("/"); + } + + @Override + public void close() throws IOException { + System.out.println("Pretending to shut down client with pointer " + clientPointer + + ". Not really doing anything."); + } + + @Override + public String getName() { + return getUri().toString(); + } + + @Override + public Path getWorkingDirectory() { + return makeAbsolute(new Path(ceph_getcwd())); + } + + @Override + public void setWorkingDirectory(Path dir) { + Path abs_path = makeAbsolute(dir); + + // error conditions if path's not a directory + boolean isDir = false; + boolean path_exists = false; + try { + isDir = isDirectory(abs_path); + path_exists = exists(abs_path); + } + + catch (IOException e) { + System.out.println("Warning: isDirectory threw an exception"); + } + + if (!isDir) { + if (path_exists) + System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + + "): path is not a directory"); + else + System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + + "): path does not exist"); + } + else { + ceph_setcwd(dir.toString()); + } + //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() + + // "changes cwd to" + getWorkingDirectory().toString()); + } + + // Makes a Path absolute. In a cheap, dirty hack, we're + // also going to strip off any "ceph://null" prefix we see. + private Path makeAbsolute(Path path) { + // first, check for the prefix + if (path.toString().startsWith("ceph://null")) { + + Path stripped_path = new Path(path.toString().substring("ceph://null".length())); + //System.out.println("Stripping path \"" + path.toString() + "\" to \"" + // + stripped_path.toString() + "\""); + return stripped_path; + } + + + if (path.isAbsolute()) { + return path; + } + Path wd = getWorkingDirectory(); + //System.out.println("Working directory is " + wd.toString()); + if (wd.toString().equals("")) + return new Path(root, path); + else + return new Path(wd, path); + } + + private String[] getEmptyStringArray(int size) { + return new String[size]; + } + + @Override + public boolean exists(Path path) throws IOException { + boolean result; + Path abs_path = makeAbsolute(path); + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning true for exists(/)"); + result = true; + } + else + { + //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":"); + result = ceph_exists(abs_path.toString()); + //System.out.println("Returned from ceph_exists to Java"); + } + // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + + return result; + } + + + /* Creates the directory and all nonexistent parents. */ + @Override + public boolean mkdirs(Path path) throws IOException { + + Path abs_path = makeAbsolute(path); + //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" + + // abs_path.toString() + "\""); + + // If the directory exists, we're happy, right? less work for us! + + if(exists(abs_path)) + throw new IOException("Error: attempting to create an existing directory"); + + // The basic idea: + // get parent. if parent = null (happens if dir is root), fail. You can't really make + // the root directory... + // if parent doesn't exist, recursively make parent; fail if this fails. + // ceph_mkdir desired path. + + Path parent = path.getParent(); + boolean result = true; + + // if the parent's null, we're trying to create the root directory. This is BAD. + if (null == parent) { + System.out.println("Error: failed making directory \"" + abs_path.toString() + + "\": directory has null parent (directory is root)") ; + result = false; + } + else { + // try to make the parent if it doesn't exist + if (!exists(parent)) { + //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() + + // "does not exist. Recursively creating:"); + if(!mkdirs(parent)) { + System.out.println("mkdirs: failed creating directory \"" + + abs_path.toString() + + "because of failure recursively creating parent" + + parent.toString()); + result = false; + } + } + } + // try to make the directory, unless the parent was null or we + // tried and failed to make the parent + if (result) { + result = ceph_mkdir(abs_path.toString()); + } + //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() + + // ": result is " + result); + return result; + } + + + + // @Override + + public boolean __isDirectory(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + boolean result; + + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning true for isDirectory(/)"); + result = true; + } + else + result = ceph_isdirectory(abs_path.toString()); + //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + return result; + } + + @Override + public boolean isFile(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + boolean result; + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning false for isFile(/)"); + result = false; + } + else + { + result = ceph_isfile(abs_path.toString()); + } + //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + + return result; + } + + @Override + public FileStatus getFileStatus(Path p) throws IOException { + // For the moment, hardwired block size and replication + Path abs_p = makeAbsolute(p); + return new FileStatus(__getLength(abs_p), __isDirectory(abs_p), 2, + 8388608, 0, abs_p); + } + + + // array of statuses for the directory's contents + // steal or factor out iteration code from delete() + @Override + public FileStatus[] listStatus(Path p) throws IOException { + Path abs_p = makeAbsolute(p); + + } + + + @Override + public Path[] listPathsRaw(Path path) throws IOException { + + String dirlist[]; + + Path abs_path = makeAbsolute(path); + + //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \"" + // + abs_path.toString() + "\""); + + // If it's a directory, get the listing. Otherwise, complain and give up. + if (isDirectory(abs_path)) + dirlist = ceph_getdir(abs_path.toString()); + else + { + if (exists(abs_path)) { } + // System.out.println("listPathsRaw on path \"" + abs_path.toString() + + // "\" failed; the path is not a directory."); + else {} + // System.out.println("listPathsRaw on path \"" + abs_path.toString() + + // "\" failed; the path does not exist."); + return null; + } + + + // convert the strings to Paths + Path paths[] = new Path[dirlist.length]; + for(int i = 0; i < dirlist.length; ++i) { + //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + + // dirlist[i] + "\""); + + // convert each listing to an absolute path + Path raw_path = new Path(dirlist[i]); + if (raw_path.isAbsolute()) + paths[i] = raw_path; + else + paths[i] = new Path(abs_path, raw_path); + } + return paths; + } + + public FSDataOutputStream create(Path f, + boolean overwrite, + int bufferSize, + short replication, + long blockSize, + Progressable progress + ) throws IOException { + + + Path absfilepath = makeAbsolute(file); + + // We ignore progress reporting and replication. + // Required semantics: if the file exists, overwrite if overwrite == true, and + // throw an exception if overwrite == false. + + // Step 1: existence test + if(isDirectory(absfilepath)) + throw new IOException("create: Cannot overwrite existing directory \"" + + absfilepath.toString() + "\" with a file"); + if (!overwrite) { + if (exists(absfilepath)) { + throw new IOException("createRaw: Cannot open existing file \"" + + absfilepath.toString() + + "\" for writing without overwrite flag"); + } + } + + // Step 2: create any nonexistent directories in the path + Path parent = absfilepath.getParent(); + if (parent != null) { // if parent is root, we're done + if(!exists(parent)) { + //System.out.println("createRaw: parent directory of path \"" + // + absfilepath.toString() + "\" does not exist. Creating:"); + mkdirs(parent); + } + } + + // Step 3: open the file + int fh = ceph_open_for_overwrite(absfilepath.toString()); + if (fh < 0) { + throw new IOException("createRaw: Open for overwrite failed on path \"" + + absfilepath.toString() + "\""); + } + + // Step 4: create the stream + FSDataOutputStream result = new CephOutputStream(getConf(), clientPointer, fh); + //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString() + // + "\" for writing with fh " + fh); + + return result; + } + + + + // Opens a Ceph file and attaches the file handle to an FSDataInputStream. + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + Path abs_path = makeAbsolute(path); + + if(!isFile(abs_path)) { + if (!exists(abs_path)) + throw new IOException("open: absolute path \"" + abs_path.toString() + + "\" does not exist"); + else + throw new IOException("open: absolute path \"" + abs_path.toString() + + "\" is not a file"); + } + + int fh = ceph_open_for_read(abs_path.toString()); + if (fh < 0) { + throw new IOException("open: Failed to open file " + abs_path.toString()); + } + long size = ceph_getfilesize(abs_path.toString()); + if (size < 0) { + throw new IOException("Failed to get file size for file " + abs_path.toString() + + " but succeeded in opening file. Something bizarre is going on."); + } + FSDataInputStream result = new CephInputStream(getConf(), clientPointer, fh, size); + return result; + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + // TODO: Check corner cases: dst already exists, + // or path is directory with children + + return ceph_rename(src.toString(), dst.toString()); + } + + @Override + public boolean delete(Path path) throws IOException { + + Path abs_path = makeAbsolute(path); + + //System.out.println("deleteRaw: Deleting path " + abs_path.toString()); + // sanity check + if (abs_path.toString().equals("/")) + throw new IOException("Error: deleting the root directory is a Bad Idea."); + + // if the path is a file, try to delete it. + if (isFile(abs_path)) { + boolean result = ceph_unlink(path.toString()); + if(!result) { + // System.out.println("deleteRaw: failed to delete file \"" + + // abs_path.toString() + "\"."); + return false; + } + else + return true; + } + + /* If the path is a directory, recursively try to delete its contents, + and then delete the directory. */ + + Path[] contents = listPathsRaw(path); + if (contents == null) { + // System.out.println("deleteRaw: Failed to read contents of directory \"" + + // abs_path.toString() + "\" while trying to delete it"); + return false; + } + + // recursively delete, skipping "." and ".." entries + Path parent = abs_path.getParent(); + for (Path p : contents) { + if (makeAbsolute(p).equals(abs_path)) continue; // "." entry + if (null != parent) { + if (p.equals(parent)) continue; // ".." entry + } + + if (!deleteRaw(p)) { + // System.out.println("deleteRaw: Failed to delete file \"" + + // p.toString() + "\" while recursively deleting \"" + // + abs_path.toString() + "\"" ); + return false; + } + } + + boolean result = ceph_unlink(path.toString()); + if (!result) + System.out.println("delete: failed to delete \"" + abs_path.toString() + "\""); + return result; + } + + + //@Override + private long __getLength(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + + if (!exists(abs_path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.__getLength: File or directory " + abs_path.toString() + " does not exist."); + } + + long filesize = ceph_getfilesize(abs_path.toString()); + if (filesize < 0) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved."); + } + return filesize; + } + + /** + * User-defined replication is not supported for Ceph file systems at the moment. + */ + @Override + public short getReplication(Path path) throws IOException { + return 1; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + /** + * User-defined replication is not supported for Ceph file systems at the moment. + */ + @Override + public boolean setReplicationRaw(Path path, short replication) + throws IOException { + return true; + } + + @Override + public long getBlockSize(Path path) throws IOException { + + if (!exists(path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist."); + } + long result = ceph_getblocksize(path.toString()); + if (!isFile(path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file."); + } + else { + System.err.println("DEBUG: getBlockSize: alleged file really is a file"); + } + if (result < 4096) { + System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " + + "path exists; strange block size of " + result + " defaulting to 8192"); + return 8192; + } + + + return result; + //return DEFAULT_BLOCK_SIZE; + // return ceph_getblocksize(path.toString()); + + } + + @Override + public long getDefaultBlockSize() { + return DEFAULT_BLOCK_SIZE; + //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE); + } + + /** + * Return 1x1 'localhost' cell if the file exists. Return null if otherwise. + */ + @Override + public String[][] getFileCacheHints(Path f, long start, long len) + throws IOException { + // TODO: Check this is the correct behavior + if (!exists(f)) { + return null; + } + return new String[][] { { "localhost" } }; + } + + @Override + public void lock(Path path, boolean shared) throws IOException { + // TODO: Design and implement? or just ignore locking? + return; + } + + @Override + public void release(Path path) throws IOException { + return; //deprecated + } + + /* old API + @Override + public void reportChecksumFailure(Path f, + FSDataInputStream in, long inPos, + FSDataInputStream sums, long sumsPos) { + // TODO: What to do here? + return; + } */ + + @Override + public void moveFromLocalFile(Path src, Path dst) throws IOException { + if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString()); + } + //FileUtil.copy(localFs, src, this, dst, true, getConf()); + } + + @Override + public void copyFromLocalFile(Path src, Path dst) throws IOException { + // make sure Ceph path exists + Path abs_src = makeAbsolute(src); + Path abs_dst = makeAbsolute(dst); + + if (isDirectory(abs_dst)) + throw new IOException("Error in copyFromLocalFile: " + + "attempting to open an existing directory as a file"); + Path abs_dst_parent = abs_dst.getParent(); + + if (!exists(abs_dst_parent)) + mkdirs(abs_dst_parent); + + if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString()); + } + //FileUtil.copy(localFs, src, this, dst, false, true, getConf()); + } + + + + @Override + public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException { + + Path abs_ceph_src = makeAbsolute(ceph_src); + + //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() + + // "\" to local file \"" + local_dst.toString() + "\" using client " + // + clientPointer); + + // make sure the alleged source file exists, and is actually a file, not + // a directory or a ballpoint pen or something + if (!isFile(abs_ceph_src)) { + if (!exists(abs_ceph_src)) { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\" because the source file does not exist"); + } + else { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\" because the Ceph path is not a file"); + } + } + + // if the destination's parent directory doesn't exist, create it. + Path local_dst_parent_dir = local_dst.getParent(); + if(null == local_dst_parent_dir) + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\": destination is root"); + + if(!localFs.mkdirs(local_dst_parent_dir)) + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\": creating the destination's parent directory failed."); + else + { + if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString())) + { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + "\""); + } + } + //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() + + // "\" to local file \"" + local_dst.toString() + "\""); + } + + + @Override + public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return tmpLocalFile; + } + + @Override + public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + moveFromLocalFile(tmpLocalFile, fsOutputFile); + } + + // diagnostic methods + + /* void dump() throws IOException { + store.dump(); + } + + void purge() throws IOException { + store.purge(); + }*/ + +} diff --git a/src/client/hadoop/ceph/CephFileSystem.java.old b/src/client/hadoop/ceph/CephFileSystem.java.old new file mode 100644 index 0000000000000..3c253a7c60286 --- /dev/null +++ b/src/client/hadoop/ceph/CephFileSystem.java.old @@ -0,0 +1,755 @@ +package org.apache.hadoop.fs.ceph; + +import java.io.IOException; +import java.net.URI; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FSOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +/** + *

+ * A {@link FileSystem} backed by Ceph.. + * This will not start a Ceph instance; one must already be running. + *

+ * @author Esteban Molina-Estolano + */ +public class CephFileSystem extends FileSystem { + + private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024; + + static { + System.loadLibrary("hadoopcephfs"); + } + + private URI uri; + + private FileSystem localFs; + + private long clientPointer; + + private Path root; + + private Path parent; + + //private Path workingDir = new Path("/user", System.getProperty("user.name")); + + /* + * Native Ceph methods. Each one has long client as a parameter, + * which should always be the member variable clientPointer. This is + * to avoid the hideous JNI code required to access the member variable + * from C++. clientPointer is set at initialization of the + * CephFileSystem instance, and should be untouched thereafter. I + * include wrapper functions to automatically add the parameter. + */ + + + private boolean ceph_copyFromLocalFile(String localPath, String cephPath) + { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); } + private boolean ceph_copyToLocalFile(String cephPath, String localPath) + { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); } + private String ceph_getcwd() { return ceph_getcwd(clientPointer); } + private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); } + private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); } + private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); } + private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); } + private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); } + private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); } + private long ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); } + private long ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); } + private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); } + private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); } + private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); } + private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); } + private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); } + + private boolean ceph_kill_client() { + System.out.println("Killing Ceph client with pointer " + clientPointer); + return ceph_kill_client(clientPointer); + } + + private native long ceph_initializeClient(); + private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath); + private native boolean ceph_copyToLocalFile (long client, String cephPath, String localPath); + private native String ceph_getcwd (long client); + private native boolean ceph_setcwd (long client, String path); + private native boolean ceph_rmdir (long client, String path); + private native boolean ceph_mkdir (long client, String path); + private native boolean ceph_unlink (long client, String path); + private native boolean ceph_rename (long client, String old_path, String new_path); + private native boolean ceph_exists (long client, String path); + private native long ceph_getblocksize (long client, String path); + private native long ceph_getfilesize (long client, String path); + private native boolean ceph_isdirectory (long client, String path); + private native boolean ceph_isfile (long client, String path); + private native String[]ceph_getdir (long client, String path); + private native int ceph_open_for_read (long client, String path); + private native int ceph_open_for_overwrite(long client, String path); + private native boolean ceph_kill_client (long client); + + + + + public CephFileSystem() { + root = new Path("/"); + parent = new Path(".."); + } + + /* + public S3FileSystem(FileSystemStore store) { + this.store = store; + } */ + + @Override + public URI getUri() { + return uri; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + //store.initialize(uri, conf); + setConf(conf); + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + // TODO: local filesystem? we really need to figure out this conf thingy + this.localFs = get(URI.create("file:///"), conf); + + // Initializes the client + this.clientPointer = ceph_initializeClient(); + System.out.println("Initialized client with pointer " + clientPointer + + ". Setting cwd to /"); + ceph_setcwd("/"); + // DEBUG + // attempt to do three exists operations on root + //System.out.println("DEBUG: attempting isdir() on root (/)"); + // ceph_isdirectory("/"); + //System.out.println("DEBUG: attempting exists() on root (/)"); + //ceph_exists("/"); + } + + @Override + public void close() throws IOException { + System.out.println("Pretending to shut down client with pointer " + clientPointer + + ". Not really doing anything."); + } + + @Override + public String getName() { + return getUri().toString(); + } + + @Override + public Path getWorkingDirectory() { + return makeAbsolute(new Path(ceph_getcwd())); + } + + @Override + public void setWorkingDirectory(Path dir) { + Path abs_path = makeAbsolute(dir); + + // error conditions if path's not a directory + boolean isDir = false; + boolean path_exists = false; + try { + isDir = isDirectory(abs_path); + path_exists = exists(abs_path); + } + + catch (IOException e) { + System.out.println("Warning: isDirectory threw an exception"); + } + + if (!isDir) { + if (path_exists) + System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + + "): path is not a directory"); + else + System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + + "): path does not exist"); + } + else { + ceph_setcwd(dir.toString()); + } + //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() + + // "changes cwd to" + getWorkingDirectory().toString()); + } + + // Makes a Path absolute. In a cheap, dirty hack, we're + // also going to strip off any "ceph://null" prefix we see. + private Path makeAbsolute(Path path) { + // first, check for the prefix + if (path.toString().startsWith("ceph://null")) { + + Path stripped_path = new Path(path.toString().substring("ceph://null".length())); + //System.out.println("Stripping path \"" + path.toString() + "\" to \"" + // + stripped_path.toString() + "\""); + return stripped_path; + } + + + if (path.isAbsolute()) { + return path; + } + Path wd = getWorkingDirectory(); + //System.out.println("Working directory is " + wd.toString()); + if (wd.toString().equals("")) + return new Path(root, path); + else + return new Path(wd, path); + } + + private String[] getEmptyStringArray(int size) { + return new String[size]; + } + + @Override + public boolean exists(Path path) throws IOException { + boolean result; + Path abs_path = makeAbsolute(path); + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning true for exists(/)"); + result = true; + } + else + { + //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":"); + result = ceph_exists(abs_path.toString()); + //System.out.println("Returned from ceph_exists to Java"); + } + // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + + return result; + } + + + /* Creates the directory and all nonexistent parents. */ + @Override + public boolean mkdirs(Path path) throws IOException { + + Path abs_path = makeAbsolute(path); + //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" + + // abs_path.toString() + "\""); + + // If the directory exists, we're happy, right? less work for us! + + if(exists(abs_path)) + throw new IOException("Error: attempting to create an existing directory"); + + // The basic idea: + // get parent. if parent = null (happens if dir is root), fail. You can't really make + // the root directory... + // if parent doesn't exist, recursively make parent; fail if this fails. + // ceph_mkdir desired path. + + Path parent = path.getParent(); + boolean result = true; + + // if the parent's null, we're trying to create the root directory. This is BAD. + if (null == parent) { + System.out.println("Error: failed making directory \"" + abs_path.toString() + + "\": directory has null parent (directory is root)") ; + result = false; + } + else { + // try to make the parent if it doesn't exist + if (!exists(parent)) { + //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() + + // "does not exist. Recursively creating:"); + if(!mkdirs(parent)) { + System.out.println("mkdirs: failed creating directory \"" + + abs_path.toString() + + "because of failure recursively creating parent" + + parent.toString()); + result = false; + } + } + } + // try to make the directory, unless the parent was null or we + // tried and failed to make the parent + if (result) { + result = ceph_mkdir(abs_path.toString()); + } + //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() + + // ": result is " + result); + return result; + } + + + @Override + public boolean isDirectory(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + boolean result; + + + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning true for isDirectory(/)"); + result = true; + } + else + result = ceph_isdirectory(abs_path.toString()); + //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + return result; + } + + @Override + public boolean isFile(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + boolean result; + if (abs_path.toString().equals("/")) + { + //System.out.println("Bug workaround! returning false for isFile(/)"); + result = false; + } + else + { + result = ceph_isfile(abs_path.toString()); + } + //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" + + // abs_path.toString() + "\", result = " + result); + + return result; + } + + + @Override + public Path[] listPathsRaw(Path path) throws IOException { + + String dirlist[]; + + Path abs_path = makeAbsolute(path); + + //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \"" + // + abs_path.toString() + "\""); + + // If it's a directory, get the listing. Otherwise, complain and give up. + if (isDirectory(abs_path)) + dirlist = ceph_getdir(abs_path.toString()); + else + { + if (exists(abs_path)) { } + // System.out.println("listPathsRaw on path \"" + abs_path.toString() + + // "\" failed; the path is not a directory."); + else {} + // System.out.println("listPathsRaw on path \"" + abs_path.toString() + + // "\" failed; the path does not exist."); + return null; + } + + + // convert the strings to Paths + Path paths[] = new Path[dirlist.length]; + for(int i = 0; i < dirlist.length; ++i) { + //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + + // dirlist[i] + "\""); + + // convert each listing to an absolute path + Path raw_path = new Path(dirlist[i]); + if (raw_path.isAbsolute()) + paths[i] = raw_path; + else + paths[i] = new Path(abs_path, raw_path); + } + return paths; + } + + @Override + public FSOutputStream createRaw(Path file, boolean overwrite, + short replication, long blockSize) throws IOException { + + return createRaw(file, overwrite, replication, blockSize, null); + } + + + @Override + public FSOutputStream createRaw(Path file, boolean overwrite, + short replication, long blockSize, Progressable progress) + throws IOException { + + + Path absfilepath = makeAbsolute(file); + + //System.out.println("createRaw: opening path \"" + file.toString() + "\" as absolute path \"" + // + absfilepath.toString() + "\" for writing"); + + // we're ignoring progress reporting and replication entirely. + // required semantics: if the file already exists, overwrite + // it if overwrite = true and throw an exception if + // overwrite = false. + + // Step 1: existence test + if(isDirectory(absfilepath)) + throw new IOException("createRaw: Cannot make an output stream to existing directory \"" + + absfilepath.toString() + "\""); + + if (!overwrite) { + if (!exists(absfilepath)) { + throw new IOException("createRaw: Cannot open existing file \"" + absfilepath.toString() + + "\" for writing without overwrite flag"); + } + } + + // Step 2: do the directories in the path exist? + Path parent = absfilepath.getParent(); + if (parent != null) // if parent is root, we're done + { + if(!exists(parent)) + { + //System.out.println("createRaw: parent directory of path \"" + // + absfilepath.toString() + "\" does not exist. Creating:"); + mkdirs(parent); + } + } + + // Step 3: open the file + int fh = ceph_open_for_overwrite(absfilepath.toString()); + if (fh < 0) { + throw new IOException("createRaw: Open for overwrite failed on path \"" + + absfilepath.toString() + "\""); + } + + // Step 4: create the stream + CephOutputStream result = new CephOutputStream(getConf(), clientPointer, fh); + //System.out.println("createRaw: opened absolute path \"" + absfilepath.toString() + // + "\" for writing with fh " + fh); + + return result; + } + + @Override + public FSInputStream openRaw(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + + //System.out.println("openRaw: opening path \"" + path.toString() + "\" as absolute path \"" + // + abs_path.toString() + "\" for reading"); + + if(!isFile(abs_path)) + { + String error; + if (!exists(abs_path)) + throw new IOException("openRaw: absolute path \"" + abs_path.toString() + + "\" does not exist"); + else + throw new IOException("openRaw: absolute path \"" + abs_path.toString() + + "\" is not a file"); + } + + int fh = ceph_open_for_read(abs_path.toString()); + if (fh < 0) { + throw new IOException("openRaw: Failed to open file " + abs_path.toString()); + } + + long size = ceph_getfilesize(abs_path.toString()); + if (size < 0) { + throw new IOException("Failed to get file size for file " + abs_path.toString() + + " but succeeded in opening file. Something excitingly bizarre is going on."); + } + + FSInputStream result = new CephInputStream(getConf(), clientPointer, fh, size); + + //System.out.println("openRaw: opened absolute path \"" + abs_path.toString() + + // "\" for reading with fd " + fh + " and size " + size); + + return result; + } + + @Override + public boolean renameRaw(Path src, Path dst) throws IOException { + // TODO: Check corner cases: dst already exists, + // or if path is directory with children + + return ceph_rename(src.toString(), dst.toString()); + } + + @Override + public boolean deleteRaw(Path path) throws IOException { + + Path abs_path = makeAbsolute(path); + + //System.out.println("deleteRaw: Deleting path " + + // abs_path.toString()); + if (abs_path.toString().equals("/")) + throw new IOException("Error: attempting to delete the root directory"); + + // if it's a file, try to delete it. + if (isFile(abs_path)) { + boolean result = ceph_unlink(path.toString()); + if(!result) { + // System.out.println("deleteRaw: failed to delete file \"" + + // abs_path.toString() + "\"."); + return false; + } + else + return true; + } + + // if it's a directory, recursively try to delete its contents, + // and then delete the directory. + // Otherwise, return false. + + Path[] contents = listPathsRaw(path); + if (contents == null) { + // System.out.println("deleteRaw: Failed to read contents of directory \"" + + // abs_path.toString() + "\" while trying to delete it"); + return false; + } + + // debug listing of directory contents + //System.out.println("deleteRaw: Enumerating contents of directory \"" + + // abs_path.toString() + "\" to delete"); + + // for (Path p : contents) { + // System.out.println(" Contains path \"" + p.toString() + "\", absolute path is \"" + // + makeAbsolute(p).toString() + "\""); + //} + + // recursively delete the directory's contents + Path parent = abs_path.getParent(); + for (Path p : contents) { + // skip self ("." directory entry) + if (makeAbsolute(p).equals(abs_path)) { + //System.out.println("Skipping self"); + continue; + } + // skip parent (".." directory entry) + if (null != parent) { + if (p.equals(parent)) { + //System.out.println("Skipping parent"); + continue; + } + } + + //System.out.println("Path " + abs_path.toString() + " contains path " + // + makeAbsolute(p).toString() + + // ". Attempting to delete it recursively:"); + if (!deleteRaw(p)) { + // System.out.println("deleteRaw: Failed to delete file \"" + + // p.toString() + "\" while recursively deleting \"" + // + abs_path.toString() + "\"" ); + return false; + } + } + + boolean result = ceph_unlink(path.toString()); + if (!result) + System.out.println("deleteRaw: failed to delete \"" + abs_path.toString() + "\""); + return result; + } + + + @Override + public long getLength(Path path) throws IOException { + Path abs_path = makeAbsolute(path); + + if (!exists(abs_path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: File or directory " + abs_path.toString() + " does not exist."); + } + + long filesize = ceph_getfilesize(abs_path.toString()); + if (filesize < 0) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved."); + } + return filesize; + + } + + /** + * User-defined replication is not supported for Ceph file systems at the moment. + */ + @Override + public short getReplication(Path path) throws IOException { + return 1; + } + + @Override + public short getDefaultReplication() { + return 1; + } + + /** + * User-defined replication is not supported for Ceph file systems at the moment. + */ + @Override + public boolean setReplicationRaw(Path path, short replication) + throws IOException { + return true; + } + + @Override + public long getBlockSize(Path path) throws IOException { + + if (!exists(path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist."); + } + long result = ceph_getblocksize(path.toString()); + if (!isFile(path)) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file."); + } + else { + System.err.println("DEBUG: getBlockSize: alleged file really is a file"); + } + if (result < 4096) { + System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " + + "path exists; strange block size of " + result + " defaulting to 8192"); + return 8192; + } + + + return result; + //return DEFAULT_BLOCK_SIZE; + // return ceph_getblocksize(path.toString()); + + } + + @Override + public long getDefaultBlockSize() { + return DEFAULT_BLOCK_SIZE; + //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE); + } + + /** + * Return 1x1 'localhost' cell if the file exists. Return null if otherwise. + */ + @Override + public String[][] getFileCacheHints(Path f, long start, long len) + throws IOException { + // TODO: Check this is the correct behavior + if (!exists(f)) { + return null; + } + return new String[][] { { "localhost" } }; + } + + @Override + public void lock(Path path, boolean shared) throws IOException { + // TODO: Design and implement? or just ignore locking? + return; + } + + @Override + public void release(Path path) throws IOException { + return; //deprecated + } + + @Override + public void reportChecksumFailure(Path f, + FSInputStream in, long inPos, + FSInputStream sums, long sumsPos) { + // TODO: What to do here? + return; + } + + @Override + public void moveFromLocalFile(Path src, Path dst) throws IOException { + if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString()); + } + //FileUtil.copy(localFs, src, this, dst, true, getConf()); + } + + @Override + public void copyFromLocalFile(Path src, Path dst) throws IOException { + // make sure Ceph path exists + Path abs_src = makeAbsolute(src); + Path abs_dst = makeAbsolute(dst); + + if (isDirectory(abs_dst)) + throw new IOException("Error in copyFromLocalFile: " + + "attempting to open an existing directory as a file"); + Path abs_dst_parent = abs_dst.getParent(); + + if (!exists(abs_dst_parent)) + mkdirs(abs_dst_parent); + + if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) { + throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString()); + } + //FileUtil.copy(localFs, src, this, dst, false, true, getConf()); + } + + + + @Override + public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException { + + Path abs_ceph_src = makeAbsolute(ceph_src); + + //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() + + // "\" to local file \"" + local_dst.toString() + "\" using client " + // + clientPointer); + + // make sure the alleged source file exists, and is actually a file, not + // a directory or a ballpoint pen or something + if (!isFile(abs_ceph_src)) { + if (!exists(abs_ceph_src)) { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\" because the source file does not exist"); + } + else { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\" because the Ceph path is not a file"); + } + } + + // if the destination's parent directory doesn't exist, create it. + Path local_dst_parent_dir = local_dst.getParent(); + if(null == local_dst_parent_dir) + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\": destination is root"); + + if(!localFs.mkdirs(local_dst_parent_dir)) + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + + "\": creating the destination's parent directory failed."); + else + { + if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString())) + { + throw new IOException("copyToLocalFile: failed copying Ceph file \"" + + abs_ceph_src.toString() + "\" to local file \"" + + local_dst.toString() + "\""); + } + } + //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() + + // "\" to local file \"" + local_dst.toString() + "\""); + } + + + @Override + public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + return tmpLocalFile; + } + + @Override + public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) + throws IOException { + moveFromLocalFile(tmpLocalFile, fsOutputFile); + } + + // diagnostic methods + + /* void dump() throws IOException { + store.dump(); + } + + void purge() throws IOException { + store.purge(); + }*/ + +} diff --git a/src/client/hadoop/ceph/CephFileSystem.java.tmp b/src/client/hadoop/ceph/CephFileSystem.java.tmp new file mode 100644 index 0000000000000..f842e380a4ecd --- /dev/null +++ b/src/client/hadoop/ceph/CephFileSystem.java.tmp @@ -0,0 +1,59 @@ + + + + +package org.apache.hadoop.fs.ceph; + +import java.io.IOException; +import java.net.URI; + + +/** + *

+ * A {@link FileSystem} backed by a Ceph store. + *

+ */ +public class CephFileSystem extends Filesystem { + + + private Path workingDir = new Path("/user", System.getProperty("user.name")); + + private URI uri; + + + public CephFileSystem() { + // perform all setup in initialize() + } + + @Override + public URI getURI() { + return uri; + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + setConf(conf); + + this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); + + // TODO: local filesystem? we really need to figure out this conf thingy + this.localFs = get(URI.create("file:///"), conf); + + // Initializes the client + this.clientPointer = ceph_initializeClient(); + System.out.println("Initialized client with pointer " + clientPointer + + ". Setting cwd to /"); + ceph_setcwd("/"); + } + + + + @Override + public void close() throws IOException { + System.out.println("Pretending to shut down client with pointer " + clientPointer + + ". Not really doing anything."); + } + + +} + diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java new file mode 100644 index 0000000000000..e3b4f1f960775 --- /dev/null +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -0,0 +1,189 @@ +package org.apache.hadoop.fs.ceph; + +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +//import java.lang.IndexOutOfBoundsException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; + +class CephInputStream extends FSInputStream { + + static { + System.loadLibrary("hadoopcephfs"); + } + + private int bufferSize; + + //private Block[] blocks; + + private boolean closed; + + private long clientPointer; + + private int fileHandle; + + private long fileLength; + + //private long pos = 0; + + //private DataInputStream blockStream; + + //private long blockEnd = -1; + + private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length); + private native long ceph_seek_from_start(long client, int fh, long pos); + private native long ceph_getpos(long client, int fh); + private native int ceph_close(long client, int fh); + + private int ceph_read(byte[] buffer, int buffer_offset, int length) + { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); } + private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); } + private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); } + private int ceph_close() { return ceph_close(clientPointer, fileHandle); } + + /* + public S3InputStream(Configuration conf, FileSystemStore store, + INode inode) { + + this.store = store; + this.blocks = inode.getBlocks(); + for (Block block : blocks) { + this.fileLength += block.getLength(); + } + this.bufferSize = conf.getInt("io.file.buffer.size", 4096); + } + */ + + public CephInputStream(Configuration conf, long clientp, int fh, long flength) { + + // Whoever's calling the constructor is responsible for doing the actual ceph_open + // call and providing the file handle. + clientPointer = clientp; + fileLength = flength; + fileHandle = fh; + //System.out.println("CephInputStream constructor: initializing stream with fh " + // + fh + " and file length " + flength); + + // TODO: Then what do we need from the config? The buffer size maybe? + // Anything? Bueller? + + } + + @Override + public synchronized long getPos() throws IOException { + return ceph_getpos(); + } + + @Override + public synchronized int available() throws IOException { + return (int) (fileLength - getPos()); + } + + @Override + public synchronized void seek(long targetPos) throws IOException { + //System.out.println("CephInputStream.seek: Seeking to position " + targetPos + + // " on fd " + fileHandle); + if (targetPos > fileLength) { + throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos + + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); + } + ceph_seek_from_start(targetPos); + } + + + // reads a byte + @Override + public synchronized int read() throws IOException { + //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle + // + " by calling general read function"); + + byte result[] = new byte[1]; + if (getPos() >= fileLength) return -1; + if (-1 == read(result, 0, 1)) return -1; + return result[0]; + } + + + @Override + public synchronized int read(byte buf[], int off, int len) throws IOException { + //System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); + + if (closed) { + throw new IOException("CephInputStream.read: cannot read " + len + + " bytes from fd " + fileHandle + ": stream closed"); + } + if (null == buf) { + throw new NullPointerException("Read buffer is null"); + } + + // check for proper index bounds + if((off < 0) || (len < 0) || (off + len > buf.length)) { + throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: " + + "read length is " + len + ", buffer offset is " + + off +", and buffer size is " + buf.length); + } + + // ensure we're not past the end of the file + if (getPos() >= fileLength) + { + System.out.println("CephInputStream.read: cannot read " + len + + " bytes from fd " + fileHandle + ": current position is " + + getPos() + " and file length is " + fileLength); + + return -1; + } + // actually do the read + int result = ceph_read(buf, off, len); + if (result < 0) + System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + + fileHandle + " failed."); + else {} + // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + // + fileHandle + ": succeeded in reading " + result + " bytes"); + + + + return result; + } + + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + int result = ceph_close(); + if (result != 0) { + throw new IOException("Close failed!"); + } + + closed = true; + } + + /** + * We don't support marks. + */ + @Override + public boolean markSupported() { + return false; + } + + @Override + public void mark(int readLimit) { + // Do nothing + } + + @Override + public void reset() throws IOException { + throw new IOException("Mark not supported"); + } + +} diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java new file mode 100644 index 0000000000000..9fac5a4e3e2d3 --- /dev/null +++ b/src/client/hadoop/ceph/CephOutputStream.java @@ -0,0 +1,201 @@ +package org.apache.hadoop.fs.ceph; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Progressable; + +class CephOutputStream extends FSDataOutputStream { + + static { + System.loadLibrary("hadoopcephfs"); + } + + + private int bufferSize; + + private long fileLength; + + //private FileSystemStore store; + + private Path path; + + private long blockSize; + + private File backupFile; + + private OutputStream backupStream; + + private Random r = new Random(); + + private boolean closed; + + private int fileHandle; + + private long clientPointer; + + private int bytesWrittenToBlock = 0; + + private byte[] outBuf; + + //private List blocks = new ArrayList(); + + //private Block nextBlock; + + + + private long ceph_seek_from_start(long pos) { + return ceph_seek_from_start(clientPointer, fileHandle, pos); + } + private long ceph_getpos() { + return ceph_getpos(clientPointer, fileHandle); + } + private int ceph_close() { return ceph_close(clientPointer, fileHandle); } + private int ceph_write(byte[] buffer, int buffer_offset, int length) + { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); } + + + private native long ceph_seek_from_start(long client, int fh, long pos); + private native long ceph_getpos(long client, int fh); + private native int ceph_close(long client, int fh); + private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length); + + + /* public CephOutputStream(Configuration conf, FileSystemStore store, + Path path, long blockSize, Progressable progress) throws IOException { + + // basic pseudocode: + // call ceph_open_for_write to open the file + // store the file handle + // store the client pointer + // look up and store the block size while we're at it + // the following code's old. kill it + + this.store = store; + this.path = path; + this.blockSize = blockSize; + this.backupFile = newBackupFile(); + this.backupStream = new FileOutputStream(backupFile); + this.bufferSize = conf.getInt("io.file.buffer.size", 4096); + this.outBuf = new byte[bufferSize]; + + }*/ + + + // The file handle + public CephOutputStream(Configuration conf, long clientp, int fh) { + clientPointer = clientp; + fileHandle = fh; + //fileLength = flength; + closed = false; + } + + // possibly useful for the local copy, write later thing? + // keep it around for now + private File newBackupFile() throws IOException { + File result = File.createTempFile("s3fs-out", ""); + result.deleteOnExit(); + return result; + } + + + @Override + public long getPos() throws IOException { + // change to get the position from Ceph client + return ceph_getpos(); + } + + // writes a byte + @Override + public synchronized void write(int b) throws IOException { + //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle); + + if (closed) { + throw new IOException("CephOutputStream.write: cannot write " + + "a byte to fd " + fileHandle + ": stream closed"); + } + // Stick the byte in a buffer and write it + byte buf[] = new byte[1]; + buf[0] = (byte) b; + int result = ceph_write(buf, 0, 1); + if (1 != result) + System.out.println("CephOutputStream.write: failed writing a single byte to fd " + + fileHandle + ": Ceph write() result = " + result); + return; + } + + @Override + public synchronized void write(byte buf[], int off, int len) throws IOException { + //System.out.println("CephOutputStream.write: writing " + len + + // " bytes to fd " + fileHandle); + + // make sure stream is open + if (closed) { + throw new IOException("CephOutputStream.write: cannot write " + len + + "bytes to fd " + fileHandle + ": stream closed"); + } + + // sanity check + if (null == buf) { + throw new NullPointerException("CephOutputStream.write: cannot write " + len + + "bytes to fd " + fileHandle + ": write buffer is null"); + } + + // check for proper index bounds + if((off < 0) || (len < 0) || (off + len > buf.length)) { + throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: " + + "write length is " + len + ", buffer offset is " + + off +", and buffer size is " + buf.length); + } + + // write! + int result = ceph_write(buf, off, len); + if (result < 0) { + throw new IOException("CephOutputStream.write: Write of " + len + + "bytes to fd " + fileHandle + " failed"); + } + if (result != len) { + throw new IOException("CephOutputStream.write: Write of " + len + + "bytes to fd " + fileHandle + "was incomplete: only " + + result + " of " + len + " bytes were written."); + } + return; + } + + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + return; + } + + @Override + public synchronized void close() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + + int result = ceph_close(); + if (result != 0) { + throw new IOException("Close failed!"); + } + + closed = true; + + } + + +} + + -- 2.39.5