import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
protected boolean ceph_rename(String oldName, String newName) {
oldName = prepare_path(oldName);
newName = prepare_path(newName);
- boolean ret = false;
try {
- ret = localFS.rename(new Path(oldName), new Path(newName));
+ Path parent = new Path(newName).getParent();
+ Path newPath = new Path(newName);
+ if (localFS.exists(parent) && !localFS.exists(newPath))
+ return localFS.rename(new Path(oldName), newPath);
+ return false;
}
- catch (IOException e) { }
- return ret;
+ catch (IOException e) { return false; }
}
protected boolean ceph_exists(String path) {
protected boolean ceph_isdirectory(String path) {
path = prepare_path(path);
- boolean ret = false;
try {
FileStatus status = localFS.getFileStatus(new Path(path));
- ret = status.isDir();
+ return status.isDir();
}
- catch (IOException e) {}
- return ret;
+ catch (IOException e) { return false; }
}
protected boolean ceph_isfile(String path) {
if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
return 0;
}
- catch (IOException e) { debug(e.toString(), ERROR); }
+ catch (FileAlreadyExistsException fe) { return ENOTDIR; }
+ catch (IOException e) {}
if (ceph_isdirectory(path))
return -EEXIST; //apparently it already existed
return -1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
arguments += " --client-readahead-max-periods="
+ conf.get("fs.ceph.readahead", "1");
//make sure they gave us a ceph monitor address or conf file
- ceph.debug("initialize:Ceph intialization arguments: " + arguments, ceph.INFO);
+ ceph.debug("initialize:Ceph initialization arguments: " + arguments, ceph.INFO);
if ( (conf.get("fs.ceph.monAddr") == null) &&
(arguments.indexOf("-m") == -1) &&
(arguments.indexOf("-c") == -1) ) {
Path abs_path = makeAbsolute(dir);
ceph.debug("setWorkingDirectory:calling ceph_setcwd from Java", ceph.TRACE);
if (!ceph.ceph_setcwd(abs_path.toString()))
- ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.ERROR);
+ ceph.debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ceph.WARN);
ceph.debug("setWorkingDirectory:exit", ceph.DEBUG);
}
* @return true if successful, false otherwise
* @throws IOException if initialize() hasn't been called.
*/
+ @Override
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
Path abs_path = makeAbsolute(path);
ceph.debug("mkdirs:calling ceph_mkdirs from Java", ceph.TRACE);
int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort());
- ceph.debug("mkdirs:exit with result " + result, ceph.DEBUG);
- if (result != 0)
+ if (result != 0) {
+ ceph.debug("mkdirs: make directory " + abs_path
+ + "Failing with result " + result, ceph.WARN);
+ if (ceph.ENOTDIR == result)
+ throw new FileAlreadyExistsException("Parent path is not a directory");
return false;
- else return true;
+ }
+ else {
+ ceph.debug("mkdirs:exiting succesfully", ceph.DEBUG);
+ return true;
+ }
}
/**
public FileStatus[] listStatus(Path path) throws IOException {
if (!initialized) throw new IOException ("You have to initialize the "
+"CephFileSystem before calling other methods.");
- ceph.debug("listStatus:enter with path " + path, ceph.DEBUG);
+ ceph.debug("listStatus:enter with path " + path, ceph.WARN);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
if (paths != null) {
Path abs_dst = makeAbsolute(dst);
ceph.debug("calling ceph_rename from Java", ceph.TRACE);
boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString());
+ if (!result) {
+ if (isDirectory(abs_dst)) { //move the srcdir into destdir
+ ceph.debug("ceph_rename failed but dst is a directory!", ceph.NOLOG);
+ Path new_dst = new Path(abs_dst, abs_src.getName());
+ result = rename(abs_src, new_dst);
+ ceph.debug("attempt to move " + abs_src.toString()
+ + " to " + new_dst.toString()
+ + "has result:" + result, ceph.NOLOG);
+ }
+ }
ceph.debug("rename:exit with result: " + result, ceph.DEBUG);
return result;
}
/* The path is a directory, so recursively try to delete its contents,
and then delete the directory. */
- if (!recursive) {
- throw new IOException("Directories must be deleted recursively!");
- }
//get the entries; listPaths will remove . and .. for us
Path[] contents = listPaths(abs_path);
if (contents == null) {
"\" while trying to delete it, BAILING", ceph.ERROR);
return false;
}
+ if (!recursive && contents.length > 0) {
+ throw new IOException("Directories must be deleted recursively!");
+ }
// delete the entries
ceph.debug("delete: recursively calling delete on contents of "
+ abs_path, ceph.DEBUG);
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
ceph.debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
- dirlist[i] + "\"", ceph.NOLOG);
+ dirlist[i] + "\"", ceph.TRACE);
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
if (raw_path.isAbsolute())