throw new IOException("append: Open for append failed on path \"" +
abs_path.toString() + "\"");
}
- CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd);
+ CephOutputStream cephOStream = new CephOutputStream(getConf(),
+ ceph, fd, bufferSize);
ceph.debug("append:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
}
// Step 4: create the stream
- OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh);
+ OutputStream cephOStream = new CephOutputStream(getConf(),
+ ceph, fh, bufferSize);
ceph.debug("create:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
throw new IOException("Failed to get file size for file " + abs_path.toString() +
" but succeeded in opening file. Something bizarre is going on.");
}
- FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size);
+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
+ fh, size, bufferSize);
ceph.debug("open:exit", ceph.DEBUG);
return new FSDataInputStream(cephIStream);
}
private CephFS ceph;
+ private byte[] buffer;
+ private int bufPos = -1;
+ private long cephPos = 0;
+
/**
* Create a new CephInputStream.
* @param conf The system configuration. Unused.
* you will need to close and re-open it to access the new data.
*/
public CephInputStream(Configuration conf, CephFS cephfs,
- int fh, long flength) {
+ int fh, long flength, int bufferSize) {
// Whoever's calling the constructor is responsible for doing the actual ceph_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
ceph = cephfs;
+ buffer = new byte[bufferSize];
ceph.debug("CephInputStream constructor: initializing stream with fh "
+ fh + " and file length " + flength, ceph.DEBUG);
}
public synchronized long getPos() throws IOException {
- return ceph.ceph_getpos(fileHandle);
+ if (bufPos == -1) {
+ cephPos = ceph.ceph_getpos(fileHandle);
+ return cephPos;
+ }
+ return cephPos - buffer.length + bufPos;
}
+
/**
* Find the number of bytes remaining in the file.
*/
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
" on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
}
- ceph.ceph_seek_from_start(fileHandle, targetPos);
+ if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= 0) {
+ bufPos = buffer.length - (cephPos - targetPos);
+ } else {
+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+ if (cephPos < 0) {
+ throw new IOException ("Ceph failed to seek to new position!");
+ }
+ bufPos = -1;
+ }
}
/**
return -1;
}
- // actually do the read
- int result = ceph.ceph_read(fileHandle, buf, off, len);
+ //if the read can be satisfied from buffer, do so.
+ if (bufPos + len < buffer.length) {
+ //refill buffer if it's empty
+ if (bufPos == -1) {
+ ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+ cephPos = ceph.ceph_getpos(fileHandle);
+ bufPos = 0;
+ }
+ for (int i = 0; i < len; ++i)
+ buf[off+i] = buffer[bufPos+i];
+ bufPos += len;
+ return len;
+ }
+
+ //otherwise copy the rest of the buffer in, and then read straight
+ //into buf if we still need more data
+ int read = 0;
+ if (bufPos != -1) {
+ read = buffer.length - bufPos;
+ for (int i = 0; i < read; ++i)
+ buf[off+i] = buffer[bufPos+i];
+ bufPos = -1; //no data in buffer now;
+ }
+
+ if (read == len)
+ return read;
+
+
+ int result = ceph.ceph_read(fileHandle, buf, off+read, len-read);
+ cephPos += len-read;
if (result < 0)
ceph.debug("CephInputStream.read: Reading " + len
+ " bytes from fd " + fileHandle + " failed.", ceph.WARN);
private int fileHandle;
+ private byte[] buffer;
+ private int bufUsed = 0;
+
/**
* Construct the CephOutputStream.
* @param conf The FileSystem configuration.
* @param fh The Ceph filehandle to connect to.
*/
- public CephOutputStream(Configuration conf, CephFS cephfs, int fh) {
+ public CephOutputStream(Configuration conf, CephFS cephfs,
+ int fh, int bufferSize) {
ceph = cephfs;
fileHandle = fh;
closed = false;
+ buffer = new byte[bufferSize];
}
/**Ceph likes things to be closed before it shuts down,
+ off +", and buffer size is " + buf.length);
}
- // write!
- int result = ceph.ceph_write(fileHandle, buf, off, len);
+ int result;
+
+ // if there's lots of space left, write to the buffer and return
+ if (bufUsed + len < buffer.length) {
+ for (int i = 0; i < len; ++i) {
+ buffer[bufUsed+i] = buf[off+i];
+ }
+ bufUsed += len;
+ return;
+ }
+
+ //if len isn't too large, fill buffer, write, and fill with rest
+ if (bufUsed + len < 2*buffer.length) {
+ for (int i = 0; i + bufUsed < buffer.length; ++i) {
+ buffer[bufUsed+i] = buf[off+i];
+ }
+ int sent = len - (buffer.length - bufUsed);
+ result = ceph.ceph_write(fileHandle, buffer, 0, buffer.length);
+ if (result != buffer.length)
+ throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
+ off += sent;
+ for (int i = 0; i + sent < len; ++i) {
+ buffer[i] = buf[off+i];
+ }
+ bufUsed = len - sent;
+ return;
+ }
+
+
+ //if we make it here, the buffer's huge, so just flush the old buffer...
+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+ if (result < 0 || result != bufUsed)
+ throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
+ bufUsed = 0;
+ //...and then write ful buf
+ result = ceph.ceph_write(fileHandle, buf, off, len);
if (result < 0) {
throw new IOException("CephOutputStream.write: Write of " + len +
"bytes to fd " + fileHandle + " failed");
}
/**
- * Flush the written data. It doesn't actually do anything; all writes are synchronous.
- * @throws IOException if you've closed the stream.
+ * Flush the buffered data.
+ * @throws IOException if you've closed the stream or the write fails.
*/
@Override
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
+ if (bufUsed == 0) return;
+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+ if (result < 0) {
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + " failed");
+ }
+ if (result != len) {
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + "was incomplete: only "
+ + result + " of " + len + " bytes were written.");
+ }
return;
- }
+ }
/**
* Close the CephOutputStream.
if (closed) {
throw new IOException("Stream closed");
}
-
+ flush();
int result = ceph.ceph_close(fileHandle);
if (result != 0) {
throw new IOException("Close failed!");