From: Greg Farnum Date: Tue, 27 Oct 2009 20:02:30 +0000 (-0700) Subject: Hadoop: Reorder a few things for better safety and fix compile bugs X-Git-Tag: v0.18~128^2~53 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=81aca0e70a6b39e51b497121d29991bd8fd9f227;p=ceph.git Hadoop: Reorder a few things for better safety and fix compile bugs --- diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java index 4e7cefbe106f..09c009eba3ef 100644 --- a/src/client/hadoop/ceph/CephInputStream.java +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -97,14 +97,14 @@ public class CephInputStream extends FSInputStream { throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); } - if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= 0) { - bufPos = buffer.length - (cephPos - targetPos); + if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= targetPos) { + bufPos = buffer.length - (int)(cephPos - targetPos); } else { cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos); + bufPos = -1; if (cephPos < 0) { throw new IOException ("Ceph failed to seek to new position!"); } - bufPos = -1; } } @@ -203,14 +203,14 @@ public class CephInputStream extends FSInputStream { int result = ceph.ceph_read(fileHandle, buf, off+read, len-read); - cephPos += len-read; if (result < 0) ceph.debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle + " failed.", ceph.WARN); ceph.debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle + ": succeeded in reading " + result + " bytes", - ceph.TRACE); + ceph.TRACE); + cephPos += len-read; return result; } diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java index 852decfc3560..4126cb94c52a 100644 --- a/src/client/hadoop/ceph/CephOutputStream.java +++ b/src/client/hadoop/ceph/CephOutputStream.java @@ -91,7 +91,7 @@ public class CephOutputStream extends OutputStream { // Stick the byte in a buffer and write it byte buf[] = new byte[1]; buf[0] = (byte) b; - int result = ceph.ceph_write(fileHandle, buf, 0, 1); + int result = write(buf, 0, 1); if (1 != result) ceph.debug("CephOutputStream.write: failed writing a single byte to fd " + fileHandle + ": Ceph write() result = " + result, @@ -161,10 +161,12 @@ ceph.WARN); //if we make it here, the buffer's huge, so just flush the old buffer... - result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); - if (result < 0 || result != bufUsed) - throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle); - bufUsed = 0; + if (bufUsed > 0) { + result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); + if (result < 0 || result != bufUsed) + throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle); + bufUsed = 0; + } //...and then write ful buf result = ceph.ceph_write(fileHandle, buf, off, len); if (result < 0) { @@ -190,14 +192,16 @@ ceph.WARN); } if (bufUsed == 0) return; int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed); + int oldbufUsed = bufUsed; + bufUsed = 0; if (result < 0) { - throw new IOException("CephOutputStream.write: Write of " + len + + throw new IOException("CephOutputStream.write: Write of " + oldbufUsed + "bytes to fd " + fileHandle + " failed"); } - if (result != len) { - throw new IOException("CephOutputStream.write: Write of " + len + + if (result != oldbufUsed) { + throw new IOException("CephOutputStream.write: Write of " + oldbufUsed + "bytes to fd " + fileHandle + "was incomplete: only " - + result + " of " + len + " bytes were written."); + + result + " of " + oldbufUsed + " bytes were written."); } return; } @@ -210,7 +214,7 @@ ceph.WARN); public synchronized void close() throws IOException { ceph.debug("CephOutputStream.close:enter", ceph.TRACE); if (closed) { - throw new IOException("Stream closed"); + throw new IOException("Stream already closed"); } flush(); int result = ceph.ceph_close(fileHandle);