]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Added in initialization checks and
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 7 Aug 2009 04:49:50 +0000 (21:49 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 7 Aug 2009 22:05:43 +0000 (15:05 -0700)
limited use of Progressables.

src/client/hadoop/ceph/CephFileSystem.java

index 17b7614937762da3bc640f5ff6220c8f770713ff..5c5aa63f9019bea2d0e9c37860abaabb2fdfa6c9 100644 (file)
@@ -41,6 +41,7 @@ public class CephFileSystem extends FileSystem {
   private FileSystem localFs;
   private Path root;
   private Path parent;
+  private boolean initialized = false;
 
   private static boolean debug;
   private static String cephDebugLevel;
@@ -86,6 +87,7 @@ public class CephFileSystem extends FileSystem {
     } */
 
   public URI getUri() {
+    if (!initialized) return null;
     debug("getUri:enter");
     debug("getUri:exit with return " + uri);
     return uri;
@@ -112,6 +114,7 @@ public class CephFileSystem extends FileSystem {
     if (!ceph_initializeClient(cephDebugLevel, monAddr)) {
       throw new IOException("Ceph initialization failed!");
     }
+    initialized = true;
     debug("Initialized client. Setting cwd to /");
     ceph_setcwd("/");
     debug("initialize:exit");
@@ -119,6 +122,7 @@ public class CephFileSystem extends FileSystem {
 
   @Override
     public void close() throws IOException {
+    if (!initialized) throw new IOException ("close:You have to initialize the CephFileSystem before calling other methods.");
     debug("close:enter");
     super.close();//this method does stuff, make sure it's run!
     System.gc(); //to run the finalizers on CephInput/OutputStreams
@@ -129,9 +133,12 @@ public class CephFileSystem extends FileSystem {
 
   public FSDataOutputStream append (Path file, int bufferSize,
                                    Progressable progress) throws IOException {
+    if (!initialized) throw new IOException ("append:You have to initialize the CephFileSystem before calling other methods.");
     debug("append:enter with path " + file + " bufferSize " + bufferSize);
     Path abs_path = makeAbsolute(file);
+    if (progress!=null) progress.progress();
     int fd = ceph_open_for_append(abs_path.toString());
+    if (progress!=null) progress.progress();
     if( fd < 0 ) { //error in open
       throw new IOException("append: Open for append failed on path \"" +
                            abs_path.toString() + "\"");
@@ -143,20 +150,23 @@ public class CephFileSystem extends FileSystem {
 
   @Deprecated
     public String getName() {
+    if (!initialized) return null;
     debug("getName:enter");
     debug("getName:exit with value " + getUri().toString());
     return getUri().toString();
   }
 
   public Path getWorkingDirectory() {
+    if (!initialized) return null;
     debug("getWorkingDirectory:enter");
     debug("Working directory is " + ceph_getcwd());
     debug("getWorkingDirectory:exit");
-    return new Path(fs_default_name + makeAbsolute(new Path(ceph_getcwd())));
+    return new Path(fs_default_name + ceph_getcwd());
   }
 
   @Override
     public void setWorkingDirectory(Path dir) {
+    if (!initialized) return;
     debug("setWorkingDirecty:enter with new working dir " + dir);
     Path abs_path = makeAbsolute(dir);
 
@@ -196,6 +206,7 @@ public class CephFileSystem extends FileSystem {
      * that makes more calls than we need, so override.
      */
     public boolean exists(Path path) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("exists:enter with path " + path);
     boolean result;
     Path abs_path = makeAbsolute(path);
@@ -213,6 +224,7 @@ public class CephFileSystem extends FileSystem {
 
   /* Creates the directory and all nonexistent parents.   */
   public boolean mkdirs(Path path, FsPermission perms) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("mkdirs:enter with path " + path);
     Path abs_path = makeAbsolute(path);
     debug("calling ceph_mkdirs from Java");
@@ -229,6 +241,7 @@ public class CephFileSystem extends FileSystem {
    */
   @Override
     public boolean isFile(Path path) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("isFile:enter with path " + path);
     Path abs_path = makeAbsolute(path);
     boolean result;
@@ -242,7 +255,29 @@ public class CephFileSystem extends FileSystem {
     return result;
   }
 
+  /*
+   * And again, a bit faster than using the default, which does a full stat.
+   */
+  @Override
+  public boolean isDirectory(Path path) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
+    debug("isDirectory:enter with path " + path);
+    Path abs_path = makeAbsolute(path);
+    boolean result;
+    if (abs_path.toString().equals("/")) {
+      result = true;
+    }
+    else {
+      debug("calling ceph_isdirectory from Java");
+      result = ceph_isdirectory(abs_path.toString());
+      debug("Returned from ceph_isdirectory to Java");
+    }
+    debug("isDirectory:exit with result " + result);
+    return result;
+  }
+
   public FileStatus getFileStatus(Path p) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("getFileStatus:enter with path " + p);
     Path abs_p = makeAbsolute(p);
     //sadly, Ceph doesn't really do uids/gids just yet, but
@@ -250,7 +285,6 @@ public class CephFileSystem extends FileSystem {
     FileStatus status;
     Stat lstat = new Stat();
     if(ceph_stat(abs_p.toString(), lstat)) {
-      debug("getFileStatus: mod_time is " + lstat.mod_time);
       status = new FileStatus(lstat.size, lstat.is_dir,
                              ceph_replication(abs_p.toString()),
                              lstat.block_size,
@@ -273,6 +307,7 @@ public class CephFileSystem extends FileSystem {
 
   // array of statuses for the directory's contents
   public FileStatus[] listStatus(Path p) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("listStatus:enter with path " + p);
     Path abs_p = makeAbsolute(p);
     Path[] paths = listPaths(abs_p);
@@ -286,6 +321,7 @@ public class CephFileSystem extends FileSystem {
 
   @Override
     public void setPermission(Path p, FsPermission permission) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     Path abs_path = makeAbsolute(p);
     ceph_setPermission(abs_path.toString(), permission.toShort());
   }
@@ -303,11 +339,12 @@ public class CephFileSystem extends FileSystem {
                                   long blockSize,
                                   Progressable progress
                                   ) throws IOException {
-       
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("create:enter with path " + f);
     Path abs_path = makeAbsolute(f);
-      
-    // We ignore progress reporting and replication.
+    if (progress!=null) progress.progress();
+    // We ignore replication since that's not configurable here, and
+    // progress reporting is quite limited.
     // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE;
     // throw an exception if !CreateFlag.OVERWRITE.
 
@@ -315,6 +352,7 @@ public class CephFileSystem extends FileSystem {
     if(isDirectory(abs_path))
       throw new IOException("create: Cannot overwrite existing directory \""
                            + abs_path.toString() + "\" with a file");      
+    if (progress!=null) progress.progress();
     //if (!flag.contains(CreateFlag.OVERWRITE)) {
     if (!overwrite) {
       if (exists(abs_path)) {
@@ -331,10 +369,11 @@ public class CephFileSystem extends FileSystem {
        mkdirs(parent);
       }
     }
-
+    if (progress!=null) progress.progress();
     // Step 3: open the file
     debug("calling ceph_open_for_overwrite from Java");
     int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
+    if (progress!=null) progress.progress();
     debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
     if (fh < 0) {
       throw new IOException("create: Open for overwrite failed on path \"" + 
@@ -343,15 +382,13 @@ public class CephFileSystem extends FileSystem {
       
     // Step 4: create the stream
     OutputStream cephOStream = new CephOutputStream(getConf(), fh);
-    //debug("createRaw: opened absolute path \""  + absfilepath.toString() 
-    //          + "\" for writing with fh " + fh);
-
     debug("create:exit");
     return new FSDataOutputStream(cephOStream);
   }
 
   // Opens a Ceph file and attaches the file handle to an FSDataInputStream.
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("open:enter with path " + path);
     Path abs_path = makeAbsolute(path);
     
@@ -380,6 +417,7 @@ public class CephFileSystem extends FileSystem {
 
   @Override
     public boolean rename(Path src, Path dst) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     // TODO: Check corner cases: dst already exists,
     // or path is directory with children
     debug("rename:enter");
@@ -399,6 +437,7 @@ public class CephFileSystem extends FileSystem {
   @Override
     public BlockLocation[] getFileBlockLocations(FileStatus file,
                          long start, long len) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     //sanitize and get the filehandle
     Path abs_path = makeAbsolute(file.getPath());
     int fh = ceph_open_for_read(abs_path.toString());
@@ -424,6 +463,7 @@ public class CephFileSystem extends FileSystem {
   public boolean delete(Path path) throws IOException { return delete(path, true); };
 
   public boolean delete(Path path, boolean recursive) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("delete:enter");
     Path abs_path = makeAbsolute(path);
     
@@ -484,10 +524,11 @@ public class CephFileSystem extends FileSystem {
   }
   
   /**
-   * You need to guarantee the path exists before calling this method, for now.
+   * 
    */
   @Deprecated
     public long getBlockSize(Path path) throws IOException {
+    if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods.");
     debug("getBlockSize:enter with path " + path);
     Path abs_path = makeAbsolute(path);
     if (!exists(abs_path)) {
@@ -540,26 +581,6 @@ public class CephFileSystem extends FileSystem {
     }
   }
  
-  /*
-   * This method is a bit faster than doing a full stat is.
-   */
-  @Override
-  public boolean isDirectory(Path path) throws IOException {
-    debug("isDirectory:enter with path " + path);
-    Path abs_path = makeAbsolute(path);
-    boolean result;
-    if (abs_path.toString().equals("/")) {
-      result = true;
-    }
-    else {
-      debug("calling ceph_isdirectory from Java");
-      result = ceph_isdirectory(abs_path.toString());
-      debug("Returned from ceph_isdirectory to Java");
-    }
-    debug("isDirectory:exit with result " + result);
-    return result;
-  }
-
   private long __getLength(Path path) throws IOException {
     debug("__getLength:enter with path " + path);
     Path abs_path = makeAbsolute(path);