]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Javadoc and cleanup in CephInput/OutputStream.
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 18 Aug 2009 15:41:40 +0000 (08:41 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Tue, 18 Aug 2009 15:41:40 +0000 (08:41 -0700)
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java

index 96c176625c0c16b567f9a62f40dd39d060d9d446..877d528a2b3bd63a61c5e6021e596827d1084519 100644 (file)
@@ -162,6 +162,7 @@ class CephInputStream extends FSInputStream {
            + fileHandle + ": succeeded in reading " + result + " bytes");   
       return result;
   }
+
   /**
    * Close the CephInputStream and release the associated filehandle.
    */
@@ -180,33 +181,6 @@ class CephInputStream extends FSInputStream {
     debug("CephOutputStream.close:exit");
   }
 
-  /**
-   * Marks are not supported.
-   * @return false
-   */
-  @Override
-  public boolean markSupported() {
-    return false;
-  }
-
-  /**
-   * Since marking isn't supported, this function throws an IOException.
-   * @throws IOException whenever called.
-   */
-  @Override
-  public void mark(int readLimit) {
-    throw new IOException("Mark not supported");
-  }
-
-  /**
-   * Since marks aren't supported, this function throws an IOException.
-   * @throws IOException whenever called.
-   */
-  @Override
-  public void reset() throws IOException {
-    throw new IOException("Mark not supported");
-  }
-
   private void debug(String out) {
     if (debug) System.out.println(out);
   }
index 0df04882638e4c3d66fa7b2b303b20fcdaf3215e..0f4f7b1ec12410348134ff558c35f60434aab9c6 100644 (file)
@@ -55,28 +55,11 @@ class CephOutputStream extends OutputStream {
   private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 
 
-  /*  public CephOutputStream(Configuration conf, FileSystemStore store,
-      Path path, long blockSize, Progressable progress) throws IOException {
-    
-      // basic pseudocode:
-      // call ceph_open_for_write to open the file
-      // store the file handle
-      // store the client pointer
-      // look up and store the block size while we're at it
-      // the following code's old. kill it
-
-      this.store = store;
-      this.path = path;
-      this.blockSize = blockSize;
-      this.backupFile = newBackupFile();
-      this.backupStream = new FileOutputStream(backupFile);
-      this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
-      this.outBuf = new byte[bufferSize];
-
-      }*/
-
-
-  // The file handle 
+  /**
+   * Construct the CephOutputStream.
+   * @param conf The FileSystem configuration.
+   * @param fh The Ceph filehandle to connect to.
+   */
   public CephOutputStream(Configuration conf,  int fh) {
     System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
     System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
@@ -85,7 +68,7 @@ class CephOutputStream extends OutputStream {
     closed = false;
   }
 
-  //Ceph requires that things be closed before it can shutdown,
+  //Ceph likes things to be closed before it shuts down,
   //so closing the IOStream stuff voluntarily is good
   public void finalize () throws Throwable {
     try {
@@ -94,23 +77,23 @@ class CephOutputStream extends OutputStream {
     finally { super.finalize();}
   }
 
-  // possibly useful for the local copy, write later thing?
-  // keep it around for now
-  private File newBackupFile() throws IOException {
-    File result = File.createTempFile("s3fs-out", "");
-    result.deleteOnExit();
-    return result;
-  } 
-
-    public long getPos() throws IOException {
-    // change to get the position from Ceph client
+  /**
+   * Get the current position in the file.
+   * @return The file offset in bytes.
+   */
+  public long getPos() throws IOException {
     return ceph_getpos(fileHandle);
   }
 
-  // writes a byte
+  /**
+   * Write a byte.
+   * @param b The byte to write.
+   * @throws IOException If you have closed the CephOutputStream or the
+   * write fails.
+   */
   @Override
-    public synchronized void write(int b) throws IOException {
-      //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle);
+  public synchronized void write(int b) throws IOException {
+      debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
 
       if (closed) {
        throw new IOException("CephOutputStream.write: cannot write " + 
@@ -121,16 +104,25 @@ class CephOutputStream extends OutputStream {
       buf[0] = (byte) b;    
       int result = ceph_write(fileHandle, buf, 0, 1);
       if (1 != result)
-       System.out.println("CephOutputStream.write: failed writing a single byte to fd "
-                          + fileHandle + ": Ceph write() result = " + result);
+       debug("CephOutputStream.write: failed writing a single byte to fd "
+             + fileHandle + ": Ceph write() result = " + result);
       return;
     }
 
+  /**
+   * Write a byte buffer into the Ceph file.
+   * @param buf[] the byte array to write from
+   * @param off the position in the file to start writing at.
+   * @param len The number of bytes to actually write.
+   * @throws IOException if you have closed the CephOutputStream, or
+   * the write fails.
+   * @throws NullPointerException if buf is null.
+   * @throws IndexOutOfBoundsException if len > buf.length.
+   */
   @Override
-    public synchronized void write(byte buf[], int off, int len) throws IOException {
-      //System.out.println("CephOutputStream.write: writing " + len + 
-      //                " bytes to fd " + fileHandle);
-
+  public synchronized void write(byte buf[], int off, int len) throws IOException {
+     debug("CephOutputStream.write: writing " + len + 
+          " bytes to fd " + fileHandle);
       // make sure stream is open
       if (closed) {
        throw new IOException("CephOutputStream.write: cannot write " + len + 
@@ -164,16 +156,24 @@ class CephOutputStream extends OutputStream {
       return; 
     }
    
+  /**
+   * Flush the written data. It doesn't actually do anything; all writes are synchronous.
+   * @throws IOException if you've closed the stream.
+   */
   @Override
-    public synchronized void flush() throws IOException {
-      if (closed) {
-       throw new IOException("Stream closed");
-      }
-      return;
+  public synchronized void flush() throws IOException {
+    if (closed) {
+      throw new IOException("Stream closed");
     }
-    
+    return;
+  }
+  
+  /**
+   * Close the CephOutputStream.
+   * @throws IOException if Ceph somehow returns an error. In current code it can't.
+   */
   @Override
-    public synchronized void close() throws IOException {
+  public synchronized void close() throws IOException {
       debug("CephOutputStream.close:enter");
       if (closed) {
        throw new IOException("Stream closed");