-// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include "CephFSInterface.h"
-
#include "client/libceph.h"
+#include "common/Timer.h" // why?
+#include "common/ceph_argparse.h"
#include "common/config.h"
#include "msg/SimpleMessenger.h"
-#include "common/Timer.h"
#include <sys/stat.h>
+#include <sys/statvfs.h>
+
+union ceph_mount_union_t {
+ ceph_mount_t *cmount;
+ jlong cjlong;
+};
+
+static void set_ceph_mount_t(JNIEnv *env, jobject obj, ceph_mount_t *cmount)
+{
+ jclass cls = env->GetObjectClass(obj);
+ jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;");
+ if (fid == NULL)
+ return;
+ ceph_mount_union_t ceph_mount_union;
+ ceph_mount_union.cjlong= 0;
+ ceph_mount_union.cmount = cmount;
+ env->SetLongField(obj, fid, ceph_mount_union.cjlong);
+}
+
+static ceph_mount_t *get_ceph_mount_t(JNIEnv *env, jobject obj)
+{
+ jclass cls = env->GetObjectClass(obj);
+ jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;");
+ if (fid == NULL)
+ return NULL;
+ ceph_mount_union_t ceph_mount_union;
+ ceph_mount_union.cjlong = env->GetLongField(obj, fid);
+ return ceph_mount_union.cmount;
+}
-using namespace std;
-const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6.
-static int path_size;
/*
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_initializeClient
(JNIEnv *env, jobject obj, jstring j_args, jint block_size)
{
dout(3) << "CephFSInterface: Initializing Ceph client:" << dendl;
+
+ // Convert Java argument string to argv
const char *c_args = env->GetStringUTFChars(j_args, 0);
- if (c_args == NULL) return false; //out of memory!
- string args(c_args);
- path_size = 64; //reasonable starting point?
-
- //construct an arguments vector
- vector<string> args_vec;
- string arg;
- size_t i = 0;
- size_t j = 0;
- bool local_writes = false;
- while (1) {
- j = args.find(' ', i);
- if (j == string::npos) {
- if (i == 0) { //there were no spaces? That can't happen!
- env->ReleaseStringUTFChars(j_args, c_args);
- return false;
- }
- //otherwise it's the last argument, so push it on and exit loop
- args_vec.push_back(args.substr(i, args.size()));
+ if (c_args == NULL)
+ return false; //out of memory!
+ string cppargs(c_args);
+ char b[cppargs.length()+1];
+ strcpy(b, cppargs.c_str());
+ env->ReleaseStringUTFChars(j_args, c_args);
+ std::vector<const char*> args;
+ char *p = b;
+ while (*p) {
+ args.push_back(p);
+ while (*p && *p != ' ')
+ p++;
+ if (!*p)
break;
+ *p++ = 0;
+ while (*p && *p == ' ')
+ p++;
+ }
+
+ // parse the arguments
+ bool set_local_writes = false;
+ std::string mount_root, val;
+ for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
+ if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) {
+ mount_root = val;
+ } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) {
+ set_local_writes = true;
+ } else {
+ ++i;
}
- if (j!=i) { //if there are two spaces in a row, don't make a new arg
- arg = args.substr(i, j-i);
- if (arg.compare("set_local_pg") == 0)
- local_writes = true;
- else
- args_vec.push_back(arg);
- }
- i = j+1;
}
- //convert to array
- const char ** argv = new const char*[args_vec.size()];
- for (size_t i = 0; i < args_vec.size(); ++i)
- argv[i] = args_vec[i].c_str();
+ // connect to the cmount
+ ceph_mount_t *cmount;
+ int ret = ceph_create(&cmount, NULL);
+ if (ret)
+ return false;
+ ceph_conf_read_file(cmount, NULL); // read config file from the default location
+ ceph_conf_parse_argv(cmount, args.size(), &args[0]);
- int r = ceph_initialize(args_vec.size(), argv);
- env->ReleaseStringUTFChars(j_args, c_args);
- delete argv;
+ ceph_localize_reads(cmount, true);
+ ceph_set_default_file_stripe_unit(cmount, block_size);
+ ceph_set_default_object_size(cmount, block_size);
- ceph_localize_reads(true);
- ceph_set_default_file_stripe_unit(block_size);
- ceph_set_default_object_size(block_size);
+ ret = ceph_mount(cmount, mount_root.c_str());
+ if (ret)
+ return false;
+ if (set_local_writes)
+ ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount));
- if (r < 0) return false;
- r = ceph_mount();
- if (r < 0) return false;
- if (local_writes)
- ceph_set_default_preferred_pg(ceph_get_local_osd());
+ set_ceph_mount_t(env, obj, cmount);
return true;
}
(JNIEnv *env, jobject obj)
{
dout(10) << "CephFSInterface: In getcwd" << dendl;
-
- char *path = new char[path_size];
- int r = ceph_getcwd(path, path_size);
- if (r==-ERANGE) { //path is too short
- path_size = ceph_getcwd(path, 0) * 1.2; //leave some extra
- delete [] path;
- path = new char[path_size];
- ceph_getcwd(path, path_size);
- }
- jstring j_path = env->NewStringUTF(path);
- delete [] path;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ jstring j_path = env->NewStringUTF(ceph_getcwd(cmount));
return j_path;
}
dout(10) << "CephFSInterface: In setcwd" << dendl;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if(c_path == NULL ) return false;
- jboolean success = (0 <= ceph_chdir(c_path)) ? JNI_TRUE : JNI_FALSE;
+ if (c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int ret = ceph_chdir(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
- return success;
+ return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Returns: true on successful delete; false otherwise
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
- (JNIEnv *env, jobject, jstring j_path)
+ (JNIEnv *env, jobject obj, jstring j_path)
{
dout(10) << "CephFSInterface: In rmdir" << dendl;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if(c_path == NULL ) return false;
- jboolean success = (0 == ceph_rmdir(c_path)) ? JNI_TRUE : JNI_FALSE;
+ if(c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int ret = ceph_rmdir(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
- return success;
+ return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Returns: true if the unlink occurred, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
- (JNIEnv *env, jobject, jstring j_path)
+ (JNIEnv *env, jobject obj, jstring j_path)
{
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
+ if (c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
dout(10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl;
- int result = ceph_unlink(c_path);
+ int ret = ceph_unlink(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
- return (0 == result) ? JNI_TRUE : JNI_FALSE;
+ return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* Returns: true if the rename occurred, false otherwise
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
- (JNIEnv *env, jobject, jstring j_from, jstring j_to)
+ (JNIEnv *env, jobject obj, jstring j_from, jstring j_to)
{
dout(10) << "CephFSInterface: In rename" << dendl;
const char *c_from = env->GetStringUTFChars(j_from, 0);
- if (c_from == NULL) return false;
- const char *c_to = env->GetStringUTFChars(j_to, 0);
+ if (c_from == NULL)
+ return false;
+ const char *c_to = env->GetStringUTFChars(j_to, 0);
if (c_to == NULL) {
env->ReleaseStringUTFChars(j_from, c_from);
return false;
}
- jboolean success = false;
struct stat stbuf;
- if (ceph_lstat(c_to, &stbuf) < 0) //Hadoop doesn't want to overwrite files in a rename
- success = (0 <= ceph_rename(c_from, c_to)) ? JNI_TRUE : JNI_FALSE;
- env->ReleaseStringUTFChars(j_from, c_from);
- env->ReleaseStringUTFChars(j_to, c_to);
- return success;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int ret = ceph_lstat(cmount, c_to, &stbuf);
+ if (ret != -ENOENT) {
+ // Hadoop doesn't want to overwrite files in a rename.
+ env->ReleaseStringUTFChars(j_from, c_from);
+ env->ReleaseStringUTFChars(j_to, c_to);
+ return JNI_FALSE;
+ }
+
+ ret = ceph_rename(cmount, c_from, c_to);
+ return ret ? JNI_FALSE : JNI_TRUE;
}
/*
* if it does not or there is an unexpected failure.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
-(JNIEnv *env, jobject, jstring j_path)
+(JNIEnv *env, jobject obj, jstring j_path)
{
dout(10) << "CephFSInterface: In exists" << dendl;
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
+ if (c_path == NULL)
+ return false;
dout(10) << "Attempting lstat with file " << c_path << ":" << dendl;
- int result = ceph_lstat(c_path, &stbuf);
- dout(10) << "result is " << result << dendl;
+
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int ret = ceph_lstat(cmount, c_path, &stbuf);
+ dout(10) << "result is " << ret << dendl;
env->ReleaseStringUTFChars(j_path, c_path);
- if (result < 0) {
+ if (ret < 0) {
dout(10) << "Returning false (file does not exist)" << dendl;
return JNI_FALSE;
}
dout(10) << "In getblocksize" << dendl;
//struct stat stbuf;
-
+
jlong result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
+ if (c_path == NULL)
+ return -ENOMEM;
// we need to open the file to retrieve the stripe size
dout(10) << "CephFSInterface: getblocksize: opening file" << dendl;
- int fh = ceph_open(c_path, O_RDONLY);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int fh = ceph_open(cmount, c_path, O_RDONLY, 0);
env->ReleaseStringUTFChars(j_path, c_path);
- if (fh < 0) return fh;
+ if (fh < 0)
+ return fh;
- result = ceph_get_file_stripe_unit(fh);
+ result = ceph_get_file_stripe_unit(cmount, fh);
- int close_result = ceph_close(fh);
- assert (close_result > -1);
+ int close_result = ceph_close(cmount, fh);
+ if (close_result < 0)
+ return close_result;
return result;
}
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
- int result = ceph_lstat(c_path, &stbuf);
+ if (c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int ret = ceph_lstat(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
// if the stat call failed, it's definitely not a file...
- if (0 > result) return false;
+ if (ret < 0)
+ return false;
// check the stat result
- return (!(0 == S_ISREG(stbuf.st_mode)));
+ return !!S_ISREG(stbuf.st_mode);
}
* Returns true if the given path is a directory, false otherwise.
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
- (JNIEnv *env, jobject, jstring j_path)
+ (JNIEnv *env, jobject obj, jstring j_path)
{
dout(10) << "In isdirectory" << dendl;
struct stat stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
- int result = ceph_lstat(c_path, &stbuf);
+ if (c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int result = ceph_lstat(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
// if the stat call failed, it's definitely not a directory...
- if (0 > result) return JNI_FALSE;
+ if (result < 0)
+ return JNI_FALSE;
// check the stat result
- return (!(0 == S_ISDIR(stbuf.st_mode)));
+ return !!S_ISDIR(stbuf.st_mode);
}
/*
* will not contain . or .. entries.
*/
JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
-(JNIEnv *env, jobject obj, jstring j_path)
+(JNIEnv *env, jobject obj, jstring j_path)
{
dout(10) << "In getdir" << dendl;
list<string> contents;
const char *c_path = env->GetStringUTFChars(j_path, 0);
if (c_path == NULL) return NULL;
- DIR *dirp;
+ ceph_dir_result_t *dirp;
int r;
- r = ceph_opendir(c_path, &dirp);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ r = ceph_opendir(cmount, c_path, &dirp);
if (r<0) {
env->ReleaseStringUTFChars(j_path, c_path);
return NULL;
string *ent;
int bufpos;
while (1) {
- r = ceph_getdnames(dirp, buf, buflen);
+ r = ceph_getdnames(cmount, dirp, buf, buflen);
if (r==-ERANGE) { //expand the buffer
delete [] buf;
buflen *= 2;
}
}
delete [] buf;
- ceph_closedir(dirp);
+ ceph_closedir(cmount, dirp);
env->ReleaseStringUTFChars(j_path, c_path);
-
+
if (r < 0) return NULL;
// Create a Java String array of the size of the directory listing
for (list<string>::iterator it = contents.begin();
it != contents.end();
it++) {
- env->SetObjectArrayElement(dirListingStringArray, i,
+ env->SetObjectArrayElement(dirListingStringArray, i,
env->NewStringUTF(it->c_str()));
++i;
}
-
+
return dirListingStringArray;
}
* given mode.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
-(JNIEnv *env, jobject, jstring j_path, jint mode)
+(JNIEnv *env, jobject obj, jstring j_path, jint mode)
{
dout(10) << "In Hadoop mk_dirs" << dendl;
//get c-style string and make the call, clean up the string...
jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
- result = ceph_mkdirs(c_path, mode);
+ if (c_path == NULL)
+ return -ENOMEM;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_mkdirs(cmount, c_path, mode);
env->ReleaseStringUTFChars(j_path, c_path);
//...and return
jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
- result = ceph_open(c_path, O_WRONLY|O_CREAT|O_APPEND);
+ if (c_path == NULL)
+ return -ENOMEM;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0);
env->ReleaseStringUTFChars(j_path, c_path);
return result;
{
dout(10) << "In open_for_read" << dendl;
- jint result;
+ jint result;
// open as read-only: flag = O_RDONLY
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
- result = ceph_open(c_path, O_RDONLY);
+ if (c_path == NULL)
+ return -ENOMEM;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_open(cmount, c_path, O_RDONLY, 0);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
{
dout(10) << "In open_for_overwrite" << dendl;
- jint result;
+ jint result;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
- result = ceph_open(c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
+ if (c_path == NULL)
+ return -ENOMEM;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
env->ReleaseStringUTFChars(j_path, c_path);
// returns file handle, or -1 on failure
- return result;
+ return result;
}
/*
* Closes a given filehandle.
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
-(JNIEnv *env, jobject ojb, jint fh)
+(JNIEnv *env, jobject obj, jint fh)
{
dout(10) << "In CephTalker::ceph_close" << dendl;
- return ceph_close(fh);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ return ceph_close(cmount, fh);
}
/*
(JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode)
{
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
- int result = ceph_chmod(c_path, j_new_mode);
+ if (c_path == NULL)
+ return false;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int result = ceph_chmod(cmount, c_path, j_new_mode);
env->ReleaseStringUTFChars(j_path, c_path);
return (result==0);
* Class: org_apache_hadoop_fs_ceph_CephTalker
* Method: ceph_kill_client
* Signature: (J)Z
- *
+ *
* Closes the Ceph client. This should be called before shutting down
* (multiple times is okay but redundant).
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
(JNIEnv *env, jobject obj)
-{
- ceph_deinitialize();
+{
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ if (!cmount)
+ return true;
+ ceph_shutdown(cmount);
+ set_ceph_mount_t(env, obj, NULL);
return true;
}
jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I");
if (c_mode_id == NULL) return false;
//do actual lstat
- int r = ceph_lstat_precise(c_path, &st);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int r = ceph_lstat_precise(cmount, c_path, &st);
env->ReleaseStringUTFChars(j_path, c_path);
if (r < 0) return false; //fail out; file DNE or Ceph broke
//setup variables
struct statvfs stbuf;
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
+ if (c_path == NULL)
+ return -ENOMEM;
jclass cls = env->GetObjectClass(j_cephstat);
- if (cls == NULL) return 1; //JVM error of some kind
+ if (cls == NULL)
+ return 1; //JVM error of some kind
jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J");
jfieldID c_used_id = env->GetFieldID(cls, "used", "J");
jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J");
//do the statfs
- int r = ceph_statfs(c_path, &stbuf);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int r = ceph_statfs(cmount, c_path, &stbuf);
env->ReleaseStringUTFChars(j_path, c_path);
-
-
- if (r!=0) return r; //something broke
+ if (r != 0)
+ return r; //something broke
//place info into Java; convert from bytes to kilobytes
env->SetLongField(j_cephstat, c_capacity_id,
{
//get c-string of path, send off to libceph, release c-string, return
const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return -ENOMEM;
- int replication = ceph_get_file_replication(c_path);
+ if (c_path == NULL)
+ return -ENOMEM;
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int replication = ceph_get_file_replication(cmount, c_path);
env->ReleaseStringUTFChars(j_path, c_path);
return replication;
}
(JNIEnv *env, jobject obj, jint j_fh, jlong j_offset)
{
//get the address
+ const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6.
char *address = new char[IP_ADDR_LENGTH];
- int r = ceph_get_file_stripe_address(j_fh, j_offset, address, IP_ADDR_LENGTH);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ int r = ceph_get_file_stripe_address(cmount, j_fh, j_offset,
+ address, IP_ADDR_LENGTH);
if (r == -ERANGE) {//buffer's too small
delete [] address;
- int size = ceph_get_file_stripe_address(j_fh, j_offset, address, 0);
+ int size = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, 0);
address = new char[size];
- r = ceph_get_file_stripe_address(j_fh, j_offset, address, size);
+ r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, size);
}
if (r != 0) { //some rather worse problem
if (r == -EINVAL) return NULL; //ceph thinks there are no OSDs
attr.st_mtime_micro = (mtime % 1000) * 1000;
attr.st_atime_sec = atime / 1000;
attr.st_atime_micro = (atime % 1000) * 1000;
- return ceph_setattr_precise(c_path, &attr, mask);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ return ceph_setattr_precise(cmount, c_path, &attr, mask);
}
/*
// Make sure to convert the Hadoop read arguments into a
// more ceph-friendly form
- jint result;
+ jint result;
// Step 1: get a pointer to the buffer.
jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
c_buffer += (int)buffer_offset;
// Step 3: do the read
- result = ceph_read((int)fh, c_buffer, length, -1);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_read(cmount, (int)fh, c_buffer, length, -1);
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
-
+
return result;
}
{
dout(10) << "In CephTalker::seek_from_start" << dendl;
- return ceph_lseek(fh, pos, SEEK_SET);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ return ceph_lseek(cmount, fh, pos, SEEK_SET);
}
/*
dout(10) << "In CephTalker::ceph_getpos" << dendl;
// seek a distance of 0 to get current offset
- return ceph_lseek(fh, 0, SEEK_CUR);
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ return ceph_lseek(cmount, fh, 0, SEEK_CUR);
}
/*
// IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we
// have to convert. The write is *always* from the current position in the file,
// and buffer_offset is the location in the *buffer* where we start writing.
- jint result;
+ jint result;
// Step 1: get a pointer to the buffer.
jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
- if (j_buffer_ptr == NULL) return -ENOMEM;
+ if (j_buffer_ptr == NULL)
+ return -ENOMEM;
char *c_buffer = (char*) j_buffer_ptr;
// Step 2: pointer arithmetic to start in the right buffer position
c_buffer += (int)buffer_offset;
// Step 3: do the write
- result = ceph_write((int)fh, c_buffer, length, -1);
-
+ ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+ result = ceph_write(cmount, (int)fh, c_buffer, length, -1);
+
// Step 4: release the pointer to the buffer
env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);