class CephFaker extends CephFS {
FileSystem localFS;
+ String localPrefix;
int blockSize;
Configuration conf;
Hashtable<Integer, Object> files;
* command-line arguments. */
try {
localFS = FileSystem.getLocal(conf);
- localFS.setWorkingDirectory(new Path(conf.get("test.build.data", "/tmp")));
+ localPrefix = conf.get("hadoop.tmp.dir", "/tmp");
+ localFS.setWorkingDirectory(new Path(localPrefix));
}
catch (IOException e) {
return false;
}
protected String ceph_getcwd() {
- return localFS.getWorkingDirectory().toString();
+ return sanitize_path(localFS.getWorkingDirectory().toString());
}
protected boolean ceph_setcwd(String path) {
- localFS.setWorkingDirectory(new Path(path));
+ localFS.setWorkingDirectory(new Path(prepare_path(path)));
return true;
}
//the caller is responsible for ensuring empty dirs
protected boolean ceph_rmdir(String pth) {
- Path path = new Path(pth);
+ Path path = new Path(prepare_path(pth));
boolean ret = false;
try {
if (localFS.listStatus(path).length <= 1) {
//this needs to work on (empty) directories too
protected boolean ceph_unlink(String path) {
+ path = prepare_path(path);
boolean ret = false;
if (ceph_isdirectory(path)) {
ret = ceph_rmdir(path);
}
protected boolean ceph_rename(String oldName, String newName) {
+ oldName = prepare_path(oldName);
+ newName = prepare_path(newName);
boolean ret = false;
try {
ret = localFS.rename(new Path(oldName), new Path(newName));
}
protected boolean ceph_exists(String path) {
+ path = prepare_path(path);
boolean ret = false;
try {
ret = localFS.exists(new Path(path));
}
protected long ceph_getblocksize(String path) {
+ path = prepare_path(path);
try {
FileStatus status = localFS.getFileStatus(new Path(path));
return status.getBlockSize();
}
protected boolean ceph_isdirectory(String path) {
+ path = prepare_path(path);
boolean ret = false;
try {
FileStatus status = localFS.getFileStatus(new Path(path));
}
protected boolean ceph_isfile(String path) {
+ path = prepare_path(path);
boolean ret = false;
try {
FileStatus status = localFS.getFileStatus(new Path(path));
}
protected String[] ceph_getdir(String path) {
+ path = prepare_path(path);
if (!ceph_isdirectory(path)) {
return null;
}
FileStatus[] stats = localFS.listStatus(new Path(path));
String[] names = new String[stats.length];
for (int i=0; i<stats.length; ++i) {
- names[i] = stats[i].getPath().toString();
+ names[i] = sanitize_path(stats[i].getPath().toString());
}
return names;
}
}
protected int ceph_mkdirs(String path, int mode) {
- boolean success = false;
+ path = prepare_path(path);
try {
- success = localFS.mkdirs(new Path(path), new FsPermission((short)mode));
- }
- catch (IOException e) { }
- if (success) {
- return 0;
+ if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
+ return 0;
}
- if (ceph_isdirectory(path)) {
+ catch (IOException e) { System.out.println(e); }
+ if (ceph_isdirectory(path))
return -EEXIST; //apparently it already existed
- }
return -1;
}
* it's okay.
*/
protected int ceph_open_for_append(String path) {
+ path = prepare_path(path);
FSDataOutputStream stream;
try {
stream = localFS.append(new Path(path));
}
protected int ceph_open_for_read(String path) {
+ path = prepare_path(path);
FSDataInputStream stream;
try {
stream = localFS.open(new Path(path));
}
protected int ceph_open_for_overwrite(String path, int mode) {
+ path = prepare_path(path);
FSDataOutputStream stream;
try {
stream = localFS.create(new Path(path));
}
protected boolean ceph_setPermission(String pth, int mode) {
+ pth = prepare_path(pth);
Path path = new Path(pth);
boolean ret = false;
try {
}
protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
+ pth = prepare_path(pth);
Path path = new Path(pth);
boolean ret = false;
try {
}
protected int ceph_statfs(String pth, CephFileSystem.CephStat fill) {
- Path path = new Path(pth);
+ pth = prepare_path(pth);
try {
FsStatus stat = localFS.getStatus();
fill.capacity = stat.getCapacity();
}
protected int ceph_replication(String path) {
+ path = prepare_path(path);
int ret = -1; //-1 for failure
try {
ret = localFS.getFileStatus(new Path(path)).getReplication();
}
protected int ceph_setTimes(String pth, long mtime, long atime) {
+ pth = prepare_path(pth);
Path path = new Path(pth);
int ret = -1; //generic fail
try {
catch (NullPointerException f) {}
return (int)ret;
}
+
+ /*
+ * We need to remove the localFS file prefix before returning to Ceph
+ */
+ private String sanitize_path(String path) {
+ debug("sanitize_path(" + path + ")", INFO);
+ if (path.startsWith("file:"))
+ path = path.substring("file:".length());
+ if (path.startsWith(localPrefix))
+ path = "/" + path.substring(localPrefix.length());
+ return path;
+ }
+
+ /*
+ * If it's an absolute path we need to shove the
+ * test dir onto the front as a prefix.
+ */
+ private String prepare_path(String path) {
+ if (path.startsWith("/"))
+ return localPrefix + path.substring(1);
+ return path;
+ }
}