import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.CreateFlag;
+//import org.apache.hadoop.fs.FsStatus;
+//import org.apache.hadoop.fs.CreateFlag;
/**
* <p>
- * A {@link FileSystem} backed by <a href="http://ceph.sourceforge.net">Ceph.</a>.
+ * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
* This will not start a Ceph instance; one must already be running.
* </p>
- */
+ * Configuration of the CephFileSystem is handled via a few Hadoop
+ * Configuration properties: <br>
+ * fs.ceph.monAddr -- the ip address of the monitor to connect to. <br>
+ * fs.ceph.libDir -- the directory that libceph and libhadoopceph are
+ * located in. This assumes Hadoop is being run on a linux-style machine
+ * with names like libceph.so.
+ * <p>
+ * You can also enable debugging of the CephFileSystem and Ceph itself: <br>
+ * fs.ceph.debug -- if 'true' will print out method enter/exit messages,
+ * plus a little more.
+ * fs.ceph.debugLevel -- a number that will print out diagnostic messages
+ * from Ceph of at least that importance.
+ */
public class CephFileSystem extends FileSystem {
private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
private native String ceph_hosts(int fh, long offset);
private native int ceph_setTimes(String path, long mtime, long atime);
+ /**
+ * Create a new CephFileSystem.
+ */
public CephFileSystem() {
debug("CephFileSystem:enter");
root = new Path("/");
debug("CephFileSystem:exit");
}
- /*
- public S3FileSystem(FileSystemStore store) {
- this.store = store;
- } */
-
+ /**
+ * Lets you get the URI of this CephFileSystem.
+ * @return the URI.
+ */
public URI getUri() {
if (!initialized) return null;
debug("getUri:enter");
return uri;
}
+ /**
+ * Should be called after constructing a CephFileSystem but before calling
+ * any other methods.
+ * Starts up the connection to Ceph, reads in configuraton options, etc.
+ * @param uri The URI for this filesystem.
+ * @param conf The Hadoop Configuration to retrieve properties from.
+ * @throws IOException if the Ceph client initialization fails.
+ */
@Override
- public void initialize(URI uri, Configuration conf) throws IOException {
+ public void initialize(URI uri, Configuration conf) throws IOException {
debug("initialize:enter");
System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
debug("initialize:exit");
}
+ /**
+ * Close down the CephFileSystem. Runs the base-class close method
+ * and then kills the Ceph client itself.
+ * @throws IOException if initialize() hasn't been called.
+ */
@Override
- public void close() throws IOException {
+ public void close() throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
debug("close:enter");
super.close();//this method does stuff, make sure it's run!
- System.gc(); //to run the finalizers on CephInput/OutputStreams
- //this is kinda a hack and we may need to adjust it
ceph_kill_client();
debug("close:exit");
}
+ /**
+ * Get an FSDataOutputStream to append onto a file.
+ * @param file The File you want to append onto
+ * @param bufferSize The size of the buffer to use in flushing writes
+ * @param progress The Progressable to report progress to.
+ * Reporting is limited but exists.
+ * @return An FSDataOutputStream that connects to the file on Ceph.
+ * @throws IOException If the file does not exist or Ceph fails.
+ */
public FSDataOutputStream append (Path file, int bufferSize,
Progressable progress) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
return new FSDataOutputStream(cephOStream);
}
+ /**
+ * Get the name of this CephFileSystem.
+ * @return The name of the CephFileSystem as a string.
+ * @deprecated Use getUri() instead.
+ */
@Deprecated
- public String getName() {
+ public String getName() {
if (!initialized) return null;
debug("getName:enter");
debug("getName:exit with value " + getUri().toString());
return getUri().toString();
}
+ /**
+ * Get the current working directory for the given file system
+ * @return the directory Path
+ */
public Path getWorkingDirectory() {
if (!initialized) return null;
debug("getWorkingDirectory:enter");
return new Path(fs_default_name + ceph_getcwd());
}
+ /**
+ * Set the current working directory for the given file system. All relative
+ * paths will be resolved relative to it.
+ *
+ * @param dir The directory to change to.
+ */
@Override
- public void setWorkingDirectory(Path dir) {
+ public void setWorkingDirectory(Path dir) {
if (!initialized) return;
debug("setWorkingDirecty:enter with new working dir " + dir);
Path abs_path = makeAbsolute(dir);
debug("setWorkingDirectory:exit");
}
-
-
+ /**
+ * Check if a path exists.
+ * Overriden because it's moderately faster than the generic implementation.
+ * @param path The file to check existence on.
+ * @return true if the file exists, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
+ */
@Override
- /**
- * There's not much point to using a full getFileStatus for this since
- * that makes more calls than we need, so override.
- */
- public boolean exists(Path path) throws IOException {
+ public boolean exists(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
debug("exists:enter with path " + path);
return result;
}
- /* Creates the directory and all nonexistent parents. */
+ /**
+ * Create a directory and any nonexistent parents. Any portion
+ * of the directory tree can exist without error.
+ * @param path The directory path to create
+ * @param perms The permissions to apply to the created directories.
+ * @return true if successful, false otherwise
+ * @throws IOException if initialize() hasn't been called.
+ */
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
}
/**
- * As with exists, this is faster than their method
+ * Check if a path is a file. This is moderately faster than the
+ * generic implementation.
+ * @param path The path to check.
+ * @return true if the path is a file, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean isFile(Path path) throws IOException {
+ public boolean isFile(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
debug("isFile:enter with path " + path);
return result;
}
- /*
- * And again, a bit faster than using the default, which does a full stat.
+ /**
+ * Check if a path is a directory. This is moderately faster than
+ * the generic implementation.
+ * @param path The path to check
+ * @return true if the path is a directory, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
*/
@Override
public boolean isDirectory(Path path) throws IOException {
return result;
}
- public FileStatus getFileStatus(Path p) throws IOException {
+ /**
+ * Get stat information on a file. This does not fill owner or group, as
+ * Ceph's support for these is a bit different than HDFS'.
+ * @param path The path to stat.
+ * @return FileStatus object containing the stat information.
+ * @throws IOException if initialize() hasn't been called
+ * @throws FileNotFoundException if the path could not be resolved.
+ */
+ public FileStatus getFileStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
- debug("getFileStatus:enter with path " + p);
- Path abs_p = makeAbsolute(p);
+ debug("getFileStatus:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
//sadly, Ceph doesn't really do uids/gids just yet, but
//everything else is filled
FileStatus status;
Stat lstat = new Stat();
- if(ceph_stat(abs_p.toString(), lstat)) {
+ if(ceph_stat(abs_path.toString(), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
- ceph_replication(abs_p.toString()),
+ ceph_replication(abs_path.toString()),
lstat.block_size,
//these times in seconds get converted to millis
lstat.mod_time*1000,
new FsPermission((short)lstat.mode),
null,
null,
- new Path(fs_default_name+abs_p.toString()));
+ new Path(fs_default_name+abs_path.toString()));
}
else { //fail out
throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
- + p + " does not exist or could not be accessed");
+ + path + " does not exist or could not be accessed");
}
debug("getFileStatus:exit");
return status;
}
- // array of statuses for the directory's contents
- public FileStatus[] listStatus(Path p) throws IOException {
+ /**
+ * Get the FileStatus for each listing in a directory.
+ * @param path
+ */
+ public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
- debug("listStatus:enter with path " + p);
- Path abs_path = makeAbsolute(p);
+ debug("listStatus:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
if (isDirectory(abs_path)) {
Path[] paths = listPaths(abs_path);
FileStatus[] statuses = new FileStatus[paths.length];
if (isFile(abs_path)) return null;
//shouldn't get here
- throw new FileNotFoundException("listStatus found no such file " + p);
+ throw new FileNotFoundException("listStatus found no such file " + path);
}
@Override