]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
hadoop: convert to new libceph interface
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Thu, 21 Apr 2011 00:33:28 +0000 (17:33 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 25 Apr 2011 18:05:43 +0000 (11:05 -0700)
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/client/hadoop/CephFSInterface.cc
src/client/hadoop/ceph/CephTalker.java

index b84932483d2dc5bcd038c778e7d58a60c66c45dd..107f943e4d5fa46888c16255ce9283b227e8ab0b 100644 (file)
@@ -1,16 +1,42 @@
-// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
 #include "CephFSInterface.h"
-
 #include "client/libceph.h"
+#include "common/Timer.h" // why?
+#include "common/ceph_argparse.h"
 #include "common/config.h"
 #include "msg/SimpleMessenger.h"
-#include "common/Timer.h"
 
 #include <sys/stat.h>
+#include <sys/statvfs.h>
+
+union ceph_mount_union_t {
+  ceph_mount_t *cmount;
+  jlong cjlong;
+};
+
+static void set_ceph_mount_t(JNIEnv *env, jobject obj, ceph_mount_t *cmount)
+{
+  jclass cls = env->GetObjectClass(obj);
+  jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;");
+  if (fid == NULL)
+    return;
+  ceph_mount_union_t ceph_mount_union;
+  ceph_mount_union.cjlong= 0;
+  ceph_mount_union.cmount = cmount;
+  env->SetLongField(obj, fid, ceph_mount_union.cjlong);
+}
+
+static ceph_mount_t *get_ceph_mount_t(JNIEnv *env, jobject obj)
+{
+  jclass cls = env->GetObjectClass(obj);
+  jfieldID fid = env->GetFieldID(cls, "cmount", "Ljava/lang/Long;");
+  if (fid == NULL)
+    return NULL;
+  ceph_mount_union_t ceph_mount_union;
+  ceph_mount_union.cjlong = env->GetLongField(obj, fid);
+  return ceph_mount_union.cmount;
+}
 
-using namespace std;
-const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6.
-static int path_size;
 /*
  * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_initializeClient
@@ -26,56 +52,60 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initi
   (JNIEnv *env, jobject obj, jstring j_args, jint block_size)
 {
   dout(3) << "CephFSInterface: Initializing Ceph client:" << dendl;
+
+  // Convert Java argument string to argv
   const char *c_args = env->GetStringUTFChars(j_args, 0);
-  if (c_args == NULL) return false; //out of memory!
-  string args(c_args);
-  path_size = 64; //reasonable starting point?
-
-  //construct an arguments vector
-  vector<string> args_vec;
-  string arg;
-  size_t i = 0;
-  size_t j = 0;
-  bool local_writes = false;
-  while (1) {
-    j = args.find(' ', i);
-    if (j == string::npos) {
-      if (i == 0) { //there were no spaces? That can't happen!
-       env->ReleaseStringUTFChars(j_args, c_args);
-       return false;
-      }
-      //otherwise it's the last argument, so push it on and exit loop
-      args_vec.push_back(args.substr(i, args.size()));
+  if (c_args == NULL)
+    return false; //out of memory!
+  string cppargs(c_args);
+  char b[cppargs.length()+1];
+  strcpy(b, cppargs.c_str());
+  env->ReleaseStringUTFChars(j_args, c_args);
+  std::vector<const char*> args;
+  char *p = b;
+  while (*p) {
+    args.push_back(p);
+    while (*p && *p != ' ')
+      p++;
+    if (!*p)
       break;
+    *p++ = 0;
+    while (*p && *p == ' ')
+      p++;
+  }
+
+  // parse the arguments
+  bool set_local_writes = false;
+  std::string mount_root, val;
+  for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
+    if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) {
+      mount_root = val;
+    } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) {
+      set_local_writes = true;
+    } else {
+      ++i;
     }
-    if (j!=i) { //if there are two spaces in a row, don't make a new arg
-      arg = args.substr(i, j-i);
-      if (arg.compare("set_local_pg") == 0)
-       local_writes = true;
-      else
-       args_vec.push_back(arg);
-    }
-    i = j+1;
   }
 
-  //convert to array
-  const char ** argv = new const char*[args_vec.size()];
-  for (size_t i = 0; i < args_vec.size(); ++i)
-    argv[i] = args_vec[i].c_str();
+  // connect to the cmount
+  ceph_mount_t *cmount;
+  int ret = ceph_create(&cmount, NULL);
+  if (ret)
+    return false;
+  ceph_conf_read_file(cmount, NULL); // read config file from the default location
+  ceph_conf_parse_argv(cmount, args.size(), &args[0]);
 
-  int r = ceph_initialize(args_vec.size(), argv);
-  env->ReleaseStringUTFChars(j_args, c_args);
-  delete argv;
+  ceph_localize_reads(cmount, true);
+  ceph_set_default_file_stripe_unit(cmount, block_size);
+  ceph_set_default_object_size(cmount, block_size);
 
-  ceph_localize_reads(true);
-  ceph_set_default_file_stripe_unit(block_size);
-  ceph_set_default_object_size(block_size);
+  ret = ceph_mount(cmount, mount_root.c_str());
+  if (ret)
+    return false;
+  if (set_local_writes)
+    ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount));
 
-  if (r < 0) return false;
-  r = ceph_mount();
-  if (r < 0) return false;
-  if (local_writes)
-    ceph_set_default_preferred_pg(ceph_get_local_osd());
+  set_ceph_mount_t(env, obj, cmount);
   return true;
 }
 
@@ -90,17 +120,8 @@ JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
   (JNIEnv *env, jobject obj)
 {
   dout(10) << "CephFSInterface: In getcwd" << dendl;
-
-  char *path = new char[path_size];
-  int r = ceph_getcwd(path, path_size);
-  if (r==-ERANGE) { //path is too short
-    path_size = ceph_getcwd(path, 0) * 1.2; //leave some extra
-    delete [] path;
-    path = new char[path_size];
-    ceph_getcwd(path, path_size);
-  }
-  jstring j_path = env->NewStringUTF(path);
-  delete [] path;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  jstring j_path = env->NewStringUTF(ceph_getcwd(cmount));
   return j_path;
 }
 
@@ -120,10 +141,12 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcw
   dout(10) << "CephFSInterface: In setcwd" << dendl;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if(c_path == NULL ) return false;
-  jboolean success = (0 <= ceph_chdir(c_path)) ? JNI_TRUE : JNI_FALSE; 
+  if (c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int ret = ceph_chdir(cmount, c_path);
   env->ReleaseStringUTFChars(j_path, c_path);
-  return success;
+  return ret ? JNI_FALSE : JNI_TRUE;
 }
 
 /*
@@ -137,15 +160,17 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcw
  * Returns: true on successful delete; false otherwise
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
-  (JNIEnv *env, jobject, jstring j_path)
+  (JNIEnv *env, jobject obj, jstring j_path)
 {
   dout(10) << "CephFSInterface: In rmdir" << dendl;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if(c_path == NULL ) return false;
-  jboolean success = (0 == ceph_rmdir(c_path)) ? JNI_TRUE : JNI_FALSE; 
+  if(c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int ret = ceph_rmdir(cmount, c_path);
   env->ReleaseStringUTFChars(j_path, c_path);
-  return success;
+  return ret ? JNI_FALSE : JNI_TRUE;
 }
 
 /*
@@ -158,14 +183,16 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
  * Returns: true if the unlink occurred, false otherwise.
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
-  (JNIEnv *env, jobject, jstring j_path)
+  (JNIEnv *env, jobject obj, jstring j_path)
 {
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return false;
+  if (c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
   dout(10) << "CephFSInterface: In unlink for path " << c_path <<  ":" << dendl;
-  int result = ceph_unlink(c_path);
+  int ret = ceph_unlink(cmount, c_path);
   env->ReleaseStringUTFChars(j_path, c_path);
-  return (0 == result) ? JNI_TRUE : JNI_FALSE; 
+  return ret ? JNI_FALSE : JNI_TRUE;
 }
 
 /*
@@ -179,23 +206,29 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlin
  * Returns: true if the rename occurred, false otherwise
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
-  (JNIEnv *env, jobject, jstring j_from, jstring j_to)
+  (JNIEnv *env, jobject obj, jstring j_from, jstring j_to)
 {
   dout(10) << "CephFSInterface: In rename" << dendl;
   const char *c_from = env->GetStringUTFChars(j_from, 0);
-  if (c_from == NULL) return false;
-  const char *c_to   = env->GetStringUTFChars(j_to,   0);
+  if (c_from == NULL)
+    return false;
+  const char *c_to = env->GetStringUTFChars(j_to,   0);
   if (c_to == NULL) {
     env->ReleaseStringUTFChars(j_from, c_from);
     return false;
   }
-  jboolean success = false;
   struct stat stbuf;
-  if (ceph_lstat(c_to, &stbuf) < 0) //Hadoop doesn't want to overwrite files in a rename
-    success = (0 <= ceph_rename(c_from, c_to)) ? JNI_TRUE : JNI_FALSE; 
-  env->ReleaseStringUTFChars(j_from, c_from);
-  env->ReleaseStringUTFChars(j_to, c_to);
-  return success;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int ret = ceph_lstat(cmount, c_to, &stbuf);
+  if (ret != -ENOENT) {
+    // Hadoop doesn't want to overwrite files in a rename.
+    env->ReleaseStringUTFChars(j_from, c_from);
+    env->ReleaseStringUTFChars(j_to, c_to);
+    return JNI_FALSE;
+  }
+
+  ret = ceph_rename(cmount, c_from, c_to);
+  return ret ? JNI_FALSE : JNI_TRUE;
 }
 
 /*
@@ -206,7 +239,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1renam
  * if it does not or there is an unexpected failure.
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
-(JNIEnv *env, jobject, jstring j_path)
+(JNIEnv *env, jobject obj, jstring j_path)
 {
 
   dout(10) << "CephFSInterface: In exists" << dendl;
@@ -214,12 +247,15 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exist
   struct stat stbuf;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return false;
+  if (c_path == NULL)
+    return false;
   dout(10) << "Attempting lstat with file " << c_path << ":" << dendl;
-  int result = ceph_lstat(c_path, &stbuf);
-  dout(10) << "result is " << result << dendl;
+
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int ret = ceph_lstat(cmount, c_path, &stbuf);
+  dout(10) << "result is " << ret << dendl;
   env->ReleaseStringUTFChars(j_path, c_path);
-  if (result < 0) {
+  if (ret < 0) {
     dout(10) << "Returning false (file does not exist)" << dendl;
     return JNI_FALSE;
   }
@@ -246,21 +282,25 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblock
   dout(10) << "In getblocksize" << dendl;
 
   //struct stat stbuf;
-  
+
   jlong result;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
+  if (c_path == NULL)
+    return -ENOMEM;
   // we need to open the file to retrieve the stripe size
   dout(10) << "CephFSInterface: getblocksize: opening file" << dendl;
-  int fh = ceph_open(c_path, O_RDONLY);  
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int fh = ceph_open(cmount, c_path, O_RDONLY, 0);
   env->ReleaseStringUTFChars(j_path, c_path);
-  if (fh < 0) return fh;
+  if (fh < 0)
+    return fh;
 
-  result = ceph_get_file_stripe_unit(fh);
+  result = ceph_get_file_stripe_unit(cmount, fh);
 
-  int close_result = ceph_close(fh);
-  assert (close_result > -1);
+  int close_result = ceph_close(cmount, fh);
+  if (close_result < 0)
+    return close_result;
 
   return result;
 }
@@ -279,15 +319,18 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfil
   struct stat stbuf;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return false;
-  int result = ceph_lstat(c_path, &stbuf);
+  if (c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int ret = ceph_lstat(cmount, c_path, &stbuf);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   // if the stat call failed, it's definitely not a file...
-  if (0 > result) return false; 
+  if (ret < 0)
+    return false;
 
   // check the stat result
-  return (!(0 == S_ISREG(stbuf.st_mode)));
+  return !!S_ISREG(stbuf.st_mode);
 }
 
 
@@ -298,22 +341,25 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfil
  * Returns true if the given path is a directory, false otherwise.
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
-  (JNIEnv *env, jobject, jstring j_path)
+  (JNIEnv *env, jobject obj, jstring j_path)
 {
   dout(10) << "In isdirectory" << dendl;
 
   struct stat stbuf;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return false;
-  int result = ceph_lstat(c_path, &stbuf);
+  if (c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int result = ceph_lstat(cmount, c_path, &stbuf);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   // if the stat call failed, it's definitely not a directory...
-  if (0 > result) return JNI_FALSE; 
+  if (result < 0)
+    return JNI_FALSE;
 
   // check the stat result
-  return (!(0 == S_ISDIR(stbuf.st_mode)));
+  return !!S_ISDIR(stbuf.st_mode);
 }
 
 /*
@@ -328,7 +374,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdir
  *  will not contain . or .. entries.
  */
 JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
-(JNIEnv *env, jobject obj, jstring j_path)
+(JNIEnv *env, jobject  obj, jstring j_path)
 {
   dout(10) << "In getdir" << dendl;
 
@@ -336,9 +382,10 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g
   list<string> contents;
   const char *c_path = env->GetStringUTFChars(j_path, 0);
   if (c_path == NULL) return NULL;
-  DIR *dirp;
+  ceph_dir_result_t *dirp;
   int r;
-  r = ceph_opendir(c_path, &dirp);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  r = ceph_opendir(cmount, c_path, &dirp);
   if (r<0) {
     env->ReleaseStringUTFChars(j_path, c_path);
     return NULL;
@@ -348,7 +395,7 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g
   string *ent;
   int bufpos;
   while (1) {
-    r = ceph_getdnames(dirp, buf, buflen);
+    r = ceph_getdnames(cmount, dirp, buf, buflen);
     if (r==-ERANGE) { //expand the buffer
       delete [] buf;
       buflen *= 2;
@@ -369,9 +416,9 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g
     }
   }
   delete [] buf;
-  ceph_closedir(dirp);
+  ceph_closedir(cmount, dirp);
   env->ReleaseStringUTFChars(j_path, c_path);
-  
+
   if (r < 0) return NULL;
 
   // Create a Java String array of the size of the directory listing
@@ -388,11 +435,11 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g
   for (list<string>::iterator it = contents.begin();
        it != contents.end();
        it++) {
-    env->SetObjectArrayElement(dirListingStringArray, i, 
+    env->SetObjectArrayElement(dirListingStringArray, i,
                               env->NewStringUTF(it->c_str()));
     ++i;
   }
-  
+
   return dirListingStringArray;
 }
 
@@ -404,15 +451,17 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1g
  * given mode.
  */
 JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
-(JNIEnv *env, jobject, jstring j_path, jint mode)
+(JNIEnv *env, jobject obj, jstring j_path, jint mode)
 {
   dout(10) << "In Hadoop mk_dirs" << dendl;
 
   //get c-style string and make the call, clean up the string...
   jint result;
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
-  result = ceph_mkdirs(c_path, mode);
+  if (c_path == NULL)
+    return -ENOMEM;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_mkdirs(cmount, c_path, mode);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   //...and return
@@ -437,8 +486,10 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for
   jint result;
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
-  result = ceph_open(c_path, O_WRONLY|O_CREAT|O_APPEND);
+  if (c_path == NULL)
+    return -ENOMEM;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   return result;
@@ -460,12 +511,14 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for
 {
   dout(10) << "In open_for_read" << dendl;
 
-  jint result; 
+  jint result;
 
   // open as read-only: flag = O_RDONLY
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
-  result = ceph_open(c_path, O_RDONLY);
+  if (c_path == NULL)
+    return -ENOMEM;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_open(cmount, c_path, O_RDONLY, 0);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   // returns file handle, or -1 on failure
@@ -488,16 +541,18 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for
 {
   dout(10) << "In open_for_overwrite" << dendl;
 
-  jint result; 
+  jint result;
 
 
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
-  result = ceph_open(c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
+  if (c_path == NULL)
+    return -ENOMEM;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   // returns file handle, or -1 on failure
-  return result;       
+  return result;
 }
 
 /*
@@ -507,11 +562,12 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for
  * Closes a given filehandle.
  */
 JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
-(JNIEnv *env, jobject ojb, jint fh)
+(JNIEnv *env, jobject obj, jint fh)
 {
   dout(10) << "In CephTalker::ceph_close" << dendl;
 
-  return ceph_close(fh);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  return ceph_close(cmount, fh);
 }
 
 /*
@@ -529,8 +585,10 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPe
 (JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode)
 {
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return false;
-  int result = ceph_chmod(c_path, j_new_mode);
+  if (c_path == NULL)
+    return false;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int result = ceph_chmod(cmount, c_path, j_new_mode);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   return (result==0);
@@ -540,14 +598,18 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPe
  * Class:     org_apache_hadoop_fs_ceph_CephTalker
  * Method:    ceph_kill_client
  * Signature: (J)Z
- * 
+ *
  * Closes the Ceph client. This should be called before shutting down
  * (multiple times is okay but redundant).
  */
 JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
   (JNIEnv *env, jobject obj)
-{  
-  ceph_deinitialize();  
+{
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  if (!cmount)
+    return true;
+  ceph_shutdown(cmount);
+  set_ceph_mount_t(env, obj, NULL);
   return true;
 }
 
@@ -585,7 +647,8 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
   jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I");
   if (c_mode_id == NULL) return false;
   //do actual lstat
-  int r = ceph_lstat_precise(c_path, &st);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int r = ceph_lstat_precise(cmount, c_path, &st);
   env->ReleaseStringUTFChars(j_path, c_path);
 
   if (r < 0) return false; //fail out; file DNE or Ceph broke
@@ -620,19 +683,21 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
   //setup variables
   struct statvfs stbuf;
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
+  if (c_path == NULL)
+    return -ENOMEM;
   jclass cls = env->GetObjectClass(j_cephstat);
-  if (cls == NULL) return 1; //JVM error of some kind
+  if (cls == NULL)
+    return 1; //JVM error of some kind
   jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J");
   jfieldID c_used_id = env->GetFieldID(cls, "used", "J");
   jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J");
 
   //do the statfs
-  int r = ceph_statfs(c_path, &stbuf);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int r = ceph_statfs(cmount, c_path, &stbuf);
   env->ReleaseStringUTFChars(j_path, c_path);
-
-
-  if (r!=0) return r; //something broke
+  if (r != 0)
+    return r; //something broke
 
   //place info into Java; convert from bytes to kilobytes
   env->SetLongField(j_cephstat, c_capacity_id,
@@ -659,8 +724,10 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replicati
 {
   //get c-string of path, send off to libceph, release c-string, return
   const char *c_path = env->GetStringUTFChars(j_path, 0);
-  if (c_path == NULL) return -ENOMEM;
-  int replication = ceph_get_file_replication(c_path);
+  if (c_path == NULL)
+    return -ENOMEM;
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int replication = ceph_get_file_replication(cmount, c_path);
   env->ReleaseStringUTFChars(j_path, c_path);
   return replication;
 }
@@ -679,13 +746,16 @@ JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
 (JNIEnv *env, jobject obj, jint j_fh, jlong j_offset)
 {
   //get the address
+  const static int IP_ADDR_LENGTH = 24;//a buffer size; may want to up for IPv6.
   char *address = new char[IP_ADDR_LENGTH];
-  int r = ceph_get_file_stripe_address(j_fh, j_offset, address, IP_ADDR_LENGTH);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  int r = ceph_get_file_stripe_address(cmount, j_fh, j_offset,
+                                      address, IP_ADDR_LENGTH);
   if (r == -ERANGE) {//buffer's too small
     delete [] address;
-    int size = ceph_get_file_stripe_address(j_fh, j_offset, address, 0);
+    int size = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, 0);
     address = new char[size];
-    r = ceph_get_file_stripe_address(j_fh, j_offset, address, size);
+    r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, address, size);
   }
   if (r != 0) { //some rather worse problem
     if (r == -EINVAL) return NULL; //ceph thinks there are no OSDs
@@ -724,7 +794,8 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
   attr.st_mtime_micro = (mtime % 1000) * 1000;
   attr.st_atime_sec = atime / 1000;
   attr.st_atime_micro = (atime % 1000) * 1000;
-  return ceph_setattr_precise(c_path, &attr, mask);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  return ceph_setattr_precise(cmount, c_path, &attr, mask);
 }
 
 /*
@@ -750,7 +821,7 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
 
   // Make sure to convert the Hadoop read arguments into a
   // more ceph-friendly form
-  jint result; 
+  jint result;
 
   // Step 1: get a pointer to the buffer.
   jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
@@ -761,11 +832,12 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
   c_buffer += (int)buffer_offset;
 
   // Step 3: do the read
-  result = ceph_read((int)fh, c_buffer, length, -1);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_read(cmount, (int)fh, c_buffer, length, -1);
 
   // Step 4: release the pointer to the buffer
   env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
-  
+
   return result;
 }
 
@@ -785,7 +857,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1fr
 {
   dout(10) << "In CephTalker::seek_from_start" << dendl;
 
-  return ceph_lseek(fh, pos, SEEK_SET);
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  return ceph_lseek(cmount, fh, pos, SEEK_SET);
 }
 
 /*
@@ -803,7 +876,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
   dout(10) << "In CephTalker::ceph_getpos" << dendl;
 
   // seek a distance of 0 to get current offset
-  return ceph_lseek(fh, 0, SEEK_CUR);  
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  return ceph_lseek(cmount, fh, 0, SEEK_CUR);
 }
 
 /*
@@ -827,19 +901,21 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
   // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we
   // have to convert.  The write is *always* from the current position in the file,
   // and buffer_offset is the location in the *buffer* where we start writing.
-  jint result; 
+  jint result;
 
   // Step 1: get a pointer to the buffer.
   jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
-  if (j_buffer_ptr == NULL) return -ENOMEM;
+  if (j_buffer_ptr == NULL)
+    return -ENOMEM;
   char *c_buffer = (char*) j_buffer_ptr;
 
   // Step 2: pointer arithmetic to start in the right buffer position
   c_buffer += (int)buffer_offset;
 
   // Step 3: do the write
-  result = ceph_write((int)fh, c_buffer, length, -1);
-  
+  ceph_mount_t *cmount = get_ceph_mount_t(env, obj);
+  result = ceph_write(cmount, (int)fh, c_buffer, length, -1);
+
   // Step 4: release the pointer to the buffer
   env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
 
index 9e416a0e231c0ccdde8e7fbad53465dfa8c29ede..9df4e94eac6d231d3c07773e811ab6c32c0d8468 100644 (file)
@@ -26,11 +26,16 @@ import org.apache.commons.logging.Log;
 
 
 class CephTalker extends CephFS {
+  // JNI doesn't give us any way to store pointers, so use a long.
+  // Here we're assuming pointers aren't longer than 8 bytes.
+  long cluster;
+
   // we write a constructor so we can load the libraries
   public CephTalker(Configuration conf, Log log) {
     super(conf, log);
     System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
     System.load(conf.get("fs.ceph.libDir") + "/libceph.so");
+    cluster = 0;
   }
 
   protected native boolean ceph_initializeClient(String arguments, int block_size);