From 035f80312cc560eaecd1bd06eb3ddff174f75466 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Wed, 22 Jul 2009 11:24:36 -0700 Subject: [PATCH] Hadoop: CephInputStream retabbing and add seekToNewSource stub. --- src/client/hadoop/ceph/CephInputStream.java | 126 ++++++++++---------- 1 file changed, 65 insertions(+), 61 deletions(-) diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java index e3b4f1f960775..79b747c6a4e6b 100644 --- a/src/client/hadoop/ceph/CephInputStream.java +++ b/src/client/hadoop/ceph/CephInputStream.java @@ -1,3 +1,4 @@ +// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- package org.apache.hadoop.fs.ceph; import java.io.BufferedOutputStream; @@ -21,7 +22,7 @@ class CephInputStream extends FSInputStream { private int bufferSize; - //private Block[] blocks; + //private Block[] blocks; private boolean closed; @@ -31,11 +32,11 @@ class CephInputStream extends FSInputStream { private long fileLength; - //private long pos = 0; + //private long pos = 0; - //private DataInputStream blockStream; + //private DataInputStream blockStream; - //private long blockEnd = -1; + //private long blockEnd = -1; private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length); private native long ceph_seek_from_start(long client, int fh, long pos); @@ -43,64 +44,67 @@ class CephInputStream extends FSInputStream { private native int ceph_close(long client, int fh); private int ceph_read(byte[] buffer, int buffer_offset, int length) - { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); } + { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); } private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); } private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); } private int ceph_close() { return ceph_close(clientPointer, fileHandle); } - /* - public S3InputStream(Configuration conf, FileSystemStore store, - INode inode) { + /* + public S3InputStream(Configuration conf, FileSystemStore store, + INode inode) { this.store = store; this.blocks = inode.getBlocks(); for (Block block : blocks) { - this.fileLength += block.getLength(); + this.fileLength += block.getLength(); } this.bufferSize = conf.getInt("io.file.buffer.size", 4096); - } - */ + } + */ public CephInputStream(Configuration conf, long clientp, int fh, long flength) { - // Whoever's calling the constructor is responsible for doing the actual ceph_open - // call and providing the file handle. - clientPointer = clientp; - fileLength = flength; - fileHandle = fh; - //System.out.println("CephInputStream constructor: initializing stream with fh " - // + fh + " and file length " + flength); + // Whoever's calling the constructor is responsible for doing the actual ceph_open + // call and providing the file handle. + clientPointer = clientp; + fileLength = flength; + fileHandle = fh; + //System.out.println("CephInputStream constructor: initializing stream with fh " + // + fh + " and file length " + flength); - // TODO: Then what do we need from the config? The buffer size maybe? - // Anything? Bueller? + // TODO: Then what do we need from the config? The buffer size maybe? + // Anything? Bueller? } - @Override public synchronized long getPos() throws IOException { return ceph_getpos(); } @Override - public synchronized int available() throws IOException { - return (int) (fileLength - getPos()); - } + public synchronized int available() throws IOException { + return (int) (fileLength - getPos()); + } - @Override public synchronized void seek(long targetPos) throws IOException { - //System.out.println("CephInputStream.seek: Seeking to position " + targetPos + - // " on fd " + fileHandle); + //System.out.println("CephInputStream.seek: Seeking to position " + targetPos + + // " on fd " + fileHandle); if (targetPos > fileLength) { throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos + - " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength); } ceph_seek_from_start(targetPos); } + //method stub obviously + public synchronized boolean seekToNewSource(long targetPos) { + return true; + } + // reads a byte @Override - public synchronized int read() throws IOException { + public synchronized int read() throws IOException { //System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle // + " by calling general read function"); @@ -108,61 +112,61 @@ class CephInputStream extends FSInputStream { if (getPos() >= fileLength) return -1; if (-1 == read(result, 0, 1)) return -1; return result[0]; - } + } @Override - public synchronized int read(byte buf[], int off, int len) throws IOException { + public synchronized int read(byte buf[], int off, int len) throws IOException { //System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle); - if (closed) { - throw new IOException("CephInputStream.read: cannot read " + len + - " bytes from fd " + fileHandle + ": stream closed"); - } - if (null == buf) { + if (closed) { + throw new IOException("CephInputStream.read: cannot read " + len + + " bytes from fd " + fileHandle + ": stream closed"); + } + if (null == buf) { throw new NullPointerException("Read buffer is null"); - } + } - // check for proper index bounds - if((off < 0) || (len < 0) || (off + len > buf.length)) { + // check for proper index bounds + if((off < 0) || (len < 0) || (off + len > buf.length)) { throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: " + "read length is " + len + ", buffer offset is " + off +", and buffer size is " + buf.length); - } + } - // ensure we're not past the end of the file - if (getPos() >= fileLength) + // ensure we're not past the end of the file + if (getPos() >= fileLength) { - System.out.println("CephInputStream.read: cannot read " + len + - " bytes from fd " + fileHandle + ": current position is " + - getPos() + " and file length is " + fileLength); + System.out.println("CephInputStream.read: cannot read " + len + + " bytes from fd " + fileHandle + ": current position is " + + getPos() + " and file length is " + fileLength); - return -1; + return -1; } - // actually do the read - int result = ceph_read(buf, off, len); - if (result < 0) - System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " - + fileHandle + " failed."); - else {} - // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " - // + fileHandle + ": succeeded in reading " + result + " bytes"); + // actually do the read + int result = ceph_read(buf, off, len); + if (result < 0) + System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + + fileHandle + " failed."); + else {} + // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + // + fileHandle + ": succeeded in reading " + result + " bytes"); - return result; - } + return result; + } @Override - public void close() throws IOException { + public void close() throws IOException { if (closed) { throw new IOException("Stream closed"); } int result = ceph_close(); if (result != 0) { - throw new IOException("Close failed!"); + throw new IOException("Close failed!"); } closed = true; @@ -172,17 +176,17 @@ class CephInputStream extends FSInputStream { * We don't support marks. */ @Override - public boolean markSupported() { + public boolean markSupported() { return false; } @Override - public void mark(int readLimit) { + public void mark(int readLimit) { // Do nothing } @Override - public void reset() throws IOException { + public void reset() throws IOException { throw new IOException("Mark not supported"); } -- 2.39.5