]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: f5 e3
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 30 Oct 2009 22:37:06 +0000 (15:37 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 3 Nov 2009 00:38:15 +0000 (16:38 -0800)
src/client/hadoop/ceph/CephFaker.java

index 5ebd7cf260ef085f81194c41eadd4d1b28f7aef2..5fa612af73ed90dcd0e008c8bf2c6ad73f86087a 100644 (file)
@@ -22,6 +22,7 @@ package org.apache.hadoop.fs.ceph;
 
 import java.net.URI;
 import java.util.Hashtable;
+import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
@@ -65,19 +66,21 @@ class CephFaker extends CephFS {
                        try {
                                localFS = FileSystem.getLocal(conf);
                                localFS.initialize(URI.create("file://localhost"), conf);
-                               String userpath = "/user/" + System.getProperty("user.name");
+                               localFS.setVerifyChecksum(false);
+                               String testDir = conf.get("hadoop.tmp.dir");
                                localPrefix = localFS.getWorkingDirectory().toString();
-                               int userpathinworkingdir = localPrefix.indexOf(userpath);
-                               if (-1 == userpathinworkingdir)
-                                       userpathinworkingdir = localPrefix.length();
-                               localPrefix = localPrefix.substring(0, userpathinworkingdir);
+                               int testDirLoc = localPrefix.indexOf(testDir) - 1;
+                               if (-2 == testDirLoc)
+                                       testDirLoc = localPrefix.length();
+                               localPrefix = localPrefix.substring(0, testDirLoc)
+                                       + "/" + conf.get("hadoop.tmp.dir");
 
                                localFS.setWorkingDirectory(new Path(localPrefix
                                                                                                                                                                                 +"/user/"
                                                                                                                                                                                 + System.getProperty("user.name")));
                                //I don't know why, but the unit tests expect the default
                                //working dir to be /user/username, so satisfy them!
-                               debug("localPrefix is " + localPrefix, INFO);
+                               //debug("localPrefix is " + localPrefix, INFO);
                        }
                        catch (IOException e) {
                                return false;
@@ -203,7 +206,7 @@ class CephFaker extends CephFS {
 
        protected int ceph_mkdirs(String path, int mode) {
                path = prepare_path(path);
-               debug("ceph_mkdirs on " + path, INFO);
+               //debug("ceph_mkdirs on " + path, INFO);
                try {
                        if(localFS.mkdirs(new Path(path), new FsPermission((short)mode)))
                                return 0;
@@ -239,6 +242,8 @@ class CephFaker extends CephFS {
                        stream = localFS.open(new Path(path));
                        files.put(new Integer(fileCount), stream);
                        filenames.put(new Integer(fileCount), path);
+                       debug("ceph_open_for_read fh:" + fileCount
+                                               + ", pathname:" + path, INFO);
                        return fileCount++;
                }
                catch (IOException e) { }
@@ -252,6 +257,8 @@ class CephFaker extends CephFS {
                        stream = localFS.create(new Path(path));
                        files.put(new Integer(fileCount), stream);
                        filenames.put(new Integer(fileCount), path);
+                       debug("ceph_open_for_overwrite fh:" + fileCount
+                                               + ", pathname:" + path, INFO);
                        return fileCount++;
                }
                catch (IOException e) { }
@@ -259,17 +266,21 @@ class CephFaker extends CephFS {
        }
 
        protected int ceph_close(int filehandle) {
+               debug("ceph_close(filehandle " + filehandle + ")", INFO);
                try {
-                       if(files.remove(new Integer(filehandle)) == null) {
+                       ((Closeable)files.get(new Integer(filehandle))).close();
+                       if(null == files.get(new Integer(filehandle))) {
                                return -ENOENT; //this isn't quite the right error code,
                                // but the important part is it's negative
                        }
-                       else {
-                               filenames.remove(new Integer(filehandle));
-                               return 0; //hurray, success
-                       }
+                       return 0; //hurray, success
+               }
+               catch (NullPointerException ne) {
+                       debug("ceph_close caught NullPointerException!" + ne, WARN);
+               } //err, how?
+               catch (IOException ie) {
+                       debug("ceph_close caught IOException!" + ie, WARN);
                }
-               catch (NullPointerException e) { } //err, how?
                return -1; //failure
        }
 
@@ -288,9 +299,9 @@ class CephFaker extends CephFS {
        //rather than try and match a Ceph deployment's behavior exactly,
        //just make bad things happen if they try and call methods after this
        protected boolean ceph_kill_client() {
-               debug("ceph_kill_client", INFO);
+               //debug("ceph_kill_client", INFO);
                localFS.setWorkingDirectory(new Path(localPrefix));
-               debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
+               //debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
                try{
                        localFS.close(); }
                catch (Exception e) {}
@@ -384,15 +395,19 @@ class CephFaker extends CephFS {
 
        protected int ceph_write(int fh, byte[] buffer,
                                                                                                         int buffer_offset, int length) {
+               debug("ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset
+                                       + ", length:" + length, INFO);
                long ret = -1;//generic fail
                try {
                        FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
+                       debug("ceph_write got outputstream", INFO);
                        long startPos = os.getPos();
                        os.write(buffer, buffer_offset, length);
                        ret = os.getPos() - startPos;
                }
-               catch (IOException e) {}
-               catch (NullPointerException f) { }
+               catch (IOException e) { debug("ceph_write caught IOException!", WARN);}
+               catch (NullPointerException f) {
+                       debug("ceph_write caught NullPointerException!", WARN); }
                return (int)ret;
        }
 
@@ -411,15 +426,21 @@ class CephFaker extends CephFS {
        }
 
        protected long ceph_seek_from_start(int fh, long pos) {
+               debug("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")", INFO);
                long ret = -1;//generic fail
                try {
-                       FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-                       if(is.seekToNewSource(pos)) {
-                               ret = is.getPos();
+                       debug("ceph_seek_from_start filename is "
+                                               + filenames.get(new Integer(fh)), INFO);
+                       if (null == files.get(new Integer(fh))) {
+                               debug("ceph_seek_from_start: is is null!", WARN);
                        }
+                       FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
+                       debug("ceph_seek_from_start retrieved is!", INFO);
+                       is.seek(pos);
+                       ret = is.getPos();
                }
-               catch (IOException e) {}
-               catch (NullPointerException f) {}
+               catch (IOException e) {debug("ceph_seek_from_start caught IOException!", WARN);}
+               catch (NullPointerException f) {debug("ceph_seek_from_start caught NullPointerException!", WARN);}
                return (int)ret;
        }
 
@@ -427,7 +448,7 @@ class CephFaker extends CephFS {
         * We need to remove the localFS file prefix before returning to Ceph
         */
        private String sanitize_path(String path) {
-               debug("sanitize_path(" + path + ")", INFO);
+               //debug("sanitize_path(" + path + ")", INFO);
                /*              if (path.startsWith("file:"))
                                        path = path.substring("file:".length()); */
                if (path.startsWith(localPrefix)) {
@@ -435,7 +456,7 @@ class CephFaker extends CephFS {
                        if (path.length() == 0) //it was a root path
                                path = "/";
                }
-               debug("sanitize_path returning " + path, INFO);
+               //debug("sanitize_path returning " + path, INFO);
                return path;
        }
 
@@ -444,14 +465,14 @@ class CephFaker extends CephFS {
         * test dir onto the front as a prefix.
         */
        private String prepare_path(String path) {
-               debug("prepare_path(" + path + ")", INFO);
+               //debug("prepare_path(" + path + ")", INFO);
                if (path.startsWith("/"))
-                       return localPrefix + path;
+                       path = localPrefix + path;
                else if (path.equals("..")) {
                        if (ceph_getcwd().equals("/"))
                                path = "."; // you can't go up past root!
                }
-               debug("prepare_path returning" + path, INFO);
+               //debug("prepare_path returning" + path, INFO);
                return path;
        }
 }