From: Greg Farnum Date: Tue, 18 Aug 2009 15:41:40 +0000 (-0700) Subject: Hadoop: Javadoc and cleanup in CephInput/OutputStream. X-Git-Tag: v0.13~30^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bca0fbac2d8a1d8d035e3d71bd47dbffd32c947e;p=ceph.git Hadoop: Javadoc and cleanup in CephInput/OutputStream. --- diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java index 96c176625c0c..877d528a2b3b 100644 --- a/src/client/hadoop/ceph/CephInputStream.java +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -162,6 +162,7 @@ class CephInputStream extends FSInputStream { + fileHandle + ": succeeded in reading " + result + " bytes"); return result; } + /** * Close the CephInputStream and release the associated filehandle. */ @@ -180,33 +181,6 @@ class CephInputStream extends FSInputStream { debug("CephOutputStream.close:exit"); } - /** - * Marks are not supported. - * @return false - */ - @Override - public boolean markSupported() { - return false; - } - - /** - * Since marking isn't supported, this function throws an IOException. - * @throws IOException whenever called. - */ - @Override - public void mark(int readLimit) { - throw new IOException("Mark not supported"); - } - - /** - * Since marks aren't supported, this function throws an IOException. - * @throws IOException whenever called. - */ - @Override - public void reset() throws IOException { - throw new IOException("Mark not supported"); - } - private void debug(String out) { if (debug) System.out.println(out); } diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java index 0df04882638e..0f4f7b1ec124 100644 --- a/src/client/hadoop/ceph/CephOutputStream.java +++ b/src/client/hadoop/ceph/CephOutputStream.java @@ -55,28 +55,11 @@ class CephOutputStream extends OutputStream { private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length); - /* public CephOutputStream(Configuration conf, FileSystemStore store, - Path path, long blockSize, Progressable progress) throws IOException { - - // basic pseudocode: - // call ceph_open_for_write to open the file - // store the file handle - // store the client pointer - // look up and store the block size while we're at it - // the following code's old. kill it - - this.store = store; - this.path = path; - this.blockSize = blockSize; - this.backupFile = newBackupFile(); - this.backupStream = new FileOutputStream(backupFile); - this.bufferSize = conf.getInt("io.file.buffer.size", 4096); - this.outBuf = new byte[bufferSize]; - - }*/ - - - // The file handle + /** + * Construct the CephOutputStream. + * @param conf The FileSystem configuration. + * @param fh The Ceph filehandle to connect to. + */ public CephOutputStream(Configuration conf, int fh) { System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); @@ -85,7 +68,7 @@ class CephOutputStream extends OutputStream { closed = false; } - //Ceph requires that things be closed before it can shutdown, + //Ceph likes things to be closed before it shuts down, //so closing the IOStream stuff voluntarily is good public void finalize () throws Throwable { try { @@ -94,23 +77,23 @@ class CephOutputStream extends OutputStream { finally { super.finalize();} } - // possibly useful for the local copy, write later thing? - // keep it around for now - private File newBackupFile() throws IOException { - File result = File.createTempFile("s3fs-out", ""); - result.deleteOnExit(); - return result; - } - - public long getPos() throws IOException { - // change to get the position from Ceph client + /** + * Get the current position in the file. + * @return The file offset in bytes. + */ + public long getPos() throws IOException { return ceph_getpos(fileHandle); } - // writes a byte + /** + * Write a byte. + * @param b The byte to write. + * @throws IOException If you have closed the CephOutputStream or the + * write fails. + */ @Override - public synchronized void write(int b) throws IOException { - //System.out.println("CephOutputStream.write: writing a single byte to fd " + fileHandle); + public synchronized void write(int b) throws IOException { + debug("CephOutputStream.write: writing a single byte to fd " + fileHandle); if (closed) { throw new IOException("CephOutputStream.write: cannot write " + @@ -121,16 +104,25 @@ class CephOutputStream extends OutputStream { buf[0] = (byte) b; int result = ceph_write(fileHandle, buf, 0, 1); if (1 != result) - System.out.println("CephOutputStream.write: failed writing a single byte to fd " - + fileHandle + ": Ceph write() result = " + result); + debug("CephOutputStream.write: failed writing a single byte to fd " + + fileHandle + ": Ceph write() result = " + result); return; } + /** + * Write a byte buffer into the Ceph file. + * @param buf[] the byte array to write from + * @param off the position in the file to start writing at. + * @param len The number of bytes to actually write. + * @throws IOException if you have closed the CephOutputStream, or + * the write fails. + * @throws NullPointerException if buf is null. + * @throws IndexOutOfBoundsException if len > buf.length. + */ @Override - public synchronized void write(byte buf[], int off, int len) throws IOException { - //System.out.println("CephOutputStream.write: writing " + len + - // " bytes to fd " + fileHandle); - + public synchronized void write(byte buf[], int off, int len) throws IOException { + debug("CephOutputStream.write: writing " + len + + " bytes to fd " + fileHandle); // make sure stream is open if (closed) { throw new IOException("CephOutputStream.write: cannot write " + len + @@ -164,16 +156,24 @@ class CephOutputStream extends OutputStream { return; } + /** + * Flush the written data. It doesn't actually do anything; all writes are synchronous. + * @throws IOException if you've closed the stream. + */ @Override - public synchronized void flush() throws IOException { - if (closed) { - throw new IOException("Stream closed"); - } - return; + public synchronized void flush() throws IOException { + if (closed) { + throw new IOException("Stream closed"); } - + return; + } + + /** + * Close the CephOutputStream. + * @throws IOException if Ceph somehow returns an error. In current code it can't. + */ @Override - public synchronized void close() throws IOException { + public synchronized void close() throws IOException { debug("CephOutputStream.close:enter"); if (closed) { throw new IOException("Stream closed");