From 9e08f0ec3dfdb3617f4d5bf3f6a70b0bd4bc3ba5 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Wed, 14 Oct 2009 16:17:44 -0700 Subject: [PATCH] Hadoop: Now also uses their logging framework --- src/client/hadoop/ceph/CephFileSystem.java | 208 +++++++++++++-------- 1 file changed, 133 insertions(+), 75 deletions(-) diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index e2c4728129ae8..d522167570163 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -65,6 +65,14 @@ public class CephFileSystem extends FileSystem { private static final int EEXIST = 17; private static final int ENOENT = 2; + private static final int FATAL = 0; + private static final int ERROR = 1; + private static final int WARN = 2; + private static final int INFO = 3; + private static final int DEBUG = 4; + private static final int TRACE = 5; + private static final int NOLOG = 6; + private URI uri; @@ -241,9 +249,8 @@ public class CephFileSystem extends FileSystem { * Create a new CephFileSystem. */ public CephFileSystem() { - debug("CephFileSystem:enter"); root = new Path("/"); - debug("CephFileSystem:exit"); + debug("CephFileSystem:exit", DEBUG); } /** @@ -252,8 +259,7 @@ public class CephFileSystem extends FileSystem { */ public URI getUri() { if (!initialized) return null; - debug("getUri:enter"); - debug("getUri:exit with return " + uri); + debug("getUri:exit with return " + uri, DEBUG); return uri; } @@ -268,7 +274,7 @@ public class CephFileSystem extends FileSystem { */ @Override public void initialize(URI uri, Configuration conf) throws IOException { - debug("initialize:enter"); + debug("initialize:enter", DEBUG); if (!initialized) { System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); @@ -297,23 +303,24 @@ public class CephFileSystem extends FileSystem { arguments += " --client-readahead-max-periods=" + conf.get("fs.ceph.readahead", "1"); //make sure they gave us a ceph monitor address or conf file + debug("initialize:Ceph intialization arguments: " + arguments, INFO); if ( (conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1) && (arguments.indexOf("-c") == -1) ) { - debug("You need to specify a Ceph monitor address."); + debug("initialize:You need to specify a Ceph monitor address.", FATAL); throw new IOException("You must specify a Ceph monitor address or config file!"); } // Initialize the client if (!ceph_initializeClient(arguments, conf.getInt("fs.ceph.blockSize", 1<<26))) { - debug("Ceph initialization failed!"); + debug("initialize:Ceph initialization failed!", FATAL); throw new IOException("Ceph initialization failed!"); } initialized = true; - debug("Initialized client. Setting cwd to /"); + debug("initialize:Ceph initialized client. Setting cwd to /", INFO); ceph_setcwd("/"); } - debug("initialize:exit"); + debug("initialize:exit", DEBUG); } /** @@ -325,10 +332,11 @@ public class CephFileSystem extends FileSystem { public void close() throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("close:enter"); + debug("close:enter", DEBUG); super.close();//this method does stuff, make sure it's run! + debug("close: Calling ceph_kill_client from Java", TRACE); ceph_kill_client(); - debug("close:exit"); + debug("close:exit", DEBUG); } /** @@ -344,17 +352,19 @@ public class CephFileSystem extends FileSystem { Progressable progress) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("append:enter with path " + file + " bufferSize " + bufferSize); + debug("append:enter with path " + file + " bufferSize " + bufferSize, DEBUG); Path abs_path = makeAbsolute(file); if (progress!=null) progress.progress(); + debug("append: Entering ceph_open_for_append from Java", TRACE); int fd = ceph_open_for_append(abs_path.toString()); + debug("append: Returned to Java", TRACE); if (progress!=null) progress.progress(); if( fd < 0 ) { //error in open throw new IOException("append: Open for append failed on path \"" + abs_path.toString() + "\""); } CephOutputStream cephOStream = new CephOutputStream(getConf(), fd); - debug("append:exit"); + debug("append:exit", DEBUG); return new FSDataOutputStream(cephOStream, statistics); } @@ -364,9 +374,9 @@ public class CephFileSystem extends FileSystem { */ public Path getWorkingDirectory() { if (!initialized) return null; - debug("getWorkingDirectory:enter"); - debug("Working directory is " + ceph_getcwd()); - debug("getWorkingDirectory:exit"); + debug("getWorkingDirectory:enter", DEBUG); + String cwd = ceph_getcwd(); + debug("getWorkingDirectory:exit with path " + cwd, DEBUG); return new Path(fs_default_name + ceph_getcwd()); } @@ -380,13 +390,12 @@ public class CephFileSystem extends FileSystem { @Override public void setWorkingDirectory(Path dir) { if (!initialized) return; - debug("setWorkingDirecty:enter with new working dir " + dir); + debug("setWorkingDirecty:enter with new working dir " + dir, DEBUG); Path abs_path = makeAbsolute(dir); - debug("calling ceph_setcwd from Java"); + debug("setWorkingDirectory:calling ceph_setcwd from Java", TRACE); if (!ceph_setcwd(abs_path.toString())) - debug("Warning:ceph_setcwd failed for some reason on path " + abs_path); - debug("returned from ceph_setcwd to Java" ); - debug("setWorkingDirectory:exit"); + debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ERROR); + debug("setWorkingDirectory:exit", DEBUG); } /** @@ -400,19 +409,19 @@ public class CephFileSystem extends FileSystem { public boolean exists(Path path) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("exists:enter with path " + path); + debug("exists:enter with path " + path, DEBUG); boolean result; Path abs_path = makeAbsolute(path); if (abs_path.equals(root)) { result = true; } else { - debug("Calling ceph_exists from Java on path " - + abs_path.toString() + ":"); + debug("exists:Calling ceph_exists from Java on path " + + abs_path.toString(), TRACE); result = ceph_exists(abs_path.toString()); - debug("Returned from ceph_exists to Java"); + debug("exists:Returned from ceph_exists to Java", TRACE); } - debug("exists:exit with value " + result); + debug("exists:exit with value " + result, DEBUG); return result; } @@ -427,12 +436,11 @@ public class CephFileSystem extends FileSystem { public boolean mkdirs(Path path, FsPermission perms) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("mkdirs:enter with path " + path); + debug("mkdirs:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); - debug("calling ceph_mkdirs from Java"); + debug("mkdirs:calling ceph_mkdirs from Java", TRACE); int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); - debug("Returned from ceph_mkdirs to Java with result " + result); - debug("mkdirs:exit with result " + result); + debug("mkdirs:exit with result " + result, DEBUG); if (result != 0) return false; else return true; @@ -449,16 +457,17 @@ public class CephFileSystem extends FileSystem { public boolean isFile(Path path) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("isFile:enter with path " + path); + debug("isFile:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); boolean result; if (abs_path.equals(root)) { result = false; } else { + debug("isFile:entering ceph_isfile from Java", TRACE); result = ceph_isfile(abs_path.toString()); } - debug("isFile:exit with result " + result); + debug("isFile:exit with result " + result, DEBUG); return result; } @@ -473,18 +482,18 @@ public class CephFileSystem extends FileSystem { public boolean isDirectory(Path path) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("isDirectory:enter with path " + path); + debug("isDirectory:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); boolean result; if (abs_path.equals(root)) { result = true; } else { - debug("calling ceph_isdirectory from Java"); + debug("calling ceph_isdirectory from Java", TRACE); result = ceph_isdirectory(abs_path.toString()); - debug("Returned from ceph_isdirectory to Java"); + debug("Returned from ceph_isdirectory to Java", TRACE); } - debug("isDirectory:exit with result " + result); + debug("isDirectory:exit with result " + result, DEBUG); return result; } @@ -499,12 +508,13 @@ public class CephFileSystem extends FileSystem { public FileStatus getFileStatus(Path path) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("getFileStatus:enter with path " + path); + debug("getFileStatus:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); //sadly, Ceph doesn't really do uids/gids just yet, but //everything else is filled FileStatus status; Stat lstat = new Stat(); + debug("getFileStatus: calling ceph_stat from Java", TRACE); if(ceph_stat(abs_path.toString(), lstat)) { status = new FileStatus(lstat.size, lstat.is_dir, ceph_replication(abs_path.toString()), @@ -521,7 +531,7 @@ public class CephFileSystem extends FileSystem { + path + " does not exist or could not be accessed"); } - debug("getFileStatus:exit"); + debug("getFileStatus:exit", DEBUG); return status; } @@ -536,7 +546,7 @@ public class CephFileSystem extends FileSystem { public FileStatus[] listStatus(Path path) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("listStatus:enter with path " + path); + debug("listStatus:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); Path[] paths = listPaths(abs_path); if (paths != null) { @@ -544,7 +554,7 @@ public class CephFileSystem extends FileSystem { for (int i = 0; i < paths.length; ++i) { statuses[i] = getFileStatus(paths[i]); } - debug("listStatus:exit"); + debug("listStatus:exit", DEBUG); return statuses; } if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null @@ -556,8 +566,12 @@ public class CephFileSystem extends FileSystem { public void setPermission(Path p, FsPermission permission) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); + debug("setPermission:enter with path " + p + + " and permissions " + permission, DEBUG); Path abs_path = makeAbsolute(p); + debug("setPermission:calling ceph_setpermission from Java", TRACE); ceph_setPermission(abs_path.toString(), permission.toShort()); + debug("setPermission:exit", DEBUG); } /** @@ -570,10 +584,14 @@ public class CephFileSystem extends FileSystem { public void setTimes(Path p, long mtime, long atime) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); + debug("setTimes:enter with path " + p + " mtime:" + mtime + + " atime:" + atime, DEBUG); Path abs_path = makeAbsolute(p); + debug("setTimes:calling ceph_setTimes from Java", TRACE); int r = ceph_setTimes(abs_path.toString(), mtime, atime); if (r<0) throw new IOException ("Failed to set times on path " + abs_path.toString() + " Error code: " + r); + debug("setTimes:exit", DEBUG); } /** @@ -605,7 +623,7 @@ public class CephFileSystem extends FileSystem { ) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("create:enter with path " + path); + debug("create:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); if (progress!=null) progress.progress(); // We ignore replication since that's not configurable here, and @@ -639,10 +657,10 @@ public class CephFileSystem extends FileSystem { if (progress!=null) progress.progress(); } // Step 3: open the file - debug("calling ceph_open_for_overwrite from Java"); + debug("calling ceph_open_for_overwrite from Java", TRACE); int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); if (progress!=null) progress.progress(); - debug("Returned from ceph_open_for_overwrite to Java with fh " + fh); + debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, TRACE); if (fh < 0) { throw new IOException("create: Open for overwrite failed on path \"" + path.toString() + "\""); @@ -650,7 +668,7 @@ public class CephFileSystem extends FileSystem { // Step 4: create the stream OutputStream cephOStream = new CephOutputStream(getConf(), fh); - debug("create:exit"); + debug("create:exit", DEBUG); return new FSDataOutputStream(cephOStream, statistics); } @@ -665,7 +683,7 @@ public class CephFileSystem extends FileSystem { public FSDataInputStream open(Path path, int bufferSize) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("open:enter with path " + path); + debug("open:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); int fh = ceph_open_for_read(abs_path.toString()); @@ -684,14 +702,16 @@ public class CephFileSystem extends FileSystem { + "\" is a directory!"); } Stat lstat = new Stat(); + debug("open:calling ceph_stat from Java", TRACE); ceph_stat(abs_path.toString(), lstat); + debug("open:returned to Java", TRACE); long size = lstat.size; if (size < 0) { throw new IOException("Failed to get file size for file " + abs_path.toString() + " but succeeded in opening file. Something bizarre is going on."); } FSInputStream cephIStream = new CephInputStream(getConf(), fh, size); - debug("open:exit"); + debug("open:exit", DEBUG); return new FSDataInputStream(cephIStream); } @@ -706,13 +726,12 @@ public class CephFileSystem extends FileSystem { public boolean rename(Path src, Path dst) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("rename:enter"); - debug("calling ceph_rename from Java"); + debug("rename:enter with src:" + src + " and dest:" + dst, DEBUG); Path abs_src = makeAbsolute(src); Path abs_dst = makeAbsolute(dst); + debug("calling ceph_rename from Java", TRACE); boolean result = ceph_rename(abs_src.toString(), abs_dst.toString()); - debug("return from ceph_rename to Java with result " + result); - debug("rename:exit"); + debug("rename:exit with result: " + result, DEBUG); return result; } @@ -735,25 +754,44 @@ public class CephFileSystem extends FileSystem { long start, long len) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); + debug("getFileBlockLocations:enter with path " + file.getPath() + + ", start pos " + start + ", length " + len, DEBUG); //sanitize and get the filehandle Path abs_path = makeAbsolute(file.getPath()); + debug("getFileBlockLocations:call ceph_open_for_read from Java", TRACE); int fh = ceph_open_for_read(abs_path.toString()); + debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh " + + fh, TRACE); if (fh < 0) { + debug("getFileBlockLocations:got error " + fh + + ", exiting and returning null!", ERROR); return null; } //get the block size + debug("getFileBlockLocations:call ceph_getblocksize from Java", TRACE); long blockSize = ceph_getblocksize(abs_path.toString()); + debug("getFileBlockLocations:return from ceph_getblocksize", TRACE); BlockLocation[] locations = new BlockLocation[(int)Math.ceil(len/(float)blockSize)]; + long offset; for (int i = 0; i < locations.length; ++i) { - String host = ceph_hosts(fh, start + i*blockSize); + offset = start + i*blockSize; + debug("getFileBlockLocations:call ceph_hosts from Java on fh " + + fh + " and offset " + offset, TRACE); + String host = ceph_hosts(fh, offset); + debug("getFileBlockLocations:return from ceph_hosts to Java with host " + + host, TRACE); String[] hostArray = new String[1]; hostArray[0] = host; locations[i] = new BlockLocation(hostArray, hostArray, start+i*blockSize-(start % blockSize), blockSize); } + debug("getFileBlockLocations:call ceph_close from Java on fh " + + fh, TRACE); ceph_close(fh); + debug("getFileBlockLocations:return with " + locations.length + + " locations", DEBUG); return locations; } @@ -769,16 +807,17 @@ public class CephFileSystem extends FileSystem { public FsStatus getStatus (Path path) throws IOException { if (!initialized) throw new IOException("You have to initialize the " + " CephFileSystem before calling other methods."); - debug("getStatus:enter"); + debug("getStatus:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); - //currently(Ceph .14) Ceph actually ignores the path + //currently(Ceph .16) Ceph actually ignores the path //but we still pass it in; if Ceph stops ignoring we may need more //error-checking code. CephStat ceph_stat = new CephStat(); + debug("getStatus:calling ceph_statfs from Java", TRACE); int result = ceph_statfs(abs_path.toString(), ceph_stat); if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result); - debug("getStatus:exit"); + debug("getStatus:exit successfully", DEBUG); return new FsStatus(ceph_stat.capacity, ceph_stat.used, ceph_stat.remaining); } @@ -797,23 +836,24 @@ public class CephFileSystem extends FileSystem { public boolean delete(Path path, boolean recursive) throws IOException { if (!initialized) throw new IOException ("You have to initialize the " +"CephFileSystem before calling other methods."); - debug("delete:enter"); + debug("delete:enter with path " + path + " and recursive=" + recursive, + DEBUG); Path abs_path = makeAbsolute(path); - debug("delete: Deleting path " + abs_path.toString()); // sanity check if (abs_path.equals(root)) throw new IOException("Error: deleting the root directory is a Bad Idea."); - if (!exists(abs_path)) return false; // if the path is a file, try to delete it. if (isFile(abs_path)) { + debug("delete:calling ceph_unlink from Java with path " + abs_path, + TRACE); boolean result = ceph_unlink(abs_path.toString()); if(!result) debug("delete: failed to delete file \"" + - abs_path.toString() + "\"."); - debug("delete:exit"); + abs_path.toString() + "\".", ERROR); + debug("delete:exit with success=" + result, DEBUG); return result; } @@ -826,25 +866,27 @@ public class CephFileSystem extends FileSystem { Path[] contents = listPaths(abs_path); if (contents == null) { debug("delete: Failed to read contents of directory \"" + - abs_path.toString() + "\" while trying to delete it"); - debug("delete:exit"); + abs_path.toString() + + "\" while trying to delete it, BAILING", ERROR); return false; } // delete the entries + debug("delete: recursively calling delete on contents of " + + abs_path, DEBUG); for (Path p : contents) { if (!delete(p, true)) { debug("delete: Failed to delete file \"" + p.toString() + "\" while recursively deleting \"" - + abs_path.toString() + "\"" ); - debug("delete:exit"); + + abs_path.toString() + "\", BAILING", ERROR ); return false; } } //if we've come this far it's a now-empty directory, so delete it! boolean result = ceph_rmdir(abs_path.toString()); if (!result) - debug("delete: failed to delete \"" + abs_path.toString() + "\""); - debug("delete:exit"); + debug("delete: failed to delete \"" + abs_path.toString() + + "\", BAILING", ERROR); + debug("delete:exit", DEBUG); return result; } @@ -870,34 +912,34 @@ public class CephFileSystem extends FileSystem { // Makes a Path absolute. In a cheap, dirty hack, we're // also going to strip off any fs_default_name prefix we see. private Path makeAbsolute(Path path) { - debug("makeAbsolute:enter with path " + path); + debug("makeAbsolute:enter with path " + path, NOLOG); if (path == null) return new Path("/"); // first, check for the prefix if (path.toString().startsWith(fs_default_name)) { Path stripped_path = new Path(path.toString().substring(fs_default_name.length())); - debug("makeAbsolute:exit with path " + stripped_path); + debug("makeAbsolute:exit with path " + stripped_path, NOLOG); return stripped_path; } if (path.isAbsolute()) { - debug("makeAbsolute:exit with path " + path); + debug("makeAbsolute:exit with path " + path, NOLOG); return path; } Path new_path = new Path(ceph_getcwd(), path); - debug("makeAbsolute:exit with path " + new_path); + debug("makeAbsolute:exit with path " + new_path, NOLOG); return new_path; } private Path[] listPaths(Path path) throws IOException { - debug("listPaths:enter with path " + path); + debug("listPaths:enter with path " + path, NOLOG); String dirlist[]; Path abs_path = makeAbsolute(path); // If it's a directory, get the listing. Otherwise, complain and give up. - debug("calling ceph_getdir from Java with path " + abs_path); + debug("calling ceph_getdir from Java with path " + abs_path, NOLOG); dirlist = ceph_getdir(abs_path.toString()); - debug("returning from ceph_getdir to Java"); + debug("returning from ceph_getdir to Java", NOLOG); if (dirlist == null) { throw new IOException("listPaths: path " + path.toString() + " is not a directory."); @@ -907,7 +949,7 @@ public class CephFileSystem extends FileSystem { Path[] paths = new Path[dirlist.length]; for (int i = 0; i < dirlist.length; ++i) { debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" + - dirlist[i] + "\""); + dirlist[i] + "\"", NOLOG); // convert each listing to an absolute path Path raw_path = new Path(dirlist[i]); if (raw_path.isAbsolute()) @@ -915,12 +957,28 @@ public class CephFileSystem extends FileSystem { else paths[i] = new Path(abs_path, raw_path); } - debug("listPaths:exit"); + debug("listPaths:exit", NOLOG); return paths; } - private void debug(String statement) { + private void debug(String statement, int priority) { if (debug) System.err.println(statement); + switch(priority) { + case FATAL: LOG.fatal(statement); + break; + case ERROR: LOG.error(statement); + break; + case WARN: LOG.warn(statement); + break; + case INFO: LOG.info(statement); + break; + case DEBUG: LOG.debug(statement); + break; + case TRACE: LOG.trace(statement); + break; + case NOLOG: break; + default: break; + } } private static class Stat { -- 2.39.5