]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Javadoc and cleanup in CephInputStream.
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 18 Aug 2009 00:15:01 +0000 (17:15 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 18 Aug 2009 00:16:14 +0000 (17:16 -0700)
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephInputStream.java

index 4286717f37a7425da56ad8ce5ffd83e4d05c74aa..d60c26861e05524a52856ada7b3e60b823a31e73 100644 (file)
@@ -161,7 +161,7 @@ public class CephFileSystem extends FileSystem {
   /**
    * Get an FSDataOutputStream to append onto a file.
    * @param file The File you want to append onto
-   * @param bufferSize The size of the buffer to use in flushing writes
+   * @param bufferSize Ceph does internal buffering; this is ignored.
    * @param progress The Progressable to report progress to.
    * Reporting is limited but exists.
    * @return An FSDataOutputStream that connects to the file on Ceph.
@@ -444,7 +444,7 @@ public class CephFileSystem extends FileSystem {
    * @param path The file to create.
    * @param permission The permissions to apply to the file.
    * @param overwrite If true, overwrite any existing file with this name.
-   * @param bufferSize The size of the write buffer in the returned OutputStream.
+   * @param bufferSize Ceph does internal buffering; this is ignored.
    * @param replication Ignored by Ceph. This can be configured via Ceph configuration.
    * @param blockSize Ignored by Ceph.
    * @param progress A Progressable to report back to. Reporting is limited but exists.
@@ -513,7 +513,7 @@ public class CephFileSystem extends FileSystem {
   /**
    * Open a Ceph file and attach the file handle to an FSDataInputStream.
    * @param path The file to open
-   * @param bufferSize the size of the read buffer in the returned FSDataInputStream.
+   * @param bufferSize Ceph does internal buffering; this is ignored.
    * @return FSDataInputStream reading from the given path.
    * @throws IOException if initialize() hasn't been called, the path DNE or is a
    * directory, or there is an error getting data to set up the FSDataInputStream.
index b6954d0bafaa5b594846edd9f6f563cde90cc57b..96c176625c0c16b567f9a62f40dd39d060d9d446 100644 (file)
@@ -14,6 +14,12 @@ import java.io.OutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
 
+
+/**
+ * <p>
+ * An {@link FSInputStream} for a CephFileSystem and corresponding
+ * Ceph instance.
+ */
 class CephInputStream extends FSInputStream {
 
   private int bufferSize;
@@ -39,19 +45,13 @@ class CephInputStream extends FSInputStream {
   private native long ceph_getpos(int fh);
   private native int ceph_close(int fh);
     
-  /*
-    public S3InputStream(Configuration conf, FileSystemStore store,
-    INode inode) {
-    
-    this.store = store;
-    this.blocks = inode.getBlocks();
-    for (Block block : blocks) {
-    this.fileLength += block.getLength();
-    }
-    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);    
-    }
-  */
-
+  /**
+   * Create a new CephInputStream.
+   * @param conf The system configuration. Unused.
+   * @param fh The filehandle provided by Ceph to reference.
+   * @param flength The current length of the file. If the length changes
+   * you will need to close and re-open it to access the new data.
+   */
   public CephInputStream(Configuration conf, int fh, long flength) {
     System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
     System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
@@ -59,14 +59,11 @@ class CephInputStream extends FSInputStream {
     // call and providing the file handle.
     fileLength = flength;
     fileHandle = fh;
-    //System.out.println("CephInputStream constructor: initializing stream with fh "
-    //          + fh + " and file length " + flength);
+    debug("CephInputStream constructor: initializing stream with fh "
+         + fh + " and file length " + flength);
       
-    // TODO: Then what do we need from the config? The buffer size maybe?
-    // Anything? Bueller?
-
   }
-  //Ceph requires that things be closed before it can shutdown,
+  //Ceph likes things to be closed before it shuts down,
   //so closing the IOStream stuff voluntarily is good
   public void finalize () throws Throwable {
     try {
@@ -80,13 +77,13 @@ class CephInputStream extends FSInputStream {
   }
 
   @Override
-    public synchronized int available() throws IOException {
+  public synchronized int available() throws IOException {
       return (int) (fileLength - getPos());
     }
 
   public synchronized void seek(long targetPos) throws IOException {
-    //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
-    //          " on fd " + fileHandle);
+    debug("CephInputStream.seek: Seeking to position " + targetPos +
+         " on fd " + fileHandle);
     if (targetPos > fileLength) {
       throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
                            " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
@@ -94,19 +91,25 @@ class CephInputStream extends FSInputStream {
     ceph_seek_from_start(fileHandle, targetPos);
   }
 
-  //failovers are handled by the Ceph code at a very low level;
-  //if there are issues that can be solved by changing sources
-  //they'll be dealt with before anybody even tries to call this method!
+  /**
+   * Failovers are handled by the Ceph code at a very low level;
+   * if there are issues that can be solved by changing sources
+   * they'll be dealt with before anybody even tries to call this method!
+   * @return false.
+   */
   public synchronized boolean seekToNewSource(long targetPos) {
     return false;
   }
     
     
-  // reads a byte
+  /**
+   * Read a byte from the file.
+   * @return the next byte.
+   */
   @Override
-    public synchronized int read() throws IOException {
-      //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
-      //                + " by calling general read function");
+  public synchronized int read() throws IOException {
+      debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+           + " by calling general read function");
 
       byte result[] = new byte[1];
       if (getPos() >= fileLength) return -1;
@@ -114,10 +117,16 @@ class CephInputStream extends FSInputStream {
       return result[0];
     }
 
-
+  /**
+   * Read a specified number of bytes into a byte[] from the file.
+   * @param buf[] the byte array to read into.
+   * @param off the offset to start at in the file
+   * @param len the number of bytes to read
+   * @return 0 if successful, otherwise an error code.
+   */
   @Override
-    public synchronized int read(byte buf[], int off, int len) throws IOException {
-      //System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
+  public synchronized int read(byte buf[], int off, int len) throws IOException {
+      debug("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
       
       if (closed) {
        throw new IOException("CephInputStream.read: cannot read " + len  + 
@@ -126,40 +135,38 @@ class CephInputStream extends FSInputStream {
       if (null == buf) {
        throw new NullPointerException("Read buffer is null");
       }
-
+      
       // check for proper index bounds
       if((off < 0) || (len < 0) || (off + len > buf.length)) {
        throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
-                                           + "read length is " + len + ", buffer offset is " 
-                                           + off +", and buffer size is " + buf.length);
+                           + "read length is " + len + ", buffer offset is " 
+                           + off +", and buffer size is " + buf.length);
       }
-
+      
       // ensure we're not past the end of the file
       if (getPos() >= fileLength) 
        {
-         System.out.println("CephInputStream.read: cannot read " + len  + 
+         debug("CephInputStream.read: cannot read " + len  + 
                             " bytes from fd " + fileHandle + ": current position is " +
                             getPos() + " and file length is " + fileLength);
-           
+         
          return -1;
        }
       // actually do the read
       int result = ceph_read(fileHandle, buf, off, len);
       if (result < 0)
-       System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
-                          + fileHandle + " failed.");
-      else {}
-      //      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
-      //                + fileHandle + ": succeeded in reading " + result + " bytes");
-
-
+       debug("CephInputStream.read: Reading " + len
+                          + " bytes from fd " + fileHandle + " failed.");
 
+      debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
+           + fileHandle + ": succeeded in reading " + result + " bytes");   
       return result;
-    }
-
-
+  }
+  /**
+   * Close the CephInputStream and release the associated filehandle.
+   */
   @Override
-    public void close() throws IOException {
+  public void close() throws IOException {
     debug("CephOutputStream.close:enter");
     if (closed) {
       throw new IOException("Stream closed");
@@ -174,20 +181,29 @@ class CephInputStream extends FSInputStream {
   }
 
   /**
-   * We don't support marks.
+   * Marks are not supported.
+   * @return false
    */
   @Override
-    public boolean markSupported() {
+  public boolean markSupported() {
     return false;
   }
 
+  /**
+   * Since marking isn't supported, this function throws an IOException.
+   * @throws IOException whenever called.
+   */
   @Override
-    public void mark(int readLimit) {
-    // Do nothing
+  public void mark(int readLimit) {
+    throw new IOException("Mark not supported");
   }
 
+  /**
+   * Since marks aren't supported, this function throws an IOException.
+   * @throws IOException whenever called.
+   */
   @Override
-    public void reset() throws IOException {
+  public void reset() throws IOException {
     throw new IOException("Mark not supported");
   }