* Create a new CephFileSystem.
*/
public CephFileSystem() {
- if(debug) debug("CephFileSystem:enter");
+ debug("CephFileSystem:enter");
root = new Path("/");
- if(debug) debug("CephFileSystem:exit");
+ debug("CephFileSystem:exit");
}
/**
*/
public URI getUri() {
if (!initialized) return null;
- if(debug) debug("getUri:enter");
- if(debug) debug("getUri:exit with return " + uri);
+ debug("getUri:enter");
+ debug("getUri:exit with return " + uri);
return uri;
}
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
- if(debug) debug("initialize:enter");
+ debug("initialize:enter");
if (!initialized) {
System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
if ( (conf.get("fs.ceph.monAddr") == null) &&
(arguments.indexOf("-m") == -1) &&
(arguments.indexOf("-c") == -1) ) {
- if(debug) debug("You need to specify a Ceph monitor address.");
+ debug("You need to specify a Ceph monitor address.");
throw new IOException("You must specify a Ceph monitor address or config file!");
}
// Initialize the client
if (!ceph_initializeClient(arguments,
conf.getInt("fs.ceph.blockSize", 1<<26))) {
- if(debug) debug("Ceph initialization failed!");
+ debug("Ceph initialization failed!");
throw new IOException("Ceph initialization failed!");
}
initialized = true;
- if(debug) debug("Initialized client. Setting cwd to /");
+ debug("Initialized client. Setting cwd to /");
ceph_setcwd("/");
}
- if(debug) debug("initialize:exit");
+ debug("initialize:exit");
}
/**
public void close() throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("close:enter");
+ debug("close:enter");
super.close();//this method does stuff, make sure it's run!
ceph_kill_client();
- if(debug) debug("close:exit");
+ debug("close:exit");
}
/**
Progressable progress) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("append:enter with path " + file + " bufferSize " + bufferSize);
+ debug("append:enter with path " + file + " bufferSize " + bufferSize);
Path abs_path = makeAbsolute(file);
if (progress!=null) progress.progress();
int fd = ceph_open_for_append(abs_path.toString());
abs_path.toString() + "\"");
}
CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
- if(debug) debug("append:exit");
+ debug("append:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
*/
public Path getWorkingDirectory() {
if (!initialized) return null;
- if(debug) debug("getWorkingDirectory:enter");
- if(debug) debug("Working directory is " + ceph_getcwd());
- if(debug) debug("getWorkingDirectory:exit");
+ debug("getWorkingDirectory:enter");
+ debug("Working directory is " + ceph_getcwd());
+ debug("getWorkingDirectory:exit");
return new Path(fs_default_name + ceph_getcwd());
}
@Override
public void setWorkingDirectory(Path dir) {
if (!initialized) return;
- if(debug) debug("setWorkingDirecty:enter with new working dir " + dir);
+ debug("setWorkingDirecty:enter with new working dir " + dir);
Path abs_path = makeAbsolute(dir);
- if(debug) debug("calling ceph_setcwd from Java");
+ debug("calling ceph_setcwd from Java");
if (!ceph_setcwd(abs_path.toString()))
- if(debug) debug("Warning:ceph_setcwd failed for some reason on path " + abs_path);
- if(debug) debug("returned from ceph_setcwd to Java" );
- if(debug) debug("setWorkingDirectory:exit");
+ debug("Warning:ceph_setcwd failed for some reason on path " + abs_path);
+ debug("returned from ceph_setcwd to Java" );
+ debug("setWorkingDirectory:exit");
}
/**
public boolean exists(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("exists:enter with path " + path);
+ debug("exists:enter with path " + path);
boolean result;
Path abs_path = makeAbsolute(path);
if (abs_path.equals(root)) {
result = true;
}
else {
- if(debug) debug("Calling ceph_exists from Java on path "
+ debug("Calling ceph_exists from Java on path "
+ abs_path.toString() + ":");
result = ceph_exists(abs_path.toString());
- if(debug) debug("Returned from ceph_exists to Java");
+ debug("Returned from ceph_exists to Java");
}
- if(debug) debug("exists:exit with value " + result);
+ debug("exists:exit with value " + result);
return result;
}
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("mkdirs:enter with path " + path);
+ debug("mkdirs:enter with path " + path);
Path abs_path = makeAbsolute(path);
- if(debug) debug("calling ceph_mkdirs from Java");
+ debug("calling ceph_mkdirs from Java");
int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
- if(debug) debug("Returned from ceph_mkdirs to Java with result " + result);
- if(debug) debug("mkdirs:exit with result " + result);
+ debug("Returned from ceph_mkdirs to Java with result " + result);
+ debug("mkdirs:exit with result " + result);
if (result != 0)
return false;
else return true;
public boolean isFile(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("isFile:enter with path " + path);
+ debug("isFile:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
else {
result = ceph_isfile(abs_path.toString());
}
- if(debug) debug("isFile:exit with result " + result);
+ debug("isFile:exit with result " + result);
return result;
}
public boolean isDirectory(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("isDirectory:enter with path " + path);
+ debug("isDirectory:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
if (abs_path.equals(root)) {
result = true;
}
else {
- if(debug) debug("calling ceph_isdirectory from Java");
+ debug("calling ceph_isdirectory from Java");
result = ceph_isdirectory(abs_path.toString());
- if(debug) debug("Returned from ceph_isdirectory to Java");
+ debug("Returned from ceph_isdirectory to Java");
}
- if(debug) debug("isDirectory:exit with result " + result);
+ debug("isDirectory:exit with result " + result);
return result;
}
public FileStatus getFileStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("getFileStatus:enter with path " + path);
+ debug("getFileStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
//sadly, Ceph doesn't really do uids/gids just yet, but
//everything else is filled
+ path + " does not exist or could not be accessed");
}
- if(debug) debug("getFileStatus:exit");
+ debug("getFileStatus:exit");
return status;
}
public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("listStatus:enter with path " + path);
+ debug("listStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
if (paths != null) {
for (int i = 0; i < paths.length; ++i) {
statuses[i] = getFileStatus(paths[i]);
}
- if(debug) debug("listStatus:exit");
+ debug("listStatus:exit");
return statuses;
}
if (!isFile(path)) throw new FileNotFoundException(); //if we get here, listPaths returned null
) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("create:enter with path " + path);
+ debug("create:enter with path " + path);
Path abs_path = makeAbsolute(path);
if (progress!=null) progress.progress();
// We ignore replication since that's not configurable here, and
if (progress!=null) progress.progress();
}
// Step 3: open the file
- if(debug) debug("calling ceph_open_for_overwrite from Java");
+ debug("calling ceph_open_for_overwrite from Java");
int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort());
if (progress!=null) progress.progress();
- if(debug) debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
+ debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
if (fh < 0) {
throw new IOException("create: Open for overwrite failed on path \"" +
path.toString() + "\"");
// Step 4: create the stream
OutputStream cephOStream = new CephOutputStream(getConf(), fh);
- if(debug) debug("create:exit");
+ debug("create:exit");
return new FSDataOutputStream(cephOStream, statistics);
}
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("open:enter with path " + path);
+ debug("open:enter with path " + path);
Path abs_path = makeAbsolute(path);
int fh = ceph_open_for_read(abs_path.toString());
" but succeeded in opening file. Something bizarre is going on.");
}
FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
- if(debug) debug("open:exit");
+ debug("open:exit");
return new FSDataInputStream(cephIStream);
}
public boolean rename(Path src, Path dst) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("rename:enter");
- if(debug) debug("calling ceph_rename from Java");
+ debug("rename:enter");
+ debug("calling ceph_rename from Java");
Path abs_src = makeAbsolute(src);
Path abs_dst = makeAbsolute(dst);
boolean result = ceph_rename(abs_src.toString(), abs_dst.toString());
- if(debug) debug("return from ceph_rename to Java with result " + result);
- if(debug) debug("rename:exit");
+ debug("return from ceph_rename to Java with result " + result);
+ debug("rename:exit");
return result;
}
public FsStatus getStatus (Path path) throws IOException {
if (!initialized) throw new IOException("You have to initialize the "
+ " CephFileSystem before calling other methods.");
- if(debug) debug("getStatus:enter");
+ debug("getStatus:enter");
Path abs_path = makeAbsolute(path);
//currently(Ceph .14) Ceph actually ignores the path
CephStat ceph_stat = new CephStat();
int result = ceph_statfs(abs_path.toString(), ceph_stat);
if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
- if(debug) debug("getStatus:exit");
+ debug("getStatus:exit");
return new FsStatus(ceph_stat.capacity,
ceph_stat.used, ceph_stat.remaining);
}
public boolean delete(Path path, boolean recursive) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- if(debug) debug("delete:enter");
+ debug("delete:enter");
Path abs_path = makeAbsolute(path);
- if(debug) debug("delete: Deleting path " + abs_path.toString());
+ debug("delete: Deleting path " + abs_path.toString());
// sanity check
if (abs_path.equals(root))
throw new IOException("Error: deleting the root directory is a Bad Idea.");
if (isFile(abs_path)) {
boolean result = ceph_unlink(abs_path.toString());
if(!result)
- if(debug) debug("delete: failed to delete file \"" +
+ debug("delete: failed to delete file \"" +
abs_path.toString() + "\".");
- if(debug) debug("delete:exit");
+ debug("delete:exit");
return result;
}
//get the entries; listPaths will remove . and .. for us
Path[] contents = listPaths(abs_path);
if (contents == null) {
- if(debug) debug("delete: Failed to read contents of directory \"" +
+ debug("delete: Failed to read contents of directory \"" +
abs_path.toString() + "\" while trying to delete it");
- if(debug) debug("delete:exit");
+ debug("delete:exit");
return false;
}
// delete the entries
for (Path p : contents) {
if (!delete(p, true)) {
- if(debug) debug("delete: Failed to delete file \"" +
+ debug("delete: Failed to delete file \"" +
p.toString() + "\" while recursively deleting \""
+ abs_path.toString() + "\"" );
- if(debug) debug("delete:exit");
+ debug("delete:exit");
return false;
}
}
//if we've come this far it's a now-empty directory, so delete it!
boolean result = ceph_rmdir(abs_path.toString());
if (!result)
- if(debug) debug("delete: failed to delete \"" + abs_path.toString() + "\"");
- if(debug) debug("delete:exit");
+ debug("delete: failed to delete \"" + abs_path.toString() + "\"");
+ debug("delete:exit");
return result;
}
// Makes a Path absolute. In a cheap, dirty hack, we're
// also going to strip off any fs_default_name prefix we see.
private Path makeAbsolute(Path path) {
- if(debug) debug("makeAbsolute:enter with path " + path);
+ debug("makeAbsolute:enter with path " + path);
if (path == null) return new Path("/");
// first, check for the prefix
if (path.toString().startsWith(fs_default_name)) {
Path stripped_path = new Path(path.toString().substring(fs_default_name.length()));
- if(debug) debug("makeAbsolute:exit with path " + stripped_path);
+ debug("makeAbsolute:exit with path " + stripped_path);
return stripped_path;
}
if (path.isAbsolute()) {
- if(debug) debug("makeAbsolute:exit with path " + path);
+ debug("makeAbsolute:exit with path " + path);
return path;
}
Path new_path = new Path(ceph_getcwd(), path);
- if(debug) debug("makeAbsolute:exit with path " + new_path);
+ debug("makeAbsolute:exit with path " + new_path);
return new_path;
}
private Path[] listPaths(Path path) throws IOException {
- if(debug) debug("listPaths:enter with path " + path);
+ debug("listPaths:enter with path " + path);
String dirlist[];
Path abs_path = makeAbsolute(path);
// If it's a directory, get the listing. Otherwise, complain and give up.
- if(debug) debug("calling ceph_getdir from Java with path " + abs_path);
+ debug("calling ceph_getdir from Java with path " + abs_path);
dirlist = ceph_getdir(abs_path.toString());
- if(debug) debug("returning from ceph_getdir to Java");
+ debug("returning from ceph_getdir to Java");
if (dirlist == null) {
throw new IOException("listPaths: path " + path.toString() + " is not a directory.");
// convert the strings to Paths
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
- if(debug) debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
+ debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
dirlist[i] + "\"");
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
else
paths[i] = new Path(abs_path, raw_path);
}
- if(debug) debug("listPaths:exit");
+ debug("listPaths:exit");
return paths;
}
private void debug(String statement) {
- System.err.println(statement);
+ if (debug) System.err.println(statement);
}
private static class Stat {
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-//import java.lang.IndexOutOfBoundsException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
fileHandle = fh;
closed = false;
debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
- if(debug) debug("CephInputStream constructor: initializing stream with fh "
+ debug("CephInputStream constructor: initializing stream with fh "
+ fh + " and file length " + flength);
}
}
public synchronized void seek(long targetPos) throws IOException {
- if(debug) debug("CephInputStream.seek: Seeking to position " + targetPos +
+ debug("CephInputStream.seek: Seeking to position " + targetPos +
" on fd " + fileHandle);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
*/
@Override
public synchronized int read() throws IOException {
- if(debug) debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ " by calling general read function");
byte result[] = new byte[1];
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
- if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
+ debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
throw new IOException("CephInputStream.read: cannot read " + len +
" bytes from fd " + fileHandle + ": stream closed");
}
if (null == buf) {
- throw new NullPointerException("Read buffer is null");
+ throw new IOException("Read buffer is null");
}
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
- throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+ throw new IOException("CephInputStream.read: Indices out of bounds for read: "
+ "read length is " + len + ", buffer offset is "
+ off +", and buffer size is " + buf.length);
}
// ensure we're not past the end of the file
if (getPos() >= fileLength)
{
- if(debug) debug("CephInputStream.read: cannot read " + len +
+ debug("CephInputStream.read: cannot read " + len +
" bytes from fd " + fileHandle + ": current position is " +
getPos() + " and file length is " + fileLength);
// actually do the read
int result = ceph_read(fileHandle, buf, off, len);
if (result < 0)
- if(debug) debug("CephInputStream.read: Reading " + len
+ debug("CephInputStream.read: Reading " + len
+ " bytes from fd " + fileHandle + " failed.");
- if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd "
+ debug("CephInputStream.read: Reading " + len + " bytes from fd "
+ fileHandle + ": succeeded in reading " + result + " bytes");
return result;
}
*/
@Override
public void close() throws IOException {
- if(debug) debug("CephOutputStream.close:enter");
+ debug("CephOutputStream.close:enter");
if (closed) {
throw new IOException("Stream closed");
}
throw new IOException("Close failed!");
}
closed = true;
- if(debug) debug("CephOutputStream.close:exit");
+ debug("CephOutputStream.close:exit");
}
private void debug(String out) {