-// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
String arguments = "CephFSInterface";
arguments += conf.get("fs.ceph.commandLine", "");
if (conf.get("fs.ceph.clientDebug") != null) {
- arguments += " --debug_client ";
- arguments += conf.get("fs.ceph.clientDebug");
+ arguments += " --debug_client ";
+ arguments += conf.get("fs.ceph.clientDebug");
}
if (conf.get("fs.ceph.messengerDebug") != null) {
- arguments += " --debug_ms ";
- arguments += conf.get("fs.ceph.messengerDebug");
+ arguments += " --debug_ms ";
+ arguments += conf.get("fs.ceph.messengerDebug");
}
if (conf.get("fs.ceph.monAddr") != null) {
- arguments += " -m ";
- arguments += conf.get("fs.ceph.monAddr");
+ arguments += " -m ";
+ arguments += conf.get("fs.ceph.monAddr");
}
- arguments += " --client-readahead-max-periods="
- + conf.get("fs.ceph.readahead", "1");
+ arguments += " --client-readahead-max-periods="
+ + conf.get("fs.ceph.readahead", "1");
//make sure they gave us a ceph monitor address or conf file
if ( (conf.get("fs.ceph.monAddr") == null) &&
- (arguments.indexOf("-m") == -1) &&
- (arguments.indexOf("-c") == -1) ) {
- if(debug) debug("You need to specify a Ceph monitor address.");
- throw new IOException("You must specify a Ceph monitor address or config file!");
+ (arguments.indexOf("-m") == -1) &&
+ (arguments.indexOf("-c") == -1) ) {
+ if(debug) debug("You need to specify a Ceph monitor address.");
+ throw new IOException("You must specify a Ceph monitor address or config file!");
}
// Initialize the client
if (!ceph_initializeClient(arguments,
- conf.getInt("fs.ceph.blockSize", 1<<26))) {
- if(debug) debug("Ceph initialization failed!");
- throw new IOException("Ceph initialization failed!");
+ conf.getInt("fs.ceph.blockSize", 1<<26))) {
+ if(debug) debug("Ceph initialization failed!");
+ throw new IOException("Ceph initialization failed!");
}
initialized = true;
if(debug) debug("Initialized client. Setting cwd to /");
*/
@Override
public void close() throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("close:enter");
super.close();//this method does stuff, make sure it's run!
ceph_kill_client();
* @throws IOException If initialize() hasn't been called or the file cannot be found or appended to.
*/
public FSDataOutputStream append (Path file, int bufferSize,
- Progressable progress) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ Progressable progress) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("append:enter with path " + file + " bufferSize " + bufferSize);
Path abs_path = makeAbsolute(file);
if (progress!=null) progress.progress();
if (progress!=null) progress.progress();
if( fd < 0 ) { //error in open
throw new IOException("append: Open for append failed on path \"" +
- abs_path.toString() + "\"");
+ abs_path.toString() + "\"");
}
CephOutputStream cephOStream = new CephOutputStream(getConf(), fd);
if(debug) debug("append:exit");
* @param dir The directory to change to.
*/
@Override
- public void setWorkingDirectory(Path dir) {
+ public void setWorkingDirectory(Path dir) {
if (!initialized) return;
if(debug) debug("setWorkingDirecty:enter with new working dir " + dir);
Path abs_path = makeAbsolute(dir);
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean exists(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public boolean exists(Path path) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("exists:enter with path " + path);
boolean result;
Path abs_path = makeAbsolute(path);
}
else {
if(debug) debug("Calling ceph_exists from Java on path "
- + abs_path.toString() + ":");
+ + abs_path.toString() + ":");
result = ceph_exists(abs_path.toString());
if(debug) debug("Returned from ceph_exists to Java");
}
* @throws IOException if initialize() hasn't been called.
*/
public boolean mkdirs(Path path, FsPermission perms) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("mkdirs:enter with path " + path);
Path abs_path = makeAbsolute(path);
if(debug) debug("calling ceph_mkdirs from Java");
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean isFile(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public boolean isFile(Path path) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("isFile:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean isDirectory(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public boolean isDirectory(Path path) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("isDirectory:enter with path " + path);
Path abs_path = makeAbsolute(path);
boolean result;
* @throws FileNotFoundException if the path could not be resolved.
*/
public FileStatus getFileStatus(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("getFileStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
//sadly, Ceph doesn't really do uids/gids just yet, but
Stat lstat = new Stat();
if(ceph_stat(abs_path.toString(), lstat)) {
status = new FileStatus(lstat.size, lstat.is_dir,
- ceph_replication(abs_path.toString()),
- lstat.block_size,
- lstat.mod_time,
- lstat.access_time,
- new FsPermission((short)lstat.mode),
- null,
- null,
- new Path(fs_default_name+abs_path.toString()));
+ ceph_replication(abs_path.toString()),
+ lstat.block_size,
+ lstat.mod_time,
+ lstat.access_time,
+ new FsPermission((short)lstat.mode),
+ null,
+ null,
+ new Path(fs_default_name+abs_path.toString()));
}
else { //fail out
- throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
- + path + " does not exist or could not be accessed");
+ throw new FileNotFoundException("org.apache.hadoop.fs.ceph.CephFileSystem: File "
+ + path + " does not exist or could not be accessed");
}
if(debug) debug("getFileStatus:exit");
* @throws FileNotFoundException if the input path can't be found.
*/
public FileStatus[] listStatus(Path path) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("listStatus:enter with path " + path);
Path abs_path = makeAbsolute(path);
Path[] paths = listPaths(abs_path);
if (paths != null) {
FileStatus[] statuses = new FileStatus[paths.length];
for (int i = 0; i < paths.length; ++i) {
- statuses[i] = getFileStatus(paths[i]);
+ statuses[i] = getFileStatus(paths[i]);
}
if(debug) debug("listStatus:exit");
return statuses;
}
@Override
- public void setPermission(Path p, FsPermission permission) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public void setPermission(Path p, FsPermission permission) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
Path abs_path = makeAbsolute(p);
ceph_setPermission(abs_path.toString(), permission.toShort());
}
* @param mtime Set modification time in number of millis since Jan 1, 1970.
* @param atime Set access time in number of millis since Jan 1, 1970.
*/
-@Override
- public void setTimes(Path p, long mtime, long atime) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
- Path abs_path = makeAbsolute(p);
- int r = ceph_setTimes(abs_path.toString(), mtime, atime);
- if (r<0) throw new IOException ("Failed to set times on path "
- + abs_path.toString() + " Error code: " + r);
-}
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
+ Path abs_path = makeAbsolute(p);
+ int r = ceph_setTimes(abs_path.toString(), mtime, atime);
+ if (r<0) throw new IOException ("Failed to set times on path "
+ + abs_path.toString() + " Error code: " + r);
+ }
/**
* Create a new file and open an FSDataOutputStream that's connected to it.
* failure in attempting to open for append with Ceph.
*/
public FSDataOutputStream create(Path path,
- FsPermission permission,
- EnumSet<CreateFlag> flag,
- //boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress
- ) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ FsPermission permission,
+ EnumSet<CreateFlag> flag,
+ //boolean overwrite,
+ int bufferSize,
+ short replication,
+ long blockSize,
+ Progressable progress
+ ) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("create:enter with path " + path);
Path abs_path = makeAbsolute(path);
if (progress!=null) progress.progress();
boolean exists = exists(abs_path);
if (exists) {
if(isDirectory(abs_path))
- throw new IOException("create: Cannot overwrite existing directory \""
- + path.toString() + "\" with a file");
+ throw new IOException("create: Cannot overwrite existing directory \""
+ + path.toString() + "\" with a file");
//if (!overwrite)
if (!flag.contains(CreateFlag.OVERWRITE))
- throw new IOException("createRaw: Cannot open existing file \""
- + abs_path.toString()
- + "\" for writing without overwrite flag");
+ throw new IOException("createRaw: Cannot open existing file \""
+ + abs_path.toString()
+ + "\" for writing without overwrite flag");
}
if (progress!=null) progress.progress();
if (!exists) {
Path parent = abs_path.getParent();
if (parent != null) { // if parent is root, we're done
- int r = ceph_mkdirs(parent.toString(), permission.toShort());
- if (!(r==0 || r==-EEXIST))
- throw new IOException ("Error creating parent directory; code: " + r);
+ int r = ceph_mkdirs(parent.toString(), permission.toShort());
+ if (!(r==0 || r==-EEXIST))
+ throw new IOException ("Error creating parent directory; code: " + r);
}
if (progress!=null) progress.progress();
}
if(debug) debug("Returned from ceph_open_for_overwrite to Java with fh " + fh);
if (fh < 0) {
throw new IOException("create: Open for overwrite failed on path \"" +
- path.toString() + "\"");
+ path.toString() + "\"");
}
// Step 4: create the stream
OutputStream cephOStream = new CephOutputStream(getConf(), fh);
if(debug) debug("create:exit");
return new FSDataOutputStream(cephOStream, statistics);
- }
+ }
/**
* Open a Ceph file and attach the file handle to an FSDataInputStream.
* directory, or there is an error getting data to set up the FSDataInputStream.
*/
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("open:enter with path " + path);
Path abs_path = makeAbsolute(path);
int fh = ceph_open_for_read(abs_path.toString());
if (fh < 0) { //uh-oh, something's bad!
if (fh == -ENOENT) //well that was a stupid open
- throw new IOException("open: absolute path \"" + abs_path.toString()
- + "\" does not exist");
+ throw new IOException("open: absolute path \"" + abs_path.toString()
+ + "\" does not exist");
else //hrm...the file exists but we can't open it :(
- throw new IOException("open: Failed to open file " + abs_path.toString());
+ throw new IOException("open: Failed to open file " + abs_path.toString());
}
if(isDirectory(abs_path)) { //yes, it is possible to open Ceph directories
//but that doesn't mean you should in Hadoop!
ceph_close(fh);
throw new IOException("open: absolute path \"" + abs_path.toString()
- + "\" is a directory!");
+ + "\" is a directory!");
}
Stat lstat = new Stat();
ceph_stat(abs_path.toString(), lstat);
long size = lstat.size;
if (size < 0) {
throw new IOException("Failed to get file size for file " + abs_path.toString() +
- " but succeeded in opening file. Something bizarre is going on.");
+ " but succeeded in opening file. Something bizarre is going on.");
}
FSInputStream cephIStream = new CephInputStream(getConf(), fh, size);
if(debug) debug("open:exit");
return new FSDataInputStream(cephIStream);
- }
+ }
/**
* Rename a file or directory.
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public boolean rename(Path src, Path dst) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public boolean rename(Path src, Path dst) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("rename:enter");
if(debug) debug("calling ceph_rename from Java");
Path abs_src = makeAbsolute(src);
* @throws IOException if initialize() hasn't been called.
*/
@Override
- public BlockLocation[] getFileBlockLocations(FileStatus file,
- long start, long len) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ public BlockLocation[] getFileBlockLocations(FileStatus file,
+ long start, long len) throws IOException {
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
//sanitize and get the filehandle
Path abs_path = makeAbsolute(file.getPath());
int fh = ceph_open_for_read(abs_path.toString());
String[] hostArray = new String[1];
hostArray[0] = host;
locations[i] = new BlockLocation(hostArray, hostArray,
- start+i*blockSize, blockSize);
+ start+i*blockSize, blockSize);
}
ceph_close(fh);
return locations;
* stat somehow fails.
*/
@Override
- public FsStatus getStatus (Path path) throws IOException {
- if (!initialized) throw new IOException("You have to initialize the"
- + " CephFileSystem before calling other methods.");
+ public FsStatus getStatus (Path path) throws IOException {
+ if (!initialized) throw new IOException("You have to initialize the "
+ + " CephFileSystem before calling other methods.");
if(debug) debug("getStatus:enter");
Path abs_path = makeAbsolute(path);
if (result!=0) throw new IOException("Somehow failed to statfs the Ceph filesystem. Error code: " + result);
if(debug) debug("getStatus:exit");
return new FsStatus(ceph_stat.capacity,
- ceph_stat.used, ceph_stat.remaining);
+ ceph_stat.used, ceph_stat.remaining);
}
/**
* or you attempt to delete the root directory.
*/
public boolean delete(Path path, boolean recursive) throws IOException {
- if (!initialized) throw new IOException ("You have to initialize the"
- +"CephFileSystem before calling other methods.");
+ if (!initialized) throw new IOException ("You have to initialize the "
+ +"CephFileSystem before calling other methods.");
if(debug) debug("delete:enter");
Path abs_path = makeAbsolute(path);
if (isFile(abs_path)) {
boolean result = ceph_unlink(abs_path.toString());
if(!result)
- if(debug) debug("delete: failed to delete file \"" +
- abs_path.toString() + "\".");
+ if(debug) debug("delete: failed to delete file \"" +
+ abs_path.toString() + "\".");
if(debug) debug("delete:exit");
return result;
}
Path[] contents = listPaths(abs_path);
if (contents == null) {
if(debug) debug("delete: Failed to read contents of directory \"" +
- abs_path.toString() + "\" while trying to delete it");
+ abs_path.toString() + "\" while trying to delete it");
if(debug) debug("delete:exit");
return false;
}
// delete the entries
for (Path p : contents) {
if (!delete(p, true)) {
- if(debug) debug("delete: Failed to delete file \"" +
- p.toString() + "\" while recursively deleting \""
- + abs_path.toString() + "\"" );
- if(debug) debug("delete:exit");
- return false;
+ if(debug) debug("delete: Failed to delete file \"" +
+ p.toString() + "\" while recursively deleting \""
+ + abs_path.toString() + "\"" );
+ if(debug) debug("delete:exit");
+ return false;
}
}
//if we've come this far it's a now-empty directory, so delete it!
* by a separate Ceph configuration.
*/
@Override
- public short getDefaultReplication() {
+ public short getDefaultReplication() {
return 1;
}
* @return the default block size, in bytes, as a long.
*/
@Override
- public long getDefaultBlockSize() {
+ public long getDefaultBlockSize() {
return getConf().getInt("fs.ceph.blockSize", 1<<26);
}
Path[] paths = new Path[dirlist.length];
for (int i = 0; i < dirlist.length; ++i) {
if(debug) debug("Raw enumeration of paths in \"" + abs_path.toString() + "\": \"" +
- dirlist[i] + "\"");
+ dirlist[i] + "\"");
// convert each listing to an absolute path
Path raw_path = new Path(dirlist[i]);
if (raw_path.isAbsolute())
- paths[i] = raw_path;
+ paths[i] = raw_path;
else
- paths[i] = new Path(abs_path, raw_path);
+ paths[i] = new Path(abs_path, raw_path);
}
if(debug) debug("listPaths:exit");
return paths;
-// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
closed = false;
debug = ("true".equals(conf.get("fs.ceph.debug", "false")));
if(debug) debug("CephInputStream constructor: initializing stream with fh "
- + fh + " and file length " + flength);
+ + fh + " and file length " + flength);
}
/** Ceph likes things to be closed before it shuts down,
* Find the number of bytes remaining in the file.
*/
@Override
- public synchronized int available() throws IOException {
+ public synchronized int available() throws IOException {
return (int) (fileLength - getPos());
}
public synchronized void seek(long targetPos) throws IOException {
if(debug) debug("CephInputStream.seek: Seeking to position " + targetPos +
- " on fd " + fileHandle);
+ " on fd " + fileHandle);
if (targetPos > fileLength) {
throw new IOException("CephInputStream.seek: failed seeking to position " + targetPos +
- " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
+ " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
}
ceph_seek_from_start(fileHandle, targetPos);
}
* @return the next byte.
*/
@Override
- public synchronized int read() throws IOException {
+ public synchronized int read() throws IOException {
if(debug) debug("CephInputStream.read: Reading a single byte from fd " + fileHandle
- + " by calling general read function");
+ + " by calling general read function");
byte result[] = new byte[1];
if (getPos() >= fileLength) return -1;
* @return 0 if successful, otherwise an error code.
*/
@Override
- public synchronized int read(byte buf[], int off, int len) throws IOException {
+ public synchronized int read(byte buf[], int off, int len) throws IOException {
if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
if (closed) {
- throw new IOException("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": stream closed");
+ throw new IOException("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": stream closed");
}
if (null == buf) {
- throw new NullPointerException("Read buffer is null");
+ throw new NullPointerException("Read buffer is null");
}
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
- throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
- + "read length is " + len + ", buffer offset is "
- + off +", and buffer size is " + buf.length);
+ throw new IndexOutOfBoundsException("CephInputStream.read: Indices out of bounds for read: "
+ + "read length is " + len + ", buffer offset is "
+ + off +", and buffer size is " + buf.length);
}
// ensure we're not past the end of the file
if (getPos() >= fileLength)
- {
- if(debug) debug("CephInputStream.read: cannot read " + len +
- " bytes from fd " + fileHandle + ": current position is " +
- getPos() + " and file length is " + fileLength);
+ {
+ if(debug) debug("CephInputStream.read: cannot read " + len +
+ " bytes from fd " + fileHandle + ": current position is " +
+ getPos() + " and file length is " + fileLength);
- return -1;
- }
+ return -1;
+ }
// actually do the read
int result = ceph_read(fileHandle, buf, off, len);
if (result < 0)
- if(debug) debug("CephInputStream.read: Reading " + len
- + " bytes from fd " + fileHandle + " failed.");
+ if(debug) debug("CephInputStream.read: Reading " + len
+ + " bytes from fd " + fileHandle + " failed.");
if(debug) debug("CephInputStream.read: Reading " + len + " bytes from fd "
- + fileHandle + ": succeeded in reading " + result + " bytes");
+ + fileHandle + ": succeeded in reading " + result + " bytes");
return result;
- }
+ }
/**
* Close the CephInputStream and release the associated filehandle.
*/
@Override
- public void close() throws IOException {
+ public void close() throws IOException {
if(debug) debug("CephOutputStream.close:enter");
if (closed) {
throw new IOException("Stream closed");
-// -*- mode:Java; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
/**
*
* Licensed under the Apache License, Version 2.0
* write fails.
*/
@Override
- public synchronized void write(int b) throws IOException {
+ public synchronized void write(int b) throws IOException {
if(debug) debug("CephOutputStream.write: writing a single byte to fd " + fileHandle);
if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " +
- "a byte to fd " + fileHandle + ": stream closed");
+ throw new IOException("CephOutputStream.write: cannot write " +
+ "a byte to fd " + fileHandle + ": stream closed");
}
// Stick the byte in a buffer and write it
byte buf[] = new byte[1];
buf[0] = (byte) b;
int result = ceph_write(fileHandle, buf, 0, 1);
if (1 != result)
- if(debug) debug("CephOutputStream.write: failed writing a single byte to fd "
- + fileHandle + ": Ceph write() result = " + result);
+ if(debug) debug("CephOutputStream.write: failed writing a single byte to fd "
+ + fileHandle + ": Ceph write() result = " + result);
return;
}
* @throws IndexOutOfBoundsException if len > buf.length.
*/
@Override
- public synchronized void write(byte buf[], int off, int len) throws IOException {
- if(debug) debug("CephOutputStream.write: writing " + len +
- " bytes to fd " + fileHandle);
+ public synchronized void write(byte buf[], int off, int len) throws IOException {
+ if(debug) debug("CephOutputStream.write: writing " + len +
+ " bytes to fd " + fileHandle);
// make sure stream is open
if (closed) {
- throw new IOException("CephOutputStream.write: cannot write " + len +
- "bytes to fd " + fileHandle + ": stream closed");
+ throw new IOException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": stream closed");
}
// sanity check
if (null == buf) {
- throw new NullPointerException("CephOutputStream.write: cannot write " + len +
- "bytes to fd " + fileHandle + ": write buffer is null");
+ throw new NullPointerException("CephOutputStream.write: cannot write " + len +
+ "bytes to fd " + fileHandle + ": write buffer is null");
}
// check for proper index bounds
if((off < 0) || (len < 0) || (off + len > buf.length)) {
- throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
- + "write length is " + len + ", buffer offset is "
- + off +", and buffer size is " + buf.length);
+ throw new IndexOutOfBoundsException("CephOutputStream.write: Indices out of bounds for write: "
+ + "write length is " + len + ", buffer offset is "
+ + off +", and buffer size is " + buf.length);
}
// write!
int result = ceph_write(fileHandle, buf, off, len);
if (result < 0) {
- throw new IOException("CephOutputStream.write: Write of " + len +
- "bytes to fd " + fileHandle + " failed");
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + " failed");
}
if (result != len) {
- throw new IOException("CephOutputStream.write: Write of " + len +
- "bytes to fd " + fileHandle + "was incomplete: only "
- + result + " of " + len + " bytes were written.");
+ throw new IOException("CephOutputStream.write: Write of " + len +
+ "bytes to fd " + fileHandle + "was incomplete: only "
+ + result + " of " + len + " bytes were written.");
}
return;
}
* @throws IOException if you've closed the stream.
*/
@Override
- public synchronized void flush() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
- return;
- }
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ return;
+ }
/**
* Close the CephOutputStream.
* @throws IOException if Ceph somehow returns an error. In current code it can't.
*/
@Override
- public synchronized void close() throws IOException {
+ public synchronized void close() throws IOException {
if(debug) debug("CephOutputStream.close:enter");
if (closed) {
- throw new IOException("Stream closed");
+ throw new IOException("Stream closed");
}
int result = ceph_close(fileHandle);
if (result != 0) {
- throw new IOException("Close failed!");
+ throw new IOException("Close failed!");
}
closed = true;