]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Add buffering to CephIOStreams
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 21 Oct 2009 23:56:29 +0000 (16:56 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 23 Oct 2009 21:14:46 +0000 (14:14 -0700)
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java

index 4aea7a24671324bb370aff1fbc80455d79bf65d5..6d05e8ff80c8aef4bc8fcded57575eb6727ea6f3 100644 (file)
@@ -200,7 +200,8 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("append: Open for append failed on path \"" +
                                                                                                                abs_path.toString() + "\"");
     }
-    CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd);
+    CephOutputStream cephOStream = new CephOutputStream(getConf(),
+                                                                                                                                                                                                                               ceph, fd, bufferSize);
     ceph.debug("append:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
   }
@@ -504,7 +505,8 @@ public class CephFileSystem extends FileSystem {
     }
       
     // Step 4: create the stream
-    OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh);
+    OutputStream cephOStream = new CephOutputStream(getConf(),
+                                                                                                                                                                                                               ceph, fh, bufferSize);
     ceph.debug("create:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
        }
@@ -547,7 +549,8 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("Failed to get file size for file " + abs_path.toString() + 
                                                                                                                " but succeeded in opening file. Something bizarre is going on.");
     }
-    FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size);
+    FSInputStream cephIStream = new CephInputStream(getConf(), ceph,
+                                                                                                                                                                                                               fh, size, bufferSize);
     ceph.debug("open:exit", ceph.DEBUG);
     return new FSDataInputStream(cephIStream);
        }
index 2d4447aa47c5eba67f23585cf5d6f72049223ded..4e7cefbe106fe92ddfacf0e951a6b59088e75162 100644 (file)
@@ -40,6 +40,10 @@ public class CephInputStream extends FSInputStream {
 
        private CephFS ceph;
 
+       private byte[] buffer;
+       private int bufPos = -1;
+       private long cephPos = 0;
+
   /**
    * Create a new CephInputStream.
    * @param conf The system configuration. Unused.
@@ -48,13 +52,14 @@ public class CephInputStream extends FSInputStream {
    * you will need to close and re-open it to access the new data.
    */
   public CephInputStream(Configuration conf, CephFS cephfs,
-                                                                                                int fh, long flength) {
+                                                                                                int fh, long flength, int bufferSize) {
     // Whoever's calling the constructor is responsible for doing the actual ceph_open
     // call and providing the file handle.
     fileLength = flength;
     fileHandle = fh;
     closed = false;
                ceph = cephfs;
+               buffer = new byte[bufferSize];
     ceph.debug("CephInputStream constructor: initializing stream with fh "
                                                                                + fh + " and file length " + flength, ceph.DEBUG);
       
@@ -70,8 +75,13 @@ public class CephInputStream extends FSInputStream {
   }
 
   public synchronized long getPos() throws IOException {
-    return ceph.ceph_getpos(fileHandle);
+               if (bufPos == -1) {
+                       cephPos = ceph.ceph_getpos(fileHandle);
+                       return cephPos;
+               }
+               return cephPos - buffer.length + bufPos;
   }
+
   /**
    * Find the number of bytes remaining in the file.
    */
@@ -87,7 +97,15 @@ public class CephInputStream extends FSInputStream {
       throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
                                                                                                                " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
     }
-    ceph.ceph_seek_from_start(fileHandle, targetPos);
+               if ((cephPos-targetPos < buffer.length) && -1 != bufPos && cephPos >= 0) {
+                       bufPos = buffer.length - (cephPos - targetPos);
+               } else {
+                       cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
+                       if (cephPos < 0) {
+                               throw new IOException ("Ceph failed to seek to new position!");
+                       }
+                       bufPos = -1;
+               }
   }
 
   /**
@@ -156,8 +174,36 @@ public class CephInputStream extends FSInputStream {
          
                                        return -1;
                                }
-      // actually do the read
-      int result = ceph.ceph_read(fileHandle, buf, off, len);
+                       //if the read can be satisfied from buffer, do so.
+                       if (bufPos + len < buffer.length) {
+                               //refill buffer if it's empty
+                               if (bufPos == -1) {
+                                       ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
+                                       cephPos = ceph.ceph_getpos(fileHandle);
+                                       bufPos = 0;
+                               }
+                               for (int i = 0; i < len; ++i)
+                                       buf[off+i] = buffer[bufPos+i];
+                               bufPos += len;
+                               return len;
+                       }
+
+                       //otherwise copy the rest of the buffer in, and then read straight
+                       //into buf if we still need more data
+                       int read = 0;
+                       if (bufPos != -1) {
+                               read =  buffer.length - bufPos;
+                               for (int i = 0; i < read; ++i)
+                                       buf[off+i] = buffer[bufPos+i];
+                               bufPos = -1; //no data in buffer now;
+                       }
+
+                       if (read == len)
+                               return read;
+
+
+      int result = ceph.ceph_read(fileHandle, buf, off+read, len-read);
+                       cephPos += len-read;
       if (result < 0)
                                ceph.debug("CephInputStream.read: Reading " + len
                                                                         + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
index 51fa92cefe74114ff29e6d4b339587eae2035704..852decfc3560ddaa60482ee2ba646edb3e93ee1c 100644 (file)
@@ -39,15 +39,20 @@ public class CephOutputStream extends OutputStream {
 
   private int fileHandle;
 
+       private byte[] buffer;
+       private int bufUsed = 0;
+
   /**
    * Construct the CephOutputStream.
    * @param conf The FileSystem configuration.
    * @param fh The Ceph filehandle to connect to.
    */
-  public CephOutputStream(Configuration conf, CephFS cephfs, int fh) {
+  public CephOutputStream(Configuration conf, CephFS cephfs,
+                                                                                                       int fh, int bufferSize) {
                ceph = cephfs;
     fileHandle = fh;
     closed = false;
+               buffer = new byte[bufferSize];
   }
 
   /**Ceph likes things to be closed before it shuts down,
@@ -126,8 +131,42 @@ ceph.WARN);
                                                                                                                                                                                + off +", and buffer size is " + buf.length);
       }
 
-      // write!
-      int result = ceph.ceph_write(fileHandle, buf, off, len);
+                       int result;
+
+      // if there's lots of space left, write to the buffer and return
+                       if (bufUsed + len < buffer.length) {
+                               for (int i = 0; i < len; ++i) {
+                                       buffer[bufUsed+i] = buf[off+i];
+                               }
+                               bufUsed += len;
+                               return;
+                       }
+
+                       //if len isn't too large, fill buffer, write, and fill with rest
+                       if (bufUsed + len < 2*buffer.length) {
+                               for (int i = 0; i + bufUsed < buffer.length; ++i) {
+                                       buffer[bufUsed+i] = buf[off+i];
+                               }
+                               int sent = len - (buffer.length - bufUsed);
+                               result = ceph.ceph_write(fileHandle, buffer, 0, buffer.length);
+                               if (result != buffer.length)
+                                       throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
+                               off += sent;
+                               for (int i = 0; i + sent < len; ++i) {
+                                       buffer[i] = buf[off+i];
+                               }
+                               bufUsed = len - sent;
+                               return;
+                       }
+
+
+                       //if we make it here, the buffer's huge, so just flush the old buffer...
+                       result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+                       if (result < 0 || result != bufUsed)
+                               throw new IOException("CephOutputStream.write: Failed to write some buffered data to fd " + fileHandle);
+                       bufUsed = 0;
+                       //...and then write ful buf
+      result = ceph.ceph_write(fileHandle, buf, off, len);
       if (result < 0) {
                                throw new IOException("CephOutputStream.write: Write of " + len + 
                                                                                                                        "bytes to fd " + fileHandle + " failed");
@@ -141,16 +180,27 @@ ceph.WARN);
     }
    
   /**
-   * Flush the written data. It doesn't actually do anything; all writes are synchronous.
-   * @throws IOException if you've closed the stream.
+   * Flush the buffered data.
+   * @throws IOException if you've closed the stream or the write fails.
    */
   @Override
        public synchronized void flush() throws IOException {
                        if (closed) {
                                throw new IOException("Stream closed");
                        }
+                       if (bufUsed == 0) return;
+                       int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
+      if (result < 0) {
+                               throw new IOException("CephOutputStream.write: Write of " + len + 
+                                                                                                                       "bytes to fd " + fileHandle + " failed");
+      }
+      if (result != len) {
+                               throw new IOException("CephOutputStream.write: Write of " + len + 
+                                                                                                                       "bytes to fd " + fileHandle + "was incomplete:  only "
+                                                                                                                       + result + " of " + len + " bytes were written.");
+      }
                        return;
-               }
+       }
   
   /**
    * Close the CephOutputStream.
@@ -162,7 +212,7 @@ ceph.WARN);
       if (closed) {
                                throw new IOException("Stream closed");
       }
-
+                       flush();
       int result = ceph.ceph_close(fileHandle);
       if (result != 0) {
                                throw new IOException("Close failed!");