From f275c16a46b8000bbfdab3d8863682540fa3b873 Mon Sep 17 00:00:00 2001 From: eestolan Date: Tue, 20 Mar 2007 20:12:15 +0000 Subject: [PATCH] more Hadoop stuff git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1267 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/Makefile | 9 +- trunk/ceph/client/Client.h | 1 + .../ceph/client/hadoop/CephClientInterface.cc | 217 ------- .../ceph/client/hadoop/CephClientInterface.h | 115 ---- trunk/ceph/client/hadoop/CephFSInterface.cc | 612 ++++++++++++++++++ trunk/ceph/client/hadoop/CephFSInterface.h | 229 +++++++ 6 files changed, 849 insertions(+), 334 deletions(-) delete mode 100644 trunk/ceph/client/hadoop/CephClientInterface.cc delete mode 100644 trunk/ceph/client/hadoop/CephClientInterface.h create mode 100644 trunk/ceph/client/hadoop/CephFSInterface.cc create mode 100644 trunk/ceph/client/hadoop/CephFSInterface.h diff --git a/trunk/ceph/Makefile b/trunk/ceph/Makefile index 8fa037a3984e2..0ef52e8295df8 100644 --- a/trunk/ceph/Makefile +++ b/trunk/ceph/Makefile @@ -97,12 +97,13 @@ COMMON_OBJS= \ common/Timer.o\ config.o - CLIENT_OBJS= \ client/FileCache.o\ client/Client.o\ client/SyntheticClient.o\ - client/Trace.o + client/Trace.o\ + client/hadoop/CephFSInterface.o\ + ifeq ($(want_bdb),yes) OSBDB_OBJS = \ @@ -112,11 +113,15 @@ OSBDB_OBJ = osbdb.o endif TARGETS = cmon cosd cmds csyn newsyn fakesyn mkmonmap cfuse fakefuse +NO_FUSE = cmon cosd cmds csyn newsyn fakesyn + SRCS=*.cc */*.cc *.h */*.h */*/*.h all: depend ${TARGETS} +nofuse: depend ${NO_FUSE} + test: depend ${TEST_TARGETS} obfs: depend obfstest diff --git a/trunk/ceph/client/Client.h b/trunk/ceph/client/Client.h index 313d0e29dcfcd..513a840d62670 100644 --- a/trunk/ceph/client/Client.h +++ b/trunk/ceph/client/Client.h @@ -529,6 +529,7 @@ protected: // crap int chdir(const char *s); + const string getcwd() { return cwd; } // namespace ops int getdir(const char *path, list& contents); diff --git a/trunk/ceph/client/hadoop/CephClientInterface.cc b/trunk/ceph/client/hadoop/CephClientInterface.cc deleted file mode 100644 index 6466dd6300891..0000000000000 --- a/trunk/ceph/client/hadoop/CephClientInterface.cc +++ /dev/null @@ -1,217 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -//#include - - -using namespace std; - -// globals -//Client *client; // the ceph client -//this has to go - the real client will have to hold the pointer. -//Every function will need to take a Client pointer. - -// ------ -// fuse hooks - -static int ceph_getattr(Client* client, const char *path, struct stat *stbuf) -{ - return client->lstat(path, stbuf); -} - -static int ceph_readlink(Client* client, const char *path, char *buf, size_t size) -{ - int res; - - res = client->readlink(path, buf, size - 1); - if (res < 0) return res; - - buf[res] = '\0'; - return 0; -} - -// get rid of the callback thing, perhaps? and return the answer some other way? -/* -static int ceph_getdir(Client* client, const char *path, fuse_dirh_t h, fuse_dirfil_t filler) -{ - map contents; - - int res = client->getdir(path, contents); - if (res < 0) return res; - - // return contents to fuse via callback - for (map::iterator it = contents.begin(); - it != contents.end(); - it++) { - // (immutable) inode contents too. - res = filler(h, // fuse's handle - it->first.c_str(), // dentry as char* - it->second.mode & INODE_TYPE_MASK, // mask type bits from mode - it->second.ino); // ino.. 64->32 bit issue here? FIXME - if (res != 0) break; // fuse has had enough - } - return res; -} -*/ - -static int ceph_mknod(Client* client, const char *path, mode_t mode, dev_t rdev) -{ - return client->mknod(path, mode); -} - -static int ceph_mkdir(Client* client, const char *path, mode_t mode) -{ - return client->mkdir(path, mode); -} - -static int ceph_unlink(Client* client, const char *path) -{ - return client->unlink(path); -} - -static int ceph_rmdir(Client* client, const char *path) -{ - return client->rmdir(path); -} - -static int ceph_symlink(Client* client, const char *from, const char *to) -{ - return client->symlink(from, to); -} - - -static int ceph_rename(Client* client, const char *from, const char *to) -{ - return client->rename(from, to); -} - -static int ceph_link(Client* client, const char *from, const char *to) -{ - return client->link(from, to); -} - -static int ceph_chmod(Client* client, const char *path, mode_t mode) -{ - return client->chmod(path, mode); -} - -static int ceph_chown(Client* client, const char *path, uid_t uid, gid_t gid) -{ - return client->chown(path, uid, gid); -} - -static int ceph_truncate(Client* client, const char *path, off_t size) -{ - return client->truncate(path, size); -} - -static int ceph_utime(Client* client, const char *path, struct utimbuf *buf) -{ - return client->utime(path, buf); -} - - -static int ceph_open(Client* client, const char *path, struct fuse_file_info *fi) -{ - int res; - - res = client->open(path, fi->flags); - if (res < 0) return res; - fi->fh = res; - return 0; // fuse wants 0 onsucess -} - -static int ceph_read(Client* client, const char *path, char *buf, size_t size, off_t offset, - struct fuse_file_info *fi) -{ - fh_t fh = fi->fh; - return client->read(fh, buf, size, offset); -} - -static int ceph_write(Client* client, const char *path, const char *buf, size_t size, - off_t offset, struct fuse_file_info *fi) -{ - fh_t fh = fi->fh; - return client->write(fh, buf, size, offset); -} - -/* -static int ceph_flush(const char *path, struct fuse_file_info *fi) -{ - fh_t fh = fi->fh; - return client->flush(fh); -} -*/ - - -#ifdef DARWIN -static int ceph_statfs(Client* client, const char *path, struct statvfs *stbuf) -{ - return client->statfs(path, stbuf); -} -#else -static int ceph_statfs(Client* client, const char *path, struct statfs *stbuf) -{ - return client->statfs(path, stbuf); -} -#endif - - -/* remove fuse stuff from these two -static int ceph_release(Client* client, const char *path, struct fuse_file_info *fi) -{ - fh_t fh = fi->fh; - int r = client->close(fh); // close the file - return r; -} - -static int ceph_fsync(Client* client, const char *path, int isdatasync, - struct fuse_file_info *fi) -{ - fh_t fh = fi->fh; - return client->fsync(fh, isdatasync ? true:false); -} -*/ - -/* -static struct fuse_operations ceph_oper = { - getattr: ceph_getattr, - readlink: ceph_readlink, - getdir: ceph_getdir, - mknod: ceph_mknod, - mkdir: ceph_mkdir, - unlink: ceph_unlink, - rmdir: ceph_rmdir, - symlink: ceph_symlink, - rename: ceph_rename, - link: ceph_link, - chmod: ceph_chmod, - chown: ceph_chown, - truncate: ceph_truncate, - utime: ceph_utime, - open: ceph_open, - read: ceph_read, - write: ceph_write, - statfs: ceph_statfs, - flush: 0, //ceph_flush, - release: ceph_release, - fsync: ceph_fsync -}; - -*/ - - -// Does this do anything we need? No. All it does is assemble a bunch of -// arguments and call fuse_main. - diff --git a/trunk/ceph/client/hadoop/CephClientInterface.h b/trunk/ceph/client/hadoop/CephClientInterface.h deleted file mode 100644 index e0b37c305029e..0000000000000 --- a/trunk/ceph/client/hadoop/CephClientInterface.h +++ /dev/null @@ -1,115 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -#include -#include -#include -#ifdef DARWIN -#include -#else -#include -#endif // DARWIN - -// ceph stuff -#include "include/types.h" - -#include "Client.h" - -#include "config.h" - -// stl -#include - - - - - - -// stbuf holds the attributes -static int ceph_getattr(Client* client, const char *path, struct stat *stbuf); - -// reads a symlink -static int ceph_readlink(Client* client, const char *path, char *buf, size_t size); - -// to do: remove fuse stuff from this one -//static int ceph_getdir(Client* client, const char *path, fuse_dirh_t h, fuse_dirfil_t filler); - -// looks irrelevant - it's for special device files -static int ceph_mknod(Client* client, const char *path, mode_t mode, dev_t rdev); - -// mode is the file permission bits -static int ceph_mkdir(Client* client, const char *path, mode_t mode); - -// delete! -static int ceph_unlink(Client* client, const char *path); - -// delete! if it's an empty directory -static int ceph_rmdir(Client* client, const char *path); - -// make a symlink -static int ceph_symlink(Client* client, const char *from, const char *to); - -// self-explanatory -static int ceph_rename(Client* client, const char *from, const char *to); - -static int ceph_link(Client* client, const char *from, const char *to); //hard link - -static int ceph_chmod(Client* client, const char *path, mode_t mode); //just chmod - -static int ceph_chown(Client* client, const char *path, uid_t uid, gid_t gid); //duh - -static int ceph_truncate(Client* client, const char *path, off_t size); //chop or zero-pad to size - -// set file access/modification times -static int ceph_utime(Client* client, const char *path, struct utimbuf *buf); - -// ok, gotta figure out what's in fuse_file_info and how to use it. Presumably it includes -// a file descriptor and the open flags? -static int ceph_open(Client* client, const char *path, struct fuse_file_info *fi); - -// read! -static int ceph_read(Client* client, const char *path, char *buf, size_t size, off_t offset, - struct fuse_file_info *fi); - -// write! -static int ceph_write(Client* client, const char *path, const char *buf, size_t size, - off_t offset, struct fuse_file_info *fi); - -/* was already commented out -static int ceph_flush(const char *path, struct fuse_file_info *fi); -*/ - - -// is this statvfs perhaps? we probably don't need it -#ifdef DARWIN -static int ceph_statfs(Client* client, const char *path, struct statvfs *stbuf); -#else -static int ceph_statfs(Client* client, const char *path, struct statfs *stbuf); -#endif - -// Remove fuse stuff from these two -//static int ceph_release(Client* client, const char *path, struct fuse_file_info *fi); - -//static int ceph_fsync(Client* client, const char *path, int isdatasync, struct fuse_file_info *fi); //kinda like flush? - -/* ceph_fuse_main - * - start up fuse glue, attached to Client* cl. - * - argc, argv should include a mount point, and - * any weird fuse options you want. by default, - * we will put fuse in the foreground so that it - * won't fork and we can see stdout. - */ -// int ceph_fuse_main(Client *cl, int argc, char *argv[]); diff --git a/trunk/ceph/client/hadoop/CephFSInterface.cc b/trunk/ceph/client/hadoop/CephFSInterface.cc new file mode 100644 index 0000000000000..184587aaea5d8 --- /dev/null +++ b/trunk/ceph/client/hadoop/CephFSInterface.cc @@ -0,0 +1,612 @@ +#include "CephFSInterface.h" + +using namespace std; + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_initializeClient + * Signature: ()J + * Initializes a ceph client. + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1initializeClient + (JNIEnv *, jobject) +{ + + + //vector args; + //argv_to_vec(argc, argv, args); + //parse_config_options(args); + + // args for fuse + // vec_to_argv(args, argc, argv); + + // FUSE will chdir("/"); be ready. + g_conf.use_abspaths = true; + + // load monmap + MonMap monmap; + int r = monmap.read(".ceph_monmap"); + if (r < 0) { + cout << "could not find .ceph_monmap"; + return 0; + } + //assert(r >= 0); + + // start up network + // ERROR - undefined reference to "rank" + // rank.start_rank(); + + // start client + Client *client; + // ERROR - undefined reference to "rank" + // client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap); + client->init(); + + // start up fuse + // use my argc, argv (make sure you pass a mount point!) + cout << "mounting" << endl; + client->mount(); + + //cerr << "starting fuse on pid " << getpid() << endl; + //ceph_fuse_main(client, argc, argv); + //cerr << "fuse finished on pid " << getpid() << endl; + + //client->unmount(); + //cout << "unmounted" << endl; + //client->shutdown(); + + //delete client; + + // wait for messenger to finish + //rank.wait(); + + return (jlong)client; + +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_copyFromLocalFile + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyFromLocalFile +(JNIEnv * env, jobject obj, jlong clientp, jstring j_local_path, jstring j_ceph_path) { + + Client* client; + client = (Client*) clientp; + const char* c_local_path = env->GetStringUTFChars(j_local_path, 0); + const char* c_ceph_path = env->GetStringUTFChars(j_ceph_path, 0); + + struct stat st; + int r = ::stat(c_local_path, &st); + assert (r == 0); + + // open the files + int fh_local = ::open(c_local_path, O_RDONLY); + int fh_ceph = client->open(c_ceph_path, O_WRONLY|O_CREAT|O_TRUNC); + assert (fh_local > -1); + assert (fh_ceph > -1); + + // get the source file size + bufferlist finalbl; + off_t remaining = st.st_size; + + + // copy the file a chunk at a time + const int chunk = 1048576; + + while (remaining > 0) { + bufferlist bl; + bufferptr bp(MIN(remaining, chunk)); + bl.push_back(bp); + off_t got = ::read(fh_local, bp.c_str(), MIN(remaining,chunk)); + assert(got > 0); + remaining -= got; + off_t wrote = client->write(fh_ceph, bp.c_str(), got, -1); + assert (got == wrote); + finalbl.claim_append(bl); + } + client->close(fh_ceph); + ::close(fh_local); + + env->ReleaseStringUTFChars(j_local_path, c_local_path); + env->ReleaseStringUTFChars(j_ceph_path, c_ceph_path); + + return JNI_TRUE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_copyToLocalFile + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyToLocalFile +(JNIEnv *env, jobject obj, jlong clientp, jstring j_ceph_path, jstring j_local_path) +{ + Client* client; + client = (Client*) clientp; + const char* c_ceph_path = env->GetStringUTFChars(j_ceph_path, 0); + const char* c_local_path = env->GetStringUTFChars(j_local_path, 0); + + // get source file size + struct stat st; + int r = client->lstat(c_ceph_path, &st); + assert (r == 0); + + int fh_ceph = client->open(c_ceph_path, O_WRONLY|O_CREAT|O_TRUNC); + int fh_local = ::open(c_local_path, O_RDONLY); + assert (fh_ceph > -1); + assert (fh_local > -1); + + // copy the file a chunk at a time + const int chunk = 1048576; + + bufferlist finalbl; + off_t remaining = st.st_size; + while (remaining > 0) { + bufferlist bl; + bufferptr bp(MIN(remaining, chunk)); + bl.push_back(bp); + off_t got = client->read(fh_ceph, bp.c_str(), MIN(remaining,chunk), -1); + assert(got > 0); + remaining -= got; + off_t wrote = ::write(fh_local, bp.c_str(), got); + assert (got == wrote); + finalbl.claim_append(bl); + } + client->close(fh_ceph); + ::close(fh_local); + + env->ReleaseStringUTFChars(j_local_path, c_local_path); + env->ReleaseStringUTFChars(j_ceph_path, c_ceph_path); + + return JNI_TRUE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getcwd + * Signature: (J)Ljava/lang/String; + * Returns the current working directory. + */ +JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getcwd + (JNIEnv *env, jobject obj, jlong clientp) +{ + Client* client; + client = (Client*) clientp; + + return (env->NewStringUTF(client->getcwd().c_str())); +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_setcwd + * Signature: (JLjava/lang/String;)Z + * + * Changes the working directory. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1setcwd +(JNIEnv *env, jobject obj, jlong clientp, jstring j_path) +{ + Client* client; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + return (0 <= client->chdir(c_path)) ? JNI_TRUE : JNI_FALSE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_rmdir + * Signature: (JLjava/lang/String;)Z + * Removes an empty directory. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rmdir + (JNIEnv *env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + return (0 == client->rmdir(c_path)) ? JNI_TRUE : JNI_FALSE; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_mkdir + * Signature: (JLjava/lang/String;)Z + * Creates a directory with full permissions. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1mkdir + (JNIEnv * env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + return (0 == client->mkdir(c_path, 0xFF)) ? JNI_TRUE : JNI_FALSE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_unlink + * Signature: (JLjava/lang/String;)Z + * Unlinks a path. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1unlink + (JNIEnv * env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + return (0 == client->unlink(c_path)) ? JNI_TRUE : JNI_FALSE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_rename + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + * Renames a file. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rename + (JNIEnv *env, jobject, jlong clientp, jstring j_from, jstring j_to) +{ + Client* client; + client = (Client*) clientp; + + const char* c_from = env->GetStringUTFChars(j_from, 0); + const char* c_to = env->GetStringUTFChars(j_to, 0); + + return (0 <= client->rename(c_from, c_to)) ? JNI_TRUE : JNI_FALSE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_exists + * Signature: (JLjava/lang/String;)Z + * Returns true if the path exists. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1exists +(JNIEnv *env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + struct stat *stbuf; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + return (0 <= client->lstat(c_path, stbuf)) ? JNI_TRUE : JNI_FALSE; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getblocksize + * Signature: (JLjava/lang/String;)J + * Returns the block size. Size is -1 if the file + * does not exist. + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getblocksize + (JNIEnv *env, jobject obj, jlong clientp, jstring j_path) + +{ + Client* client; + struct stat *stbuf; + client = (Client*) clientp; + + jint result; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + if (0 > client->lstat(c_path, stbuf)) + result = -1; + else + result = stbuf->st_blksize; + + env->ReleaseStringUTFChars(j_path, c_path); + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getfilesize + * Signature: (JLjava/lang/String;)J + * Returns the file size, or -1 on failure. + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getfilesize + (JNIEnv *env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + struct stat *stbuf; + client = (Client*) clientp; + + jlong result; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + if (0 > client->lstat(c_path, stbuf)) result = -1; + else result = stbuf->st_size; + env->ReleaseStringUTFChars(j_path, c_path); + + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_isdirectory + * Signature: (JLjava/lang/String;)Z + * Returns true if the path is a directory. + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isdirectory + (JNIEnv *env, jobject, jlong clientp, jstring j_path) +{ + Client* client; + struct stat *stbuf; + client = (Client*) clientp; + + const char* c_path = env->GetStringUTFChars(j_path, 0); + int result = client->lstat(c_path, stbuf); + env->ReleaseStringUTFChars(j_path, c_path); + + // if the stat call failed, it's definitely not a directory... + if (0 > result) return JNI_FALSE; + + // check the stat result + return (0 == S_ISDIR(stbuf->st_mode)) ? JNI_FALSE : JNI_TRUE; + + +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getdir + * Signature: (JLjava/lang/String;)[Ljava/lang/String; + */ +JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getdir +(JNIEnv *env, jobject obj, jlong clientp, jstring j_path) { + + Client* client; + client = (Client*) clientp; + + // get the directory listing + map contents; + const char* c_path = env->GetStringUTFChars(j_path, 0); + int result = client->getdir(c_path, contents); + env->ReleaseStringUTFChars(j_path, c_path); + + if (result < 0) return NULL; + + jint dir_size = contents.size(); + if (dir_size < 1) return NULL; + + // Create a Java String array of the size of the directory listing + //jstring blankString = env->NewStringUTF(""); + jclass stringClass = env->FindClass("java/lang/string"); + jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(dir_size, stringClass, NULL); + + + // populate the array with the elements of the directory list + int i = 0; + for (map::iterator it = contents.begin(); + it != contents.end(); + it++) { + env->SetObjectArrayElement(dirListingStringArray, i, + env->NewStringUTF(it->first.c_str())); + } + + return dirListingStringArray; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_open_for_read + * Signature: (JLjava/lang/String;)I + * Open a file for reading. + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1read + (JNIEnv *env, jobject obj, jlong clientp, jstring j_path) + +{ + Client* client; + client = (Client*) clientp; + + jint result; + + // open as read-only: flag = O_RDONLY + + const char* c_path = env->GetStringUTFChars(j_path, 0); + result = client->open(c_path, O_RDONLY); + env->ReleaseStringUTFChars(j_path, c_path); + + // returns file handle, or -1 on failure + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_open_for_overwrite + * Signature: (JLjava/lang/String;)I + * Opens a file for overwriting; creates it if necessary. + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1overwrite + (JNIEnv *env, jobject obj, jlong clientp, jstring j_path) +{ + Client* client; + client = (Client*) clientp; + + jint result; + + + const char* c_path = env->GetStringUTFChars(j_path, 0); + result = client->open(c_path, O_WRONLY|O_CREAT|O_TRUNC); + env->ReleaseStringUTFChars(j_path, c_path); + + // returns file handle, or -1 on failure + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_read + * Signature: (JI[BII)I + * Reads into the given byte array from the current position. + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read + (JNIEnv *env, jobject obj, jlong clientp, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) +{ + // IMPORTANT NOTE: Hadoop read arguments are a bit different from POSIX so we + // have to convert. The read is *always* from the current position in the file, + // and buffer_offset is the location in the *buffer* where we start writing. + + + Client* client; + client = (Client*) clientp; + jint result; + + // Step 1: get a pointer to the buffer. + jbyte* j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); + char* c_buffer = (char*) j_buffer_ptr; + + // Step 2: pointer arithmetic to start in the right buffer position + c_buffer += (int)buffer_offset; + + // Step 3: do the read + result = client->read((int)fh, c_buffer, length, -1); + + // Step 4: release the pointer to the buffer + env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); + + + return result; + +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_seek_from_start + * Signature: (JIJ)J + * Seeks to the given position. + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start + (JNIEnv *env, jobject obj, jlong clientp, jint fh, jlong pos) +{ + + Client* client; + client = (Client*) clientp; + jint result; + + result = client->lseek(fh, pos, SEEK_SET); + + return result; +} + + +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos + (JNIEnv *env, jobject obj, jlong clientp, jint fh) +{ + Client* client; + client = (Client*) clientp; + jint result; + + // seek a distance of 0 to get current offset + result = client->lseek(fh, 0, SEEK_CUR); + + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_close + * Signature: (JI)I + * Closes the file. + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close + (JNIEnv *env, jobject obj, jlong clientp, jint fh) +{ + Client* client; + client = (Client*) clientp; + jint result; + + result = client->close(fh); + + return result; +} + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_seek_from_start + * Signature: (JIJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1seek_1from_1start + (JNIEnv *env, jobject obj, jlong clientp, jint fh, jlong pos) +{ + + Client* client; + client = (Client*) clientp; + jint result; + + result = client->lseek(fh, pos, SEEK_SET); + + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_getpos + * Signature: (JI)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos + (JNIEnv *env, jobject obj, jlong clientp, jint fh) +{ + Client* client; + client = (Client*) clientp; + jint result; + + // seek a distance of 0 to get current offset + result = client->lseek(fh, 0, SEEK_CUR); + + return result; +} + + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_write + * Signature: (JI[BII)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write + (JNIEnv *env, jobject obj, jlong clientp, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length) +{ + // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we + // have to convert. The write is *always* from the current position in the file, + // and buffer_offset is the location in the *buffer* where we start writing. + + + Client* client; + client = (Client*) clientp; + jint result; + + // Step 1: get a pointer to the buffer. + jbyte* j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL); + char* c_buffer = (char*) j_buffer_ptr; + + // Step 2: pointer arithmetic to start in the right buffer position + c_buffer += (int)buffer_offset; + + // Step 3: do the write + result = client->write((int)fh, c_buffer, length, -1); + + // Step 4: release the pointer to the buffer + env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0); + + return result; + +} + diff --git a/trunk/ceph/client/hadoop/CephFSInterface.h b/trunk/ceph/client/hadoop/CephFSInterface.h new file mode 100644 index 0000000000000..8cba3248c33dc --- /dev/null +++ b/trunk/ceph/client/hadoop/CephFSInterface.h @@ -0,0 +1,229 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem */ + +#include +#include "client/Client.h" +#include "config.h" +#include "client/fuse.h" +#include "msg/SimpleMessenger.h" +#include "common/Timer.h" + +#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem +#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem +#ifdef __cplusplus +extern "C" { +#endif + +#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE +#define org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE 1048576LL +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_initializeClient + * Signature: ()J + * Initializes a ceph client. + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1initializeClient +(JNIEnv *, jobject); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_copyFromLocalFile + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyFromLocalFile + (JNIEnv *, jobject, jlong, jstring, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_copyToLocalFile + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1copyToLocalFile + (JNIEnv *, jobject, jlong, jstring, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getcwd + * Signature: (J)Ljava/lang/String; + */ +JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getcwd + (JNIEnv *, jobject, jlong); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_setcwd + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1setcwd + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_rmdir + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rmdir + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_mkdir + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1mkdir + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_unlink + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1unlink + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_rename + * Signature: (JLjava/lang/String;Ljava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1rename + (JNIEnv *, jobject, jlong, jstring, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_exists + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1exists + (JNIEnv *, jobject, jlong, jstring); + + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getblocksize + * Signature: (JLjava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getblocksize + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getfilesize + * Signature: (JLjava/lang/String;)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getfilesize + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_isdirectory + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isdirectory + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_isfile + * Signature: (JLjava/lang/String;)Z + */ +JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1isfile + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_getdir + * Signature: (JLjava/lang/String;)[Ljava/lang/String; + */ +JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1getdir + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_open_for_read + * Signature: (JLjava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1read + (JNIEnv *, jobject, jlong, jstring); + +/* + * Class: org_apache_hadoop_fs_ceph_CephFileSystem + * Method: ceph_open_for_overwrite + * Signature: (JLjava/lang/String;)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephFileSystem_ceph_1open_1for_1overwrite + (JNIEnv *, jobject, jlong, jstring); + +#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE +#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_read + * Signature: (JI[BII)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read + (JNIEnv *, jobject, jlong, jint, jbyteArray, jint, jint); + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_seek_from_start + * Signature: (JIJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start + (JNIEnv *, jobject, jlong, jint, jlong); + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_getpos + * Signature: (JI)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos + (JNIEnv *, jobject, jlong, jint); + +/* + * Class: org_apache_hadoop_fs_ceph_CephInputStream + * Method: ceph_close + * Signature: (JI)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close + (JNIEnv *, jobject, jlong, jint); + +/* Header for class org_apache_hadoop_fs_ceph_CephOutputStream */ + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_seek_from_start + * Signature: (JIJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1seek_1from_1start + (JNIEnv *, jobject, jlong, jint, jlong); + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_getpos + * Signature: (JI)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos + (JNIEnv *, jobject, jlong, jint); + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_close + * Signature: (JI)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close + (JNIEnv *, jobject, jlong, jint); + +/* + * Class: org_apache_hadoop_fs_ceph_CephOutputStream + * Method: ceph_write + * Signature: (JI[BII)I + */ +JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write + (JNIEnv *, jobject, jlong, jint, jbyteArray, jint, jint); + +#ifdef __cplusplus +} +#endif +#endif -- 2.39.5