import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FsStatus;
private URI uri;
- private Path root;
+ private final Path root;
private boolean initialized = false;
- private static boolean debug = false;
- private static String cephDebugLevel;
- private static String monAddr;
- private static String fs_default_name;
+ private boolean debug = false;
+ private String cephDebugLevel;
+ private String monAddr;
+ private String fs_default_name;
private native boolean ceph_initializeClient(String arguments, int block_size);
private native String ceph_getcwd();
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
super.initialize(uri, conf);
setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ statistics = getStatistics(uri.getScheme(), getClass());
fs_default_name = conf.get("fs.default.name");
debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
//build up the arguments for Ceph
- String arguments = new String("CephFSInterface");
+ String arguments = "CephFSInterface";
arguments += conf.get("fs.ceph.commandLine", "");
if (conf.get("fs.ceph.clientDebug") != null) {
arguments += " --debug_client ";
}
CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
if(debug) debug("append:exit");
- return new FSDataOutputStream(cephOStream);
- }
-
- /**
- * Get the name of this CephFileSystem.
- * @return The name of the CephFileSystem as a string.
- * @deprecated Use getUri() instead.
- */
- @Deprecated
- public String getName() {
- if (!initialized) return null;
- if(debug) debug("getName:enter");
- if(debug) debug("getName:exit with value " + getUri().toString());
- return getUri().toString();
+ return new FSDataOutputStream(cephOStream, statistics);
}
/**
if(debug) debug("exists:enter with path " + path);
boolean result;
Path abs_path = makeAbsolute(path);
- if (abs_path.toString().equals("/")) {
+ if (abs_path.equals(root)) {
result = true;
}
else {
if(debug) debug("isFile:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
- if (abs_path.toString().equals("/")) {
+ if (abs_path.equals(root)) {
result = false;
}
else {
if(debug) debug("isDirectory:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
- if (abs_path.toString().equals(root)) {
+ if (abs_path.equals(root)) {
result = true;
}
else {
// Step 4: create the stream
OutputStream cephOStream = new CephOutputStream(getConf(), fh);
if(debug) debug("create:exit");
- return new FSDataOutputStream(cephOStream);
+ return new FSDataOutputStream(cephOStream, statistics);
}
/**
* Get usage statistics on the Ceph filesystem.
* @param path A path to the partition you're interested in.
* Ceph doesn't partition, so this is ignored.
- * @return FsStatus reportin capacity, usage, and remaining spac.
+ * @return FsStatus reporting capacity, usage, and remaining spac.
* @throws IOException if initialize() hasn't been called, or the
* stat somehow fails.
*/
+ @Override
public FsStatus getStatus (Path path) throws IOException {
if (!initialized) throw new IOException("You have to initialize the"
+ " CephFileSystem before calling other methods.");
if(debug) debug("getStatus:enter");
Path abs_path = makeAbsolute(path);
- //currently(Ceph .12) Ceph actually ignores the path
+ //currently(Ceph .14) Ceph actually ignores the path
//but we still pass it in; if Ceph stops ignoring we may need more
//error-checking code.
CephStat ceph_stat = new CephStat();
ceph_stat.used, ceph_stat.remaining);
}
- /**
- * Delete the given path, and any children if it's a directory.
- * @param path The path to delete.
- * @return true if the delete succeeded, false otherwise.
- * @throws IOException If initialize() hasn't been called,
- * or you try to delete the root directory.
- * @deprecated Use delete(Path path, boolean recursive) instead.
- */
- public boolean delete(Path path) throws IOException { return delete(path, true); };
-
/**
* Delete the given path, and optionally its children.
* @param path the path to delete.
if(debug) debug("delete: Deleting path " + abs_path.toString());
// sanity check
- if (abs_path.toString().equals(root))
+ if (abs_path.equals(root))
throw new IOException("Error: deleting the root directory is a Bad Idea.");
if (!exists(abs_path)) return false;
return false;
}
// delete the entries
- Path parent = abs_path.getParent();
for (Path p : contents) {
if (!delete(p, true)) {
if(debug) debug("delete: Failed to delete file \"" +
public short getDefaultReplication() {
return 1;
}
-
- /**
- * Get the block size for a given file. This will return if path is a
- * directory, but the value is meaningless.
- * @param path The Path you want to get the block size for.
- * @return the block size of path (in bytes) as a long.
- * @throws IOException if initialize() hasn't been called or
- * the path doesn't exist.
- * @deprecated use getFileStatus instead.
- */
- @Deprecated
- public long getBlockSize(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
- if(debug) debug("getBlockSize:enter with path " + path);
- Path abs_path = makeAbsolute(path);
- long result = ceph_getblocksize(abs_path.toString());
-
- if (result < 0)
- throw new IOException("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: File or directory "
- + path.toString() + " does not exist.");
-
- if(debug) debug("getBlockSize:exit with result " + result);
- return result;
- }
/**
* Get the default block size.
System.err.println(statement);
}
- private class Stat {
+ private static class Stat {
public long size;
public boolean is_dir;
public long block_size;
public Stat(){}
}
- private class CephStat {
+ private static class CephStat {
public long capacity;
public long used;
public long remaining;