/**
* Get an FSDataOutputStream to append onto a file.
* @param file The File you want to append onto
- * @param bufferSize The size of the buffer to use in flushing writes
+ * @param bufferSize Ceph does internal buffering; this is ignored.
* @param progress The Progressable to report progress to.
* Reporting is limited but exists.
* @return An FSDataOutputStream that connects to the file on Ceph.
* @param path The file to create.
* @param permission The permissions to apply to the file.
* @param overwrite If true, overwrite any existing file with this name.
- * @param bufferSize The size of the write buffer in the returned OutputStream.
+ * @param bufferSize Ceph does internal buffering; this is ignored.
* @param replication Ignored by Ceph. This can be configured via Ceph configuration.
* @param blockSize Ignored by Ceph.
* @param progress A Progressable to report back to. Reporting is limited but exists.
/**
* Open a Ceph file and attach the file handle to an FSDataInputStream.
* @param path The file to open
- * @param bufferSize the size of the read buffer in the returned FSDataInputStream.
+ * @param bufferSize Ceph does internal buffering; this is ignored.
* @return FSDataInputStream reading from the given path.
* @throws IOException if initialize() hasn't been called, the path DNE or is a
* directory, or there is an error getting data to set up the FSDataInputStream.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSInputStream;
+
+/**
+ * <p>
+ * An {@link FSInputStream} for a CephFileSystem and corresponding
+ * Ceph instance.
+ */
class CephInputStream extends FSInputStream {
private int bufferSize;
private native long ceph_getpos(int fh);
private native int ceph_close(int fh);
- /*
- public S3InputStream(Configuration conf, FileSystemStore store,
- INode inode) {
-
- this.store = store;
- this.blocks = inode.getBlocks();
- for (Block block : blocks) {
- this.fileLength += block.getLength();
- }
- this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
- }
- */
-
+ /**
+ * Create a new CephInputStream.
+ * @param conf The system configuration. Unused.
+ * @param fh The filehandle provided by Ceph to reference.
+ * @param flength The current length of the file. If the length changes
+ * you will need to close and re-open it to access the new data.
+ */
public CephInputStream(Configuration conf, int fh, long flength) {
System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
- //System.out.println("CephInputStream constructor: initializing stream with fh "
- // + fh + " and file length " + flength);
+ debug("CephInputStream constructor: initializing stream with fh "
+ + fh + " and file length " + flength);
- // TODO: Then what do we need from the config? The buffer size maybe?
- // Anything? Bueller?
-
}
- //Ceph requires that things be closed before it can shutdown,
+ //Ceph likes things to be closed before it shuts down,
//so closing the IOStream stuff voluntarily is good
public void finalize () throws Throwable {
try {
}
@Override
- public synchronized int available() throws IOException {
+ public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}
public synchronized void seek(long targetPos) throws IOException {
- //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
- // " on fd " + fileHandle);
+ debug("CephInputStream.seek: Seeking to position " + targetPos +
+ " on fd " + fileHandle);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
" on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
ceph_seek_from_start(fileHandle, targetPos);
}
- //failovers are handled by the Ceph code at a very low level;
- //if there are issues that can be solved by changing sources
- //they'll be dealt with before anybody even tries to call this method!
+ /**
+ * Failovers are handled by the Ceph code at a very low level;
+ * if there are issues that can be solved by changing sources
+ * they'll be dealt with before anybody even tries to call this method!
+ * @return false.
+ */
public synchronized boolean seekToNewSource(long targetPos) {
return false;
}
- // reads a byte
+ /**
+ * Read a byte from the file.
+ * @return the next byte.
+ */
@Override
- public synchronized int read() throws IOException {
- //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
- // + " by calling general read function");
+ public synchronized int read() throws IOException {
+ debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ + " by calling general read function");
byte result[] = new byte[1];
if (getPos() >= fileLength) return -1;
return result[0];
}
-
+ /**
+ * Read a specified number of bytes into a byte[] from the file.
+ * @param buf[] the byte array to read into.
+ * @param off the offset to start at in the file
+ * @param len the number of bytes to read
+ * @return 0 if successful, otherwise an error code.
+ */
@Override
- public synchronized int read(byte buf[], int off, int len) throws IOException {
- //System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
+ debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
throw new IOException("CephInputStream.read: cannot read " + len +
if (null == buf) {
throw new NullPointerException("Read buffer is null");
}
-
+
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
- + "read length is " + len + ", buffer offset is "
- + off +", and buffer size is " + buf.length);
+ + "read length is " + len + ", buffer offset is "
+ + off +", and buffer size is " + buf.length);
}
-
+
// ensure we're not past the end of the file
if (getPos() >= fileLength)
{
- System.out.println("CephInputStream.read: cannot read " + len +
+ debug("CephInputStream.read: cannot read " + len +
" bytes from fd " + fileHandle + ": current position is " +
getPos() + " and file length is " + fileLength);
-
+
return -1;
}
// actually do the read
int result = ceph_read(fileHandle, buf, off, len);
if (result < 0)
- System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
- + fileHandle + " failed.");
- else {}
- // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
- // + fileHandle + ": succeeded in reading " + result + " bytes");
-
-
+ debug("CephInputStream.read: Reading " + len
+ + " bytes from fd " + fileHandle + " failed.");
+ debug("CephInputStream.read: Reading " + len + " bytes from fd "
+ + fileHandle + ": succeeded in reading " + result + " bytes");
return result;
- }
-
-
+ }
+ /**
+ * Close the CephInputStream and release the associated filehandle.
+ */
@Override
- public void close() throws IOException {
+ public void close() throws IOException {
debug("CephOutputStream.close:enter");
if (closed) {
throw new IOException("Stream closed");
}
/**
- * We don't support marks.
+ * Marks are not supported.
+ * @return false
*/
@Override
- public boolean markSupported() {
+ public boolean markSupported() {
return false;
}
+ /**
+ * Since marking isn't supported, this function throws an IOException.
+ * @throws IOException whenever called.
+ */
@Override
- public void mark(int readLimit) {
- // Do nothing
+ public void mark(int readLimit) {
+ throw new IOException("Mark not supported");
}
+ /**
+ * Since marks aren't supported, this function throws an IOException.
+ * @throws IOException whenever called.
+ */
@Override
- public void reset() throws IOException {
+ public void reset() throws IOException {
throw new IOException("Mark not supported");
}