From 3eec8bb6f53f95a922fa8a05e1479626cb2b3f81 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Tue, 21 Jul 2009 17:20:08 -0700 Subject: [PATCH] Hadoop: Continued cleanup work. --- src/client/hadoop/ceph/CephFileSystem.java | 85 +++++++++++----------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index 95690164765f4..e4dee02e835a5 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -4,15 +4,18 @@ package org.apache.hadoop.fs.ceph; import java.io.IOException; import java.net.URI; import java.util.Set; +import java.util.EnumSet; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.CreateFlag; /** *

@@ -92,7 +95,7 @@ public class CephFileSystem extends FileSystem { private native boolean ceph_isfile (long client, String path); private native String[]ceph_getdir (long client, String path); private native int ceph_mkdirs (long client, String path, int mode); - private native int ceph_open_for_append (long client, String path, int mode); + private native int ceph_open_for_append (long client, String path); private native int ceph_open_for_read (long client, String path); private native int ceph_open_for_overwrite(long client, String path, int mode); private native boolean ceph_kill_client (long client); @@ -142,14 +145,18 @@ public class CephFileSystem extends FileSystem { + ". Not really doing anything."); } - public FsDataOutputStream append (Path file, int bufferSize, + public FSDataOutputStream append (Path file, int bufferSize, Progressable progress) throws IOException { - ceph_open_for_append(....); - return new CephDataOutstream(...); + Path abs_path = makeAbsolute(file); + int fd = ceph_open_for_append(abs_path.toString()); + if( fd < 0 ) { //error in open + throw new IOException("append: Open for append failed on path \"" + + abs_path.toString() + "\""); + } + return new CephOutputStream(getConf(), clientPointer, fd); } - @Override - public String getName() { + public String getName() { return getUri().toString(); } @@ -241,10 +248,12 @@ public class CephFileSystem extends FileSystem { /* Creates the directory and all nonexistent parents. */ public boolean mkdirs(Path path, FsPermission perms) throws IOException { Path abs_path = makeAbsolute(path); - result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); + int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); /*System.out.println("mkdirs: attempted to make directory " + abs_path.toString() + ": result is " + result); */ - return result; + if (result != 0) + return true; + else return false; } @@ -299,16 +308,15 @@ public class CephFileSystem extends FileSystem { public FileStatus[] listStatus(Path p) throws IOException { Path abs_p = makeAbsolute(p); Path[] paths = listPathsRaw(abs_p); - FileStatus[] statuses = new FileStatus[paths.size()]; - for (Path path : paths) { - statuses[i] = getFileStatus(path); + FileStatus[] statuses = new FileStatus[paths.length]; + for (int i = 0; i < paths.length; ++i) { + statuses[i] = getFileStatus(paths[i]); } return statuses; } - @Override - public Path[] listPathsRaw(Path path) throws IOException { + public Path[] listPathsRaw(Path path) throws IOException { String dirlist[]; @@ -352,7 +360,7 @@ public class CephFileSystem extends FileSystem { public FSDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, + EnumSet flag, int bufferSize, short replication, long blockSize, @@ -360,26 +368,26 @@ public class CephFileSystem extends FileSystem { ) throws IOException { - Path absfilepath = makeAbsolute(file); + Path abs_path = makeAbsolute(f); // We ignore progress reporting and replication. // Required semantics: if the file exists, overwrite if overwrite == true, and // throw an exception if overwrite == false. // Step 1: existence test - if(isDirectory(absfilepath)) + if(isDirectory(abs_path)) throw new IOException("create: Cannot overwrite existing directory \"" - + absfilepath.toString() + "\" with a file"); - if (!overwrite) { - if (exists(absfilepath)) { + + abs_path.toString() + "\" with a file"); + if (!flag.contains(CreateFlag.OVERWRITE)) { + if (exists(abs_path)) { throw new IOException("createRaw: Cannot open existing file \"" - + absfilepath.toString() + + abs_path.toString() + "\" for writing without overwrite flag"); } } - + // Step 2: create any nonexistent directories in the path - Path parent = absfilepath.getParent(); + Path parent = abs_path.getParent(); if (parent != null) { // if parent is root, we're done if(!exists(parent)) { //System.out.println("createRaw: parent directory of path \"" @@ -389,10 +397,10 @@ public class CephFileSystem extends FileSystem { } // Step 3: open the file - int fh = ceph_open_for_overwrite(absfilepath.toString(), int(permission.toShort())); + int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); if (fh < 0) { throw new IOException("createRaw: Open for overwrite failed on path \"" + - absfilepath.toString() + "\""); + abs_path.toString() + "\""); } // Step 4: create the stream @@ -407,7 +415,7 @@ public class CephFileSystem extends FileSystem { // Opens a Ceph file and attaches the file handle to an FSDataInputStream. @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { + public FSInputStream open(Path path, int bufferSize) throws IOException { Path abs_path = makeAbsolute(path); if(!isFile(abs_path)) { @@ -428,7 +436,7 @@ public class CephFileSystem extends FileSystem { throw new IOException("Failed to get file size for file " + abs_path.toString() + " but succeeded in opening file. Something bizarre is going on."); } - FSDataInputStream result = new CephInputStream(getConf(), clientPointer, fh, size); + FSInputStream result = new CephInputStream(getConf(), clientPointer, fh, size); return result; } @@ -462,7 +470,7 @@ public class CephFileSystem extends FileSystem { /* If the path is a directory, recursively try to delete its contents, and then delete the directory. */ if (!recursive) { - throw IOException("Directories must be deleted recursively!"); + throw new IOException("Directories must be deleted recursively!"); } Path[] contents = listPathsRaw(path); if (contents == null) { @@ -507,12 +515,11 @@ public class CephFileSystem extends FileSystem { /** * User-defined replication is not supported for Ceph file systems at the moment. */ - @Override + public short getReplication(Path path) throws IOException { return 1; } - @Override public short getDefaultReplication() { return 1; } @@ -520,14 +527,12 @@ public class CephFileSystem extends FileSystem { /** * User-defined replication is not supported for Ceph file systems at the moment. */ - @Override - public boolean setReplicationRaw(Path path, short replication) + public boolean setReplicationRaw(Path path, short replication) throws IOException { return true; } - @Override - public long getBlockSize(Path path) throws IOException { + public long getBlockSize(Path path) throws IOException { if (!exists(path)) { throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory " + path.toString() + " does not exist."); @@ -561,8 +566,7 @@ public class CephFileSystem extends FileSystem { /** * Return 1x1 'localhost' cell if the file exists. Return null if otherwise. */ - @Override - public String[][] getFileCacheHints(Path f, long start, long len) + public String[][] getFileCacheHints(Path f, long start, long len) throws IOException { // TODO: Check this is the correct behavior if (!exists(f)) { @@ -571,14 +575,12 @@ public class CephFileSystem extends FileSystem { return new String[][] { { "localhost" } }; } - @Override - public void lock(Path path, boolean shared) throws IOException { + public void lock(Path path, boolean shared) throws IOException { // TODO: Design and implement? or just ignore locking? return; } - @Override - public void release(Path path) throws IOException { + public void release(Path path) throws IOException { return; //deprecated } @@ -621,8 +623,7 @@ public class CephFileSystem extends FileSystem { - @Override - public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException { + public void copyToLocalFile(Path ceph_src, Path local_dst, boolean copyCrc) throws IOException { Path abs_ceph_src = makeAbsolute(ceph_src); -- 2.39.5