]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Simplify CephInputStream so that it actually works
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 28 Oct 2009 21:04:58 +0000 (14:04 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 3 Nov 2009 00:38:15 +0000 (16:38 -0800)
src/client/hadoop/ceph/CephInputStream.java

index 09c009eba3ef57558de0cac301164f8e071094b7..15ea48abf0651fbace3706ac41c9d88b596b0e04 100644 (file)
@@ -41,7 +41,8 @@ public class CephInputStream extends FSInputStream {
        private CephFS ceph;
 
        private byte[] buffer;
-       private int bufPos = -1;
+       private int bufPos = 0;
+       private int bufValid = 0;
        private long cephPos = 0;
 
   /**
@@ -74,12 +75,26 @@ public class CephInputStream extends FSInputStream {
     finally { super.finalize(); }
   }
 
-  public synchronized long getPos() throws IOException {
-               if (bufPos == -1) {
-                       cephPos = ceph.ceph_getpos(fileHandle);
-                       return cephPos;
+       private synchronized boolean fillBuffer() throws IOException {
+               bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+               bufPos = 0;
+               if (bufValid < 0) {
+                       int err = bufValid;
+                       bufValid = 0;
+                       //attempt to reset to old position. If it fails, too bad.
+                       ceph.ceph_seek_from_start(fileHandle, cephPos);
+                       throw new IOException("Failed to fill read buffer! Error code:"
+                                                                                                               + err);
                }
-               return cephPos - buffer.length + bufPos;
+               cephPos += bufValid;
+               return (bufValid != 0);
+       }
+
+       /*
+        * Make sure this works!
+        */
+  public synchronized long getPos() throws IOException {
+               return cephPos - bufValid + bufPos;
   }
 
   /**
@@ -94,19 +109,19 @@ public class CephInputStream extends FSInputStream {
     ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
                                                                                " on fd " + fileHandle, ceph.TRACE);
     if (targetPos > fileLength) {
-      throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
-                                                                                                               " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+      throw new IOException("CephInputStream.seek: failed seek to position "
+                                                                                                               + targetPos + " on fd " + fileHandle
+                                                                                                               + ": Cannot seek after EOF " + fileLength);
     }
-               if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= targetPos) {
-                       bufPos = buffer.length - (int)(cephPos - targetPos);
-               } else {
-                       cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
-                       bufPos = -1;
-                       if (cephPos < 0) {
-                               throw new IOException ("Ceph failed to seek to new position!");
-                       }
+               long oldPos = cephPos;
+               cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+               bufValid = 0;
+               bufPos = 0;
+               if (cephPos < 0) {
+                       cephPos = oldPos;
+                       throw new IOException ("Ceph failed to seek to new position!");
                }
-  }
+       }
 
   /**
    * Failovers are handled by the Ceph code at a very low level;
@@ -143,7 +158,8 @@ public class CephInputStream extends FSInputStream {
    * @return 0 if successful, otherwise an error code.
    */
   @Override
-       public synchronized int read(byte buf[], int off, int len) throws IOException {
+       public synchronized int read(byte buf[], int off, int len)
+               throws IOException {
       ceph.debug("CephInputStream.read: Reading " + len  +
                                                                 " bytes from fd " + fileHandle, ceph.TRACE);
       
@@ -152,67 +168,52 @@ public class CephInputStream extends FSInputStream {
                                                                                                                        " bytes from fd " + fileHandle +
                                                                                                                        ": stream closed");
       }
-      if (null == buf) {
-                               throw new IOException("Read buffer is null");
-      }
-      
-      // check for proper index bounds
-      if((off < 0) || (len < 0) || (off + len > buf.length)) {
-                               throw new IOException("CephInputStream.read: Indices out of bounds for read: "
-                                                                                                                       + "read length is " + len +
-                                                                                                                       ", buffer offset is " + off +
-                                                                                                                       ", and buffer size is " + buf.length);
-      }
-      
+                       
       // ensure we're not past the end of the file
-      if (getPos() >= fileLength) 
-                               {
-                                       ceph.debug("CephInputStream.read: cannot read " + len  + 
-                                                                                " bytes from fd " + fileHandle + ": current position is "
-                                                                                + getPos() + " and file length is " + fileLength,
-                                                                                ceph.WARN);
-         
-                                       return -1;
-                               }
-                       //if the read can be satisfied from buffer, do so.
-                       if (bufPos + len < buffer.length) {
-                               //refill buffer if it's empty
-                               if (bufPos == -1) {
-                                       ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
-                                       cephPos = ceph.ceph_getpos(fileHandle);
-                                       bufPos = 0;
-                               }
-                               for (int i = 0; i < len; ++i)
-                                       buf[off+i] = buffer[bufPos+i];
-                               bufPos += len;
-                               return len;
+      if (getPos() >= fileLength) {
+                               ceph.debug("CephInputStream.read: cannot read " + len  + 
+                                                                        " bytes from fd " + fileHandle + ": current position is "
+                                                                        + getPos() + " and file length is " + fileLength,
+                                                                        ceph.WARN);
+                               
+                               return -1;
                        }
 
-                       //otherwise copy the rest of the buffer in, and then read straight
-                       //into buf if we still need more data
-                       int read = 0;
-                       if (bufPos != -1) {
-                               read =  buffer.length - bufPos;
-                               for (int i = 0; i < read; ++i)
-                                       buf[off+i] = buffer[bufPos+i];
-                               bufPos = -1; //no data in buffer now;
-                       }
-
-                       if (read == len)
-                               return read;
-
-
-      int result = ceph.ceph_read(fileHandle, buf, off+read, len-read);
-      if (result < 0)
-                               ceph.debug("CephInputStream.read: Reading " + len
-                                                                        + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
-
-      ceph.debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
-                                                                + fileHandle + ": succeeded in reading " + result + " bytes",
+                       int totalRead = 0;
+                       int initialLen = len;
+                       int read;
+                       do {
+                               read = Math.min(len, bufValid - bufPos);
+                               try {
+                                       System.arraycopy(buffer, bufPos, buf, off, read);
+                               }
+                               catch(IndexOutOfBoundsException ie) {
+                                       throw new IOException("CephInputStream.read: Indices out of bounds:"
+                                                                                                                               + "read length is " + len
+                                                                                                                               + ", buffer offset is " + off
+                                                                                                                               + ", and buffer size is " + buf.length);
+                               }
+                               catch (ArrayStoreException ae) {
+                                       throw new IOException("Uh-oh, CephInputStream failed to do an array"
+                                                                                                                               + "copy due to type mismatch...");
+                               }
+                               catch (NullPointerException ne) {
+                                       throw new IOException("CephInputStream.read: cannot read "
+                                                                                                                               + len + "bytes from fd:" + fileHandle
+                                                                                                                               + ": buf is null");
+                               }
+                               bufPos += read;
+                               len -= read;
+                               off += read;
+                               totalRead += read;
+                       } while (len > 0 && fillBuffer());
+
+      ceph.debug("CephInputStream.read: Reading " + initialLen
+                                                                + " bytes from fd " + fileHandle
+                                                                + ": succeeded in reading " + totalRead + " bytes",
                                                                 ceph.TRACE);
-                       cephPos += len-read;
-      return result;
-               }
+      return totalRead;
+       }
 
   /**
    * Close the CephInputStream and release the associated filehandle.
@@ -225,10 +226,11 @@ public class CephInputStream extends FSInputStream {
     }
 
     int result = ceph.ceph_close(fileHandle);
+    closed = true;
     if (result != 0) {
-      throw new IOException("Close failed!");
+      throw new IOException("Close somehow failed!"
+                                                                                                               + "Don't try and use this stream again, though");
     }
-    closed = true;
     ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
   }
 }