From 4d41507183cb7c234daa2a0a270529b3623ad3f8 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 20 Apr 2011 17:33:28 -0700 Subject: [PATCH] hadoop: convert to new libceph interface Signed-off-by: Colin McCabe --- src/client/hadoop/CephFSInterface.cc | 374 +++++++++++++++---------- src/client/hadoop/ceph/CephTalker.java | 5 + 2 files changed, 230 insertions(+), 149 deletions(-) diff --git a/src/client/hadoop/CephFSInterface.cc b/src/client/hadoop/CephFSInterface.cc index b84932483d2dc..107f943e4d5fa 100644 --- a/src/client/hadoop/CephFSInterface.cc +++ b/src/client/hadoop/CephFSInterface.cc @@ -1,16 +1,42 @@ -// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- #include "CephFSInterface.h" - #include "client/libceph.h" +#include "common/Timer.h" // why? +#include "common/ceph_argparse.h" #include "common/config.h" #include "msg/SimpleMessenger.h" -#include "common/Timer.h" #include +#include + +union ceph_mount_union_t { + ceph_mount_t *cmount; + jlong cjlong; +}; + +static void set_ceph_mount_t(JNIEnv *env, jobject obj, ceph_mount_t *cmount) +{ + jclass cls = env->GetObjectClass(obj); + jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;"); + if (fid == NULL) + return; + ceph_mount_union_t ceph_mount_union; + ceph_mount_union.cjlong= 0; + ceph_mount_union.cmount = cmount; + env->SetLongField(obj, fid, ceph_mount_union.cjlong); +} + +static ceph_mount_t *get_ceph_mount_t(JNIEnv *env, jobject obj) +{ + jclass cls = env->GetObjectClass(obj); + jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;"); + if (fid == NULL) + return NULL; + ceph_mount_union_t ceph_mount_union; + ceph_mount_union.cjlong = env->GetLongField(obj, fid); + return ceph_mount_union.cmount; +} -using namespace std; -const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6. -static int path_size; /* * Class: org_apache_hadoop_fs_ceph_CephTalker * Method: ceph_initializeClient @@ -26,56 +52,60 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initi (JNIEnv *env, jobject obj, jstring j_args, jint block_size) { dout(3) << "CephFSInterface: Initializing Ceph client:" << dendl; + + // Convert Java argument string to argv const char *c_args = env->GetStringUTFChars(j_args, 0); - if (c_args == NULL) return false; //out of memory! - string args(c_args); - path_size = 64; //reasonable starting point? - - //construct an arguments vector - vector args_vec; - string arg; - size_t i = 0; - size_t j = 0; - bool local_writes = false; - while (1) { - j = args.find(' ', i); - if (j == string::npos) { - if (i == 0) { //there were no spaces? That can't happen! - env->ReleaseStringUTFChars(j_args, c_args); - return false; - } - //otherwise it's the last argument, so push it on and exit loop - args_vec.push_back(args.substr(i, args.size())); + if (c_args == NULL) + return false; //out of memory! + string cppargs(c_args); + char b[cppargs.length()+1]; + strcpy(b, cppargs.c_str()); + env->ReleaseStringUTFChars(j_args, c_args); + std::vector args; + char *p = b; + while (*p) { + args.push_back(p); + while (*p && *p != ' ') + p++; + if (!*p) break; + *p++ = 0; + while (*p && *p == ' ') + p++; + } + + // parse the arguments + bool set_local_writes = false; + std::string mount_root, val; + for (std::vector::iterator i = args.begin(); i != args.end(); ) { + if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) { + mount_root = val; + } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) { + set_local_writes = true; + } else { + ++i; } - if (j!=i) { //if there are two spaces in a row, don't make a new arg - arg = args.substr(i, j-i); - if (arg.compare("set_local_pg") == 0) - local_writes = true; - else - args_vec.push_back(arg); - } - i = j+1; } - //convert to array - const char ** argv = new const char*[args_vec.size()]; - for (size_t i = 0; i < args_vec.size(); ++i) - argv[i] = args_vec[i].c_str(); + // connect to the cmount + ceph_mount_t *cmount; + int ret = ceph_create(&cmount, NULL); + if (ret) + return false; + ceph_conf_read_file(cmount, NULL); // read config file from the default location + ceph_conf_parse_argv(cmount, args.size(), &args[0]); - int r = ceph_initialize(args_vec.size(), argv); - env->ReleaseStringUTFChars(j_args, c_args); - delete argv; + ceph_localize_reads(cmount, true); + ceph_set_default_file_stripe_unit(cmount, block_size); + ceph_set_default_object_size(cmount, block_size); - ceph_localize_reads(true); - ceph_set_default_file_stripe_unit(block_size); - ceph_set_default_object_size(block_size); + ret = ceph_mount(cmount, mount_root.c_str()); + if (ret) + return false; + if (set_local_writes) + ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount)); - if (r < 0) return false; - r = ceph_mount(); - if (r < 0) return false; - if (local_writes) - ceph_set_default_preferred_pg(ceph_get_local_osd()); + set_ceph_mount_t(env, obj, cmount); return true; } @@ -90,17 +120,8 @@ JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd (JNIEnv *env, jobject obj) { dout(10) << "CephFSInterface: In getcwd" << dendl; - - char *path = new char[path_size]; - int r = ceph_getcwd(path, path_size); - if (r==-ERANGE) { //path is too short - path_size = ceph_getcwd(path, 0) * 1.2; //leave some extra - delete [] path; - path = new char[path_size]; - ceph_getcwd(path, path_size); - } - jstring j_path = env->NewStringUTF(path); - delete [] path; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + jstring j_path = env->NewStringUTF(ceph_getcwd(cmount)); return j_path; } @@ -120,10 +141,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcw dout(10) << "CephFSInterface: In setcwd" << dendl; const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL ) return false; - jboolean success = (0 <= ceph_chdir(c_path)) ? JNI_TRUE : JNI_FALSE; + if (c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int ret = ceph_chdir(cmount, c_path); env->ReleaseStringUTFChars(j_path, c_path); - return success; + return ret ? JNI_FALSE : JNI_TRUE; } /* @@ -137,15 +160,17 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcw * Returns: true on successful delete; false otherwise */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir - (JNIEnv *env, jobject, jstring j_path) + (JNIEnv *env, jobject obj, jstring j_path) { dout(10) << "CephFSInterface: In rmdir" << dendl; const char *c_path = env->GetStringUTFChars(j_path, 0); - if(c_path == NULL ) return false; - jboolean success = (0 == ceph_rmdir(c_path)) ? JNI_TRUE : JNI_FALSE; + if(c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int ret = ceph_rmdir(cmount, c_path); env->ReleaseStringUTFChars(j_path, c_path); - return success; + return ret ? JNI_FALSE : JNI_TRUE; } /* @@ -158,14 +183,16 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir * Returns: true if the unlink occurred, false otherwise. */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink - (JNIEnv *env, jobject, jstring j_path) + (JNIEnv *env, jobject obj, jstring j_path) { const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; + if (c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); dout(10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl; - int result = ceph_unlink(c_path); + int ret = ceph_unlink(cmount, c_path); env->ReleaseStringUTFChars(j_path, c_path); - return (0 == result) ? JNI_TRUE : JNI_FALSE; + return ret ? JNI_FALSE : JNI_TRUE; } /* @@ -179,23 +206,29 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlin * Returns: true if the rename occurred, false otherwise */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename - (JNIEnv *env, jobject, jstring j_from, jstring j_to) + (JNIEnv *env, jobject obj, jstring j_from, jstring j_to) { dout(10) << "CephFSInterface: In rename" << dendl; const char *c_from = env->GetStringUTFChars(j_from, 0); - if (c_from == NULL) return false; - const char *c_to = env->GetStringUTFChars(j_to, 0); + if (c_from == NULL) + return false; + const char *c_to = env->GetStringUTFChars(j_to, 0); if (c_to == NULL) { env->ReleaseStringUTFChars(j_from, c_from); return false; } - jboolean success = false; struct stat stbuf; - if (ceph_lstat(c_to, &stbuf) < 0) //Hadoop doesn't want to overwrite files in a rename - success = (0 <= ceph_rename(c_from, c_to)) ? JNI_TRUE : JNI_FALSE; - env->ReleaseStringUTFChars(j_from, c_from); - env->ReleaseStringUTFChars(j_to, c_to); - return success; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int ret = ceph_lstat(cmount, c_to, &stbuf); + if (ret != -ENOENT) { + // Hadoop doesn't want to overwrite files in a rename. + env->ReleaseStringUTFChars(j_from, c_from); + env->ReleaseStringUTFChars(j_to, c_to); + return JNI_FALSE; + } + + ret = ceph_rename(cmount, c_from, c_to); + return ret ? JNI_FALSE : JNI_TRUE; } /* @@ -206,7 +239,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1renam * if it does not or there is an unexpected failure. */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists -(JNIEnv *env, jobject, jstring j_path) +(JNIEnv *env, jobject obj, jstring j_path) { dout(10) << "CephFSInterface: In exists" << dendl; @@ -214,12 +247,15 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exist struct stat stbuf; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; + if (c_path == NULL) + return false; dout(10) << "Attempting lstat with file " << c_path << ":" << dendl; - int result = ceph_lstat(c_path, &stbuf); - dout(10) << "result is " << result << dendl; + + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int ret = ceph_lstat(cmount, c_path, &stbuf); + dout(10) << "result is " << ret << dendl; env->ReleaseStringUTFChars(j_path, c_path); - if (result < 0) { + if (ret < 0) { dout(10) << "Returning false (file does not exist)" << dendl; return JNI_FALSE; } @@ -246,21 +282,25 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblock dout(10) << "In getblocksize" << dendl; //struct stat stbuf; - + jlong result; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; + if (c_path == NULL) + return -ENOMEM; // we need to open the file to retrieve the stripe size dout(10) << "CephFSInterface: getblocksize: opening file" << dendl; - int fh = ceph_open(c_path, O_RDONLY); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int fh = ceph_open(cmount, c_path, O_RDONLY, 0); env->ReleaseStringUTFChars(j_path, c_path); - if (fh < 0) return fh; + if (fh < 0) + return fh; - result = ceph_get_file_stripe_unit(fh); + result = ceph_get_file_stripe_unit(cmount, fh); - int close_result = ceph_close(fh); - assert (close_result > -1); + int close_result = ceph_close(cmount, fh); + if (close_result < 0) + return close_result; return result; } @@ -279,15 +319,18 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfil struct stat stbuf; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; - int result = ceph_lstat(c_path, &stbuf); + if (c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int ret = ceph_lstat(cmount, c_path, &stbuf); env->ReleaseStringUTFChars(j_path, c_path); // if the stat call failed, it's definitely not a file... - if (0 > result) return false; + if (ret < 0) + return false; // check the stat result - return (!(0 == S_ISREG(stbuf.st_mode))); + return !!S_ISREG(stbuf.st_mode); } @@ -298,22 +341,25 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfil * Returns true if the given path is a directory, false otherwise. */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory - (JNIEnv *env, jobject, jstring j_path) + (JNIEnv *env, jobject obj, jstring j_path) { dout(10) << "In isdirectory" << dendl; struct stat stbuf; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; - int result = ceph_lstat(c_path, &stbuf); + if (c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int result = ceph_lstat(cmount, c_path, &stbuf); env->ReleaseStringUTFChars(j_path, c_path); // if the stat call failed, it's definitely not a directory... - if (0 > result) return JNI_FALSE; + if (result < 0) + return JNI_FALSE; // check the stat result - return (!(0 == S_ISDIR(stbuf.st_mode))); + return !!S_ISDIR(stbuf.st_mode); } /* @@ -328,7 +374,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdir * will not contain . or .. entries. */ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir -(JNIEnv *env, jobject obj, jstring j_path) +(JNIEnv *env, jobject obj, jstring j_path) { dout(10) << "In getdir" << dendl; @@ -336,9 +382,10 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g list contents; const char *c_path = env->GetStringUTFChars(j_path, 0); if (c_path == NULL) return NULL; - DIR *dirp; + ceph_dir_result_t *dirp; int r; - r = ceph_opendir(c_path, &dirp); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + r = ceph_opendir(cmount, c_path, &dirp); if (r<0) { env->ReleaseStringUTFChars(j_path, c_path); return NULL; @@ -348,7 +395,7 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g string *ent; int bufpos; while (1) { - r = ceph_getdnames(dirp, buf, buflen); + r = ceph_getdnames(cmount, dirp, buf, buflen); if (r==-ERANGE) { //expand the buffer delete [] buf; buflen *= 2; @@ -369,9 +416,9 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g } } delete [] buf; - ceph_closedir(dirp); + ceph_closedir(cmount, dirp); env->ReleaseStringUTFChars(j_path, c_path); - + if (r < 0) return NULL; // Create a Java String array of the size of the directory listing @@ -388,11 +435,11 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g for (list::iterator it = contents.begin(); it != contents.end(); it++) { - env->SetObjectArrayElement(dirListingStringArray, i, + env->SetObjectArrayElement(dirListingStringArray, i, env->NewStringUTF(it->c_str())); ++i; } - + return dirListingStringArray; } @@ -404,15 +451,17 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g * given mode. */ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs -(JNIEnv *env, jobject, jstring j_path, jint mode) +(JNIEnv *env, jobject obj, jstring j_path, jint mode) { dout(10) << "In Hadoop mk_dirs" << dendl; //get c-style string and make the call, clean up the string... jint result; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; - result = ceph_mkdirs(c_path, mode); + if (c_path == NULL) + return -ENOMEM; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_mkdirs(cmount, c_path, mode); env->ReleaseStringUTFChars(j_path, c_path); //...and return @@ -437,8 +486,10 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for jint result; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; - result = ceph_open(c_path, O_WRONLY|O_CREAT|O_APPEND); + if (c_path == NULL) + return -ENOMEM; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0); env->ReleaseStringUTFChars(j_path, c_path); return result; @@ -460,12 +511,14 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for { dout(10) << "In open_for_read" << dendl; - jint result; + jint result; // open as read-only: flag = O_RDONLY const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; - result = ceph_open(c_path, O_RDONLY); + if (c_path == NULL) + return -ENOMEM; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_open(cmount, c_path, O_RDONLY, 0); env->ReleaseStringUTFChars(j_path, c_path); // returns file handle, or -1 on failure @@ -488,16 +541,18 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for { dout(10) << "In open_for_overwrite" << dendl; - jint result; + jint result; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; - result = ceph_open(c_path, O_WRONLY|O_CREAT|O_TRUNC, mode); + if (c_path == NULL) + return -ENOMEM; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode); env->ReleaseStringUTFChars(j_path, c_path); // returns file handle, or -1 on failure - return result; + return result; } /* @@ -507,11 +562,12 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for * Closes a given filehandle. */ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close -(JNIEnv *env, jobject ojb, jint fh) +(JNIEnv *env, jobject obj, jint fh) { dout(10) << "In CephTalker::ceph_close" << dendl; - return ceph_close(fh); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + return ceph_close(cmount, fh); } /* @@ -529,8 +585,10 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPe (JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode) { const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return false; - int result = ceph_chmod(c_path, j_new_mode); + if (c_path == NULL) + return false; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int result = ceph_chmod(cmount, c_path, j_new_mode); env->ReleaseStringUTFChars(j_path, c_path); return (result==0); @@ -540,14 +598,18 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPe * Class: org_apache_hadoop_fs_ceph_CephTalker * Method: ceph_kill_client * Signature: (J)Z - * + * * Closes the Ceph client. This should be called before shutting down * (multiple times is okay but redundant). */ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client (JNIEnv *env, jobject obj) -{ - ceph_deinitialize(); +{ + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + if (!cmount) + return true; + ceph_shutdown(cmount); + set_ceph_mount_t(env, obj, NULL); return true; } @@ -585,7 +647,8 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I"); if (c_mode_id == NULL) return false; //do actual lstat - int r = ceph_lstat_precise(c_path, &st); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int r = ceph_lstat_precise(cmount, c_path, &st); env->ReleaseStringUTFChars(j_path, c_path); if (r < 0) return false; //fail out; file DNE or Ceph broke @@ -620,19 +683,21 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs //setup variables struct statvfs stbuf; const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; + if (c_path == NULL) + return -ENOMEM; jclass cls = env->GetObjectClass(j_cephstat); - if (cls == NULL) return 1; //JVM error of some kind + if (cls == NULL) + return 1; //JVM error of some kind jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J"); jfieldID c_used_id = env->GetFieldID(cls, "used", "J"); jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J"); //do the statfs - int r = ceph_statfs(c_path, &stbuf); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int r = ceph_statfs(cmount, c_path, &stbuf); env->ReleaseStringUTFChars(j_path, c_path); - - - if (r!=0) return r; //something broke + if (r != 0) + return r; //something broke //place info into Java; convert from bytes to kilobytes env->SetLongField(j_cephstat, c_capacity_id, @@ -659,8 +724,10 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replicati { //get c-string of path, send off to libceph, release c-string, return const char *c_path = env->GetStringUTFChars(j_path, 0); - if (c_path == NULL) return -ENOMEM; - int replication = ceph_get_file_replication(c_path); + if (c_path == NULL) + return -ENOMEM; + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int replication = ceph_get_file_replication(cmount, c_path); env->ReleaseStringUTFChars(j_path, c_path); return replication; } @@ -679,13 +746,16 @@ JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts (JNIEnv *env, jobject obj, jint j_fh, jlong j_offset) { //get the address + const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6. char *address = new char[IP_ADDR_LENGTH]; - int r = ceph_get_file_stripe_address(j_fh, j_offset, address, IP_ADDR_LENGTH); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + int r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, + address, IP_ADDR_LENGTH); if (r == -ERANGE) {//buffer's too small delete [] address; - int size = ceph_get_file_stripe_address(j_fh, j_offset, address, 0); + int size = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, 0); address = new char[size]; - r = ceph_get_file_stripe_address(j_fh, j_offset, address, size); + r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, size); } if (r != 0) { //some rather worse problem if (r == -EINVAL) return NULL; //ceph thinks there are no OSDs @@ -724,7 +794,8 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes attr.st_mtime_micro = (mtime % 1000) * 1000; attr.st_atime_sec = atime / 1000; attr.st_atime_micro = (atime % 1000) * 1000; - return ceph_setattr_precise(c_path, &attr, mask); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + return ceph_setattr_precise(cmount, c_path, &attr, mask); } /* @@ -750,7 +821,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read // Make sure to convert the Hadoop read arguments into a // more ceph-friendly form - jint result; + jint result; // Step 1: get a pointer to the buffer. jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); @@ -761,11 +832,12 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read c_buffer += (int)buffer_offset; // Step 3: do the read - result = ceph_read((int)fh, c_buffer, length, -1); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_read(cmount, (int)fh, c_buffer, length, -1); // Step 4: release the pointer to the buffer env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); - + return result; } @@ -785,7 +857,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1fr { dout(10) << "In CephTalker::seek_from_start" << dendl; - return ceph_lseek(fh, pos, SEEK_SET); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + return ceph_lseek(cmount, fh, pos, SEEK_SET); } /* @@ -803,7 +876,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos dout(10) << "In CephTalker::ceph_getpos" << dendl; // seek a distance of 0 to get current offset - return ceph_lseek(fh, 0, SEEK_CUR); + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + return ceph_lseek(cmount, fh, 0, SEEK_CUR); } /* @@ -827,19 +901,21 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we // have to convert. The write is *always* from the current position in the file, // and buffer_offset is the location in the *buffer* where we start writing. - jint result; + jint result; // Step 1: get a pointer to the buffer. jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); - if (j_buffer_ptr == NULL) return -ENOMEM; + if (j_buffer_ptr == NULL) + return -ENOMEM; char *c_buffer = (char*) j_buffer_ptr; // Step 2: pointer arithmetic to start in the right buffer position c_buffer += (int)buffer_offset; // Step 3: do the write - result = ceph_write((int)fh, c_buffer, length, -1); - + ceph_mount_t *cmount = get_ceph_mount_t(env, obj); + result = ceph_write(cmount, (int)fh, c_buffer, length, -1); + // Step 4: release the pointer to the buffer env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java index 9e416a0e231c0..9df4e94eac6d2 100644 --- a/src/client/hadoop/ceph/CephTalker.java +++ b/src/client/hadoop/ceph/CephTalker.java @@ -26,11 +26,16 @@ import org.apache.commons.logging.Log; class CephTalker extends CephFS { + // JNI doesn't give us any way to store pointers, so use a long. + // Here we're assuming pointers aren't longer than 8 bytes. + long cluster; + // we write a constructor so we can load the libraries public CephTalker(Configuration conf, Log log) { super(conf, log); System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so"); System.load(conf.get("fs.ceph.libDir") + "/libceph.so"); + cluster = 0; } protected native boolean ceph_initializeClient(String arguments, int block_size); -- 2.39.5