private CephFS ceph;
private byte[] buffer;
- private int bufPos = -1;
+ private int bufPos = 0;
+ private int bufValid = 0;
private long cephPos = 0;
/**
finally { super.finalize(); }
}
- public synchronized long getPos() throws IOException {
- if (bufPos == -1) {
- cephPos = ceph.ceph_getpos(fileHandle);
- return cephPos;
+ private synchronized boolean fillBuffer() throws IOException {
+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+ bufPos = 0;
+ if (bufValid < 0) {
+ int err = bufValid;
+ bufValid = 0;
+ //attempt to reset to old position. If it fails, too bad.
+ ceph.ceph_seek_from_start(fileHandle, cephPos);
+ throw new IOException("Failed to fill read buffer! Error code:"
+ + err);
}
- return cephPos - buffer.length + bufPos;
+ cephPos += bufValid;
+ return (bufValid != 0);
+ }
+
+ /*
+ * Make sure this works!
+ */
+ public synchronized long getPos() throws IOException {
+ return cephPos - bufValid + bufPos;
}
/**
ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
" on fd " + fileHandle, ceph.TRACE);
if (targetPos > fileLength) {
- throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
- " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+ throw new IOException("CephInputStream.seek: failed seek to position "
+ + targetPos + " on fd " + fileHandle
+ + ": Cannot seek after EOF " + fileLength);
}
- if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= targetPos) {
- bufPos = buffer.length - (int)(cephPos - targetPos);
- } else {
- cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
- bufPos = -1;
- if (cephPos < 0) {
- throw new IOException ("Ceph failed to seek to new position!");
- }
+ long oldPos = cephPos;
+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+ bufValid = 0;
+ bufPos = 0;
+ if (cephPos < 0) {
+ cephPos = oldPos;
+ throw new IOException ("Ceph failed to seek to new position!");
}
- }
+ }
/**
* Failovers are handled by the Ceph code at a very low level;
* @return 0 if successful, otherwise an error code.
*/
@Override
- public synchronized int read(byte buf[], int off, int len) throws IOException {
+ public synchronized int read(byte buf[], int off, int len)
+ throws IOException {
ceph.debug("CephInputStream.read: Reading " + len +
" bytes from fd " + fileHandle, ceph.TRACE);
" bytes from fd " + fileHandle +
": stream closed");
}
- if (null == buf) {
- throw new IOException("Read buffer is null");
- }
-
- // check for proper index bounds
- if((off < 0) || (len < 0) || (off + len > buf.length)) {
- throw new IOException("CephInputStream.read: Indices out of bounds for read: "
- + "read length is " + len +
- ", buffer offset is " + off +
- ", and buffer size is " + buf.length);
- }
-
+
// ensure we're not past the end of the file
- if (getPos() >= fileLength)
- {
- ceph.debug("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": current position is "
- + getPos() + " and file length is " + fileLength,
- ceph.WARN);
-
- return -1;
- }
- //if the read can be satisfied from buffer, do so.
- if (bufPos + len < buffer.length) {
- //refill buffer if it's empty
- if (bufPos == -1) {
- ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
- cephPos = ceph.ceph_getpos(fileHandle);
- bufPos = 0;
- }
- for (int i = 0; i < len; ++i)
- buf[off+i] = buffer[bufPos+i];
- bufPos += len;
- return len;
+ if (getPos() >= fileLength) {
+ ceph.debug("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": current position is "
+ + getPos() + " and file length is " + fileLength,
+ ceph.WARN);
+
+ return -1;
}
- //otherwise copy the rest of the buffer in, and then read straight
- //into buf if we still need more data
- int read = 0;
- if (bufPos != -1) {
- read = buffer.length - bufPos;
- for (int i = 0; i < read; ++i)
- buf[off+i] = buffer[bufPos+i];
- bufPos = -1; //no data in buffer now;
- }
-
- if (read == len)
- return read;
-
-
- int result = ceph.ceph_read(fileHandle, buf, off+read, len-read);
- if (result < 0)
- ceph.debug("CephInputStream.read: Reading " + len
- + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
-
- ceph.debug("CephInputStream.read: Reading " + len + " bytes from fd "
- + fileHandle + ": succeeded in reading " + result + " bytes",
+ int totalRead = 0;
+ int initialLen = len;
+ int read;
+ do {
+ read = Math.min(len, bufValid - bufPos);
+ try {
+ System.arraycopy(buffer, bufPos, buf, off, read);
+ }
+ catch(IndexOutOfBoundsException ie) {
+ throw new IOException("CephInputStream.read: Indices out of bounds:"
+ + "read length is " + len
+ + ", buffer offset is " + off
+ + ", and buffer size is " + buf.length);
+ }
+ catch (ArrayStoreException ae) {
+ throw new IOException("Uh-oh, CephInputStream failed to do an array"
+ + "copy due to type mismatch...");
+ }
+ catch (NullPointerException ne) {
+ throw new IOException("CephInputStream.read: cannot read "
+ + len + "bytes from fd:" + fileHandle
+ + ": buf is null");
+ }
+ bufPos += read;
+ len -= read;
+ off += read;
+ totalRead += read;
+ } while (len > 0 && fillBuffer());
+
+ ceph.debug("CephInputStream.read: Reading " + initialLen
+ + " bytes from fd " + fileHandle
+ + ": succeeded in reading " + totalRead + " bytes",
ceph.TRACE);
- cephPos += len-read;
- return result;
- }
+ return totalRead;
+ }
/**
* Close the CephInputStream and release the associated filehandle.
}
int result = ceph.ceph_close(fileHandle);
+ closed = true;
if (result != 0) {
- throw new IOException("Close failed!");
+ throw new IOException("Close somehow failed!"
+ + "Don't try and use this stream again, though");
}
- closed = true;
ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
}
}