]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: CephOutputStream retabbed, and it's an OutputStream now.
authorGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:25:04 +0000 (11:25 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Wed, 22 Jul 2009 18:37:01 +0000 (11:37 -0700)
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephOutputStream.java

index c14f1aa53682f91176afe6272788f855c4509a7f..e1079bdf89f8f50932118a5ad9dc3577924097db 100644 (file)
@@ -2,12 +2,14 @@
 package org.apache.hadoop.fs.ceph;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URI;
 import java.util.Set;
 import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -153,7 +155,8 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("append: Open for append failed on path \"" +
                            abs_path.toString() + "\"");
     }
-    return new CephOutputStream(getConf(), clientPointer, fd);
+    CephOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fd);
+    return new FSDataOutputStream(cephOStream);
   }
 
   public String getName() {
@@ -403,7 +406,7 @@ public class CephFileSystem extends FileSystem {
     }
       
     // Step 4: create the stream
-    FSOutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh);
+    OutputStream cephOStream = new CephOutputStream(getConf(), clientPointer, fh);
     //System.out.println("createRaw: opened absolute path \""  + absfilepath.toString() 
     //          + "\" for writing with fh " + fh);
 
index 9fac5a4e3e2d37b9ed4c1d1502ea17db8d0e6974..254c83830b4bc6a46e38c9c9bc7ec50ebd7c7042 100644 (file)
@@ -1,3 +1,4 @@
+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
 package org.apache.hadoop.fs.ceph;
 
 import java.io.File;
@@ -11,11 +12,10 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Progressable;
 
-class CephOutputStream extends FSDataOutputStream {
+class CephOutputStream extends OutputStream {
 
   static {
     System.loadLibrary("hadoopcephfs");
@@ -26,7 +26,7 @@ class CephOutputStream extends FSDataOutputStream {
 
   private long fileLength;
 
-    //private FileSystemStore store;
+  //private FileSystemStore store;
 
   private Path path;
 
@@ -48,9 +48,9 @@ class CephOutputStream extends FSDataOutputStream {
 
   private byte[] outBuf;
 
-    //private List<Block> blocks = new ArrayList<Block>();
+  //private List<Block> blocks = new ArrayList<Block>();
 
-    //private Block nextBlock;
+  //private Block nextBlock;
 
     
 
@@ -62,7 +62,7 @@ class CephOutputStream extends FSDataOutputStream {
   }
   private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
   private int ceph_write(byte[] buffer, int buffer_offset, int length)
-    { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
+  { return ceph_write(clientPointer, fileHandle, buffer, buffer_offset, length); }
 
 
   private native long ceph_seek_from_start(long client, int fh, long pos);
@@ -71,33 +71,33 @@ class CephOutputStream extends FSDataOutputStream {
   private native int ceph_write(long client, int fh, byte[] buffer, int buffer_offset, int length);
 
 
-    /*  public CephOutputStream(Configuration conf, FileSystemStore store,
+  /*  public CephOutputStream(Configuration conf, FileSystemStore store,
       Path path, long blockSize, Progressable progress) throws IOException {
     
-    // basic pseudocode:
-    // call ceph_open_for_write to open the file
-    // store the file handle
-    // store the client pointer
-    // look up and store the block size while we're at it
-    // the following code's old. kill it
+      // basic pseudocode:
+      // call ceph_open_for_write to open the file
+      // store the file handle
+      // store the client pointer
+      // look up and store the block size while we're at it
+      // the following code's old. kill it
 
-    this.store = store;
-    this.path = path;
-    this.blockSize = blockSize;
-    this.backupFile = newBackupFile();
-    this.backupStream = new FileOutputStream(backupFile);
-    this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
-    this.outBuf = new byte[bufferSize];
+      this.store = store;
+      this.path = path;
+      this.blockSize = blockSize;
+      this.backupFile = newBackupFile();
+      this.backupStream = new FileOutputStream(backupFile);
+      this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
+      this.outBuf = new byte[bufferSize];
 
-    }*/
+      }*/
 
 
-    // The file handle 
+  // The file handle 
   public CephOutputStream(Configuration conf, long clientp, int fh) {
-      clientPointer = clientp;
-      fileHandle = fh;
-      //fileLength = flength;
-      closed = false;
+    clientPointer = clientp;
+    fileHandle = fh;
+    //fileLength = flength;
+    closed = false;
   }
 
   // possibly useful for the local copy, write later thing?
@@ -108,92 +108,90 @@ class CephOutputStream extends FSDataOutputStream {
     return result;
   } 
 
-
-  @Override
-  public long getPos() throws IOException {
+    public long getPos() throws IOException {
     // change to get the position from Ceph client
-      return ceph_getpos();
+    return ceph_getpos();
   }
 
-    // writes a byte
+  // writes a byte
   @Override
-  public synchronized void write(int b) throws IOException {
+    public synchronized void write(int b) throws IOException {
       //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle);
 
-    if (closed) {
-      throw new IOException("CephOutputStream.write: cannot write " + 
-                           "a byte to fd " + fileHandle + ": stream closed");
-    }
-    // Stick the byte in a buffer and write it
-    byte buf[] = new byte[1];
-    buf[0] = (byte) b;    
-    int result = ceph_write(buf, 0, 1);
-    if (1 != result)
+      if (closed) {
+       throw new IOException("CephOutputStream.write: cannot write " + 
+                             "a byte to fd " + fileHandle + ": stream closed");
+      }
+      // Stick the byte in a buffer and write it
+      byte buf[] = new byte[1];
+      buf[0] = (byte) b;    
+      int result = ceph_write(buf, 0, 1);
+      if (1 != result)
        System.out.println("CephOutputStream.write: failed writing a single byte to fd "
                           + fileHandle + ": Ceph write() result = " + result);
-    return;
-  }
+      return;
+    }
 
   @Override
-  public synchronized void write(byte buf[], int off, int len) throws IOException {
+    public synchronized void write(byte buf[], int off, int len) throws IOException {
       //System.out.println("CephOutputStream.write: writing " + len + 
       //                " bytes to fd " + fileHandle);
 
       // make sure stream is open
       if (closed) {
-         throw new IOException("CephOutputStream.write: cannot write " + len + 
-                               "bytes to fd " + fileHandle + ": stream closed");
+       throw new IOException("CephOutputStream.write: cannot write " + len + 
+                             "bytes to fd " + fileHandle + ": stream closed");
       }
 
       // sanity check
       if (null == buf) {
-         throw new NullPointerException("CephOutputStream.write: cannot write " + len + 
-                                        "bytes to fd " + fileHandle + ": write buffer is null");
+       throw new NullPointerException("CephOutputStream.write: cannot write " + len + 
+                                      "bytes to fd " + fileHandle + ": write buffer is null");
       }
 
-    // check for proper index bounds
-    if((off < 0) || (len < 0) || (off + len > buf.length)) {
+      // check for proper index bounds
+      if((off < 0) || (len < 0) || (off + len > buf.length)) {
        throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
                                            + "write length is " + len + ", buffer offset is " 
                                            + off +", and buffer size is " + buf.length);
-       }
+      }
 
-    // write!
-    int result = ceph_write(buf, off, len);
-    if (result < 0) {
+      // write!
+      int result = ceph_write(buf, off, len);
+      if (result < 0) {
        throw new IOException("CephOutputStream.write: Write of " + len + 
-                               "bytes to fd " + fileHandle + " failed");
-    }
-    if (result != len) {
+                             "bytes to fd " + fileHandle + " failed");
+      }
+      if (result != len) {
        throw new IOException("CephOutputStream.write: Write of " + len + 
-                               "bytes to fd " + fileHandle + "was incomplete:  only "
+                             "bytes to fd " + fileHandle + "was incomplete:  only "
                              + result + " of " + len + " bytes were written.");
+      }
+      return; 
     }
-    return; 
-  }
    
   @Override
-  public synchronized void flush() throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed");
+    public synchronized void flush() throws IOException {
+      if (closed) {
+       throw new IOException("Stream closed");
+      }
+      return;
     }
-    return;
-  }
     
   @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed");
-    }
+    public synchronized void close() throws IOException {
+      if (closed) {
+       throw new IOException("Stream closed");
+      }
 
-    int result = ceph_close();
-    if (result != 0) {
+      int result = ceph_close();
+      if (result != 0) {
        throw new IOException("Close failed!");
-    }
+      }
        
-    closed = true;
+      closed = true;
 
-  }
+    }
     
 
 }