From f3db8c5c2fb995f66eee2f510bb293f3122268b0 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Thu, 6 Aug 2009 21:49:50 -0700 Subject: [PATCH] Hadoop: Added in initialization checks and limited use of Progressables. --- src/client/hadoop/ceph/CephFileSystem.java | 81 ++++++++++++++-------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index 17b7614937762..5c5aa63f9019b 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -41,6 +41,7 @@ public class CephFileSystem extends FileSystem { private FileSystem localFs; private Path root; private Path parent; + private boolean initialized = false; private static boolean debug; private static String cephDebugLevel; @@ -86,6 +87,7 @@ public class CephFileSystem extends FileSystem { } */ public URI getUri() { + if (!initialized) return null; debug("getUri:enter"); debug("getUri:exit with return " + uri); return uri; @@ -112,6 +114,7 @@ public class CephFileSystem extends FileSystem { if (!ceph_initializeClient(cephDebugLevel, monAddr)) { throw new IOException("Ceph initialization failed!"); } + initialized = true; debug("Initialized client. Setting cwd to /"); ceph_setcwd("/"); debug("initialize:exit"); @@ -119,6 +122,7 @@ public class CephFileSystem extends FileSystem { @Override public void close() throws IOException { + if (!initialized) throw new IOException ("close:You have to initialize the CephFileSystem before calling other methods."); debug("close:enter"); super.close();//this method does stuff, make sure it's run! System.gc(); //to run the finalizers on CephInput/OutputStreams @@ -129,9 +133,12 @@ public class CephFileSystem extends FileSystem { public FSDataOutputStream append (Path file, int bufferSize, Progressable progress) throws IOException { + if (!initialized) throw new IOException ("append:You have to initialize the CephFileSystem before calling other methods."); debug("append:enter with path " + file + " bufferSize " + bufferSize); Path abs_path = makeAbsolute(file); + if (progress!=null) progress.progress(); int fd = ceph_open_for_append(abs_path.toString()); + if (progress!=null) progress.progress(); if( fd < 0 ) { //error in open throw new IOException("append: Open for append failed on path \"" + abs_path.toString() + "\""); @@ -143,20 +150,23 @@ public class CephFileSystem extends FileSystem { @Deprecated public String getName() { + if (!initialized) return null; debug("getName:enter"); debug("getName:exit with value " + getUri().toString()); return getUri().toString(); } public Path getWorkingDirectory() { + if (!initialized) return null; debug("getWorkingDirectory:enter"); debug("Working directory is " + ceph_getcwd()); debug("getWorkingDirectory:exit"); - return new Path(fs_default_name + makeAbsolute(new Path(ceph_getcwd()))); + return new Path(fs_default_name + ceph_getcwd()); } @Override public void setWorkingDirectory(Path dir) { + if (!initialized) return; debug("setWorkingDirecty:enter with new working dir " + dir); Path abs_path = makeAbsolute(dir); @@ -196,6 +206,7 @@ public class CephFileSystem extends FileSystem { * that makes more calls than we need, so override. */ public boolean exists(Path path) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("exists:enter with path " + path); boolean result; Path abs_path = makeAbsolute(path); @@ -213,6 +224,7 @@ public class CephFileSystem extends FileSystem { /* Creates the directory and all nonexistent parents. */ public boolean mkdirs(Path path, FsPermission perms) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("mkdirs:enter with path " + path); Path abs_path = makeAbsolute(path); debug("calling ceph_mkdirs from Java"); @@ -229,6 +241,7 @@ public class CephFileSystem extends FileSystem { */ @Override public boolean isFile(Path path) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("isFile:enter with path " + path); Path abs_path = makeAbsolute(path); boolean result; @@ -242,7 +255,29 @@ public class CephFileSystem extends FileSystem { return result; } + /* + * And again, a bit faster than using the default, which does a full stat. + */ + @Override + public boolean isDirectory(Path path) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); + debug("isDirectory:enter with path " + path); + Path abs_path = makeAbsolute(path); + boolean result; + if (abs_path.toString().equals("/")) { + result = true; + } + else { + debug("calling ceph_isdirectory from Java"); + result = ceph_isdirectory(abs_path.toString()); + debug("Returned from ceph_isdirectory to Java"); + } + debug("isDirectory:exit with result " + result); + return result; + } + public FileStatus getFileStatus(Path p) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("getFileStatus:enter with path " + p); Path abs_p = makeAbsolute(p); //sadly, Ceph doesn't really do uids/gids just yet, but @@ -250,7 +285,6 @@ public class CephFileSystem extends FileSystem { FileStatus status; Stat lstat = new Stat(); if(ceph_stat(abs_p.toString(), lstat)) { - debug("getFileStatus: mod_time is " + lstat.mod_time); status = new FileStatus(lstat.size, lstat.is_dir, ceph_replication(abs_p.toString()), lstat.block_size, @@ -273,6 +307,7 @@ public class CephFileSystem extends FileSystem { // array of statuses for the directory's contents public FileStatus[] listStatus(Path p) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("listStatus:enter with path " + p); Path abs_p = makeAbsolute(p); Path[] paths = listPaths(abs_p); @@ -286,6 +321,7 @@ public class CephFileSystem extends FileSystem { @Override public void setPermission(Path p, FsPermission permission) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); Path abs_path = makeAbsolute(p); ceph_setPermission(abs_path.toString(), permission.toShort()); } @@ -303,11 +339,12 @@ public class CephFileSystem extends FileSystem { long blockSize, Progressable progress ) throws IOException { - + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("create:enter with path " + f); Path abs_path = makeAbsolute(f); - - // We ignore progress reporting and replication. + if (progress!=null) progress.progress(); + // We ignore replication since that's not configurable here, and + // progress reporting is quite limited. // Required semantics: if the file exists, overwrite if CreateFlag.OVERWRITE; // throw an exception if !CreateFlag.OVERWRITE. @@ -315,6 +352,7 @@ public class CephFileSystem extends FileSystem { if(isDirectory(abs_path)) throw new IOException("create: Cannot overwrite existing directory \"" + abs_path.toString() + "\" with a file"); + if (progress!=null) progress.progress(); //if (!flag.contains(CreateFlag.OVERWRITE)) { if (!overwrite) { if (exists(abs_path)) { @@ -331,10 +369,11 @@ public class CephFileSystem extends FileSystem { mkdirs(parent); } } - + if (progress!=null) progress.progress(); // Step 3: open the file debug("calling ceph_open_for_overwrite from Java"); int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); + if (progress!=null) progress.progress(); debug("Returned from ceph_open_for_overwrite to Java with fh " + fh); if (fh < 0) { throw new IOException("create: Open for overwrite failed on path \"" + @@ -343,15 +382,13 @@ public class CephFileSystem extends FileSystem { // Step 4: create the stream OutputStream cephOStream = new CephOutputStream(getConf(), fh); - //debug("createRaw: opened absolute path \"" + absfilepath.toString() - // + "\" for writing with fh " + fh); - debug("create:exit"); return new FSDataOutputStream(cephOStream); } // Opens a Ceph file and attaches the file handle to an FSDataInputStream. public FSDataInputStream open(Path path, int bufferSize) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("open:enter with path " + path); Path abs_path = makeAbsolute(path); @@ -380,6 +417,7 @@ public class CephFileSystem extends FileSystem { @Override public boolean rename(Path src, Path dst) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); // TODO: Check corner cases: dst already exists, // or path is directory with children debug("rename:enter"); @@ -399,6 +437,7 @@ public class CephFileSystem extends FileSystem { @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); //sanitize and get the filehandle Path abs_path = makeAbsolute(file.getPath()); int fh = ceph_open_for_read(abs_path.toString()); @@ -424,6 +463,7 @@ public class CephFileSystem extends FileSystem { public boolean delete(Path path) throws IOException { return delete(path, true); }; public boolean delete(Path path, boolean recursive) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("delete:enter"); Path abs_path = makeAbsolute(path); @@ -484,10 +524,11 @@ public class CephFileSystem extends FileSystem { } /** - * You need to guarantee the path exists before calling this method, for now. + * */ @Deprecated public long getBlockSize(Path path) throws IOException { + if (!initialized) throw new IOException ("You have to initialize the CephFileSystem before calling other methods."); debug("getBlockSize:enter with path " + path); Path abs_path = makeAbsolute(path); if (!exists(abs_path)) { @@ -540,26 +581,6 @@ public class CephFileSystem extends FileSystem { } } - /* - * This method is a bit faster than doing a full stat is. - */ - @Override - public boolean isDirectory(Path path) throws IOException { - debug("isDirectory:enter with path " + path); - Path abs_path = makeAbsolute(path); - boolean result; - if (abs_path.toString().equals("/")) { - result = true; - } - else { - debug("calling ceph_isdirectory from Java"); - result = ceph_isdirectory(abs_path.toString()); - debug("Returned from ceph_isdirectory to Java"); - } - debug("isDirectory:exit with result " + result); - return result; - } - private long __getLength(Path path) throws IOException { debug("__getLength:enter with path " + path); Path abs_path = makeAbsolute(path); -- 2.39.5