+// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
package org.apache.hadoop.fs.ceph;
import java.io.BufferedOutputStream;
private int bufferSize;
- //private Block[] blocks;
+ //private Block[] blocks;
private boolean closed;
private long fileLength;
- //private long pos = 0;
+ //private long pos = 0;
- //private DataInputStream blockStream;
+ //private DataInputStream blockStream;
- //private long blockEnd = -1;
+ //private long blockEnd = -1;
private native int ceph_read(long client, int fh, byte[] buffer, int buffer_offset, int length);
private native long ceph_seek_from_start(long client, int fh, long pos);
private native int ceph_close(long client, int fh);
private int ceph_read(byte[] buffer, int buffer_offset, int length)
- { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
+ { return ceph_read(clientPointer, fileHandle, buffer, buffer_offset, length); }
private long ceph_seek_from_start(long pos) { return ceph_seek_from_start(clientPointer, fileHandle, pos); }
private long ceph_getpos() { return ceph_getpos(clientPointer, fileHandle); }
private int ceph_close() { return ceph_close(clientPointer, fileHandle); }
- /*
- public S3InputStream(Configuration conf, FileSystemStore store,
- INode inode) {
+ /*
+ public S3InputStream(Configuration conf, FileSystemStore store,
+ INode inode) {
this.store = store;
this.blocks = inode.getBlocks();
for (Block block : blocks) {
- this.fileLength += block.getLength();
+ this.fileLength += block.getLength();
}
this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
- }
- */
+ }
+ */
public CephInputStream(Configuration conf, long clientp, int fh, long flength) {
- // Whoever's calling the constructor is responsible for doing the actual ceph_open
- // call and providing the file handle.
- clientPointer = clientp;
- fileLength = flength;
- fileHandle = fh;
- //System.out.println("CephInputStream constructor: initializing stream with fh "
- // + fh + " and file length " + flength);
+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
+ // call and providing the file handle.
+ clientPointer = clientp;
+ fileLength = flength;
+ fileHandle = fh;
+ //System.out.println("CephInputStream constructor: initializing stream with fh "
+ // + fh + " and file length " + flength);
- // TODO: Then what do we need from the config? The buffer size maybe?
- // Anything? Bueller?
+ // TODO: Then what do we need from the config? The buffer size maybe?
+ // Anything? Bueller?
}
- @Override
public synchronized long getPos() throws IOException {
return ceph_getpos();
}
@Override
- public synchronized int available() throws IOException {
- return (int) (fileLength - getPos());
- }
+ public synchronized int available() throws IOException {
+ return (int) (fileLength - getPos());
+ }
- @Override
public synchronized void seek(long targetPos) throws IOException {
- //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
- // " on fd " + fileHandle);
+ //System.out.println("CephInputStream.seek: Seeking to position " + targetPos +
+ // " on fd " + fileHandle);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
- " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
}
ceph_seek_from_start(targetPos);
}
+ //method stub obviously
+ public synchronized boolean seekToNewSource(long targetPos) {
+ return true;
+ }
+
// reads a byte
@Override
- public synchronized int read() throws IOException {
+ public synchronized int read() throws IOException {
//System.out.println("CephInputStream.read: Reading a single byte from fd " + fileHandle
// + " by calling general read function");
if (getPos() >= fileLength) return -1;
if (-1 == read(result, 0, 1)) return -1;
return result[0];
- }
+ }
@Override
- public synchronized int read(byte buf[], int off, int len) throws IOException {
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
//System.out.println("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
- if (closed) {
- throw new IOException("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": stream closed");
- }
- if (null == buf) {
+ if (closed) {
+ throw new IOException("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": stream closed");
+ }
+ if (null == buf) {
throw new NullPointerException("Read buffer is null");
- }
+ }
- // check for proper index bounds
- if((off < 0) || (len < 0) || (off + len > buf.length)) {
+ // check for proper index bounds
+ if((off < 0) || (len < 0) || (off + len > buf.length)) {
throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+ "read length is " + len + ", buffer offset is "
+ off +", and buffer size is " + buf.length);
- }
+ }
- // ensure we're not past the end of the file
- if (getPos() >= fileLength)
+ // ensure we're not past the end of the file
+ if (getPos() >= fileLength)
{
- System.out.println("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": current position is " +
- getPos() + " and file length is " + fileLength);
+ System.out.println("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": current position is " +
+ getPos() + " and file length is " + fileLength);
- return -1;
+ return -1;
}
- // actually do the read
- int result = ceph_read(buf, off, len);
- if (result < 0)
- System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
- + fileHandle + " failed.");
- else {}
- // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
- // + fileHandle + ": succeeded in reading " + result + " bytes");
+ // actually do the read
+ int result = ceph_read(buf, off, len);
+ if (result < 0)
+ System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
+ + fileHandle + " failed.");
+ else {}
+ // System.out.println("CephInputStream.read: Reading " + len + " bytes from fd "
+ // + fileHandle + ": succeeded in reading " + result + " bytes");
- return result;
- }
+ return result;
+ }
@Override
- public void close() throws IOException {
+ public void close() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
int result = ceph_close();
if (result != 0) {
- throw new IOException("Close failed!");
+ throw new IOException("Close failed!");
}
closed = true;
* We don't support marks.
*/
@Override
- public boolean markSupported() {
+ public boolean markSupported() {
return false;
}
@Override
- public void mark(int readLimit) {
+ public void mark(int readLimit) {
// Do nothing
}
@Override
- public void reset() throws IOException {
+ public void reset() throws IOException {
throw new IOException("Mark not supported");
}