]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: CephInputStream retabbing and add seekToNewSource stub.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:24:36 +0000 (11:24 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:37:01 +0000 (11:37 -0700)
src/client/hadoop/ceph/CephInputStream.java

index e3b4f1f96077557e6f67fd2f05f5d12432c4f1f8..79b747c6a4e6b65426d35752431be2def5f78590 100644 (file)
@@ -1,3 +1,4 @@
+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 package org.apache.hadoop.fs.ceph;
 
 import java.io.BufferedOutputStream;
@@ -21,7 +22,7 @@ class CephInputStream extends FSInputStream {
 
   private int bufferSize;
 
-    //private Block[] blocks;
+  //private Block[] blocks;
 
   private boolean closed;
 
@@ -31,11 +32,11 @@ class CephInputStream extends FSInputStream {
 
   private long fileLength;
 
-    //private long pos = 0;
+  //private long pos = 0;
 
-    //private DataInputStream blockStream;
+  //private DataInputStream blockStream;
 
-    //private long blockEnd = -1;
+  //private long blockEnd = -1;
 
   private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length);
   private native long ceph_seek_from_start(long client, int fh, long pos);
@@ -43,64 +44,67 @@ class CephInputStream extends FSInputStream {
   private native int ceph_close(long client, int fh);
 
   private int ceph_read(byte[] buffer, int buffer_offset, int length)
-    { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
+  { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
   private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); }
   private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); }
   private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
     
-    /*
-  public S3InputStream(Configuration conf, FileSystemStore store,
-      INode inode) {
+  /*
+    public S3InputStream(Configuration conf, FileSystemStore store,
+    INode inode) {
     
     this.store = store;
     this.blocks = inode.getBlocks();
     for (Block block : blocks) {
-      this.fileLength += block.getLength();
+    this.fileLength += block.getLength();
     }
     this.bufferSize = conf.getInt("io.file.buffer.size", 4096);    
-  }
-    */
+    }
+  */
 
   public CephInputStream(Configuration conf, long clientp, int fh, long flength) {
 
-      // Whoever's calling the constructor is responsible for doing the actual ceph_open
-      // call and providing the file handle.
-      clientPointer = clientp;
-      fileLength = flength;
-      fileHandle = fh;
-      //System.out.println("CephInputStream constructor: initializing stream with fh "
-      //                + fh + " and file length " + flength);
+    // Whoever's calling the constructor is responsible for doing the actual ceph_open
+    // call and providing the file handle.
+    clientPointer = clientp;
+    fileLength = flength;
+    fileHandle = fh;
+    //System.out.println("CephInputStream constructor: initializing stream with fh "
+    //          + fh + " and file length " + flength);
       
-      // TODO: Then what do we need from the config? The buffer size maybe?
-      // Anything? Bueller?
+    // TODO: Then what do we need from the config? The buffer size maybe?
+    // Anything? Bueller?
 
   }
 
-  @Override
   public synchronized long getPos() throws IOException {
     return ceph_getpos();
   }
 
   @Override
-  public synchronized int available() throws IOException {
-    return (int) (fileLength - getPos());
-  }
+    public synchronized int available() throws IOException {
+      return (int) (fileLength - getPos());
+    }
 
-  @Override
   public synchronized void seek(long targetPos) throws IOException {
-      //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
-      //                " on fd " + fileHandle);
+    //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
+    //          " on fd " + fileHandle);
     if (targetPos > fileLength) {
       throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
-                        " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+                           " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
     }
     ceph_seek_from_start(targetPos);
   }
 
+  //method stub obviously
+  public synchronized boolean seekToNewSource(long targetPos) {
+    return true;
+  }
+    
     
   // reads a byte
   @Override
-  public synchronized int read() throws IOException {
+    public synchronized int read() throws IOException {
       //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
       //                + " by calling general read function");
 
@@ -108,61 +112,61 @@ class CephInputStream extends FSInputStream {
       if (getPos() >= fileLength) return -1;
       if (-1 == read(result, 0, 1)) return -1;
       return result[0];
-  }
+    }
 
 
   @Override
-  public synchronized int read(byte buf[], int off, int len) throws IOException {
+    public synchronized int read(byte buf[], int off, int len) throws IOException {
       //System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
       
-    if (closed) {
-      throw new IOException("CephInputStream.read: cannot read " + len  + 
-                           " bytes from fd " + fileHandle + ": stream closed");
-    }
-    if (null == buf) {
+      if (closed) {
+       throw new IOException("CephInputStream.read: cannot read " + len  + 
+                             " bytes from fd " + fileHandle + ": stream closed");
+      }
+      if (null == buf) {
        throw new NullPointerException("Read buffer is null");
-    }
+      }
 
-    // check for proper index bounds
-    if((off < 0) || (len < 0) || (off + len > buf.length)) {
+      // check for proper index bounds
+      if((off < 0) || (len < 0) || (off + len > buf.length)) {
        throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
                                            + "read length is " + len + ", buffer offset is " 
                                            + off +", and buffer size is " + buf.length);
-       }
+      }
 
-    // ensure we're not past the end of the file
-    if (getPos() >= fileLength) 
+      // ensure we're not past the end of the file
+      if (getPos() >= fileLength) 
        {
-           System.out.println("CephInputStream.read: cannot read " + len  + 
-                                 " bytes from fd " + fileHandle + ": current position is " +
-                                 getPos() + " and file length is " + fileLength);
+         System.out.println("CephInputStream.read: cannot read " + len  + 
+                            " bytes from fd " + fileHandle + ": current position is " +
+                            getPos() + " and file length is " + fileLength);
            
-           return -1;
+         return -1;
        }
-    // actually do the read
-    int result = ceph_read(buf, off, len);
-    if (result < 0)
-      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
-                        + fileHandle + " failed.");
-    else {}
-    //      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
-    //          + fileHandle + ": succeeded in reading " + result + " bytes");
+      // actually do the read
+      int result = ceph_read(buf, off, len);
+      if (result < 0)
+       System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
+                          + fileHandle + " failed.");
+      else {}
+      //      System.out.println("CephInputStream.read: Reading " + len  + " bytes from fd " 
+      //                + fileHandle + ": succeeded in reading " + result + " bytes");
 
 
 
-    return result;
-  }
+      return result;
+    }
 
 
   @Override
-  public void close() throws IOException {
+    public void close() throws IOException {
     if (closed) {
       throw new IOException("Stream closed");
     }
 
     int result = ceph_close();
     if (result != 0) {
-       throw new IOException("Close failed!");
+      throw new IOException("Close failed!");
     }
        
     closed = true;
@@ -172,17 +176,17 @@ class CephInputStream extends FSInputStream {
    * We don't support marks.
    */
   @Override
-  public boolean markSupported() {
+    public boolean markSupported() {
     return false;
   }
 
   @Override
-  public void mark(int readLimit) {
+    public void mark(int readLimit) {
     // Do nothing
   }
 
   @Override
-  public void reset() throws IOException {
+    public void reset() throws IOException {
     throw new IOException("Mark not supported");
   }