private static final int EEXIST = 17;
private static final int ENOENT = 2;
+ private static final int FATAL = 0;
+ private static final int ERROR = 1;
+ private static final int WARN = 2;
+ private static final int INFO = 3;
+ private static final int DEBUG = 4;
+ private static final int TRACE = 5;
+ private static final int NOLOG = 6;
+
private URI uri;
* Create a new CephFileSystem.
*/
public CephFileSystem() {
- debug("CephFileSystem:enter");
root = new Path("/");
- debug("CephFileSystem:exit");
+ debug("CephFileSystem:exit", DEBUG);
}
/**
*/
public URI getUri() {
if (!initialized) return null;
- debug("getUri:enter");
- debug("getUri:exit with return " + uri);
+ debug("getUri:exit with return " + uri, DEBUG);
return uri;
}
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
- debug("initialize:enter");
+ debug("initialize:enter", DEBUG);
if (!initialized) {
System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
arguments += " --client-readahead-max-periods="
+ conf.get("fs.ceph.readahead", "1");
//make sure they gave us a ceph monitor address or conf file
+ debug("initialize:Ceph intialization arguments: " + arguments, INFO);
if ( (conf.get("fs.ceph.monAddr") == null) &&
(arguments.indexOf("-m") == -1) &&
(arguments.indexOf("-c") == -1) ) {
- debug("You need to specify a Ceph monitor address.");
+ debug("initialize:You need to specify a Ceph monitor address.", FATAL);
throw new IOException("You must specify a Ceph monitor address or config file!");
}
// Initialize the client
if (!ceph_initializeClient(arguments,
conf.getInt("fs.ceph.blockSize", 1<<26))) {
- debug("Ceph initialization failed!");
+ debug("initialize:Ceph initialization failed!", FATAL);
throw new IOException("Ceph initialization failed!");
}
initialized = true;
- debug("Initialized client. Setting cwd to /");
+ debug("initialize:Ceph initialized client. Setting cwd to /", INFO);
ceph_setcwd("/");
}
- debug("initialize:exit");
+ debug("initialize:exit", DEBUG);
}
/**
public void close() throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("close:enter");
+ debug("close:enter", DEBUG);
super.close();//this method does stuff, make sure it's run!
+ debug("close: Calling ceph_kill_client from Java", TRACE);
ceph_kill_client();
- debug("close:exit");
+ debug("close:exit", DEBUG);
}
/**
Progressable progress) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("append:enter with path " + file + " bufferSize " + bufferSize);
+ debug("append:enter with path " + file + " bufferSize " + bufferSize, DEBUG);
Path abs_path = makeAbsolute(file);
if (progress!=null) progress.progress();
+ debug("append: Entering ceph_open_for_append from Java", TRACE);
int fd = ceph_open_for_append(abs_path.toString());
+ debug("append: Returned to Java", TRACE);
if (progress!=null) progress.progress();
if( fd < 0 ) { //error in open
throw new IOException("append: Open for append failed on path \"" +
abs_path.toString() + "\"");
}
CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
- debug("append:exit");
+ debug("append:exit", DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
*/
public Path getWorkingDirectory() {
if (!initialized) return null;
- debug("getWorkingDirectory:enter");
- debug("Working directory is " + ceph_getcwd());
- debug("getWorkingDirectory:exit");
+ debug("getWorkingDirectory:enter", DEBUG);
+ String cwd = ceph_getcwd();
+ debug("getWorkingDirectory:exit with path " + cwd, DEBUG);
return new Path(fs_default_name + ceph_getcwd());
}
@Override
public void setWorkingDirectory(Path dir) {
if (!initialized) return;
- debug("setWorkingDirecty:enter with new working dir " + dir);
+ debug("setWorkingDirecty:enter with new working dir " + dir, DEBUG);
Path abs_path = makeAbsolute(dir);
- debug("calling ceph_setcwd from Java");
+ debug("setWorkingDirectory:calling ceph_setcwd from Java", TRACE);
if (!ceph_setcwd(abs_path.toString()))
- debug("Warning:ceph_setcwd failed for some reason on path " + abs_path);
- debug("returned from ceph_setcwd to Java" );
- debug("setWorkingDirectory:exit");
+ debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ERROR);
+ debug("setWorkingDirectory:exit", DEBUG);
}
/**
public boolean exists(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("exists:enter with path " + path);
+ debug("exists:enter with path " + path, DEBUG);
boolean result;
Path abs_path = makeAbsolute(path);
if (abs_path.equals(root)) {
result = true;
}
else {
- debug("Calling ceph_exists from Java on path "
- + abs_path.toString() + ":");
+ debug("exists:Calling ceph_exists from Java on path "
+ + abs_path.toString(), TRACE);
result = ceph_exists(abs_path.toString());
- debug("Returned from ceph_exists to Java");
+ debug("exists:Returned from ceph_exists to Java", TRACE);
}
- debug("exists:exit with value " + result);
+ debug("exists:exit with value " + result, DEBUG);
return result;
}
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("mkdirs:enter with path " + path);
+ debug("mkdirs:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
- debug("calling ceph_mkdirs from Java");
+ debug("mkdirs:calling ceph_mkdirs from Java", TRACE);
int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
- debug("Returned from ceph_mkdirs to Java with result " + result);
- debug("mkdirs:exit with result " + result);
+ debug("mkdirs:exit with result " + result, DEBUG);
if (result != 0)
return false;
else return true;
public boolean isFile(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("isFile:enter with path " + path);
+ debug("isFile:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = false;
}
else {
+ debug("isFile:entering ceph_isfile from Java", TRACE);
result = ceph_isfile(abs_path.toString());
}
- debug("isFile:exit with result " + result);
+ debug("isFile:exit with result " + result, DEBUG);
return result;
}
public boolean isDirectory(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("isDirectory:enter with path " + path);
+ debug("isDirectory:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = true;
}
else {
- debug("calling ceph_isdirectory from Java");
+ debug("calling ceph_isdirectory from Java", TRACE);
result = ceph_isdirectory(abs_path.toString());
- debug("Returned from ceph_isdirectory to Java");
+ debug("Returned from ceph_isdirectory to Java", TRACE);
}
- debug("isDirectory:exit with result " + result);
+ debug("isDirectory:exit with result " + result, DEBUG);
return result;
}
public FileStatus getFileStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("getFileStatus:enter with path " + path);
+ debug("getFileStatus:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
//sadly, Ceph doesn't really do uids/gids just yet, but
//everything else is filled
FileStatus status;
Stat lstat = new Stat();
+ debug("getFileStatus: calling ceph_stat from Java", TRACE);
if(ceph_stat(abs_path.toString(), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
ceph_replication(abs_path.toString()),
+ path + " does not exist or could not be accessed");
}
- debug("getFileStatus:exit");
+ debug("getFileStatus:exit", DEBUG);
return status;
}
public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("listStatus:enter with path " + path);
+ debug("listStatus:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
if (paths != null) {
for (int i = 0; i < paths.length; ++i) {
statuses[i] = getFileStatus(paths[i]);
}
- debug("listStatus:exit");
+ debug("listStatus:exit", DEBUG);
return statuses;
}
if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
public void setPermission(Path p, FsPermission permission) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
+ debug("setPermission:enter with path " + p
+ + " and permissions " + permission, DEBUG);
Path abs_path = makeAbsolute(p);
+ debug("setPermission:calling ceph_setpermission from Java", TRACE);
ceph_setPermission(abs_path.toString(), permission.toShort());
+ debug("setPermission:exit", DEBUG);
}
/**
public void setTimes(Path p, long mtime, long atime) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
+ debug("setTimes:enter with path " + p + " mtime:" + mtime +
+ " atime:" + atime, DEBUG);
Path abs_path = makeAbsolute(p);
+ debug("setTimes:calling ceph_setTimes from Java", TRACE);
int r = ceph_setTimes(abs_path.toString(), mtime, atime);
if (r<0) throw new IOException ("Failed to set times on path "
+ abs_path.toString() + " Error code: " + r);
+ debug("setTimes:exit", DEBUG);
}
/**
) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("create:enter with path " + path);
+ debug("create:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
if (progress!=null) progress.progress();
// We ignore replication since that's not configurable here, and
if (progress!=null) progress.progress();
}
// Step 3: open the file
- debug("calling ceph_open_for_overwrite from Java");
+ debug("calling ceph_open_for_overwrite from Java", TRACE);
int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
if (progress!=null) progress.progress();
- debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
+ debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, TRACE);
if (fh < 0) {
throw new IOException("create: Open for overwrite failed on path \"" +
path.toString() + "\"");
// Step 4: create the stream
OutputStream cephOStream = new CephOutputStream(getConf(), fh);
- debug("create:exit");
+ debug("create:exit", DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("open:enter with path " + path);
+ debug("open:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
int fh = ceph_open_for_read(abs_path.toString());
+ "\" is a directory!");
}
Stat lstat = new Stat();
+ debug("open:calling ceph_stat from Java", TRACE);
ceph_stat(abs_path.toString(), lstat);
+ debug("open:returned to Java", TRACE);
long size = lstat.size;
if (size < 0) {
throw new IOException("Failed to get file size for file " + abs_path.toString() +
" but succeeded in opening file. Something bizarre is going on.");
}
FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
- debug("open:exit");
+ debug("open:exit", DEBUG);
return new FSDataInputStream(cephIStream);
}
public boolean rename(Path src, Path dst) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("rename:enter");
- debug("calling ceph_rename from Java");
+ debug("rename:enter with src:" + src + " and dest:" + dst, DEBUG);
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
+ debug("calling ceph_rename from Java", TRACE);
boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
- debug("return from ceph_rename to Java with result " + result);
- debug("rename:exit");
+ debug("rename:exit with result: " + result, DEBUG);
return result;
}
long start, long len) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
+ debug("getFileBlockLocations:enter with path " + file.getPath() +
+ ", start pos " + start + ", length " + len, DEBUG);
//sanitize and get the filehandle
Path abs_path = makeAbsolute(file.getPath());
+ debug("getFileBlockLocations:call ceph_open_for_read from Java", TRACE);
int fh = ceph_open_for_read(abs_path.toString());
+ debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh "
+ + fh, TRACE);
if (fh < 0) {
+ debug("getFileBlockLocations:got error " + fh +
+ ", exiting and returning null!", ERROR);
return null;
}
//get the block size
+ debug("getFileBlockLocations:call ceph_getblocksize from Java", TRACE);
long blockSize = ceph_getblocksize(abs_path.toString());
+ debug("getFileBlockLocations:return from ceph_getblocksize", TRACE);
BlockLocation[] locations =
new BlockLocation[(int)Math.ceil(len/(float)blockSize)];
+ long offset;
for (int i = 0; i < locations.length; ++i) {
- String host = ceph_hosts(fh, start + i*blockSize);
+ offset = start + i*blockSize;
+ debug("getFileBlockLocations:call ceph_hosts from Java on fh "
+ + fh + " and offset " + offset, TRACE);
+ String host = ceph_hosts(fh, offset);
+ debug("getFileBlockLocations:return from ceph_hosts to Java with host "
+ + host, TRACE);
String[] hostArray = new String[1];
hostArray[0] = host;
locations[i] = new BlockLocation(hostArray, hostArray,
start+i*blockSize-(start % blockSize),
blockSize);
}
+ debug("getFileBlockLocations:call ceph_close from Java on fh "
+ + fh, TRACE);
ceph_close(fh);
+ debug("getFileBlockLocations:return with " + locations.length
+ + " locations", DEBUG);
return locations;
}
public FsStatus getStatus (Path path) throws IOException {
if (!initialized) throw new IOException("You have to initialize the "
+ " CephFileSystem before calling other methods.");
- debug("getStatus:enter");
+ debug("getStatus:enter with path " + path, DEBUG);
Path abs_path = makeAbsolute(path);
- //currently(Ceph .14) Ceph actually ignores the path
+ //currently(Ceph .16) Ceph actually ignores the path
//but we still pass it in; if Ceph stops ignoring we may need more
//error-checking code.
CephStat ceph_stat = new CephStat();
+ debug("getStatus:calling ceph_statfs from Java", TRACE);
int result = ceph_statfs(abs_path.toString(), ceph_stat);
if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
- debug("getStatus:exit");
+ debug("getStatus:exit successfully", DEBUG);
return new FsStatus(ceph_stat.capacity,
ceph_stat.used, ceph_stat.remaining);
}
public boolean delete(Path path, boolean recursive) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- debug("delete:enter");
+ debug("delete:enter with path " + path + " and recursive=" + recursive,
+ DEBUG);
Path abs_path = makeAbsolute(path);
- debug("delete: Deleting path " + abs_path.toString());
// sanity check
if (abs_path.equals(root))
throw new IOException("Error: deleting the root directory is a Bad Idea.");
-
if (!exists(abs_path)) return false;
// if the path is a file, try to delete it.
if (isFile(abs_path)) {
+ debug("delete:calling ceph_unlink from Java with path " + abs_path,
+ TRACE);
boolean result = ceph_unlink(abs_path.toString());
if(!result)
debug("delete: failed to delete file \"" +
- abs_path.toString() + "\".");
- debug("delete:exit");
+ abs_path.toString() + "\".", ERROR);
+ debug("delete:exit with success=" + result, DEBUG);
return result;
}
Path[] contents = listPaths(abs_path);
if (contents == null) {
debug("delete: Failed to read contents of directory \"" +
- abs_path.toString() + "\" while trying to delete it");
- debug("delete:exit");
+ abs_path.toString() +
+ "\" while trying to delete it, BAILING", ERROR);
return false;
}
// delete the entries
+ debug("delete: recursively calling delete on contents of "
+ + abs_path, DEBUG);
for (Path p : contents) {
if (!delete(p, true)) {
debug("delete: Failed to delete file \"" +
p.toString() + "\" while recursively deleting \""
- + abs_path.toString() + "\"" );
- debug("delete:exit");
+ + abs_path.toString() + "\", BAILING", ERROR );
return false;
}
}
//if we've come this far it's a now-empty directory, so delete it!
boolean result = ceph_rmdir(abs_path.toString());
if (!result)
- debug("delete: failed to delete \"" + abs_path.toString() + "\"");
- debug("delete:exit");
+ debug("delete: failed to delete \"" + abs_path.toString()
+ + "\", BAILING", ERROR);
+ debug("delete:exit", DEBUG);
return result;
}
// Makes a Path absolute. In a cheap, dirty hack, we're
// also going to strip off any fs_default_name prefix we see.
private Path makeAbsolute(Path path) {
- debug("makeAbsolute:enter with path " + path);
+ debug("makeAbsolute:enter with path " + path, NOLOG);
if (path == null) return new Path("/");
// first, check for the prefix
if (path.toString().startsWith(fs_default_name)) {
Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
- debug("makeAbsolute:exit with path " + stripped_path);
+ debug("makeAbsolute:exit with path " + stripped_path, NOLOG);
return stripped_path;
}
if (path.isAbsolute()) {
- debug("makeAbsolute:exit with path " + path);
+ debug("makeAbsolute:exit with path " + path, NOLOG);
return path;
}
Path new_path = new Path(ceph_getcwd(), path);
- debug("makeAbsolute:exit with path " + new_path);
+ debug("makeAbsolute:exit with path " + new_path, NOLOG);
return new_path;
}
private Path[] listPaths(Path path) throws IOException {
- debug("listPaths:enter with path " + path);
+ debug("listPaths:enter with path " + path, NOLOG);
String dirlist[];
Path abs_path = makeAbsolute(path);
// If it's a directory, get the listing. Otherwise, complain and give up.
- debug("calling ceph_getdir from Java with path " + abs_path);
+ debug("calling ceph_getdir from Java with path " + abs_path, NOLOG);
dirlist = ceph_getdir(abs_path.toString());
- debug("returning from ceph_getdir to Java");
+ debug("returning from ceph_getdir to Java", NOLOG);
if (dirlist == null) {
throw new IOException("listPaths: path " + path.toString() + " is not a directory.");
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
- dirlist[i] + "\"");
+ dirlist[i] + "\"", NOLOG);
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
if (raw_path.isAbsolute())
else
paths[i] = new Path(abs_path, raw_path);
}
- debug("listPaths:exit");
+ debug("listPaths:exit", NOLOG);
return paths;
}
- private void debug(String statement) {
+ private void debug(String statement, int priority) {
if (debug) System.err.println(statement);
+ switch(priority) {
+ case FATAL: LOG.fatal(statement);
+ break;
+ case ERROR: LOG.error(statement);
+ break;
+ case WARN: LOG.warn(statement);
+ break;
+ case INFO: LOG.info(statement);
+ break;
+ case DEBUG: LOG.debug(statement);
+ break;
+ case TRACE: LOG.trace(statement);
+ break;
+ case NOLOG: break;
+ default: break;
+ }
}
private static class Stat {