*/
public class CephFileSystem extends FileSystem {
- private static final long DEFAULT_BLOCK_SIZE = 8 * 1024 * 1024;
+ private static final long DEFAULT_BLOCK_SIZE = 4 * 1024 * 1024;
* @param progress The Progressable to report progress to.
* Reporting is limited but exists.
* @return An FSDataOutputStream that connects to the file on Ceph.
- * @throws IOException If the file does not exist or Ceph fails.
+ * @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
*/
public FSDataOutputStream append (Path file, int bufferSize,
Progressable progress) throws IOException {
/**
* Get the FileStatus for each listing in a directory.
- * @param path
+ * @param path The directory to get listings from.
+ * @return FileStatus[] containing one FileStatus for each directory listing;
+ * null if path is not a directory.
+ * @throws IOException if initialize() hasn't been called.
+ * @throws FileNotFoundException if the input path can't be found.
*/
public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
}
/**
- * In order to run this with Hadoop .20, I had to revert back to using
- * a boolean instead of the CreateFlag.
+ * Create a new file and open an FSDataOutputStream that's connected to it.
+ * @param path The file to create.
+ * @param permission The permissions to apply to the file.
+ * @param overwrite If true, overwrite any existing file with this name.
+ * @param bufferSize The size of the write buffer in the returned OutputStream.
+ * @param replication Ignored by Ceph. This can be configured via Ceph configuration.
+ * @param blockSize Ignored by Ceph.
+ * @param progress A Progressable to report back to. Reporting is limited but exists.
+ * @return An FSDataOutputStream pointing to the created file.
+ * @throws IOException if initialize() hasn't been called, or the path is an
+ * existing directory, or the path exists but overwrite is false, or there is a
+ * failure in attempting to open for append with Ceph.
*/
- public FSDataOutputStream create(Path f,
+ public FSDataOutputStream create(Path path,
FsPermission permission,
//EnumSet<CreateFlag> flag,
boolean overwrite,
) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
- debug("create:enter with path " + f);
- Path abs_path = makeAbsolute(f);
+ debug("create:enter with path " + path);
+ Path abs_path = makeAbsolute(path);
if (progress!=null) progress.progress();
// We ignore replication since that's not configurable here, and
// progress reporting is quite limited.
// Step 1: existence test
if(isDirectory(abs_path))
throw new IOException("create: Cannot overwrite existing directory \""
- + abs_path.toString() + "\" with a file");
+ + path.toString() + "\" with a file");
if (progress!=null) progress.progress();
// if (!flag.contains(CreateFlag.OVERWRITE)) {
if (!overwrite) {
debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
if (fh < 0) {
throw new IOException("create: Open for overwrite failed on path \"" +
- abs_path.toString() + "\"");
+ path.toString() + "\"");
}
// Step 4: create the stream
return new FSDataOutputStream(cephOStream);
}
- // Opens a Ceph file and attaches the file handle to an FSDataInputStream.
+ /**
+ * Open a Ceph file and attach the file handle to an FSDataInputStream.
+ * @param path The file to open
+ * @param bufferSize the size of the read buffer in the returned FSDataInputStream.
+ * @return FSDataInputStream reading from the given path.
+ * @throws IOException if initialize() hasn't been called, the path DNE or is a
+ * directory, or there is an error getting data to set up the FSDataInputStream.
+ */
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
return new FSDataInputStream(cephIStream);
}
+ /**
+ * Rename a file or directory.
+ * @param src The current path of the file/directory
+ * @param dst The new name for the path.
+ * @return true if the rename succeeded, false otherwise.
+ * @throws IOException if initialize() hasn't been called.
+ */
@Override
- public boolean rename(Path src, Path dst) throws IOException {
+ public boolean rename(Path src, Path dst) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
- // TODO: Check corner cases: dst already exists,
- // or path is directory with children
debug("rename:enter");
debug("calling ceph_rename from Java");
Path abs_src = makeAbsolute(src);
return result;
}
- /*
- * Note that this doesn't set names as the others do; there's no port
- * info since, hey, you aren't going to talk direct to a Ceph instance!
+ /**
+ * Get a BlockLocation object for each block in a file.
+ *
+ * Note that this doesn't include port numbers in the name field as
+ * Ceph handles slow/down servers internally. This data should be used
+ * only for selecting which servers to run which jobs on.
+ *
+ * @param file A FileStatus object corresponding to the file you want locations for.
+ * @param start The offset of the first part of the file you are interested in.
+ * @param len The amount of the file past the offset you are interested in.
+ * @return A BlockLocation[] where each object corresponds to a block within
+ * the given range.
+ * @throws IOException if initialize() hasn't been called.
*/
@Override
- public BlockLocation[] getFileBlockLocations(FileStatus file,
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
long start, long len) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
return locations;
}
- /* public FsStatus getStatus (Path path) throws IOException {
+ /**
+ * Get usage statistics on the Ceph filesystem.
+ * @param path A path to the partition you're interested in.
+ * Ceph doesn't partition, so this is ignored.
+ * @return FsStatus reportin capacity, usage, and remaining spac.
+ * @throws IOException if initialize() hasn't been called, or the
+ * stat somehow fails.
+ *
+ public FsStatus getStatus (Path path) throws IOException {
if (!initialized) throw new IOException("You have to initialize the"
+ " CephFileSystem before calling other methods.");
debug("getStatus:enter");
Path abs_path = makeAbsolute(path);
-
+
//currently(Ceph .12) Ceph actually ignores the path
//but we still pass it in; if Ceph stops ignoring we may need more
//error-checking code.
debug("getStatus:exit");
return new FsStatus(ceph_stat.capacity,
ceph_stat.used, ceph_stat.remaining);
- }*/
+ } */
- /* Added in for .20, not required in trunk */
+ /**
+ * Delete the given path, and any children if it's a directory.
+ * @param path The path to delete.
+ * @return true if the delete succeeded, false otherwise.
+ * @throws IOException If initialize() hasn't been called,
+ * or you try to delete the root directory.
+ * @deprecated Use delete(Path path, boolean recursive) instead.
+ */
public boolean delete(Path path) throws IOException { return delete(path, true); };
+ /**
+ * Delete the given path, and optionally its children.
+ * @param path the path to delete.
+ * @param recursive If the path is a directory and this is false,
+ * delete will throw an IOException. If path is a file this is ignored.
+ * @return true if the delete succeeded, false otherwise (including if
+ * path doesn't exist).
+ * @throws IOException if initialize() hasn't been called,
+ * or you attempt to non-recursively delete a directory,
+ * or you attempt to delete the root directory.
+ */
public boolean delete(Path path, boolean recursive) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
}
/**
- * User-defined replication is not supported for Ceph file systems at the moment.
+ * Returns the default replication value of 1. This may
+ * NOT be the actual value, as replication is controlled
+ * by a separate Ceph configuration.
*/
@Override
- public short getDefaultReplication() {
+ public short getDefaultReplication() {
return 1;
}
/**
- *
+ * Get the block size for a given file. This will return if path is a
+ * directory, but the value is meaningless.
+ * @param path The Path you want to get the block size for.
+ * @return the block size of path (in bytes) as a long.
+ * @throws IOException if initialize() hasn't been called or
+ * the path doesn't exist.
+ * @deprecated use getFileStatus instead.
*/
@Deprecated
- public long getBlockSize(Path path) throws IOException {
+ public long getBlockSize(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the"
+"CephFileSystem before calling other methods.");
debug("getBlockSize:enter with path " + path);
if (result < 4096) {
debug("org.apache.hadoop.fs.ceph.CephFileSystem.getBlockSize: " +
- "path exists; strange block size of " + result + " defaulting to 8192");
- return 8192;
+ "path exists; strange block size of " + result + " defaulting to 4096");
+ return 4096;
}
debug("getBlockSize:exit with result " + result);
return result;
}
+ /**
+ * Get the default block size.
+ * @return the default block size, in bytes, as a long.
+ */
@Override
- public long getDefaultBlockSize() {
+ public long getDefaultBlockSize() {
return DEFAULT_BLOCK_SIZE;
//return getConf().getLong("fs.ceph.block.size", DEFAULT_BLOCK_SIZE);
}
// also going to strip off any fs_default_name prefix we see.
private Path makeAbsolute(Path path) {
debug("makeAbsolute:enter with path " + path);
+ if (path == null) return new Path("/");
// first, check for the prefix
if (path.toString().startsWith(fs_default_name)) {