From f958d576853e67fc4a120177ee5f1d194f1a7035 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Thu, 15 Oct 2009 11:35:38 -0700 Subject: [PATCH] Hadoop: Now wraps all ceph calls in a CephFS ABC with a CephTalker implementing --- src/client/hadoop/ceph/CephFS.java | 189 ++++++++++++++++ src/client/hadoop/ceph/CephFileSystem.java | 247 ++++----------------- src/client/hadoop/ceph/CephTalker.java | 53 +++++ 3 files changed, 291 insertions(+), 198 deletions(-) create mode 100644 src/client/hadoop/ceph/CephFS.java create mode 100644 src/client/hadoop/ceph/CephTalker.java diff --git a/src/client/hadoop/ceph/CephFS.java b/src/client/hadoop/ceph/CephFS.java new file mode 100644 index 0000000000000..a95fdc87cdb6c --- /dev/null +++ b/src/client/hadoop/ceph/CephFS.java @@ -0,0 +1,189 @@ +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * + * Abstract base class for communicating with a Ceph filesystem and its + * C++ codebase from Java, or pretending to do so (for unit testing purposes). + * As only the Ceph package should be using this directly, all methods + * are protected. + */ +package org.apache.hadoop.fs.ceph; + +abstract class CephFS { + + /* + * Performs any necessary setup to allow general use of the filesystem. + * Inputs: + * String argsuments -- a command-line style input of Ceph config params + * int block_size -- the size in bytes to use for blocks + * Returns: true on success, false otherwise + */ + abstract protected boolean ceph_initializeClient(String arguments, int block_size); + + /* + * Returns the current working directory.(absolute) as a String + */ + abstract protected String ceph_getcwd(); + /* + * Changes the working directory. + * Inputs: + * String path: The path (relative or absolute) to switch to + * Returns: true on success, false otherwise. + */ + abstract protected boolean ceph_setcwd(String path); + /* + * Given a path to a directory, removes the directory.if empty. + * Inputs: + * jstring j_path: The path (relative or absolute) to the directory + * Returns: true on successful delete; false otherwise + */ + abstract protected boolean ceph_rmdir(String path); + /* + * Given a path, unlinks it. + * Inputs: + * String path: The path (relative or absolute) to the file or empty dir + * Returns: true if the unlink occurred, false otherwise. + */ + abstract protected boolean ceph_unlink(String path); + /* + * Changes a given path name to a new name. + * Inputs: + * jstring j_from: The path whose name you want to change. + * jstring j_to: The new name for the path. + * Returns: true if the rename occurred, false otherwise + */ + abstract protected boolean ceph_rename(String old_path, String new_path); + /* + * Returns true if it the input path exists, false + * if it does not or there is an unexpected failure. + */ + abstract protected boolean ceph_exists(String path); + /* + * Get the block size for a given path. + * Input: + * String path: The path (relative or absolute) you want + * the block size for. + * Returns: block size if the path exists, otherwise a negative number + * corresponding to the standard C++ error codes (which are positive). + */ + abstract protected long ceph_getblocksize(String path); + /* + * Returns true if the given path is a directory, false otherwise. + */ + abstract protected boolean ceph_isdirectory(String path); + /* + * Returns true if the given path is a file; false otherwise. + */ + abstract protected boolean ceph_isfile(String path); + /* + * Get the contents of a given directory. + * Inputs: + * String path: The path (relative or absolute) to the directory. + * Returns: A Java String[] of the contents of the directory, or + * NULL if there is an error (ie, path is not a dir). This listing + * will not contain . or .. entries. + */ + abstract protected String[] ceph_getdir(String path); + /* + * Create the specified directory and any required intermediate ones with the + * given mode. + */ + abstract protected int ceph_mkdirs(String path, int mode); + /* + * Open a file to append. If the file does not exist, it will be created. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ + abstract protected int ceph_open_for_append(String path); + /* + * Open a file for reading. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ + abstract protected int ceph_open_for_read(String path); + /* + * Opens a file for overwriting; creates it if necessary. + * Opening a dir is possible but may have bad results. + * Inputs: + * String path: The path to open. + * int mode: The mode to open with. + * Returns: an int filehandle, or a number<0 if an error occurs. + */ + abstract protected int ceph_open_for_overwrite(String path, int mode); + /* + * Closes a given filehandle. + */ + abstract protected int ceph_close(int filehandle); + /* + * Change the mode on a path. + * Inputs: + * String path: The path to change mode on. + * int mode: The mode to apply. + * Returns: true if the mode is properly applied, false if there + * is any error. + */ + abstract protected boolean ceph_setPermission(String path, int mode); + /* + * Closes the Ceph client. This should be called before shutting down + * (multiple times is okay but redundant). + */ + abstract protected boolean ceph_kill_client(); + /* + * Get the statistics on a path returned in a custom format defined below. + * Inputs: + * String path: The path to stat. + * Stat fill: The stat object to fill. + * Returns: true if the stat is successful, false otherwise. + */ + abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill); + /* + * Statfs a filesystem in a custom format defined in CephFileSystem. + * Inputs: + * String path: A path on the filesystem that you wish to stat. + * CephStat fill: The CephStat object to fill. + * Returns: true if successful and the CephStat is filled; false otherwise. + */ + abstract protected int ceph_statfs(String Path, CephFileSystem.CephStat fill); + /* + * Check how many times a path should be replicated (if it is + * degraded it may not actually be replicated this often). + * Inputs: + * String path: The path to check. + * Returns: an int containing the number of times replicated. + */ + abstract protected int ceph_replication(String path); + /* + * Find the IP:port addresses of the primary OSD for a given file and offset. + * Inputs: + * int fh: The filehandle for the file. + * long offset: The offset to get the location of. + * Returns: a String of the location as IP, or NULL if there is an error. + */ + abstract protected String ceph_hosts(int fh, long offset); + /* + * Set the mtime and atime for a given path. + * Inputs: + * String path: The path to set the times for. + * long mtime: The mtime to set, in millis since epoch (-1 to not set). + * long atime: The atime to set, in millis since epoch (-1 to not set) + * Returns: 0 if successful, an error code otherwise. + */ + abstract protected int ceph_setTimes(String path, long mtime, long atime); + +} diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java index d522167570163..1cebf706daa2c 100644 --- a/src/client/hadoop/ceph/CephFileSystem.java +++ b/src/client/hadoop/ceph/CephFileSystem.java @@ -78,172 +78,10 @@ public class CephFileSystem extends FileSystem { private final Path root; private boolean initialized = false; + private CephFS ceph = null; private boolean debug = false; private String fs_default_name; - - /* - * Performs any necessary setup to allow general use of the filesystem. - * Inputs: - * String argsuments -- a command-line style input of Ceph config params - * int block_size -- the size in bytes to use for blocks - * Returns: true on success, false otherwise - */ - private native boolean ceph_initializeClient(String arguments, int block_size); - - /* - * Returns the current working directory.(absolute) as a String - */ - private native String ceph_getcwd(); - /* - * Changes the working directory. - * Inputs: - * String path: The path (relative or absolute) to switch to - * Returns: true on success, false otherwise. - */ - private native boolean ceph_setcwd(String path); - /* - * Given a path to a directory, removes the directory.if empty. - * Inputs: - * jstring j_path: The path (relative or absolute) to the directory - * Returns: true on successful delete; false otherwise - */ - private native boolean ceph_rmdir(String path); - /* - * Given a path, unlinks it. - * Inputs: - * String path: The path (relative or absolute) to the file or empty dir - * Returns: true if the unlink occurred, false otherwise. - */ - private native boolean ceph_unlink(String path); - /* - * Changes a given path name to a new name. - * Inputs: - * jstring j_from: The path whose name you want to change. - * jstring j_to: The new name for the path. - * Returns: true if the rename occurred, false otherwise - */ - private native boolean ceph_rename(String old_path, String new_path); - /* - * Returns true if it the input path exists, false - * if it does not or there is an unexpected failure. - */ - private native boolean ceph_exists(String path); - /* - * Get the block size for a given path. - * Input: - * String path: The path (relative or absolute) you want - * the block size for. - * Returns: block size if the path exists, otherwise a negative number - * corresponding to the standard C++ error codes (which are positive). - */ - private native long ceph_getblocksize(String path); - /* - * Returns true if the given path is a directory, false otherwise. - */ - private native boolean ceph_isdirectory(String path); - /* - * Returns true if the given path is a file; false otherwise. - */ - private native boolean ceph_isfile(String path); - /* - * Get the contents of a given directory. - * Inputs: - * String path: The path (relative or absolute) to the directory. - * Returns: A Java String[] of the contents of the directory, or - * NULL if there is an error (ie, path is not a dir). This listing - * will not contain . or .. entries. - */ - private native String[] ceph_getdir(String path); - /* - * Create the specified directory and any required intermediate ones with the - * given mode. - */ - private native int ceph_mkdirs(String path, int mode); - /* - * Open a file to append. If the file does not exist, it will be created. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - private native int ceph_open_for_append(String path); - /* - * Open a file for reading. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - private native int ceph_open_for_read(String path); - /* - * Opens a file for overwriting; creates it if necessary. - * Opening a dir is possible but may have bad results. - * Inputs: - * String path: The path to open. - * int mode: The mode to open with. - * Returns: an int filehandle, or a number<0 if an error occurs. - */ - private native int ceph_open_for_overwrite(String path, int mode); - /* - * Closes a given filehandle. - */ - private native int ceph_close(int filehandle); - /* - * Change the mode on a path. - * Inputs: - * String path: The path to change mode on. - * int mode: The mode to apply. - * Returns: true if the mode is properly applied, false if there - * is any error. - */ - private native boolean ceph_setPermission(String path, int mode); - /* - * Closes the Ceph client. This should be called before shutting down - * (multiple times is okay but redundant). - */ - private native boolean ceph_kill_client(); - /* - * Get the statistics on a path returned in a custom format defined below. - * Inputs: - * String path: The path to stat. - * Stat fill: The stat object to fill. - * Returns: true if the stat is successful, false otherwise. - */ - private native boolean ceph_stat(String path, Stat fill); - /* - * Statfs a filesystem in a custom format defined in CephFileSystem. - * Inputs: - * String path: A path on the filesystem that you wish to stat. - * CephStat fill: The CephStat object to fill. - * Returns: true if successful and the CephStat is filled; false otherwise. - */ - private native int ceph_statfs(String Path, CephStat fill); - /* - * Check how many times a path should be replicated (if it is - * degraded it may not actually be replicated this often). - * Inputs: - * String path: The path to check. - * Returns: an int containing the number of times replicated. - */ - private native int ceph_replication(String path); - /* - * Find the IP:port addresses of the primary OSD for a given file and offset. - * Inputs: - * int fh: The filehandle for the file. - * long offset: The offset to get the location of. - * Returns: a String of the location as IP, or NULL if there is an error. - */ - private native String ceph_hosts(int fh, long offset); - /* - * Set the mtime and atime for a given path. - * Inputs: - * String path: The path to set the times for. - * long mtime: The mtime to set, in millis since epoch (-1 to not set). - * long atime: The atime to set, in millis since epoch (-1 to not set) - * Returns: 0 if successful, an error code otherwise. - */ - private native int ceph_setTimes(String path, long mtime, long atime); /** * Create a new CephFileSystem. @@ -276,12 +114,14 @@ public class CephFileSystem extends FileSystem { public void initialize(URI uri, Configuration conf) throws IOException { debug("initialize:enter", DEBUG); if (!initialized) { - System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); - System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); super.initialize(uri, conf); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - statistics = getStatistics(uri.getScheme(), getClass()); + statistics = getStatistics(uri.getScheme(), getClass()); + + if (ceph == null) { + ceph = new CephTalker(conf); + } fs_default_name = conf.get("fs.default.name"); debug = ("true".equals(conf.get("fs.ceph.debug", "false"))); @@ -311,18 +151,29 @@ public class CephFileSystem extends FileSystem { throw new IOException("You must specify a Ceph monitor address or config file!"); } // Initialize the client - if (!ceph_initializeClient(arguments, + if (!ceph.ceph_initializeClient(arguments, conf.getInt("fs.ceph.blockSize", 1<<26))) { debug("initialize:Ceph initialization failed!", FATAL); throw new IOException("Ceph initialization failed!"); } initialized = true; debug("initialize:Ceph initialized client. Setting cwd to /", INFO); - ceph_setcwd("/"); + ceph.ceph_setcwd("/"); } debug("initialize:exit", DEBUG); } + /** + * Used for testing purposes, this version of initialize + * sets the given CephFS instead of defaulting to a + * CephTalker (with its assumed real Ceph instance to talk to). + */ + protected void initialize(URI uri, Configuration conf, CephFS ceph_fs) + throws IOException{ + ceph = ceph_fs; + initialize(uri, conf); + } + /** * Close down the CephFileSystem. Runs the base-class close method * and then kills the Ceph client itself. @@ -335,7 +186,7 @@ public class CephFileSystem extends FileSystem { debug("close:enter", DEBUG); super.close();//this method does stuff, make sure it's run! debug("close: Calling ceph_kill_client from Java", TRACE); - ceph_kill_client(); + ceph.ceph_kill_client(); debug("close:exit", DEBUG); } @@ -356,7 +207,7 @@ public class CephFileSystem extends FileSystem { Path abs_path = makeAbsolute(file); if (progress!=null) progress.progress(); debug("append: Entering ceph_open_for_append from Java", TRACE); - int fd = ceph_open_for_append(abs_path.toString()); + int fd = ceph.ceph_open_for_append(abs_path.toString()); debug("append: Returned to Java", TRACE); if (progress!=null) progress.progress(); if( fd < 0 ) { //error in open @@ -375,9 +226,9 @@ public class CephFileSystem extends FileSystem { public Path getWorkingDirectory() { if (!initialized) return null; debug("getWorkingDirectory:enter", DEBUG); - String cwd = ceph_getcwd(); + String cwd = ceph.ceph_getcwd(); debug("getWorkingDirectory:exit with path " + cwd, DEBUG); - return new Path(fs_default_name + ceph_getcwd()); + return new Path(fs_default_name + ceph.ceph_getcwd()); } /** @@ -393,7 +244,7 @@ public class CephFileSystem extends FileSystem { debug("setWorkingDirecty:enter with new working dir " + dir, DEBUG); Path abs_path = makeAbsolute(dir); debug("setWorkingDirectory:calling ceph_setcwd from Java", TRACE); - if (!ceph_setcwd(abs_path.toString())) + if (!ceph.ceph_setcwd(abs_path.toString())) debug("setWorkingDirectory: WARNING! ceph_setcwd failed for some reason on path " + abs_path, ERROR); debug("setWorkingDirectory:exit", DEBUG); } @@ -418,7 +269,7 @@ public class CephFileSystem extends FileSystem { else { debug("exists:Calling ceph_exists from Java on path " + abs_path.toString(), TRACE); - result = ceph_exists(abs_path.toString()); + result = ceph.ceph_exists(abs_path.toString()); debug("exists:Returned from ceph_exists to Java", TRACE); } debug("exists:exit with value " + result, DEBUG); @@ -439,7 +290,7 @@ public class CephFileSystem extends FileSystem { debug("mkdirs:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); debug("mkdirs:calling ceph_mkdirs from Java", TRACE); - int result = ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); + int result = ceph.ceph_mkdirs(abs_path.toString(), (int)perms.toShort()); debug("mkdirs:exit with result " + result, DEBUG); if (result != 0) return false; @@ -465,7 +316,7 @@ public class CephFileSystem extends FileSystem { } else { debug("isFile:entering ceph_isfile from Java", TRACE); - result = ceph_isfile(abs_path.toString()); + result = ceph.ceph_isfile(abs_path.toString()); } debug("isFile:exit with result " + result, DEBUG); return result; @@ -490,7 +341,7 @@ public class CephFileSystem extends FileSystem { } else { debug("calling ceph_isdirectory from Java", TRACE); - result = ceph_isdirectory(abs_path.toString()); + result = ceph.ceph_isdirectory(abs_path.toString()); debug("Returned from ceph_isdirectory to Java", TRACE); } debug("isDirectory:exit with result " + result, DEBUG); @@ -515,9 +366,9 @@ public class CephFileSystem extends FileSystem { FileStatus status; Stat lstat = new Stat(); debug("getFileStatus: calling ceph_stat from Java", TRACE); - if(ceph_stat(abs_path.toString(), lstat)) { + if(ceph.ceph_stat(abs_path.toString(), lstat)) { status = new FileStatus(lstat.size, lstat.is_dir, - ceph_replication(abs_path.toString()), + ceph.ceph_replication(abs_path.toString()), lstat.block_size, lstat.mod_time, lstat.access_time, @@ -570,7 +421,7 @@ public class CephFileSystem extends FileSystem { + " and permissions " + permission, DEBUG); Path abs_path = makeAbsolute(p); debug("setPermission:calling ceph_setpermission from Java", TRACE); - ceph_setPermission(abs_path.toString(), permission.toShort()); + ceph.ceph_setPermission(abs_path.toString(), permission.toShort()); debug("setPermission:exit", DEBUG); } @@ -588,7 +439,7 @@ public class CephFileSystem extends FileSystem { " atime:" + atime, DEBUG); Path abs_path = makeAbsolute(p); debug("setTimes:calling ceph_setTimes from Java", TRACE); - int r = ceph_setTimes(abs_path.toString(), mtime, atime); + int r = ceph.ceph_setTimes(abs_path.toString(), mtime, atime); if (r<0) throw new IOException ("Failed to set times on path " + abs_path.toString() + " Error code: " + r); debug("setTimes:exit", DEBUG); @@ -650,7 +501,7 @@ public class CephFileSystem extends FileSystem { if (!exists) { Path parent = abs_path.getParent(); if (parent != null) { // if parent is root, we're done - int r = ceph_mkdirs(parent.toString(), permission.toShort()); + int r = ceph.ceph_mkdirs(parent.toString(), permission.toShort()); if (!(r==0 || r==-EEXIST)) throw new IOException ("Error creating parent directory; code: " + r); } @@ -658,7 +509,7 @@ public class CephFileSystem extends FileSystem { } // Step 3: open the file debug("calling ceph_open_for_overwrite from Java", TRACE); - int fh = ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); + int fh = ceph.ceph_open_for_overwrite(abs_path.toString(), (int)permission.toShort()); if (progress!=null) progress.progress(); debug("Returned from ceph_open_for_overwrite to Java with fh " + fh, TRACE); if (fh < 0) { @@ -686,7 +537,7 @@ public class CephFileSystem extends FileSystem { debug("open:enter with path " + path, DEBUG); Path abs_path = makeAbsolute(path); - int fh = ceph_open_for_read(abs_path.toString()); + int fh = ceph.ceph_open_for_read(abs_path.toString()); if (fh < 0) { //uh-oh, something's bad! if (fh == -ENOENT) //well that was a stupid open throw new IOException("open: absolute path \"" + abs_path.toString() @@ -697,13 +548,13 @@ public class CephFileSystem extends FileSystem { if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories //but that doesn't mean you should in Hadoop! - ceph_close(fh); + ceph.ceph_close(fh); throw new IOException("open: absolute path \"" + abs_path.toString() + "\" is a directory!"); } Stat lstat = new Stat(); debug("open:calling ceph_stat from Java", TRACE); - ceph_stat(abs_path.toString(), lstat); + ceph.ceph_stat(abs_path.toString(), lstat); debug("open:returned to Java", TRACE); long size = lstat.size; if (size < 0) { @@ -730,7 +581,7 @@ public class CephFileSystem extends FileSystem { Path abs_src = makeAbsolute(src); Path abs_dst = makeAbsolute(dst); debug("calling ceph_rename from Java", TRACE); - boolean result = ceph_rename(abs_src.toString(), abs_dst.toString()); + boolean result = ceph.ceph_rename(abs_src.toString(), abs_dst.toString()); debug("rename:exit with result: " + result, DEBUG); return result; } @@ -759,7 +610,7 @@ public class CephFileSystem extends FileSystem { //sanitize and get the filehandle Path abs_path = makeAbsolute(file.getPath()); debug("getFileBlockLocations:call ceph_open_for_read from Java", TRACE); - int fh = ceph_open_for_read(abs_path.toString()); + int fh = ceph.ceph_open_for_read(abs_path.toString()); debug("getFileBlockLocations:return from ceph_open_for_read to Java with fh " + fh, TRACE); if (fh < 0) { @@ -769,7 +620,7 @@ public class CephFileSystem extends FileSystem { } //get the block size debug("getFileBlockLocations:call ceph_getblocksize from Java", TRACE); - long blockSize = ceph_getblocksize(abs_path.toString()); + long blockSize = ceph.ceph_getblocksize(abs_path.toString()); debug("getFileBlockLocations:return from ceph_getblocksize", TRACE); BlockLocation[] locations = new BlockLocation[(int)Math.ceil(len/(float)blockSize)]; @@ -778,7 +629,7 @@ public class CephFileSystem extends FileSystem { offset = start + i*blockSize; debug("getFileBlockLocations:call ceph_hosts from Java on fh " + fh + " and offset " + offset, TRACE); - String host = ceph_hosts(fh, offset); + String host = ceph.ceph_hosts(fh, offset); debug("getFileBlockLocations:return from ceph_hosts to Java with host " + host, TRACE); String[] hostArray = new String[1]; @@ -789,7 +640,7 @@ public class CephFileSystem extends FileSystem { } debug("getFileBlockLocations:call ceph_close from Java on fh " + fh, TRACE); - ceph_close(fh); + ceph.ceph_close(fh); debug("getFileBlockLocations:return with " + locations.length + " locations", DEBUG); return locations; @@ -815,7 +666,7 @@ public class CephFileSystem extends FileSystem { //error-checking code. CephStat ceph_stat = new CephStat(); debug("getStatus:calling ceph_statfs from Java", TRACE); - int result = ceph_statfs(abs_path.toString(), ceph_stat); + int result = ceph.ceph_statfs(abs_path.toString(), ceph_stat); if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result); debug("getStatus:exit successfully", DEBUG); return new FsStatus(ceph_stat.capacity, @@ -849,7 +700,7 @@ public class CephFileSystem extends FileSystem { if (isFile(abs_path)) { debug("delete:calling ceph_unlink from Java with path " + abs_path, TRACE); - boolean result = ceph_unlink(abs_path.toString()); + boolean result = ceph.ceph_unlink(abs_path.toString()); if(!result) debug("delete: failed to delete file \"" + abs_path.toString() + "\".", ERROR); @@ -882,7 +733,7 @@ public class CephFileSystem extends FileSystem { } } //if we've come this far it's a now-empty directory, so delete it! - boolean result = ceph_rmdir(abs_path.toString()); + boolean result = ceph.ceph_rmdir(abs_path.toString()); if (!result) debug("delete: failed to delete \"" + abs_path.toString() + "\", BAILING", ERROR); @@ -925,7 +776,7 @@ public class CephFileSystem extends FileSystem { debug("makeAbsolute:exit with path " + path, NOLOG); return path; } - Path new_path = new Path(ceph_getcwd(), path); + Path new_path = new Path(ceph.ceph_getcwd(), path); debug("makeAbsolute:exit with path " + new_path, NOLOG); return new_path; } @@ -938,7 +789,7 @@ public class CephFileSystem extends FileSystem { // If it's a directory, get the listing. Otherwise, complain and give up. debug("calling ceph_getdir from Java with path " + abs_path, NOLOG); - dirlist = ceph_getdir(abs_path.toString()); + dirlist = ceph.ceph_getdir(abs_path.toString()); debug("returning from ceph_getdir to Java", NOLOG); if (dirlist == null) { @@ -981,7 +832,7 @@ public class CephFileSystem extends FileSystem { } } - private static class Stat { + static class Stat { public long size; public boolean is_dir; public long block_size; @@ -992,7 +843,7 @@ public class CephFileSystem extends FileSystem { public Stat(){} } - private static class CephStat { + static class CephStat { public long capacity; public long used; public long remaining; diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java new file mode 100644 index 0000000000000..d9c32461eab63 --- /dev/null +++ b/src/client/hadoop/ceph/CephTalker.java @@ -0,0 +1,53 @@ +// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*- +/** + * + * Licensed under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + * + * + * Wraps a number of native function calls to communicate with the Ceph + * filesystem. + */ +package org.apache.hadoop.fs.ceph; + +import org.apache.hadoop.conf.Configuration; + +class CephTalker extends CephFS { + //we write a constructor so we can load the libraries + public CephTalker(Configuration conf) { + System.load(conf.get("fs.ceph.libDir")+"/libhadoopcephfs.so"); + System.load(conf.get("fs.ceph.libDir")+"/libceph.so"); + } + protected native boolean ceph_initializeClient(String arguments, int block_size); + protected native String ceph_getcwd(); + protected native boolean ceph_setcwd(String path); + protected native boolean ceph_rmdir(String path); + protected native boolean ceph_unlink(String path); + protected native boolean ceph_rename(String old_path, String new_path); + protected native boolean ceph_exists(String path); + protected native long ceph_getblocksize(String path); + protected native boolean ceph_isdirectory(String path); + protected native boolean ceph_isfile(String path); + protected native String[] ceph_getdir(String path); + protected native int ceph_mkdirs(String path, int mode); + protected native int ceph_open_for_append(String path); + protected native int ceph_open_for_read(String path); + protected native int ceph_open_for_overwrite(String path, int mode); + protected native int ceph_close(int filehandle); + protected native boolean ceph_setPermission(String path, int mode); + protected native boolean ceph_kill_client(); + protected native boolean ceph_stat(String path, CephFileSystem.Stat fill); + protected native int ceph_statfs(String Path, CephFileSystem.CephStat fill); + protected native int ceph_replication(String path); + protected native String ceph_hosts(int fh, long offset); + protected native int ceph_setTimes(String path, long mtime, long atime); +} -- 2.39.5