import java.net.URI;
import java.util.Hashtable;
+import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
try {
localFS = FileSystem.getLocal(conf);
localFS.initialize(URI.create("file://localhost"), conf);
- String userpath = "/user/" + System.getProperty("user.name");
+ localFS.setVerifyChecksum(false);
+ String testDir = conf.get("hadoop.tmp.dir");
localPrefix = localFS.getWorkingDirectory().toString();
- int userpathinworkingdir = localPrefix.indexOf(userpath);
- if (-1 == userpathinworkingdir)
- userpathinworkingdir = localPrefix.length();
- localPrefix = localPrefix.substring(0, userpathinworkingdir);
+ int testDirLoc = localPrefix.indexOf(testDir) - 1;
+ if (-2 == testDirLoc)
+ testDirLoc = localPrefix.length();
+ localPrefix = localPrefix.substring(0, testDirLoc)
+ + "/" + conf.get("hadoop.tmp.dir");
localFS.setWorkingDirectory(new Path(localPrefix
+"/user/"
+ System.getProperty("user.name")));
//I don't know why, but the unit tests expect the default
//working dir to be /user/username, so satisfy them!
- debug("localPrefix is " + localPrefix, INFO);
+ //debug("localPrefix is " + localPrefix, INFO);
}
catch (IOException e) {
return false;
protected int ceph_mkdirs(String path, int mode) {
path = prepare_path(path);
- debug("ceph_mkdirs on " + path, INFO);
+ //debug("ceph_mkdirs on " + path, INFO);
try {
if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
return 0;
stream = localFS.open(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
+ debug("ceph_open_for_read fh:" + fileCount
+ + ", pathname:" + path, INFO);
return fileCount++;
}
catch (IOException e) { }
stream = localFS.create(new Path(path));
files.put(new Integer(fileCount), stream);
filenames.put(new Integer(fileCount), path);
+ debug("ceph_open_for_overwrite fh:" + fileCount
+ + ", pathname:" + path, INFO);
return fileCount++;
}
catch (IOException e) { }
}
protected int ceph_close(int filehandle) {
+ debug("ceph_close(filehandle " + filehandle + ")", INFO);
try {
- if(files.remove(new Integer(filehandle)) == null) {
+ ((Closeable)files.get(new Integer(filehandle))).close();
+ if(null == files.get(new Integer(filehandle))) {
return -ENOENT; //this isn't quite the right error code,
// but the important part is it's negative
}
- else {
- filenames.remove(new Integer(filehandle));
- return 0; //hurray, success
- }
+ return 0; //hurray, success
+ }
+ catch (NullPointerException ne) {
+ debug("ceph_close caught NullPointerException!" + ne, WARN);
+ } //err, how?
+ catch (IOException ie) {
+ debug("ceph_close caught IOException!" + ie, WARN);
}
- catch (NullPointerException e) { } //err, how?
return -1; //failure
}
//rather than try and match a Ceph deployment's behavior exactly,
//just make bad things happen if they try and call methods after this
protected boolean ceph_kill_client() {
- debug("ceph_kill_client", INFO);
+ //debug("ceph_kill_client", INFO);
localFS.setWorkingDirectory(new Path(localPrefix));
- debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
+ //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
try{
localFS.close(); }
catch (Exception e) {}
protected int ceph_write(int fh, byte[] buffer,
int buffer_offset, int length) {
+ debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
+ + ", length:" + length, INFO);
long ret = -1;//generic fail
try {
FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
+ debug("ceph_write got outputstream", INFO);
long startPos = os.getPos();
os.write(buffer, buffer_offset, length);
ret = os.getPos() - startPos;
}
- catch (IOException e) {}
- catch (NullPointerException f) { }
+ catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
+ catch (NullPointerException f) {
+ debug("ceph_write caught NullPointerException!", WARN); }
return (int)ret;
}
}
protected long ceph_seek_from_start(int fh, long pos) {
+ debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
long ret = -1;//generic fail
try {
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
- if(is.seekToNewSource(pos)) {
- ret = is.getPos();
+ debug("ceph_seek_from_start filename is "
+ + filenames.get(new Integer(fh)), INFO);
+ if (null == files.get(new Integer(fh))) {
+ debug("ceph_seek_from_start: is is null!", WARN);
}
+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+ debug("ceph_seek_from_start retrieved is!", INFO);
+ is.seek(pos);
+ ret = is.getPos();
}
- catch (IOException e) {}
- catch (NullPointerException f) {}
+ catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
+ catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
return (int)ret;
}
* We need to remove the localFS file prefix before returning to Ceph
*/
private String sanitize_path(String path) {
- debug("sanitize_path(" + path + ")", INFO);
+ //debug("sanitize_path(" + path + ")", INFO);
/* if (path.startsWith("file:"))
path = path.substring("file:".length()); */
if (path.startsWith(localPrefix)) {
if (path.length() == 0) //it was a root path
path = "/";
}
- debug("sanitize_path returning " + path, INFO);
+ //debug("sanitize_path returning " + path, INFO);
return path;
}
* test dir onto the front as a prefix.
*/
private String prepare_path(String path) {
- debug("prepare_path(" + path + ")", INFO);
+ //debug("prepare_path(" + path + ")", INFO);
if (path.startsWith("/"))
- return localPrefix + path;
+ path = localPrefix + path;
else if (path.equals("..")) {
if (ceph_getcwd().equals("/"))
path = "."; // you can't go up past root!
}
- debug("prepare_path returning" + path, INFO);
+ //debug("prepare_path returning" + path, INFO);
return path;
}
}