}
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_read
* Signature: (JI[BII)I
* Reads into the given byte array from the current position.
* Returns: the number of bytes read on success (as jint),
* or an error code otherwise.
*/
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
(JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
dout(10) << "In read" << dendl;
}
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_seek_from_start
* Signature: (JIJ)J
* Seeks to the given position in the given file.
* Returns: the new position (as a jlong) of the filehandle on success,
* or a negative error code on failure.
*/
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
(JNIEnv *env, jobject obj, jint fh, jlong pos)
{
- dout(10) << "In CephInputStream::seek_from_start" << dendl;
+ dout(10) << "In CephTalker::seek_from_start" << dendl;
return ceph_lseek(fh, pos, SEEK_SET);
}
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getpos
* Signature: (I)J
*
* Returns: jlong current file position on success, or a
* negative error code on failure.
*/
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
(JNIEnv *env, jobject obj, jint fh)
{
- dout(10) << "In CephInputStream::ceph_getpos" << dendl;
+ dout(10) << "In CephTalker::ceph_getpos" << dendl;
// seek a distance of 0 to get current offset
return ceph_lseek(fh, 0, SEEK_CUR);
}
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_close
- * Signature: (JI)I
- * Closes the given file. Returns 0 on success (it will always succeed
- * unless it dies in a horrible fiery assert failure).
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
- (JNIEnv *env, jobject obj, jint fh)
-{
- dout(10) << "In CephInputStream::ceph_close" << dendl;
-
- return ceph_close(fh);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_getpos
- * Signature: (JI)J
- * Get the current position in a file (as a jlong) of a given filehandle.
- * Returns: jlong current file position on success, or a
- * negative error code on failure.
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
- (JNIEnv *env, jobject obj, jint fh)
-{
- dout(10) << "In CephOutputStream::ceph_getpos" << dendl;
-
- // seek a distance of 0 to get current offset
- return ceph_lseek(fh, 0, SEEK_CUR);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_close
- * Signature: (I)I
- * Closes the given file. Returns 0 on success (it will always succeed
- * unless it dies in a horrible fiery assert failure).
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
- (JNIEnv *env, jobject obj, jint fh)
-{
- dout(10) << "In CephOutputStream::ceph_close" << dendl;
-
- return ceph_close(fh);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_write
* Signature: (I[BII)I
* Write the given buffer contents to the given filehandle.
* Returns: jint, on success the number of bytes written, on failure
* a negative error code.
*/
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
(JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
{
dout(10) << "In write" << dendl;
(JNIEnv *, jobject, jstring, jlong, jlong);
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_read
* Signature: (I[BII)I
*/
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_seek_from_start
* Signature: (IJ)J
*/
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
(JNIEnv *, jobject, jint, jlong);
/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_getpos
- * Signature: (I)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_getpos
* Signature: (I)J
*/
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
+JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
(JNIEnv *, jobject, jint);
/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
+ * Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_write
* Signature: (I[BII)I
*/
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
+JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
(JNIEnv *, jobject, jint, jbyteArray, jint, jint);
#ifdef __cplusplus
*/
abstract protected int ceph_open_for_overwrite(String path, int mode);
/*
- * Closes a given filehandle.
+ * Closes the given file. Returns 0 on success, or a negative
+ * error code otherwise.
*/
abstract protected int ceph_close(int filehandle);
/*
* Returns: 0 if successful, an error code otherwise.
*/
abstract protected int ceph_setTimes(String path, long mtime, long atime);
+ /*
+ * Get the current position in a file (as a long) of a given filehandle.
+ * Returns: (long) current file position on success, or a
+ * negative error code on failure.
+ */
+ abstract protected long ceph_getpos(int fh);
+ /*
+ * Write the given buffer contents to the given filehandle.
+ * Inputs:
+ * int fh: The filehandle to write to.
+ * byte[] buffer: The buffer to write from
+ * int buffer_offset: The position in the buffer to write from
+ * int length: The number of (sequential) bytes to write.
+ * Returns: int, on success the number of bytes written, on failure
+ * a negative error code.
+ */
+ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
+ /*
+ * Reads into the given byte array from the current position.
+ * Inputs:
+ * int fh: the filehandle to read from
+ * byte[] buffer: the byte array to read into
+ * int buffer_offset: where in the buffer to start writing
+ * int length: how much to read.
+ * There'd better be enough space in the buffer to write all
+ * the data from the given offset!
+ * Returns: the number of bytes read on success (as an int),
+ * or an error code otherwise. */
+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
+ /*
+ * Seeks to the given position in the given file.
+ * Inputs:
+ * int fh: The filehandle to seek in.
+ * long pos: The position to seek to.
+ * Returns: the new position (as a long) of the filehandle on success,
+ * or a negative error code on failure. */
+ protected native long ceph_seek_from_start(int fh, long pos);
+
protected void debug(String statement, int priority) {
if (debug) System.err.println(statement);
switch(priority) {
*/
public CephFileSystem() {
root = new Path("/");
- ceph.debug("CephFileSystem:exit", ceph.DEBUG);
}
/**
*/
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
- ceph.debug("initialize:enter", ceph.DEBUG);
if (!initialized) {
super.initialize(uri, conf);
setConf(conf);
throw new IOException("append: Open for append failed on path \"" +
abs_path.toString() + "\"");
}
- CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
+ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd);
ceph.debug("append:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
}
// Step 4: create the stream
- OutputStream cephOStream = new CephOutputStream(getConf(), fh);
+ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh);
ceph.debug("create:exit", ceph.DEBUG);
return new FSDataOutputStream(cephOStream, statistics);
}
throw new IOException("Failed to get file size for file " + abs_path.toString() +
" but succeeded in opening file. Something bizarre is going on.");
}
- FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size);
ceph.debug("open:exit", ceph.DEBUG);
return new FSDataInputStream(cephIStream);
}
private long fileLength;
- private boolean debug;
-
- /*
- * Reads into the given byte array from the current position.
- * Inputs:
- * int fh: the filehandle to read from
- * byte[] buffer: the byte array to read into
- * int buffer_offset: where in the buffer to start writing
- * int length: how much to read.
- * There'd better be enough space in the buffer to write all
- * the data from the given offset!
- * Returns: the number of bytes read on success (as an int),
- * or an error code otherwise. */
- private native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
- /*
- * Seeks to the given position in the given file.
- * Inputs:
- * int fh: The filehandle to seek in.
- * long pos: The position to seek to.
- * Returns: the new position (as a long) of the filehandle on success,
- * or a negative error code on failure. */
- private native long ceph_seek_from_start(int fh, long pos);
- /*
- * Get the current position in a file (as a long) of a given filehandle.
- * Returns: (long) current file position on success, or a
- * negative error code on failure.
- */
- private native long ceph_getpos(int fh);
- /*
- * Closes the given file. Returns 0 on success, or a negative
- * error code otherwise.
- */
- private native int ceph_close(int fh);
-
+ private CephFS ceph;
+
/**
* Create a new CephInputStream.
* @param conf The system configuration. Unused.
* @param flength The current length of the file. If the length changes
* you will need to close and re-open it to access the new data.
*/
- public CephInputStream(Configuration conf, int fh, long flength) {
- System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
- System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+ public CephInputStream(Configuration conf, CephFS cephfs,
+ int fh, long flength) {
// Whoever's calling the constructor is responsible for doing the actual ceph_open
// call and providing the file handle.
fileLength = flength;
fileHandle = fh;
closed = false;
- debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
- debug("CephInputStream constructor: initializing stream with fh "
- + fh + " and file length " + flength);
+ ceph = cephfs;
+ ceph.debug("CephInputStream constructor: initializing stream with fh "
+ + fh + " and file length " + flength, ceph.DEBUG);
}
/** Ceph likes things to be closed before it shuts down,
}
public synchronized long getPos() throws IOException {
- return ceph_getpos(fileHandle);
+ return ceph.ceph_getpos(fileHandle);
}
/**
* Find the number of bytes remaining in the file.
}
public synchronized void seek(long targetPos) throws IOException {
- debug("CephInputStream.seek: Seeking to position " + targetPos +
- " on fd " + fileHandle);
+ ceph.debug("CephInputStream.seek: Seeking to position " + targetPos +
+ " on fd " + fileHandle, ceph.TRACE);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
" on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
}
- ceph_seek_from_start(fileHandle, targetPos);
+ ceph.ceph_seek_from_start(fileHandle, targetPos);
}
/**
*/
@Override
public synchronized int read() throws IOException {
- debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
- + " by calling general read function");
+ ceph.debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
+ + " by calling general read function", ceph.TRACE);
byte result[] = new byte[1];
if (getPos() >= fileLength) return -1;
*/
@Override
public synchronized int read(byte buf[], int off, int len) throws IOException {
- debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
+ ceph.debug("CephInputStream.read: Reading " + len +
+ " bytes from fd " + fileHandle, ceph.TRACE);
if (closed) {
throw new IOException("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": stream closed");
+ " bytes from fd " + fileHandle +
+ ": stream closed");
}
if (null == buf) {
throw new IOException("Read buffer is null");
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
throw new IOException("CephInputStream.read: Indices out of bounds for read: "
- + "read length is " + len + ", buffer offset is "
- + off +", and buffer size is " + buf.length);
+ + "read length is " + len +
+ ", buffer offset is " + off +
+ ", and buffer size is " + buf.length);
}
// ensure we're not past the end of the file
if (getPos() >= fileLength)
{
- debug("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": current position is " +
- getPos() + " and file length is " + fileLength);
+ ceph.debug("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": current position is "
+ + getPos() + " and file length is " + fileLength,
+ ceph.WARN);
return -1;
}
// actually do the read
- int result = ceph_read(fileHandle, buf, off, len);
+ int result = ceph.ceph_read(fileHandle, buf, off, len);
if (result < 0)
- debug("CephInputStream.read: Reading " + len
- + " bytes from fd " + fileHandle + " failed.");
+ ceph.debug("CephInputStream.read: Reading " + len
+ + " bytes from fd " + fileHandle + " failed.", ceph.WARN);
- debug("CephInputStream.read: Reading " + len + " bytes from fd "
- + fileHandle + ": succeeded in reading " + result + " bytes");
+ ceph.debug("CephInputStream.read: Reading " + len + " bytes from fd "
+ + fileHandle + ": succeeded in reading " + result + " bytes",
+ ceph.TRACE);
return result;
}
*/
@Override
public void close() throws IOException {
- debug("CephOutputStream.close:enter");
+ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
if (closed) {
throw new IOException("Stream closed");
}
- int result = ceph_close(fileHandle);
+ int result = ceph.ceph_close(fileHandle);
if (result != 0) {
throw new IOException("Close failed!");
}
closed = true;
- debug("CephOutputStream.close:exit");
- }
-
- private void debug(String out) {
- System.err.println(out);
+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
}
}
private boolean closed;
- private int fileHandle;
-
- private boolean debug;
-
- /*
- * Get the current position in a file (as a long) of a given filehandle.
- * Returns: (long) current file position on success, or a
- * negative error code on failure.
- */
- private native long ceph_getpos(int fh);
- /*
- * Closes the given file. Returns 0 on success, or a negative
- * error code otherwise.
- */
- private native int ceph_close(int fh);
- /*
- * Write the given buffer contents to the given filehandle.
- * Inputs:
- * int fh: The filehandle to write to.
- * byte[] buffer: The buffer to write from
- * int buffer_offset: The position in the buffer to write from
- * int length: The number of (sequential) bytes to write.
- * Returns: int, on success the number of bytes written, on failure
- * a negative error code.
- */
- private native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
+ private CephFS ceph;
+ private int fileHandle;
/**
* Construct the CephOutputStream.
* @param conf The FileSystem configuration.
* @param fh The Ceph filehandle to connect to.
*/
- public CephOutputStream(Configuration conf, int fh) {
- System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so");
- System.load(conf.get("fs.ceph.libDir")+"/libceph.so");
+ public CephOutputStream(Configuration conf, CephFS cephfs, int fh) {
+ ceph = cephfs;
fileHandle = fh;
closed = false;
- debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
}
/**Ceph likes things to be closed before it shuts down,
* @return The file offset in bytes.
*/
public long getPos() throws IOException {
- return ceph_getpos(fileHandle);
+ return ceph.ceph_getpos(fileHandle);
}
/**
*/
@Override
public synchronized void write(int b) throws IOException {
- debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
+ ceph.debug("CephOutputStream.write: writing a single byte to fd "
+ + fileHandle, ceph.TRACE);
if (closed) {
throw new IOException("CephOutputStream.write: cannot write " +
// Stick the byte in a buffer and write it
byte buf[] = new byte[1];
buf[0] = (byte) b;
- int result = ceph_write(fileHandle, buf, 0, 1);
+ int result = ceph.ceph_write(fileHandle, buf, 0, 1);
if (1 != result)
- debug("CephOutputStream.write: failed writing a single byte to fd "
- + fileHandle + ": Ceph write() result = " + result);
+ ceph.debug("CephOutputStream.write: failed writing a single byte to fd "
+ + fileHandle + ": Ceph write() result = " + result,
+ceph.WARN);
return;
}
*/
@Override
public synchronized void write(byte buf[], int off, int len) throws IOException {
- debug("CephOutputStream.write: writing " + len +
- " bytes to fd " + fileHandle);
+ ceph.debug("CephOutputStream.write: writing " + len +
+ " bytes to fd " + fileHandle, ceph.TRACE);
// make sure stream is open
if (closed) {
throw new IOException("CephOutputStream.write: cannot write " + len +
}
// write!
- int result = ceph_write(fileHandle, buf, off, len);
+ int result = ceph.ceph_write(fileHandle, buf, off, len);
if (result < 0) {
throw new IOException("CephOutputStream.write: Write of " + len +
"bytes to fd " + fileHandle + " failed");
*/
@Override
public synchronized void close() throws IOException {
- debug("CephOutputStream.close:enter");
+ ceph.debug("CephOutputStream.close:enter", ceph.TRACE);
if (closed) {
throw new IOException("Stream closed");
}
- int result = ceph_close(fileHandle);
+ int result = ceph.ceph_close(fileHandle);
if (result != 0) {
throw new IOException("Close failed!");
}
closed = true;
- debug("CephOutputStream.close:exit");
+ ceph.debug("CephOutputStream.close:exit", ceph.TRACE);
}
- private void debug(String out) {
- System.err.println(out);
- }
}
protected native int ceph_replication(String path);
protected native String ceph_hosts(int fh, long offset);
protected native int ceph_setTimes(String path, long mtime, long atime);
+ protected native long ceph_getpos(int fh);
+ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
}