]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Hadoop: Move CephIOStream native methods into CephFS/CephTalker
authorGreg Farnum <gregf@hq.newdream.net>
Fri, 16 Oct 2009 01:19:39 +0000 (18:19 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Fri, 16 Oct 2009 19:21:25 +0000 (12:21 -0700)
src/client/hadoop/CephFSInterface.cc
src/client/hadoop/CephFSInterface.h
src/client/hadoop/ceph/CephFS.java
src/client/hadoop/ceph/CephFileSystem.java
src/client/hadoop/ceph/CephInputStream.java
src/client/hadoop/ceph/CephOutputStream.java
src/client/hadoop/ceph/CephTalker.java

index 64dff547d814e16ddb633c054ddfea52c478668b..fdcbbfdfafbd7211c97c2f5ec861680b5188faf8 100644 (file)
@@ -718,7 +718,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
 }
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_read
  * Signature: (JI[BII)I
  * Reads into the given byte array from the current position.
@@ -732,7 +732,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
  * Returns: the number of bytes read on success (as jint),
  *  or an error code otherwise.
  */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
   (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
 {
   dout(10) << "In read" << dendl;
@@ -760,7 +760,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
 }
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_seek_from_start
  * Signature: (JIJ)J
  * Seeks to the given position in the given file.
@@ -770,16 +770,16 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
  * Returns: the new position (as a jlong) of the filehandle on success,
  *  or a negative error code on failure.
  */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
   (JNIEnv *env, jobject obj, jint fh, jlong pos)
 {
-  dout(10) << "In CephInputStream::seek_from_start" << dendl;
+  dout(10) << "In CephTalker::seek_from_start" << dendl;
 
   return ceph_lseek(fh, pos, SEEK_SET);
 }
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_getpos
  * Signature: (I)J
  *
@@ -787,64 +787,17 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1see
  * Returns: jlong current file position on success, or a
  *  negative error code on failure.
  */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
   (JNIEnv *env, jobject obj, jint fh)
 {
-  dout(10) << "In CephInputStream::ceph_getpos" << dendl;
+  dout(10) << "In CephTalker::ceph_getpos" << dendl;
 
   // seek a distance of 0 to get current offset
   return ceph_lseek(fh, 0, SEEK_CUR);  
 }
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
- * Method:    ceph_close
- * Signature: (JI)I
- * Closes the given file. Returns 0 on success (it will always succeed
- *  unless it dies in a horrible fiery assert failure).
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
-  (JNIEnv *env, jobject obj, jint fh)
-{
-  dout(10) << "In CephInputStream::ceph_close" << dendl;
-
-  return ceph_close(fh);
-}
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
- * Method:    ceph_getpos
- * Signature: (JI)J
- * Get the current position in a file (as a jlong) of a given filehandle.
- * Returns: jlong current file position on success, or a
- *  negative error code on failure.
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
-  (JNIEnv *env, jobject obj, jint fh)
-{
-  dout(10) << "In CephOutputStream::ceph_getpos" << dendl;
-
-  // seek a distance of 0 to get current offset
-  return ceph_lseek(fh, 0, SEEK_CUR);
-}
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
- * Method:    ceph_close
- * Signature: (I)I
- * Closes the given file. Returns 0 on success (it will always succeed
- *  unless it dies in a horrible fiery assert failure).
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
-  (JNIEnv *env, jobject obj, jint fh)
-{
-  dout(10) << "In CephOutputStream::ceph_close" << dendl;
-
-  return ceph_close(fh);
-}
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_write
  * Signature: (I[BII)I
  * Write the given buffer contents to the given filehandle.
@@ -856,7 +809,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1clo
  * Returns: jint, on success the number of bytes written, on failure
  *  a negative error code.
  */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
   (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
 {
   dout(10) << "In write" << dendl;
index 3dd57b270457ef23661a4bd9c8beadbb2485d080..0db93437b2800b0a72e1dbcdf14eb5d104f62164 100644 (file)
@@ -199,59 +199,35 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
   (JNIEnv *, jobject, jstring, jlong, jlong);
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_read
  * Signature: (I[BII)I
  */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
   (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_seek_from_start
  * Signature: (IJ)J
  */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
   (JNIEnv *, jobject, jint, jlong);
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
- * Method:    ceph_getpos
- * Signature: (I)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
-  (JNIEnv *, jobject, jint);
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephInputStream
- * Method:    ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
-  (JNIEnv *, jobject, jint);
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_getpos
  * Signature: (I)J
  */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
   (JNIEnv *, jobject, jint);
 
 /*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
- * Method:    ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
-  (JNIEnv *, jobject, jint);
-
-/*
- * Class:     org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_write
  * Signature: (I[BII)I
  */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
   (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
 
 #ifdef __cplusplus
index 0360e733959895c29e7e1de0cc7f024159738661..544cb93faf2ef7794ec2fa71c96c050bca644fc6 100644 (file)
@@ -146,7 +146,8 @@ abstract class CephFS {
         */
   abstract protected int ceph_open_for_overwrite(String path, int mode);
        /*
-        * Closes a given filehandle.
+        * Closes the given file. Returns 0 on success, or a negative
+        * error code otherwise.
         */
   abstract protected int ceph_close(int filehandle);
        /*
@@ -204,7 +205,45 @@ abstract class CephFS {
         * Returns: 0 if successful, an error code otherwise.
         */
   abstract protected int ceph_setTimes(String path, long mtime, long atime);
+       /*
+        * Get the current position in a file (as a long) of a given filehandle.
+        * Returns: (long) current file position on success, or a
+        *  negative error code on failure.
+        */
+  abstract protected long ceph_getpos(int fh);
+       /*
+        * Write the given buffer contents to the given filehandle.
+        * Inputs:
+        *  int fh: The filehandle to write to.
+        *  byte[] buffer: The buffer to write from
+        *  int buffer_offset: The position in the buffer to write from
+        *  int length: The number of (sequential) bytes to write.
+        * Returns: int, on success the number of bytes written, on failure
+        *  a negative error code.
+        */
+  abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 
+       /*
+        * Reads into the given byte array from the current position.
+        * Inputs:
+        *  int fh: the filehandle to read from
+        *  byte[] buffer: the byte array to read into
+        *  int buffer_offset: where in the buffer to start writing
+        *  int length: how much to read.
+        * There'd better be enough space in the buffer to write all
+        * the data from the given offset!
+        * Returns: the number of bytes read on success (as an int),
+        *  or an error code otherwise.  */
+  protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
+       /*
+        * Seeks to the given position in the given file.
+        * Inputs:
+        *  int fh: The filehandle to seek in.
+        *  long pos: The position to seek to.
+        * Returns: the new position (as a long) of the filehandle on success,
+        *  or a negative error code on failure.         */
+  protected native long ceph_seek_from_start(int fh, long pos);
+    
        protected void debug(String statement, int priority) {
     if (debug) System.err.println(statement);
                switch(priority) {
index ed1bc15dbcd3fcfe842835cea5b52191d133adc1..8760f3837ed441568794896b07d58eb42533927e 100644 (file)
@@ -80,7 +80,6 @@ public class CephFileSystem extends FileSystem {
    */
   public CephFileSystem() {
     root = new Path("/");
-    ceph.debug("CephFileSystem:exit", ceph.DEBUG);
   }
 
   /**
@@ -104,7 +103,6 @@ public class CephFileSystem extends FileSystem {
    */
   @Override
   public void initialize(URI uri, Configuration conf) throws IOException {
-    ceph.debug("initialize:enter", ceph.DEBUG);
     if (!initialized) {
       super.initialize(uri, conf);
       setConf(conf);
@@ -205,7 +203,7 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("append: Open for append failed on path \"" +
                                                                                                                abs_path.toString() + "\"");
     }
-    CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
+    CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd);
     ceph.debug("append:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
   }
@@ -509,7 +507,7 @@ public class CephFileSystem extends FileSystem {
     }
       
     // Step 4: create the stream
-    OutputStream cephOStream = new CephOutputStream(getConf(), fh);
+    OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh);
     ceph.debug("create:exit", ceph.DEBUG);
     return new FSDataOutputStream(cephOStream, statistics);
        }
@@ -552,7 +550,7 @@ public class CephFileSystem extends FileSystem {
       throw new IOException("Failed to get file size for file " + abs_path.toString() + 
                                                                                                                " but succeeded in opening file. Something bizarre is going on.");
     }
-    FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
+    FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size);
     ceph.debug("open:exit", ceph.DEBUG);
     return new FSDataInputStream(cephIStream);
        }
index 4204feabeebbaf6a98a06926b6c568632b692084..2d4447aa47c5eba67f23585cf5d6f72049223ded 100644 (file)
@@ -38,40 +38,8 @@ public class CephInputStream extends FSInputStream {
 
   private long fileLength;
 
-  private boolean debug;
-
-       /*
-        * Reads into the given byte array from the current position.
-        * Inputs:
-        *  int fh: the filehandle to read from
-        *  byte[] buffer: the byte array to read into
-        *  int buffer_offset: where in the buffer to start writing
-        *  int length: how much to read.
-        * There'd better be enough space in the buffer to write all
-        * the data from the given offset!
-        * Returns: the number of bytes read on success (as an int),
-        *  or an error code otherwise.  */
-  private native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-       /*
-        * Seeks to the given position in the given file.
-        * Inputs:
-        *  int fh: The filehandle to seek in.
-        *  long pos: The position to seek to.
-        * Returns: the new position (as a long) of the filehandle on success,
-        *  or a negative error code on failure.         */
-  private native long ceph_seek_from_start(int fh, long pos);
-       /*
-        * Get the current position in a file (as a long) of a given filehandle.
-        * Returns: (long) current file position on success, or a
-        *  negative error code on failure.
-        */
-  private native long ceph_getpos(int fh);
-       /*
-        * Closes the given file. Returns 0 on success, or a negative
-        * error code otherwise.
-        */
-  private native int ceph_close(int fh);
-    
+       private CephFS ceph;
+
   /**
    * Create a new CephInputStream.
    * @param conf The system configuration. Unused.
@@ -79,17 +47,16 @@ public class CephInputStream extends FSInputStream {
    * @param flength The current length of the file. If the length changes
    * you will need to close and re-open it to access the new data.
    */
-  public CephInputStream(Configuration conf, int fh, long flength) {
-    System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-    System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+  public CephInputStream(Configuration conf, CephFS cephfs,
+                                                                                                int fh, long flength) {
     // Whoever's calling the constructor is responsible for doing the actual ceph_open
     // call and providing the file handle.
     fileLength = flength;
     fileHandle = fh;
     closed = false;
-    debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
-    debug("CephInputStream constructor: initializing stream with fh "
-                                                                               + fh + " and file length " + flength);
+               ceph = cephfs;
+    ceph.debug("CephInputStream constructor: initializing stream with fh "
+                                                                               + fh + " and file length " + flength, ceph.DEBUG);
       
   }
   /** Ceph likes things to be closed before it shuts down,
@@ -103,7 +70,7 @@ public class CephInputStream extends FSInputStream {
   }
 
   public synchronized long getPos() throws IOException {
-    return ceph_getpos(fileHandle);
+    return ceph.ceph_getpos(fileHandle);
   }
   /**
    * Find the number of bytes remaining in the file.
@@ -114,13 +81,13 @@ public class CephInputStream extends FSInputStream {
     }
 
   public synchronized void seek(long targetPos) throws IOException {
-    debug("CephInputStream.seek: Seeking to position " + targetPos +
-                                                                               " on fd " + fileHandle);
+    ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
+                                                                               " on fd " + fileHandle, ceph.TRACE);
     if (targetPos > fileLength) {
       throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
                                                                                                                " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
     }
-    ceph_seek_from_start(fileHandle, targetPos);
+    ceph.ceph_seek_from_start(fileHandle, targetPos);
   }
 
   /**
@@ -140,8 +107,8 @@ public class CephInputStream extends FSInputStream {
    */
   @Override
        public synchronized int read() throws IOException {
-      debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
-                                                                                       + " by calling general read function");
+      ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+                                                                                       + " by calling general read function", ceph.TRACE);
 
       byte result[] = new byte[1];
       if (getPos() >= fileLength) return -1;
@@ -159,11 +126,13 @@ public class CephInputStream extends FSInputStream {
    */
   @Override
        public synchronized int read(byte buf[], int off, int len) throws IOException {
-      debug("CephInputStream.read: Reading " + len  + " bytes from fd " + fileHandle);
+      ceph.debug("CephInputStream.read: Reading " + len  +
+                                                                " bytes from fd " + fileHandle, ceph.TRACE);
       
       if (closed) {
                                throw new IOException("CephInputStream.read: cannot read " + len  + 
-                                                                                                                       " bytes from fd " + fileHandle + ": stream closed");
+                                                                                                                       " bytes from fd " + fileHandle +
+                                                                                                                       ": stream closed");
       }
       if (null == buf) {
                                throw new IOException("Read buffer is null");
@@ -172,27 +141,30 @@ public class CephInputStream extends FSInputStream {
       // check for proper index bounds
       if((off < 0) || (len < 0) || (off + len > buf.length)) {
                                throw new IOException("CephInputStream.read: Indices out of bounds for read: "
-                                                                                                                                                                               + "read length is " + len + ", buffer offset is " 
-                                                                                                                                                                               + off +", and buffer size is " + buf.length);
+                                                                                                                       + "read length is " + len +
+                                                                                                                       ", buffer offset is " + off +
+                                                                                                                       ", and buffer size is " + buf.length);
       }
       
       // ensure we're not past the end of the file
       if (getPos() >= fileLength) 
                                {
-                                       debug("CephInputStream.read: cannot read " + len  + 
-                                                                                                       " bytes from fd " + fileHandle + ": current position is " +
-                                                                                                       getPos() + " and file length is " + fileLength);
+                                       ceph.debug("CephInputStream.read: cannot read " + len  + 
+                                                                                " bytes from fd " + fileHandle + ": current position is "
+                                                                                + getPos() + " and file length is " + fileLength,
+                                                                                ceph.WARN);
          
                                        return -1;
                                }
       // actually do the read
-      int result = ceph_read(fileHandle, buf, off, len);
+      int result = ceph.ceph_read(fileHandle, buf, off, len);
       if (result < 0)
-                               debug("CephInputStream.read: Reading " + len
-                                                                                               + " bytes from fd " + fileHandle + " failed.");
+                               ceph.debug("CephInputStream.read: Reading " + len
+                                                                        + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
 
-      debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
-                                                                                       + fileHandle + ": succeeded in reading " + result + " bytes");   
+      ceph.debug("CephInputStream.read: Reading " + len  + " bytes from fd " 
+                                                                + fileHandle + ": succeeded in reading " + result + " bytes",
+                                                                ceph.TRACE);   
       return result;
                }
 
@@ -201,20 +173,16 @@ public class CephInputStream extends FSInputStream {
    */
   @Override
        public void close() throws IOException {
-    debug("CephOutputStream.close:enter");
+    ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
     if (closed) {
       throw new IOException("Stream closed");
     }
 
-    int result = ceph_close(fileHandle);
+    int result = ceph.ceph_close(fileHandle);
     if (result != 0) {
       throw new IOException("Close failed!");
     }
     closed = true;
-    debug("CephOutputStream.close:exit");
-  }
-
-  private void debug(String out) {
-    System.err.println(out);
+    ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
   }
 }
index 4694c1f90e4e42d5b9a1e103f3e5f085b7d6c6a0..51fa92cefe74114ff29e6d4b339587eae2035704 100644 (file)
@@ -35,45 +35,19 @@ public class CephOutputStream extends OutputStream {
 
   private boolean closed;
 
-  private int fileHandle;
-
-  private boolean debug;
-
-       /*
-        * Get the current position in a file (as a long) of a given filehandle.
-        * Returns: (long) current file position on success, or a
-        *  negative error code on failure.
-        */
-  private native long ceph_getpos(int fh);
-       /*
-        * Closes the given file. Returns 0 on success, or a negative
-        * error code otherwise.
-        */
-  private native int ceph_close(int fh);
-       /*
-        * Write the given buffer contents to the given filehandle.
-        * Inputs:
-        *  int fh: The filehandle to write to.
-        *  byte[] buffer: The buffer to write from
-        *  int buffer_offset: The position in the buffer to write from
-        *  int length: The number of (sequential) bytes to write.
-        * Returns: int, on success the number of bytes written, on failure
-        *  a negative error code.
-        */
-  private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
+       private CephFS ceph;
 
+  private int fileHandle;
 
   /**
    * Construct the CephOutputStream.
    * @param conf The FileSystem configuration.
    * @param fh The Ceph filehandle to connect to.
    */
-  public CephOutputStream(Configuration conf,  int fh) {
-    System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
-    System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+  public CephOutputStream(Configuration conf, CephFS cephfs, int fh) {
+               ceph = cephfs;
     fileHandle = fh;
     closed = false;
-    debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
   }
 
   /**Ceph likes things to be closed before it shuts down,
@@ -91,7 +65,7 @@ public class CephOutputStream extends OutputStream {
    * @return The file offset in bytes.
    */
   public long getPos() throws IOException {
-    return ceph_getpos(fileHandle);
+    return ceph.ceph_getpos(fileHandle);
   }
 
   /**
@@ -102,7 +76,8 @@ public class CephOutputStream extends OutputStream {
    */
   @Override
        public synchronized void write(int b) throws IOException {
-      debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
+      ceph.debug("CephOutputStream.write: writing a single byte to fd "
+                                                                + fileHandle, ceph.TRACE);
 
       if (closed) {
                                throw new IOException("CephOutputStream.write: cannot write " + 
@@ -111,10 +86,11 @@ public class CephOutputStream extends OutputStream {
       // Stick the byte in a buffer and write it
       byte buf[] = new byte[1];
       buf[0] = (byte) b;    
-      int result = ceph_write(fileHandle, buf, 0, 1);
+      int result = ceph.ceph_write(fileHandle, buf, 0, 1);
       if (1 != result)
-                               debug("CephOutputStream.write: failed writing a single byte to fd "
-                                                                                               + fileHandle + ": Ceph write() result = " + result);
+                               ceph.debug("CephOutputStream.write: failed writing a single byte to fd "
+                                                                                               + fileHandle + ": Ceph write() result = " + result,
+ceph.WARN);
       return;
     }
 
@@ -129,8 +105,8 @@ public class CephOutputStream extends OutputStream {
    */
   @Override
        public synchronized void write(byte buf[], int off, int len) throws IOException {
-                       debug("CephOutputStream.write: writing " + len + 
-                                                                                       " bytes to fd " + fileHandle);
+                       ceph.debug("CephOutputStream.write: writing " + len + 
+                                                                                       " bytes to fd " + fileHandle, ceph.TRACE);
       // make sure stream is open
       if (closed) {
                                throw new IOException("CephOutputStream.write: cannot write " + len + 
@@ -151,7 +127,7 @@ public class CephOutputStream extends OutputStream {
       }
 
       // write!
-      int result = ceph_write(fileHandle, buf, off, len);
+      int result = ceph.ceph_write(fileHandle, buf, off, len);
       if (result < 0) {
                                throw new IOException("CephOutputStream.write: Write of " + len + 
                                                                                                                        "bytes to fd " + fileHandle + " failed");
@@ -182,21 +158,18 @@ public class CephOutputStream extends OutputStream {
    */
   @Override
        public synchronized void close() throws IOException {
-      debug("CephOutputStream.close:enter");
+      ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
       if (closed) {
                                throw new IOException("Stream closed");
       }
 
-      int result = ceph_close(fileHandle);
+      int result = ceph.ceph_close(fileHandle);
       if (result != 0) {
                                throw new IOException("Close failed!");
       }
        
       closed = true;
-      debug("CephOutputStream.close:exit");
+      ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
     }
 
-  private void debug(String out) {
-    System.err.println(out);
-  }
 }
index cbb821596aa38af4558cd034d36b0eb6bf023e9d..0e6ee502ef21b7f4531af8f17daf252e5632a64f 100644 (file)
@@ -52,4 +52,6 @@ class CephTalker extends CephFS {
   protected native int ceph_replication(String path);
   protected native String ceph_hosts(int fh, long offset);
   protected native int ceph_setTimes(String path, long mtime, long atime);
+  protected native long ceph_getpos(int fh);
+  protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
 }