From: Greg Farnum Date: Tue, 18 Aug 2009 00:15:01 +0000 (-0700) Subject: Hadoop: Javadoc and cleanup in CephInputStream. X-Git-Tag: v0.13~30^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fea91de21243accf547ab3dea29fcd55c5452935;p=ceph.git Hadoop: Javadoc and cleanup in CephInputStream. --- diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index 4286717f37a7..d60c26861e05 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -161,7 +161,7 @@ public class CephFileSystem extends FileSystem { /** * Get an FSDataOutputStream to append onto a file. * @param file The File you want to append onto - * @param bufferSize The size of the buffer to use in flushing writes + * @param bufferSize Ceph does internal buffering; this is ignored. * @param progress The Progressable to report progress to. * Reporting is limited but exists. * @return An FSDataOutputStream that connects to the file on Ceph. @@ -444,7 +444,7 @@ public class CephFileSystem extends FileSystem { * @param path The file to create. * @param permission The permissions to apply to the file. * @param overwrite If true, overwrite any existing file with this name. - * @param bufferSize The size of the write buffer in the returned OutputStream. + * @param bufferSize Ceph does internal buffering; this is ignored. * @param replication Ignored by Ceph. This can be configured via Ceph configuration. * @param blockSize Ignored by Ceph. * @param progress A Progressable to report back to. Reporting is limited but exists. @@ -513,7 +513,7 @@ public class CephFileSystem extends FileSystem { /** * Open a Ceph file and attach the file handle to an FSDataInputStream. * @param path The file to open - * @param bufferSize the size of the read buffer in the returned FSDataInputStream. + * @param bufferSize Ceph does internal buffering; this is ignored. * @return FSDataInputStream reading from the given path. * @throws IOException if initialize() hasn't been called, the path DNE or is a * directory, or there is an error getting data to set up the FSDataInputStream. diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java index b6954d0bafaa..96c176625c0c 100644 --- a/src/client/hadoop/ceph/CephInputStream.java +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -14,6 +14,12 @@ import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSInputStream; + +/** + *

+ * An {@link FSInputStream} for a CephFileSystem and corresponding + * Ceph instance. + */ class CephInputStream extends FSInputStream { private int bufferSize; @@ -39,19 +45,13 @@ class CephInputStream extends FSInputStream { private native long ceph_getpos(int fh); private native int ceph_close(int fh); - /* - public S3InputStream(Configuration conf, FileSystemStore store, - INode inode) { - - this.store = store; - this.blocks = inode.getBlocks(); - for (Block block : blocks) { - this.fileLength += block.getLength(); - } - this.bufferSize = conf.getInt("io.file.buffer.size", 4096); - } - */ - + /** + * Create a new CephInputStream. + * @param conf The system configuration. Unused. + * @param fh The filehandle provided by Ceph to reference. + * @param flength The current length of the file. If the length changes + * you will need to close and re-open it to access the new data. + */ public CephInputStream(Configuration conf, int fh, long flength) { System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); @@ -59,14 +59,11 @@ class CephInputStream extends FSInputStream { // call and providing the file handle. fileLength = flength; fileHandle = fh; - //System.out.println("CephInputStream constructor: initializing stream with fh " - // + fh + " and file length " + flength); + debug("CephInputStream constructor: initializing stream with fh " + + fh + " and file length " + flength); - // TODO: Then what do we need from the config? The buffer size maybe? - // Anything? Bueller? - } - //Ceph requires that things be closed before it can shutdown, + //Ceph likes things to be closed before it shuts down, //so closing the IOStream stuff voluntarily is good public void finalize () throws Throwable { try { @@ -80,13 +77,13 @@ class CephInputStream extends FSInputStream { } @Override - public synchronized int available() throws IOException { + public synchronized int available() throws IOException { return (int) (fileLength - getPos()); } public synchronized void seek(long targetPos) throws IOException { - //System.out.println("CephInputStream.seek: Seeking to position " + targetPos + - // " on fd " + fileHandle); + debug("CephInputStream.seek: Seeking to position " + targetPos + + " on fd " + fileHandle); if (targetPos > fileLength) { throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); @@ -94,19 +91,25 @@ class CephInputStream extends FSInputStream { ceph_seek_from_start(fileHandle, targetPos); } - //failovers are handled by the Ceph code at a very low level; - //if there are issues that can be solved by changing sources - //they'll be dealt with before anybody even tries to call this method! + /** + * Failovers are handled by the Ceph code at a very low level; + * if there are issues that can be solved by changing sources + * they'll be dealt with before anybody even tries to call this method! + * @return false. + */ public synchronized boolean seekToNewSource(long targetPos) { return false; } - // reads a byte + /** + * Read a byte from the file. + * @return the next byte. + */ @Override - public synchronized int read() throws IOException { - //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle - // + " by calling general read function"); + public synchronized int read() throws IOException { + debug("CephInputStream.read: Reading a single byte from fd " + fileHandle + + " by calling general read function"); byte result[] = new byte[1]; if (getPos() >= fileLength) return -1; @@ -114,10 +117,16 @@ class CephInputStream extends FSInputStream { return result[0]; } - + /** + * Read a specified number of bytes into a byte[] from the file. + * @param buf[] the byte array to read into. + * @param off the offset to start at in the file + * @param len the number of bytes to read + * @return 0 if successful, otherwise an error code. + */ @Override - public synchronized int read(byte buf[], int off, int len) throws IOException { - //System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); + public synchronized int read(byte buf[], int off, int len) throws IOException { + debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); if (closed) { throw new IOException("CephInputStream.read: cannot read " + len + @@ -126,40 +135,38 @@ class CephInputStream extends FSInputStream { if (null == buf) { throw new NullPointerException("Read buffer is null"); } - + // check for proper index bounds if((off < 0) || (len < 0) || (off + len > buf.length)) { throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: " - + "read length is " + len + ", buffer offset is " - + off +", and buffer size is " + buf.length); + + "read length is " + len + ", buffer offset is " + + off +", and buffer size is " + buf.length); } - + // ensure we're not past the end of the file if (getPos() >= fileLength) { - System.out.println("CephInputStream.read: cannot read " + len + + debug("CephInputStream.read: cannot read " + len + " bytes from fd " + fileHandle + ": current position is " + getPos() + " and file length is " + fileLength); - + return -1; } // actually do the read int result = ceph_read(fileHandle, buf, off, len); if (result < 0) - System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " - + fileHandle + " failed."); - else {} - // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " - // + fileHandle + ": succeeded in reading " + result + " bytes"); - - + debug("CephInputStream.read: Reading " + len + + " bytes from fd " + fileHandle + " failed."); + debug("CephInputStream.read: Reading " + len + " bytes from fd " + + fileHandle + ": succeeded in reading " + result + " bytes"); return result; - } - - + } + /** + * Close the CephInputStream and release the associated filehandle. + */ @Override - public void close() throws IOException { + public void close() throws IOException { debug("CephOutputStream.close:enter"); if (closed) { throw new IOException("Stream closed"); @@ -174,20 +181,29 @@ class CephInputStream extends FSInputStream { } /** - * We don't support marks. + * Marks are not supported. + * @return false */ @Override - public boolean markSupported() { + public boolean markSupported() { return false; } + /** + * Since marking isn't supported, this function throws an IOException. + * @throws IOException whenever called. + */ @Override - public void mark(int readLimit) { - // Do nothing + public void mark(int readLimit) { + throw new IOException("Mark not supported"); } + /** + * Since marks aren't supported, this function throws an IOException. + * @throws IOException whenever called. + */ @Override - public void reset() throws IOException { + public void reset() throws IOException { throw new IOException("Mark not supported"); }