]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Reorder a few things for better safety and fix compile bugs
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 27 Oct 2009 20:02:30 +0000 (13:02 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 3 Nov 2009 00:38:15 +0000 (16:38 -0800)
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java

index 4e7cefbe106fe92ddfacf0e951a6b59088e75162..09c009eba3ef57558de0cac301164f8e071094b7 100644 (file)
@@ -97,14 +97,14 @@ public class CephInputStream extends FSInputStream {
       throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
                                                                                                                " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
     }
-               if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= 0) {
-                       bufPos = buffer.length - (cephPos - targetPos);
+               if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= targetPos) {
+                       bufPos = buffer.length - (int)(cephPos - targetPos);
                } else {
                        cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+                       bufPos = -1;
                        if (cephPos < 0) {
                                throw new IOException ("Ceph failed to seek to new position!");
                        }
-                       bufPos = -1;
                }
   }
 
@@ -203,14 +203,14 @@ public class CephInputStream extends FSInputStream {
 
 
       int result = ceph.ceph_read(fileHandle, buf, off+read, len-read);
-                       cephPos += len-read;
       if (result < 0)
                                ceph.debug("CephInputStream.read: Reading " + len
                                                                         + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
 
       ceph.debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
                                                                 + fileHandle + ": succeeded in reading " + result + " bytes",
-                                                                ceph.TRACE);   
+                                                                ceph.TRACE);
+                       cephPos += len-read;
       return result;
                }
 
index 852decfc3560ddaa60482ee2ba646edb3e93ee1c..4126cb94c52a238f5bcf7cb8871bdcfd83135d6f 100644 (file)
@@ -91,7 +91,7 @@ public class CephOutputStream extends OutputStream {
       // Stick the byte in a buffer and write it
       byte buf[] = new byte[1];
       buf[0] = (byte) b;    
-      int result = ceph.ceph_write(fileHandle, buf, 0, 1);
+      int result = write(buf, 0, 1);
       if (1 != result)
                                ceph.debug("CephOutputStream.write: failed writing a single byte to fd "
                                                                                                + fileHandle + ": Ceph write() result = " + result,
@@ -161,10 +161,12 @@ ceph.WARN);
 
 
                        //if we make it here, the buffer's huge, so just flush the old buffer...
-                       result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-                       if (result < 0 || result != bufUsed)
-                               throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
-                       bufUsed = 0;
+                       if (bufUsed > 0) {
+                               result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+                               if (result < 0 || result != bufUsed)
+                                       throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
+                               bufUsed = 0;
+                       }
                        //...and then write ful buf
       result = ceph.ceph_write(fileHandle, buf, off, len);
       if (result < 0) {
@@ -190,14 +192,16 @@ ceph.WARN);
                        }
                        if (bufUsed == 0) return;
                        int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+                       int oldbufUsed = bufUsed;
+                       bufUsed = 0;
       if (result < 0) {
-                               throw new IOException("CephOutputStream.write: Write of " + len + 
+                               throw new IOException("CephOutputStream.write: Write of " + oldbufUsed + 
                                                                                                                        "bytes to fd " + fileHandle + " failed");
       }
-      if (result != len) {
-                               throw new IOException("CephOutputStream.write: Write of " + len + 
+      if (result != oldbufUsed) {
+                               throw new IOException("CephOutputStream.write: Write of " + oldbufUsed + 
                                                                                                                        "bytes to fd " + fileHandle + "was incomplete:  only "
-                                                                                                                       + result + " of " + len + " bytes were written.");
+                                                                                                                       + result + " of " + oldbufUsed + " bytes were written.");
       }
                        return;
        }
@@ -210,7 +214,7 @@ ceph.WARN);
        public synchronized void close() throws IOException {
       ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
       if (closed) {
-                               throw new IOException("Stream closed");
+                               throw new IOException("Stream already closed");
       }
                        flush();
       int result = ceph.ceph_close(fileHandle);