]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Added Hadoop fs components.
authorgreg <gregf@skinny.ops.newdream.net>
Tue, 21 Jul 2009 20:27:25 +0000 (13:27 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:37:01 +0000 (11:37 -0700)
src/client/hadoop/ceph/CephException.java [new file with mode: 0644]
src/client/hadoop/ceph/CephFileSystem.java [new file with mode: 0644]
src/client/hadoop/ceph/CephFileSystem.java.old [new file with mode: 0644]
src/client/hadoop/ceph/CephFileSystem.java.tmp [new file with mode: 0644]
src/client/hadoop/ceph/CephInputStream.java [new file with mode: 0644]
src/client/hadoop/ceph/CephOutputStream.java [new file with mode: 0644]

diff --git a/src/client/hadoop/ceph/CephException.java b/src/client/hadoop/ceph/CephException.java
new file mode 100644 (file)
index 0000000..0b93324
--- /dev/null
@@ -0,0 +1,12 @@
+package org.apache.hadoop.fs.ceph;
+
+/**
+ * Thrown if something goes wrong with Ceph.
+ */
+public class CephException extends RuntimeException {
+
+  public CephException(Throwable t) {
+    super(t);
+  }
+
+}
diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java
new file mode 100644 (file)
index 0000000..5be6fea
--- /dev/null
@@ -0,0 +1,734 @@
+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
+ * This will not start a Ceph instance; one must already be running.
+ * </p>
+ * @author Esteban Molina-Estolano
+ */
+public class CephFileSystem extends FileSystem {
+
+  private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
+  
+  static {
+    System.loadLibrary("hadoopcephfs");
+  }
+
+  private URI uri;
+
+  private FileSystem localFs;
+  
+  private long clientPointer;
+  
+  private Path root;
+
+  private Path parent;
+  
+  //private Path workingDir = new Path("/user", System.getProperty("user.name"));
+  
+  /*
+   * Native Ceph methods. Each one has long client as a parameter,
+   * which should always be the member variable clientPointer. This is
+   * to avoid the hideous JNI code required to access the member variable
+   * from C++. clientPointer is set at initialization of the
+   * CephFileSystem instance, and should be untouched thereafter. I
+   * include wrapper functions to automatically add the parameter.
+   */ 
+  
+  private boolean ceph_copyFromLocalFile(String localPath, String cephPath) 
+      { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); }
+  private boolean ceph_copyToLocalFile(String cephPath, String localPath)
+    { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); }
+  private String  ceph_getcwd() { return ceph_getcwd(clientPointer); }  
+  private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); }
+  private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); }
+  private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); }
+  private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); }
+  private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); } 
+  private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); }
+  private long    ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); }
+  private long    ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); }
+  private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); }
+  private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); }
+  private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); }
+  private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); }
+  private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); }
+
+  private boolean ceph_kill_client() {
+      System.out.println("Killing Ceph client with pointer " + clientPointer);
+      return ceph_kill_client(clientPointer);
+  }
+
+  private native long    ceph_initializeClient();
+  private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath);
+  private native boolean ceph_copyToLocalFile   (long client, String cephPath, String localPath);
+  private native String  ceph_getcwd            (long client);  
+  private native boolean ceph_setcwd            (long client, String path);
+  private native boolean ceph_rmdir             (long client, String path);
+  private native boolean ceph_mkdir             (long client, String path);
+  private native boolean ceph_unlink            (long client, String path);
+  private native boolean ceph_rename            (long client, String old_path, String new_path);
+  private native boolean ceph_exists            (long client, String path);
+  private native long    ceph_getblocksize      (long client, String path);
+  private native long    ceph_getfilesize       (long client, String path);
+  private native boolean ceph_isdirectory       (long client, String path);
+  private native boolean ceph_isfile            (long client, String path);
+  private native String[]ceph_getdir            (long client, String path);
+  private native int     ceph_open_for_read     (long client, String path);
+  private native int     ceph_open_for_overwrite(long client, String path);
+  private native boolean ceph_kill_client       (long client);
+
+
+
+
+  public CephFileSystem() {
+      root = new Path("/");
+      parent = new Path("..");
+  }
+
+  /*
+  public S3FileSystem(FileSystemStore store) {
+    this.store = store;
+  } */
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+      //store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
+
+    // TODO: local filesystem? we really need to figure out this conf thingy
+    this.localFs = get(URI.create("file:///"), conf);
+
+    //  Initializes the client    
+    this.clientPointer = ceph_initializeClient();
+    System.out.println("Initialized client with pointer " + clientPointer 
+                      + ". Setting cwd to /");
+    ceph_setcwd("/");
+    // DEBUG
+    // attempt to do three exists operations on root
+    //System.out.println("DEBUG: attempting isdir() on root (/)");
+    // ceph_isdirectory("/");
+    //System.out.println("DEBUG: attempting exists() on root (/)");
+    //ceph_exists("/");
+  }  
+
+  @Override
+      public void close() throws IOException {
+      System.out.println("Pretending to shut down client with pointer " + clientPointer
+                        + ". Not really doing anything.");
+  }
+
+  @Override
+  public String getName() {
+    return getUri().toString();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+      return makeAbsolute(new Path(ceph_getcwd()));
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+      Path abs_path = makeAbsolute(dir);
+
+      // error conditions if path's not a directory
+      boolean isDir = false;
+      boolean path_exists = false;
+      try {
+         isDir = isDirectory(abs_path);
+         path_exists = exists(abs_path);
+      }
+
+      catch (IOException e) {
+         System.out.println("Warning: isDirectory threw an exception");
+      }
+
+    if (!isDir) {
+         if (path_exists)
+             System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + 
+                                   "): path is not a directory");
+         else
+             System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + 
+                                   "): path does not exist");
+      }
+      else {
+         ceph_setcwd(dir.toString());
+      }
+    //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() +
+    //          "changes cwd to" + getWorkingDirectory().toString());
+  }
+
+    // Makes a Path absolute. In a cheap, dirty hack, we're
+    // also going to strip off any "ceph://null" prefix we see. 
+  private Path makeAbsolute(Path path) {
+      // first, check for the prefix
+      if (path.toString().startsWith("ceph://null")) {
+         
+         Path stripped_path = new Path(path.toString().substring("ceph://null".length()));
+         //System.out.println("Stripping path \"" + path.toString() + "\" to \""
+         //                 + stripped_path.toString() + "\"");
+         return stripped_path;
+      }
+
+
+    if (path.isAbsolute()) {
+      return path;
+    }
+    Path wd = getWorkingDirectory();
+    //System.out.println("Working directory is " + wd.toString());
+    if (wd.toString().equals(""))
+       return new Path(root, path);
+    else
+       return new Path(wd, path);
+  }
+
+    private String[] getEmptyStringArray(int size) {
+       return new String[size];
+    }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+      boolean result;
+      Path abs_path = makeAbsolute(path);
+      if (abs_path.toString().equals("/"))
+         {
+             //System.out.println("Bug workaround! returning true for exists(/)");
+             result = true;
+         }
+      else 
+         {
+             //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":");
+             result =  ceph_exists(abs_path.toString());
+             //System.out.println("Returned from ceph_exists to Java");
+         }
+      // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" +
+      //               abs_path.toString() + "\", result = " + result);
+
+      return result;
+  }
+
+
+  /* Creates the directory and all nonexistent parents.   */
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+
+      Path abs_path = makeAbsolute(path);
+      //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" +
+      //               abs_path.toString() + "\"");
+
+      // If the directory exists, we're happy, right? less work for us!
+      
+      if(exists(abs_path))
+             throw new IOException("Error: attempting to create an existing directory");
+
+      // The basic idea:
+      // get parent. if parent = null (happens if dir is root), fail. You can't really make
+      // the root directory...
+      // if parent doesn't exist, recursively make parent; fail if this fails.
+      // ceph_mkdir desired path.
+
+      Path parent = path.getParent();
+      boolean result = true;
+
+      // if the parent's null, we're trying to create the root directory. This is BAD.
+      if (null == parent)  {
+         System.out.println("Error: failed making directory \"" + abs_path.toString() + 
+                            "\": directory has null parent (directory is root)") ;
+         result = false;
+      }
+      else {
+         // try to make the parent if it doesn't exist
+         if (!exists(parent)) {
+             //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() + 
+             //                         "does not exist. Recursively creating:");
+             if(!mkdirs(parent)) {
+                 System.out.println("mkdirs: failed creating directory \"" + 
+                                    abs_path.toString() + 
+                                    "because of failure recursively creating parent" +
+                                    parent.toString());
+                 result = false;
+             }
+         }
+      }
+      // try to make the directory, unless the parent was null or we
+      // tried and failed to make the parent
+      if (result) {
+         result = ceph_mkdir(abs_path.toString());
+      }
+      //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() + 
+      //                        ": result is " + result);
+      return result;
+  }
+
+
+
+    //   @Override
+
+  public boolean __isDirectory(Path path) throws IOException {
+     Path abs_path = makeAbsolute(path);
+     boolean result;
+
+     if (abs_path.toString().equals("/"))
+     {
+        //System.out.println("Bug workaround! returning true for isDirectory(/)");
+        result = true;
+     }
+     else
+        result = ceph_isdirectory(abs_path.toString());
+     //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" +
+     //                abs_path.toString() + "\", result = " + result);
+     return result;
+   }
+
+    @Override
+    public boolean isFile(Path path) throws IOException {
+       Path abs_path = makeAbsolute(path);
+       boolean result;
+       if (abs_path.toString().equals("/"))
+           {
+               //System.out.println("Bug workaround! returning false for isFile(/)");
+               result =  false;
+           }
+       else
+           {
+               result = ceph_isfile(abs_path.toString());
+           }
+       //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" +
+       //      abs_path.toString() + "\", result = " + result);
+
+       return result;
+    }
+
+  @Override
+  public FileStatus getFileStatus(Path p) throws IOException {
+      // For the moment, hardwired block size and replication
+      Path abs_p = makeAbsolute(p);
+      return new FileStatus(__getLength(abs_p), __isDirectory(abs_p), 2,
+                           8388608, 0, abs_p);                     
+  }
+    
+
+    // array of statuses for the directory's contents
+    // steal or factor out iteration code from delete()
+  @Override
+  public FileStatus[] listStatus(Path p) throws IOException {
+    Path abs_p = makeAbsolute(p);
+    
+  }
+
+
+  @Override
+  public Path[] listPathsRaw(Path path) throws IOException {
+      
+      String dirlist[];
+
+      Path abs_path = makeAbsolute(path);
+
+      //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \""
+      //                + abs_path.toString() + "\"");
+
+      // If it's a directory, get the listing. Otherwise, complain and give up.
+      if (isDirectory(abs_path))
+         dirlist = ceph_getdir(abs_path.toString());
+      else
+         {
+             if (exists(abs_path)) { }
+                 //  System.out.println("listPathsRaw on path \"" + abs_path.toString() + 
+                 //         "\" failed; the path is not a directory.");
+             else {}
+                 // System.out.println("listPathsRaw on path \"" + abs_path.toString() + 
+                 //         "\" failed; the path does not exist.");
+             return null;
+         }
+      
+
+      // convert the strings to Paths
+      Path paths[] = new Path[dirlist.length];
+      for(int i = 0; i < dirlist.length; ++i) {
+         //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
+         //                 dirlist[i] + "\"");
+
+         // convert each listing to an absolute path
+         Path raw_path = new Path(dirlist[i]);
+         if (raw_path.isAbsolute())
+             paths[i] = raw_path;
+         else
+             paths[i] = new Path(abs_path, raw_path);
+      }
+      return paths;     
+  }
+
+  public FSDataOutputStream create(Path f, 
+                                  boolean overwrite,
+                                  int bufferSize,
+                                  short replication,
+                                  long blockSize,
+                                  Progressable progress
+                                  ) throws IOException {
+       
+
+      Path absfilepath = makeAbsolute(file);
+      
+      // We ignore progress reporting and replication.
+      // Required semantics: if the file exists, overwrite if overwrite == true, and
+      // throw an exception if overwrite == false.
+
+      // Step 1: existence test
+      if(isDirectory(absfilepath))
+         throw new IOException("create: Cannot overwrite existing directory \""
+                               + absfilepath.toString() + "\" with a file");      
+      if (!overwrite) {
+         if (exists(absfilepath)) {
+             throw new IOException("createRaw: Cannot open existing file \"" 
+                                   + absfilepath.toString() 
+                                   + "\" for writing without overwrite flag");
+         }
+      }
+      
+      // Step 2: create any nonexistent directories in the path
+      Path parent =  absfilepath.getParent();
+      if (parent != null) { // if parent is root, we're done
+         if(!exists(parent)) {
+             //System.out.println("createRaw: parent directory of path \""  
+             //                 + absfilepath.toString() + "\" does not exist. Creating:");
+             mkdirs(parent);
+         }
+      }
+
+      // Step 3: open the file
+      int fh = ceph_open_for_overwrite(absfilepath.toString());
+      if (fh < 0) {
+         throw new IOException("createRaw: Open for overwrite failed on path \"" + 
+                               absfilepath.toString() + "\"");
+      }
+      
+      // Step 4: create the stream
+      FSDataOutputStream result = new CephOutputStream(getConf(), clientPointer, fh);
+      //System.out.println("createRaw: opened absolute path \""  + absfilepath.toString() 
+      //                + "\" for writing with fh " + fh);
+
+      return result;
+  }
+
+
+
+  // Opens a Ceph file and attaches the file handle to an FSDataInputStream.
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+      Path abs_path = makeAbsolute(path);
+
+      if(!isFile(abs_path)) {
+             if (!exists(abs_path))
+             throw new IOException("open:  absolute path \""  + abs_path.toString()
+                                   + "\" does not exist");
+             else
+             throw new IOException("open:  absolute path \""  + abs_path.toString()
+                                   + "\" is not a file");
+         }
+
+      int fh = ceph_open_for_read(abs_path.toString());
+      if (fh < 0) {
+         throw new IOException("open: Failed to open file " + abs_path.toString());
+      }
+      long size = ceph_getfilesize(abs_path.toString());
+      if (size < 0) {
+         throw new IOException("Failed to get file size for file " + abs_path.toString() + 
+                           " but succeeded in opening file. Something bizarre is going on.");
+      }
+      FSDataInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
+      return result;
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    // TODO: Check corner cases: dst already exists,
+    // or path is directory with children
+
+    return ceph_rename(src.toString(), dst.toString());
+  }
+
+  @Override
+  public boolean delete(Path path) throws IOException {
+
+      Path abs_path = makeAbsolute(path);      
+
+      //System.out.println("deleteRaw: Deleting path " + abs_path.toString());
+      // sanity check
+      if (abs_path.toString().equals("/"))
+                    throw new IOException("Error: deleting the root directory is a Bad Idea.");
+
+     // if the path is a file, try to delete it.
+     if (isFile(abs_path)) {            
+       boolean result = ceph_unlink(path.toString());
+       if(!result) {
+           // System.out.println("deleteRaw: failed to delete file \"" +
+           //         abs_path.toString() + "\".");
+           return false;
+       }
+       else
+           return true;
+     }
+       
+     /* If the path is a directory, recursively try to delete its contents,
+       and then delete the directory. */
+
+      Path[] contents = listPathsRaw(path);
+      if (contents == null) {
+         // System.out.println("deleteRaw: Failed to read contents of directory \"" +
+         //         abs_path.toString() + "\" while trying to delete it");
+         return false;
+      }
+
+      // recursively delete, skipping "." and ".." entries
+      Path parent = abs_path.getParent();
+      for (Path p : contents) {
+         if (makeAbsolute(p).equals(abs_path)) continue; // "." entry
+         if (null != parent) {
+             if (p.equals(parent)) continue; // ".." entry
+         }
+
+         if (!deleteRaw(p)) {
+             // System.out.println("deleteRaw: Failed to delete file \"" + 
+             //                 p.toString() + "\" while recursively deleting \""
+             //                 + abs_path.toString() + "\"" );
+             return false;
+         }
+      }
+  
+      boolean result = ceph_unlink(path.toString());
+      if (!result)
+         System.out.println("delete: failed to delete \"" + abs_path.toString() + "\"");
+      return result;
+  }
+   
+
+  //@Override
+  private long __getLength(Path path) throws IOException {
+      Path abs_path = makeAbsolute(path);
+
+      if (!exists(abs_path)) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.__getLength: File or directory " + abs_path.toString() + " does not exist.");
+      }          
+
+      long filesize = ceph_getfilesize(abs_path.toString());
+      if (filesize < 0) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved.");
+      }          
+      return filesize;
+    }
+
+  /**
+   * User-defined replication is not supported for Ceph file systems at the moment.
+   */
+  @Override
+  public short getReplication(Path path) throws IOException {
+    return 1;
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return 1;
+  }
+
+  /**
+   * User-defined replication is not supported for Ceph file systems at the moment.
+   */
+  @Override
+  public boolean setReplicationRaw(Path path, short replication)
+      throws IOException {
+    return true;
+  }
+
+  @Override
+  public long getBlockSize(Path path) throws IOException {
+   
+    if (!exists(path)) {
+      throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
+    }
+    long result = ceph_getblocksize(path.toString());
+    if (!isFile(path)) {
+      throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file.");
+    }
+    else {
+       System.err.println("DEBUG: getBlockSize: alleged file really is a file");
+    }
+    if (result < 4096) {
+       System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " + 
+                          "path exists; strange block size of " + result + " defaulting to 8192");
+       return 8192;
+    }
+
+    
+    return result;
+    //return DEFAULT_BLOCK_SIZE;
+    //  return ceph_getblocksize(path.toString());
+
+  }
+
+  @Override
+  public long getDefaultBlockSize() {
+      return DEFAULT_BLOCK_SIZE;
+      //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
+    }
+
+  /**
+   * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+   */
+  @Override
+  public String[][] getFileCacheHints(Path f, long start, long len)
+      throws IOException {
+    // TODO: Check this is the correct behavior
+    if (!exists(f)) {
+      return null;
+    }
+    return new String[][] { { "localhost" } };
+  }
+
+  @Override
+  public void lock(Path path, boolean shared) throws IOException {
+    // TODO: Design and implement? or just ignore locking?
+      return;
+  }
+
+  @Override
+  public void release(Path path) throws IOException {
+      return; //deprecated
+  }
+
+    /* old API
+  @Override
+  public void reportChecksumFailure(Path f, 
+                                    FSDataInputStream in, long inPos, 
+                                    FSDataInputStream sums, long sumsPos) {
+    // TODO: What to do here?
+      return;
+      } */
+
+  @Override
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {
+      if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString());
+      }
+      //FileUtil.copy(localFs, src, this, dst, true, getConf());
+  }
+
+  @Override
+  public void copyFromLocalFile(Path src, Path dst) throws IOException {
+      // make sure Ceph path exists
+      Path abs_src = makeAbsolute(src);
+      Path abs_dst = makeAbsolute(dst);
+
+      if (isDirectory(abs_dst))
+             throw new IOException("Error in copyFromLocalFile: " +
+                                   "attempting to open an existing directory as a file");
+      Path abs_dst_parent = abs_dst.getParent();
+
+      if (!exists(abs_dst_parent))
+         mkdirs(abs_dst_parent);
+
+      if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString());
+      }
+      //FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+  }
+
+
+
+  @Override
+  public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
+
+      Path abs_ceph_src = makeAbsolute(ceph_src);
+      
+      //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() + 
+      //                "\" to local file \"" + local_dst.toString() + "\" using client "
+      //                + clientPointer);
+
+      // make sure the alleged source file exists, and is actually a file, not
+      // a directory or a ballpoint pen or something
+      if (!isFile(abs_ceph_src)) {
+         if (!exists(abs_ceph_src)) {
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" 
+                                   + local_dst.toString() + 
+                                   "\" because the source file does not exist");
+         }
+         else {
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" + 
+                                   local_dst.toString() + 
+                                   "\" because the Ceph path is not a  file");
+         }
+      }
+
+      // if the destination's parent directory doesn't exist, create it.
+      Path local_dst_parent_dir = local_dst.getParent();
+      if(null == local_dst_parent_dir)
+         throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                               abs_ceph_src.toString() + "\" to local file \"" + 
+                               local_dst.toString() + 
+                               "\": destination is root");
+
+      if(!localFs.mkdirs(local_dst_parent_dir))
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" + 
+                                   local_dst.toString() + 
+                                   "\": creating the destination's parent directory failed.");
+      else
+         {
+             if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString())) 
+                 {
+                     throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                           abs_ceph_src.toString() + "\" to local file \"" 
+                                           + local_dst.toString() + "\"");
+                 }
+         }
+      //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() + 
+      //                "\" to local file \"" + local_dst.toString() + "\"");
+  }
+
+
+  @Override
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    return tmpLocalFile;
+  }
+
+  @Override
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    moveFromLocalFile(tmpLocalFile, fsOutputFile);
+  }
+
+  // diagnostic methods
+
+    /*  void dump() throws IOException {
+    store.dump();
+  }
+
+  void purge() throws IOException {
+    store.purge();
+    }*/
+
+}
diff --git a/src/client/hadoop/ceph/CephFileSystem.java.old b/src/client/hadoop/ceph/CephFileSystem.java.old
new file mode 100644 (file)
index 0000000..3c253a7
--- /dev/null
@@ -0,0 +1,755 @@
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
+ * This will not start a Ceph instance; one must already be running.
+ * </p>
+ * @author Esteban Molina-Estolano
+ */
+public class CephFileSystem extends FileSystem {
+
+  private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
+  
+  static {
+    System.loadLibrary("hadoopcephfs");
+  }
+
+  private URI uri;
+
+  private FileSystem localFs;
+
+  private long clientPointer;
+
+  private Path root;
+
+  private Path parent;
+
+    //private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+  /*
+   * Native Ceph methods. Each one has long client as a parameter,
+   * which should always be the member variable clientPointer. This is
+   * to avoid the hideous JNI code required to access the member variable
+   * from C++. clientPointer is set at initialization of the
+   * CephFileSystem instance, and should be untouched thereafter. I
+   * include wrapper functions to automatically add the parameter.
+   */ 
+
+  
+  private boolean ceph_copyFromLocalFile(String localPath, String cephPath) 
+      { return ceph_copyFromLocalFile(clientPointer, localPath, cephPath); }
+  private boolean ceph_copyToLocalFile(String cephPath, String localPath)
+    { return ceph_copyToLocalFile(clientPointer, cephPath, localPath); }
+  private String  ceph_getcwd() { return ceph_getcwd(clientPointer); }  
+  private boolean ceph_setcwd(String path) { return ceph_setcwd(clientPointer, path); }
+  private boolean ceph_rmdir(String path) { return ceph_rmdir(clientPointer, path); }
+  private boolean ceph_mkdir(String path) { return ceph_mkdir(clientPointer, path); }
+  private boolean ceph_unlink(String path) { return ceph_unlink(clientPointer, path); }
+  private boolean ceph_rename(String old_path, String new_path) { return ceph_rename(clientPointer, old_path, new_path); } 
+  private boolean ceph_exists(String path) { return ceph_exists(clientPointer, path); }
+  private long    ceph_getblocksize(String path) { return ceph_getblocksize(clientPointer, path); }
+  private long    ceph_getfilesize(String path) { return ceph_getfilesize(clientPointer, path); }
+  private boolean ceph_isdirectory(String path) { return ceph_isdirectory(clientPointer, path); }
+  private boolean ceph_isfile(String path) { return ceph_isfile(clientPointer, path); }
+  private String[] ceph_getdir(String path) { return ceph_getdir(clientPointer, path); }
+  private int ceph_open_for_read(String path) { return ceph_open_for_read(clientPointer, path); }
+  private int ceph_open_for_overwrite(String path) { return ceph_open_for_overwrite(clientPointer, path); }
+
+  private boolean ceph_kill_client() {
+      System.out.println("Killing Ceph client with pointer " + clientPointer);
+      return ceph_kill_client(clientPointer);
+  }
+
+  private native long    ceph_initializeClient();
+  private native boolean ceph_copyFromLocalFile (long client, String localPath, String cephPath);
+  private native boolean ceph_copyToLocalFile   (long client, String cephPath, String localPath);
+  private native String  ceph_getcwd            (long client);  
+  private native boolean ceph_setcwd            (long client, String path);
+  private native boolean ceph_rmdir             (long client, String path);
+  private native boolean ceph_mkdir             (long client, String path);
+  private native boolean ceph_unlink            (long client, String path);
+  private native boolean ceph_rename            (long client, String old_path, String new_path);
+  private native boolean ceph_exists            (long client, String path);
+  private native long    ceph_getblocksize      (long client, String path);
+  private native long    ceph_getfilesize       (long client, String path);
+  private native boolean ceph_isdirectory       (long client, String path);
+  private native boolean ceph_isfile            (long client, String path);
+  private native String[]ceph_getdir            (long client, String path);
+  private native int     ceph_open_for_read     (long client, String path);
+  private native int     ceph_open_for_overwrite(long client, String path);
+  private native boolean ceph_kill_client       (long client);
+
+
+
+
+  public CephFileSystem() {
+      root = new Path("/");
+      parent = new Path("..");
+  }
+
+  /*
+  public S3FileSystem(FileSystemStore store) {
+    this.store = store;
+  } */
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+      //store.initialize(uri, conf);
+    setConf(conf);
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
+
+    // TODO: local filesystem? we really need to figure out this conf thingy
+    this.localFs = get(URI.create("file:///"), conf);
+
+    //  Initializes the client    
+    this.clientPointer = ceph_initializeClient();
+    System.out.println("Initialized client with pointer " + clientPointer 
+                      + ". Setting cwd to /");
+    ceph_setcwd("/");
+    // DEBUG
+    // attempt to do three exists operations on root
+    //System.out.println("DEBUG: attempting isdir() on root (/)");
+    // ceph_isdirectory("/");
+    //System.out.println("DEBUG: attempting exists() on root (/)");
+    //ceph_exists("/");
+  }  
+
+  @Override
+      public void close() throws IOException {
+      System.out.println("Pretending to shut down client with pointer " + clientPointer
+                        + ". Not really doing anything.");
+  }
+
+  @Override
+  public String getName() {
+    return getUri().toString();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+      return makeAbsolute(new Path(ceph_getcwd()));
+  }
+
+  @Override
+  public void setWorkingDirectory(Path dir) {
+      Path abs_path = makeAbsolute(dir);
+
+      // error conditions if path's not a directory
+      boolean isDir = false;
+      boolean path_exists = false;
+      try {
+         isDir = isDirectory(abs_path);
+         path_exists = exists(abs_path);
+      }
+
+      catch (IOException e) {
+         System.out.println("Warning: isDirectory threw an exception");
+      }
+
+    if (!isDir) {
+         if (path_exists)
+             System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + 
+                                   "): path is not a directory");
+         else
+             System.out.println("Warning: SetWorkingDirectory(" + dir.toString() + 
+                                   "): path does not exist");
+      }
+      else {
+         ceph_setcwd(dir.toString());
+      }
+    //System.out.println("DEBUG: Attempting to change cwd to " + dir.toString() +
+    //          "changes cwd to" + getWorkingDirectory().toString());
+  }
+
+    // Makes a Path absolute. In a cheap, dirty hack, we're
+    // also going to strip off any "ceph://null" prefix we see. 
+  private Path makeAbsolute(Path path) {
+      // first, check for the prefix
+      if (path.toString().startsWith("ceph://null")) {
+         
+         Path stripped_path = new Path(path.toString().substring("ceph://null".length()));
+         //System.out.println("Stripping path \"" + path.toString() + "\" to \""
+         //                 + stripped_path.toString() + "\"");
+         return stripped_path;
+      }
+
+
+    if (path.isAbsolute()) {
+      return path;
+    }
+    Path wd = getWorkingDirectory();
+    //System.out.println("Working directory is " + wd.toString());
+    if (wd.toString().equals(""))
+       return new Path(root, path);
+    else
+       return new Path(wd, path);
+  }
+
+    private String[] getEmptyStringArray(int size) {
+       return new String[size];
+    }
+
+  @Override
+  public boolean exists(Path path) throws IOException {
+      boolean result;
+      Path abs_path = makeAbsolute(path);
+      if (abs_path.toString().equals("/"))
+         {
+             //System.out.println("Bug workaround! returning true for exists(/)");
+             result = true;
+         }
+      else 
+         {
+             //System.out.println("Calling ceph_exists from Java on path " + abs_path.toString() + ":");
+             result =  ceph_exists(abs_path.toString());
+             //System.out.println("Returned from ceph_exists to Java");
+         }
+      // System.out.println("exists \"" + path.toString() + "\"? Absolute path is \"" +
+      //               abs_path.toString() + "\", result = " + result);
+
+      return result;
+  }
+
+
+  /* Creates the directory and all nonexistent parents.   */
+  @Override
+  public boolean mkdirs(Path path) throws IOException {
+
+      Path abs_path = makeAbsolute(path);
+      //System.out.println("mkdirs: Creating directory \"" + path.toString() + "\": Absolute path is \"" +
+      //               abs_path.toString() + "\"");
+
+      // If the directory exists, we're happy, right? less work for us!
+      
+      if(exists(abs_path))
+             throw new IOException("Error: attempting to create an existing directory");
+
+      // The basic idea:
+      // get parent. if parent = null (happens if dir is root), fail. You can't really make
+      // the root directory...
+      // if parent doesn't exist, recursively make parent; fail if this fails.
+      // ceph_mkdir desired path.
+
+      Path parent = path.getParent();
+      boolean result = true;
+
+      // if the parent's null, we're trying to create the root directory. This is BAD.
+      if (null == parent)  {
+         System.out.println("Error: failed making directory \"" + abs_path.toString() + 
+                            "\": directory has null parent (directory is root)") ;
+         result = false;
+      }
+      else {
+         // try to make the parent if it doesn't exist
+         if (!exists(parent)) {
+             //System.out.println("mkdirs: parent of directory \"" + abs_path.toString() + 
+             //                         "does not exist. Recursively creating:");
+             if(!mkdirs(parent)) {
+                 System.out.println("mkdirs: failed creating directory \"" + 
+                                    abs_path.toString() + 
+                                    "because of failure recursively creating parent" +
+                                    parent.toString());
+                 result = false;
+             }
+         }
+      }
+      // try to make the directory, unless the parent was null or we
+      // tried and failed to make the parent
+      if (result) {
+         result = ceph_mkdir(abs_path.toString());
+      }
+      //System.out.println("mkdirs: attempted to make directory " + abs_path.toString() + 
+      //                        ": result is " + result);
+      return result;
+  }
+
+
+   @Override
+  public boolean isDirectory(Path path) throws IOException {
+     Path abs_path = makeAbsolute(path);
+     boolean result;
+
+
+     if (abs_path.toString().equals("/"))
+     {
+        //System.out.println("Bug workaround! returning true for isDirectory(/)");
+        result = true;
+     }
+     else
+        result = ceph_isdirectory(abs_path.toString());
+     //System.out.println("isDirectory \"" + path.toString() + "\"? Absolute path is \"" +
+     //                abs_path.toString() + "\", result = " + result);
+     return result;
+   }
+
+    @Override
+    public boolean isFile(Path path) throws IOException {
+       Path abs_path = makeAbsolute(path);
+       boolean result;
+       if (abs_path.toString().equals("/"))
+           {
+               //System.out.println("Bug workaround! returning false for isFile(/)");
+               result =  false;
+           }
+       else
+           {
+               result = ceph_isfile(abs_path.toString());
+           }
+       //System.out.println("isFile \"" + path.toString() + "\"? Absolute path is \"" +
+       //      abs_path.toString() + "\", result = " + result);
+
+       return result;
+    }
+
+
+  @Override
+  public Path[] listPathsRaw(Path path) throws IOException {
+      
+      String dirlist[];
+
+      Path abs_path = makeAbsolute(path);
+
+      //System.out.println("listPathsRaw on path \"" + path.toString() + "\", absolute path \""
+      //                + abs_path.toString() + "\"");
+
+      // If it's a directory, get the listing. Otherwise, complain and give up.
+      if (isDirectory(abs_path))
+         dirlist = ceph_getdir(abs_path.toString());
+      else
+         {
+             if (exists(abs_path)) { }
+                 //  System.out.println("listPathsRaw on path \"" + abs_path.toString() + 
+                 //         "\" failed; the path is not a directory.");
+             else {}
+                 // System.out.println("listPathsRaw on path \"" + abs_path.toString() + 
+                 //         "\" failed; the path does not exist.");
+             return null;
+         }
+      
+
+      // convert the strings to Paths
+      Path paths[] = new Path[dirlist.length];
+      for(int i = 0; i < dirlist.length; ++i) {
+         //System.out.println("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
+         //                 dirlist[i] + "\"");
+
+         // convert each listing to an absolute path
+         Path raw_path = new Path(dirlist[i]);
+         if (raw_path.isAbsolute())
+             paths[i] = raw_path;
+         else
+             paths[i] = new Path(abs_path, raw_path);
+      }
+      return paths;     
+  }
+
+  @Override
+  public FSOutputStream createRaw(Path file, boolean overwrite,
+      short replication, long blockSize) throws IOException {
+
+    return createRaw(file, overwrite, replication, blockSize, null);
+    }
+
+
+  @Override
+  public FSOutputStream createRaw(Path file, boolean overwrite,
+      short replication, long blockSize, Progressable progress)
+      throws IOException {
+
+
+      Path absfilepath = makeAbsolute(file);
+
+      //System.out.println("createRaw: opening path \"" + file.toString() + "\" as absolute path \""
+      //                + absfilepath.toString() + "\" for writing");
+
+      // we're ignoring progress reporting and replication entirely.
+      // required semantics: if the file already exists, overwrite
+      // it if overwrite = true and throw an exception if
+      // overwrite = false.
+
+      // Step 1: existence test
+      if(isDirectory(absfilepath))
+             throw new IOException("createRaw: Cannot make an output stream to existing directory \""
+                                   + absfilepath.toString() + "\"");
+      
+      if (!overwrite) {
+         if (!exists(absfilepath)) {
+             throw new IOException("createRaw: Cannot open existing file \"" + absfilepath.toString() 
+                                   + "\" for writing without overwrite flag");
+         }
+      }
+      
+      // Step 2: do the directories in the path exist?
+      Path parent =  absfilepath.getParent();
+      if (parent != null) // if parent is root, we're done
+         {
+             if(!exists(parent))
+                 {
+                     //System.out.println("createRaw: parent directory of path \""  
+                     //                 + absfilepath.toString() + "\" does not exist. Creating:");
+                     mkdirs(parent);
+                 }
+         }
+
+      // Step 3: open the file
+      int fh = ceph_open_for_overwrite(absfilepath.toString());
+      if (fh < 0) {
+         throw new IOException("createRaw: Open for overwrite failed on path \"" + 
+                               absfilepath.toString() + "\"");
+      }
+      
+      // Step 4: create the stream
+      CephOutputStream result = new CephOutputStream(getConf(), clientPointer, fh);
+      //System.out.println("createRaw: opened absolute path \""  + absfilepath.toString() 
+      //                + "\" for writing with fh " + fh);
+
+      return result;
+  }
+
+  @Override
+  public FSInputStream openRaw(Path path) throws IOException {
+      Path abs_path = makeAbsolute(path);
+
+      //System.out.println("openRaw: opening path \"" + path.toString() + "\" as absolute path \""
+      //                + abs_path.toString() + "\" for reading");
+
+      if(!isFile(abs_path))
+         {
+             String error;
+             if (!exists(abs_path))
+             throw new IOException("openRaw:  absolute path \""  + abs_path.toString()
+                                   + "\" does not exist");
+             else
+             throw new IOException("openRaw:  absolute path \""  + abs_path.toString()
+                                   + "\" is not a file");
+         }
+
+      int fh = ceph_open_for_read(abs_path.toString());
+      if (fh < 0) {
+         throw new IOException("openRaw: Failed to open file " + abs_path.toString());
+      }
+
+      long size = ceph_getfilesize(abs_path.toString());
+      if (size < 0) {
+         throw new IOException("Failed to get file size for file " + abs_path.toString() + 
+                           " but succeeded in opening file. Something excitingly bizarre is going on.");
+      }
+
+      FSInputStream result =  new CephInputStream(getConf(), clientPointer, fh, size);
+
+      //System.out.println("openRaw: opened absolute path \""  + abs_path.toString() + 
+      //                "\" for reading with fd " + fh + " and size " + size);
+
+      return result;
+  }
+
+  @Override
+  public boolean renameRaw(Path src, Path dst) throws IOException {
+    // TODO: Check corner cases: dst already exists,
+    // or if path is directory with children
+
+    return ceph_rename(src.toString(), dst.toString());
+  }
+
+  @Override
+  public boolean deleteRaw(Path path) throws IOException {
+
+      Path abs_path = makeAbsolute(path);      
+
+      //System.out.println("deleteRaw: Deleting path " +
+      //                abs_path.toString());
+      if (abs_path.toString().equals("/"))
+                    throw new IOException("Error: attempting to delete the root directory");
+
+      // if it's a file, try to delete it.
+     if (isFile(abs_path)) {            
+       boolean result =  ceph_unlink(path.toString());
+       if(!result) {
+           // System.out.println("deleteRaw: failed to delete file \"" +
+           //         abs_path.toString() + "\".");
+           return false;
+       }
+       else
+           return true;
+     }
+       
+     // if it's a directory, recursively try to delete its contents,
+     // and then delete the directory.
+     // Otherwise, return false.
+
+      Path[] contents = listPathsRaw(path);
+      if (contents == null) {
+         // System.out.println("deleteRaw: Failed to read contents of directory \"" +
+         //         abs_path.toString() + "\" while trying to delete it");
+         return false;
+      }
+
+      // debug listing of directory contents
+      //System.out.println("deleteRaw: Enumerating contents of directory \"" +
+      //                abs_path.toString() + "\" to delete");
+
+      //      for (Path p : contents)  {
+      //  System.out.println("    Contains path \"" + p.toString() + "\", absolute path is \""
+      //                    + makeAbsolute(p).toString() + "\""); 
+      //}
+
+      // recursively delete the directory's contents
+      Path parent = abs_path.getParent();
+      for (Path p : contents) {
+         // skip self ("." directory entry)
+         if (makeAbsolute(p).equals(abs_path))  {
+             //System.out.println("Skipping self");
+             continue;
+         }
+         // skip parent (".." directory entry)
+         if (null != parent) {
+             if (p.equals(parent))  {
+                 //System.out.println("Skipping parent");
+                 continue;
+             }
+         }
+
+         //System.out.println("Path " + abs_path.toString() +  " contains path " 
+         //         + makeAbsolute(p).toString() +
+         //         ". Attempting to delete it recursively:");
+         if (!deleteRaw(p)) {
+             // System.out.println("deleteRaw: Failed to delete file \"" + 
+             //                 p.toString() + "\" while recursively deleting \""
+             //                 + abs_path.toString() + "\"" );
+             return false;
+         }
+      }
+  
+      boolean result =  ceph_unlink(path.toString());
+      if (!result)
+         System.out.println("deleteRaw: failed to delete \"" + abs_path.toString() + "\"");
+      return result;
+  }
+   
+
+  @Override
+  public long getLength(Path path) throws IOException {
+      Path abs_path = makeAbsolute(path);
+
+      if (!exists(abs_path)) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: File or directory " + abs_path.toString() + " does not exist.");
+      }          
+
+      long filesize = ceph_getfilesize(abs_path.toString());
+      if (filesize < 0) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getLength: Size of file or directory " + abs_path.toString() + " could not be retrieved.");
+      }          
+      return filesize;
+
+    }
+
+  /**
+   * User-defined replication is not supported for Ceph file systems at the moment.
+   */
+  @Override
+  public short getReplication(Path path) throws IOException {
+    return 1;
+  }
+
+  @Override
+  public short getDefaultReplication() {
+    return 1;
+  }
+
+  /**
+   * User-defined replication is not supported for Ceph file systems at the moment.
+   */
+  @Override
+  public boolean setReplicationRaw(Path path, short replication)
+      throws IOException {
+    return true;
+  }
+
+  @Override
+  public long getBlockSize(Path path) throws IOException {
+   
+    if (!exists(path)) {
+      throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
+    }
+    long result = ceph_getblocksize(path.toString());
+    if (!isFile(path)) {
+      throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " is not a file.");
+    }
+    else {
+       System.err.println("DEBUG: getBlockSize: alleged file really is a file");
+    }
+    if (result < 4096) {
+       System.err.println("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " + 
+                          "path exists; strange block size of " + result + " defaulting to 8192");
+       return 8192;
+    }
+
+    
+    return result;
+    //return DEFAULT_BLOCK_SIZE;
+    //  return ceph_getblocksize(path.toString());
+
+  }
+
+  @Override
+  public long getDefaultBlockSize() {
+      return DEFAULT_BLOCK_SIZE;
+      //return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
+    }
+
+  /**
+   * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
+   */
+  @Override
+  public String[][] getFileCacheHints(Path f, long start, long len)
+      throws IOException {
+    // TODO: Check this is the correct behavior
+    if (!exists(f)) {
+      return null;
+    }
+    return new String[][] { { "localhost" } };
+  }
+
+  @Override
+  public void lock(Path path, boolean shared) throws IOException {
+    // TODO: Design and implement? or just ignore locking?
+      return;
+  }
+
+  @Override
+  public void release(Path path) throws IOException {
+      return; //deprecated
+  }
+
+  @Override
+  public void reportChecksumFailure(Path f, 
+                                    FSInputStream in, long inPos, 
+                                    FSInputStream sums, long sumsPos) {
+    // TODO: What to do here?
+      return;
+  }
+
+  @Override
+  public void moveFromLocalFile(Path src, Path dst) throws IOException {
+      if (!ceph_copyFromLocalFile(src.toString(), dst.toString())) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.moveFromLocalFile: failed moving from local file " + src.toString() + " to Ceph file " + dst.toString());
+      }
+      //FileUtil.copy(localFs, src, this, dst, true, getConf());
+  }
+
+  @Override
+  public void copyFromLocalFile(Path src, Path dst) throws IOException {
+      // make sure Ceph path exists
+      Path abs_src = makeAbsolute(src);
+      Path abs_dst = makeAbsolute(dst);
+
+      if (isDirectory(abs_dst))
+             throw new IOException("Error in copyFromLocalFile: " +
+                                   "attempting to open an existing directory as a file");
+      Path abs_dst_parent = abs_dst.getParent();
+
+      if (!exists(abs_dst_parent))
+         mkdirs(abs_dst_parent);
+
+      if (!ceph_copyFromLocalFile(abs_src.toString(), abs_dst.toString())) {
+         throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.copyFromLocalFile: failed copying from local file " + abs_src.toString() + " to Ceph file " + abs_dst.toString());
+      }
+      //FileUtil.copy(localFs, src, this, dst, false, true, getConf());
+  }
+
+
+
+  @Override
+  public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
+
+      Path abs_ceph_src = makeAbsolute(ceph_src);
+      
+      //System.out.println("CopyToLocalFile: copying Ceph file \"" + abs_ceph_src.toString() + 
+      //                "\" to local file \"" + local_dst.toString() + "\" using client "
+      //                + clientPointer);
+
+      // make sure the alleged source file exists, and is actually a file, not
+      // a directory or a ballpoint pen or something
+      if (!isFile(abs_ceph_src)) {
+         if (!exists(abs_ceph_src)) {
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" 
+                                   + local_dst.toString() + 
+                                   "\" because the source file does not exist");
+         }
+         else {
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" + 
+                                   local_dst.toString() + 
+                                   "\" because the Ceph path is not a  file");
+         }
+      }
+
+      // if the destination's parent directory doesn't exist, create it.
+      Path local_dst_parent_dir = local_dst.getParent();
+      if(null == local_dst_parent_dir)
+         throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                               abs_ceph_src.toString() + "\" to local file \"" + 
+                               local_dst.toString() + 
+                               "\": destination is root");
+
+      if(!localFs.mkdirs(local_dst_parent_dir))
+             throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                   abs_ceph_src.toString() + "\" to local file \"" + 
+                                   local_dst.toString() + 
+                                   "\": creating the destination's parent directory failed.");
+      else
+         {
+             if (!ceph_copyToLocalFile(abs_ceph_src.toString(), local_dst.toString())) 
+                 {
+                     throw new IOException("copyToLocalFile:  failed copying Ceph file \"" + 
+                                           abs_ceph_src.toString() + "\" to local file \"" 
+                                           + local_dst.toString() + "\"");
+                 }
+         }
+      //System.out.println("CopyToLocalFile: copied Ceph file \"" + abs_ceph_src.toString() + 
+      //                "\" to local file \"" + local_dst.toString() + "\"");
+  }
+
+
+  @Override
+  public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    return tmpLocalFile;
+  }
+
+  @Override
+  public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
+      throws IOException {
+    moveFromLocalFile(tmpLocalFile, fsOutputFile);
+  }
+
+  // diagnostic methods
+
+    /*  void dump() throws IOException {
+    store.dump();
+  }
+
+  void purge() throws IOException {
+    store.purge();
+    }*/
+
+}
diff --git a/src/client/hadoop/ceph/CephFileSystem.java.tmp b/src/client/hadoop/ceph/CephFileSystem.java.tmp
new file mode 100644 (file)
index 0000000..f842e38
--- /dev/null
@@ -0,0 +1,59 @@
+
+
+
+
+package org.apache.hadoop.fs.ceph;
+
+import java.io.IOException;
+import java.net.URI;
+
+
+/**
+ * <p>
+ * A {@link FileSystem} backed by a <a href="ceph.sourceforge.net">Ceph</a> store.
+ * </p>
+ */
+public class CephFileSystem extends Filesystem { 
+
+
+    private Path workingDir = new Path("/user", System.getProperty("user.name"));
+
+    private URI uri;
+
+
+    public CephFileSystem() {
+       // perform all setup in initialize()
+    }
+
+    @Override
+    public URI getURI() {
+       return uri;
+    }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    setConf(conf);
+       
+    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
+
+    // TODO: local filesystem? we really need to figure out this conf thingy
+    this.localFs = get(URI.create("file:///"), conf);
+    
+    //  Initializes the client    
+    this.clientPointer = ceph_initializeClient();
+    System.out.println("Initialized client with pointer " + clientPointer 
+                      + ". Setting cwd to /");
+    ceph_setcwd("/");
+  }
+
+
+
+  @Override
+      public void close() throws IOException {
+      System.out.println("Pretending to shut down client with pointer " + clientPointer
+                        + ". Not really doing anything.");
+  }
+
+
+}
+
diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java
new file mode 100644 (file)
index 0000000..e3b4f1f
--- /dev/null
@@ -0,0 +1,189 @@
+package org.apache.hadoop.fs.ceph;
+
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+//import java.lang.IndexOutOfBoundsException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSInputStream;
+
+class CephInputStream extends FSInputStream {
+
+  static {
+    System.loadLibrary("hadoopcephfs");
+  }
+
+  private int bufferSize;
+
+    //private Block[] blocks;
+
+  private boolean closed;
+
+  private long clientPointer;
+
+  private int fileHandle;
+
+  private long fileLength;
+
+    //private long pos = 0;
+
+    //private DataInputStream blockStream;
+
+    //private long blockEnd = -1;
+
+  private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length);
+  private native long ceph_seek_from_start(long client, int fh, long pos);
+  private native long ceph_getpos(long client, int fh);
+  private native int ceph_close(long client, int fh);
+
+  private int ceph_read(byte[] buffer, int buffer_offset, int length)
+    { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
+  private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); }
+  private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); }
+  private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
+    
+    /*
+  public S3InputStream(Configuration conf, FileSystemStore store,
+      INode inode) {
+    
+    this.store = store;
+    this.blocks = inode.getBlocks();
+    for (Block block : blocks) {
+      this.fileLength += block.getLength();
+    }
+    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);    
+  }
+    */
+
+  public CephInputStream(Configuration conf, long clientp, int fh, long flength) {
+
+      // Whoever's calling the constructor is responsible for doing the actual ceph_open
+      // call and providing the file handle.
+      clientPointer = clientp;
+      fileLength = flength;
+      fileHandle = fh;
+      //System.out.println("CephInputStream constructor: initializing stream with fh "
+      //                + fh + " and file length " + flength);
+      
+      // TODO: Then what do we need from the config? The buffer size maybe?
+      // Anything? Bueller?
+
+  }
+
+  @Override
+  public synchronized long getPos() throws IOException {
+    return ceph_getpos();
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    return (int) (fileLength - getPos());
+  }
+
+  @Override
+  public synchronized void seek(long targetPos) throws IOException {
+      //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
+      //                " on fd " + fileHandle);
+    if (targetPos > fileLength) {
+      throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
+                        " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+    }
+    ceph_seek_from_start(targetPos);
+  }
+
+    
+  // reads a byte
+  @Override
+  public synchronized int read() throws IOException {
+      //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
+      //                + " by calling general read function");
+
+      byte result[] = new byte[1];
+      if (getPos() >= fileLength) return -1;
+      if (-1 == read(result, 0, 1)) return -1;
+      return result[0];
+  }
+
+
+  @Override
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+      //System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
+      
+    if (closed) {
+      throw new IOException("CephInputStream.read: cannot read " + len  + 
+                           " bytes from fd " + fileHandle + ": stream closed");
+    }
+    if (null == buf) {
+       throw new NullPointerException("Read buffer is null");
+    }
+
+    // check for proper index bounds
+    if((off < 0) || (len < 0) || (off + len > buf.length)) {
+       throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+                                           + "read length is " + len + ", buffer offset is " 
+                                           + off +", and buffer size is " + buf.length);
+       }
+
+    // ensure we're not past the end of the file
+    if (getPos() >= fileLength) 
+       {
+           System.out.println("CephInputStream.read: cannot read " + len  + 
+                                 " bytes from fd " + fileHandle + ": current position is " +
+                                 getPos() + " and file length is " + fileLength);
+           
+           return -1;
+       }
+    // actually do the read
+    int result = ceph_read(buf, off, len);
+    if (result < 0)
+      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
+                        + fileHandle + " failed.");
+    else {}
+    //      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
+    //          + fileHandle + ": succeeded in reading " + result + " bytes");
+
+
+
+    return result;
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    int result = ceph_close();
+    if (result != 0) {
+       throw new IOException("Close failed!");
+    }
+       
+    closed = true;
+  }
+
+  /**
+   * We don't support marks.
+   */
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readLimit) {
+    // Do nothing
+  }
+
+  @Override
+  public void reset() throws IOException {
+    throw new IOException("Mark not supported");
+  }
+
+}
diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java
new file mode 100644 (file)
index 0000000..9fac5a4
--- /dev/null
@@ -0,0 +1,201 @@
+package org.apache.hadoop.fs.ceph;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+class CephOutputStream extends FSDataOutputStream {
+
+  static {
+    System.loadLibrary("hadoopcephfs");
+  }
+
+
+  private int bufferSize;
+
+  private long fileLength;
+
+    //private FileSystemStore store;
+
+  private Path path;
+
+  private long blockSize;
+
+  private File backupFile;
+
+  private OutputStream backupStream;
+
+  private Random r = new Random();
+
+  private boolean closed;
+
+  private int fileHandle;
+
+  private long clientPointer;
+
+  private int bytesWrittenToBlock = 0;
+
+  private byte[] outBuf;
+
+    //private List<Block> blocks = new ArrayList<Block>();
+
+    //private Block nextBlock;
+
+    
+
+  private long ceph_seek_from_start(long pos) {
+    return ceph_seek_from_start(clientPointer, fileHandle, pos);
+  }
+  private long ceph_getpos() {
+    return ceph_getpos(clientPointer, fileHandle);
+  }
+  private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
+  private int ceph_write(byte[] buffer, int buffer_offset, int length)
+    { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
+
+
+  private native long ceph_seek_from_start(long client, int fh, long pos);
+  private native long ceph_getpos(long client, int fh);
+  private native int ceph_close(long client, int fh);
+  private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length);
+
+
+    /*  public CephOutputStream(Configuration conf, FileSystemStore store,
+      Path path, long blockSize, Progressable progress) throws IOException {
+    
+    // basic pseudocode:
+    // call ceph_open_for_write to open the file
+    // store the file handle
+    // store the client pointer
+    // look up and store the block size while we're at it
+    // the following code's old. kill it
+
+    this.store = store;
+    this.path = path;
+    this.blockSize = blockSize;
+    this.backupFile = newBackupFile();
+    this.backupStream = new FileOutputStream(backupFile);
+    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+    this.outBuf = new byte[bufferSize];
+
+    }*/
+
+
+    // The file handle 
+  public CephOutputStream(Configuration conf, long clientp, int fh) {
+      clientPointer = clientp;
+      fileHandle = fh;
+      //fileLength = flength;
+      closed = false;
+  }
+
+  // possibly useful for the local copy, write later thing?
+  // keep it around for now
+  private File newBackupFile() throws IOException {
+    File result = File.createTempFile("s3fs-out", "");
+    result.deleteOnExit();
+    return result;
+  } 
+
+
+  @Override
+  public long getPos() throws IOException {
+    // change to get the position from Ceph client
+      return ceph_getpos();
+  }
+
+    // writes a byte
+  @Override
+  public synchronized void write(int b) throws IOException {
+      //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle);
+
+    if (closed) {
+      throw new IOException("CephOutputStream.write: cannot write " + 
+                           "a byte to fd " + fileHandle + ": stream closed");
+    }
+    // Stick the byte in a buffer and write it
+    byte buf[] = new byte[1];
+    buf[0] = (byte) b;    
+    int result = ceph_write(buf, 0, 1);
+    if (1 != result)
+       System.out.println("CephOutputStream.write: failed writing a single byte to fd "
+                          + fileHandle + ": Ceph write() result = " + result);
+    return;
+  }
+
+  @Override
+  public synchronized void write(byte buf[], int off, int len) throws IOException {
+      //System.out.println("CephOutputStream.write: writing " + len + 
+      //                " bytes to fd " + fileHandle);
+
+      // make sure stream is open
+      if (closed) {
+         throw new IOException("CephOutputStream.write: cannot write " + len + 
+                               "bytes to fd " + fileHandle + ": stream closed");
+      }
+
+      // sanity check
+      if (null == buf) {
+         throw new NullPointerException("CephOutputStream.write: cannot write " + len + 
+                                        "bytes to fd " + fileHandle + ": write buffer is null");
+      }
+
+    // check for proper index bounds
+    if((off < 0) || (len < 0) || (off + len > buf.length)) {
+       throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
+                                           + "write length is " + len + ", buffer offset is " 
+                                           + off +", and buffer size is " + buf.length);
+       }
+
+    // write!
+    int result = ceph_write(buf, off, len);
+    if (result < 0) {
+       throw new IOException("CephOutputStream.write: Write of " + len + 
+                               "bytes to fd " + fileHandle + " failed");
+    }
+    if (result != len) {
+       throw new IOException("CephOutputStream.write: Write of " + len + 
+                               "bytes to fd " + fileHandle + "was incomplete:  only "
+                             + result + " of " + len + " bytes were written.");
+    }
+    return; 
+  }
+   
+  @Override
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+    return;
+  }
+    
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
+    }
+
+    int result = ceph_close();
+    if (result != 0) {
+       throw new IOException("Close failed!");
+    }
+       
+    closed = true;
+
+  }
+    
+
+}
+
+