]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Continued cleanup work.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 00:20:08 +0000 (17:20 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:37:01 +0000 (11:37 -0700)
src/client/hadoop/ceph/CephFileSystem.java

index 95690164765f439270d396f0cc44f1a67d02e231..e4dee02e835a5e6af825f66f1faca333e1dedec1 100644 (file)
@@ -4,15 +4,18 @@ package org.apache.hadoop.fs.ceph;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Set;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CreateFlag;
 
 /**
  * <p>
@@ -92,7 +95,7 @@ public class CephFileSystem extends FileSystem {
   private native boolean ceph_isfile            (long client, String path);
   private native String[]ceph_getdir            (long client, String path);
   private native int     ceph_mkdirs            (long client, String path, int mode);
-  private native int     ceph_open_for_append   (long client, String path, int mode);
+  private native int     ceph_open_for_append   (long client, String path);
   private native int     ceph_open_for_read     (long client, String path);
   private native int     ceph_open_for_overwrite(long client, String path, int mode);
   private native boolean ceph_kill_client       (long client);
@@ -142,14 +145,18 @@ public class CephFileSystem extends FileSystem {
                       + ". Not really doing anything.");
   }
 
-  public FsDataOutputStream append (Path file, int bufferSize,
+  public FSDataOutputStream append (Path file, int bufferSize,
                                    Progressable progress) throws IOException {
-    ceph_open_for_append(....);
-    return new CephDataOutstream(...);
+    Path abs_path = makeAbsolute(file);
+    int fd = ceph_open_for_append(abs_path.toString());
+    if( fd < 0 ) { //error in open
+      throw new IOException("append: Open for append failed on path \"" +
+                           abs_path.toString() + "\"");
+    }
+    return new CephOutputStream(getConf(), clientPointer, fd);
   }
 
-  @Override
-    public String getName() {
+  public String getName() {
     return getUri().toString();
   }
 
@@ -241,10 +248,12 @@ public class CephFileSystem extends FileSystem {
   /* Creates the directory and all nonexistent parents.   */
   public boolean mkdirs(Path path, FsPermission perms) throws IOException {
     Path abs_path = makeAbsolute(path);
-    result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
+    int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
     /*System.out.println("mkdirs: attempted to make directory "
       + abs_path.toString() +  ": result is " + result); */
-    return result;
+    if (result != 0)
+      return true;
+    else return false;
   }
 
 
@@ -299,16 +308,15 @@ public class CephFileSystem extends FileSystem {
     public FileStatus[] listStatus(Path p) throws IOException {
     Path abs_p = makeAbsolute(p);
     Path[] paths = listPathsRaw(abs_p);
-    FileStatus[] statuses = new FileStatus[paths.size()];
-    for (Path path : paths) {
-      statuses[i] = getFileStatus(path);
+    FileStatus[] statuses = new FileStatus[paths.length];
+    for (int i = 0; i < paths.length; ++i) {
+      statuses[i] = getFileStatus(paths[i]);
     }
     return statuses;
   }
 
 
-  @Override
-    public Path[] listPathsRaw(Path path) throws IOException {
+  public Path[] listPathsRaw(Path path) throws IOException {
 
     String dirlist[];
 
@@ -352,7 +360,7 @@ public class CephFileSystem extends FileSystem {
 
   public FSDataOutputStream create(Path f,
                                   FsPermission permission,
-                                  boolean overwrite,
+                                  EnumSet<CreateFlag> flag,
                                   int bufferSize,
                                   short replication,
                                   long blockSize,
@@ -360,26 +368,26 @@ public class CephFileSystem extends FileSystem {
                                   ) throws IOException {
        
 
-    Path absfilepath = makeAbsolute(file);
+    Path abs_path = makeAbsolute(f);
       
     // We ignore progress reporting and replication.
     // Required semantics: if the file exists, overwrite if overwrite == true, and
     // throw an exception if overwrite == false.
 
     // Step 1: existence test
-    if(isDirectory(absfilepath))
+    if(isDirectory(abs_path))
       throw new IOException("create: Cannot overwrite existing directory \""
-                           + absfilepath.toString() + "\" with a file");      
-    if (!overwrite) {
-      if (exists(absfilepath)) {
+                           + abs_path.toString() + "\" with a file");      
+    if (!flag.contains(CreateFlag.OVERWRITE)) {
+      if (exists(abs_path)) {
        throw new IOException("createRaw: Cannot open existing file \"" 
-                             + absfilepath.toString() 
+                             + abs_path.toString() 
                              + "\" for writing without overwrite flag");
       }
     }
-      
+
     // Step 2: create any nonexistent directories in the path
-    Path parent =  absfilepath.getParent();
+    Path parent =  abs_path.getParent();
     if (parent != null) { // if parent is root, we're done
       if(!exists(parent)) {
        //System.out.println("createRaw: parent directory of path \""  
@@ -389,10 +397,10 @@ public class CephFileSystem extends FileSystem {
     }
 
     // Step 3: open the file
-    int fh = ceph_open_for_overwrite(absfilepath.toString(), int(permission.toShort()));
+    int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
     if (fh < 0) {
       throw new IOException("createRaw: Open for overwrite failed on path \"" + 
-                           absfilepath.toString() + "\"");
+                           abs_path.toString() + "\"");
     }
       
     // Step 4: create the stream
@@ -407,7 +415,7 @@ public class CephFileSystem extends FileSystem {
 
   // Opens a Ceph file and attaches the file handle to an FSDataInputStream.
   @Override
-    public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    public FSInputStream open(Path path, int bufferSize) throws IOException {
     Path abs_path = makeAbsolute(path);
 
     if(!isFile(abs_path)) {
@@ -428,7 +436,7 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("Failed to get file size for file " + abs_path.toString() + 
                            " but succeeded in opening file. Something bizarre is going on.");
     }
-    FSDataInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
+    FSInputStream result = new CephInputStream(getConf(), clientPointer, fh, size);
     return result;
   }
 
@@ -462,7 +470,7 @@ public class CephFileSystem extends FileSystem {
     /* If the path is a directory, recursively try to delete its contents,
        and then delete the directory. */
     if (!recursive) {
-      throw IOException("Directories must be deleted recursively!");
+      throw new IOException("Directories must be deleted recursively!");
     }
     Path[] contents = listPathsRaw(path);
     if (contents == null) {
@@ -507,12 +515,11 @@ public class CephFileSystem extends FileSystem {
   /**
    * User-defined replication is not supported for Ceph file systems at the moment.
    */
-  @Override
+
     public short getReplication(Path path) throws IOException {
     return 1;
   }
 
-  @Override
     public short getDefaultReplication() {
     return 1;
   }
@@ -520,14 +527,12 @@ public class CephFileSystem extends FileSystem {
   /**
    * User-defined replication is not supported for Ceph file systems at the moment.
    */
-  @Override
-    public boolean setReplicationRaw(Path path, short replication)
+  public boolean setReplicationRaw(Path path, short replication)
     throws IOException {
     return true;
   }
 
-  @Override
-    public long getBlockSize(Path path) throws IOException {
+  public long getBlockSize(Path path) throws IOException {
    
     if (!exists(path)) {
       throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist.");
@@ -561,8 +566,7 @@ public class CephFileSystem extends FileSystem {
   /**
    * Return 1x1 'localhost' cell if the file exists. Return null if otherwise.
    */
-  @Override
-    public String[][] getFileCacheHints(Path f, long start, long len)
+  public String[][] getFileCacheHints(Path f, long start, long len)
     throws IOException {
     // TODO: Check this is the correct behavior
     if (!exists(f)) {
@@ -571,14 +575,12 @@ public class CephFileSystem extends FileSystem {
     return new String[][] { { "localhost" } };
   }
 
-  @Override
-    public void lock(Path path, boolean shared) throws IOException {
+  public void lock(Path path, boolean shared) throws IOException {
     // TODO: Design and implement? or just ignore locking?
     return;
   }
 
-  @Override
-    public void release(Path path) throws IOException {
+  public void release(Path path) throws IOException {
     return; //deprecated
   }
 
@@ -621,8 +623,7 @@ public class CephFileSystem extends FileSystem {
 
 
 
-  @Override
-    public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
+  public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException {
 
     Path abs_ceph_src = makeAbsolute(ceph_src);