m_verbose = cfg->get_bool("Hypertable.Verbose");
m_root_dir = "";
//construct an arguments array
- const char *argv[3];
- argv[0] = "cephBroker";
- argv[1] = "-m";
- argv[2] = (cfg->get_str("CephBroker.MonAddr").c_str());
+ const char *argv[10];
+ int argc = 0;
+ argv[argc++] = "cephBroker";
+ argv[argc++] = "-m";
+ argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
/*
-For Ceph debugging, uncomment these lines, fix argv/c sizes,
-and set debugging to desired level.
- argv[3] = "--debug_client";
- argv[4] = "0";
- argv[5] = "--debug_ms";
- argv[6] = "0";
- argv[7] = "--lockdep";
- argv[8] = "0"; */
+ // For Ceph debugging, uncomment these lines
+ argv[argc++] = "--debug_client";
+ argv[argc++] = "0";
+ argv[argc++] = "--debug_ms";
+ argv[argc++] = "0";
+ argv[argc++] = "--lockdep";
+ argv[argc++] = "0"; */
HT_INFO("Calling ceph_initialize");
- ceph_initialize(3, argv);
+ ceph_initialize(argc++, argv);
HT_INFO("Calling ceph_mount");
ceph_mount();
HT_INFO("Returning from constructor");
}
-CephBroker::~CephBroker(){
+CephBroker::~CephBroker() {
ceph_deinitialize();
}
-void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz){
+void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz) {
int fd, ceph_fd;
String abspath;
HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
}
void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwrite,
- int32_t bufsz, int16_t replication, int64_t blksz){
+ int32_t bufsz, int16_t replication, int64_t blksz){
int fd, ceph_fd;
int flags;
String abspath;
fd = atomic_inc_return(&ms_next_fd);
- if(overwrite)
+ if (overwrite)
flags = O_WRONLY | O_CREAT | O_TRUNC;
else
flags = O_WRONLY | O_CREAT | O_APPEND;
cb->response_ok();
}
-void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount){
+void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {
OpenFileDataCephPtr fdata;
ssize_t nread;
uint64_t offset;
HT_DEBUGF("read fd=%d amount = %d", fd, amount);
- if(!m_open_file_map.get(fd, fdata)) {
+ if (!m_open_file_map.get(fd, fdata)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
return;
}
- if((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+ if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, strerror(-offset));
report_error(cb, offset);
return;
}
- if ((nread=ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) {
+ if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) {
HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount);
report_error(cb, -nread);
return;
}
void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
- uint32_t amount, const void *data, bool sync)
+ uint32_t amount, const void *data, bool sync)
{
OpenFileDataCephPtr fdata;
ssize_t nwritten;
uint64_t offset;
- HT_DEBUG_OUT <<"append fd="<< fd <<" amount="<< amount <<" data='"
- << format_bytes(20, data, amount) <<" sync="<< sync << HT_END;
+ HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
+ << format_bytes(20, data, amount) <<" sync="<< sync << HT_END;
if (!m_open_file_map.get(fd, fdata)) {
char errbuf[32];
}
int r;
- if (sync &&( (r=ceph_fsync(fdata->fd, true)) != 0)) {
+ if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) {
HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(errno));
report_error(cb, r);
return;
cb->response(offset, nwritten);
}
-void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset){
+
+void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
OpenFileDataCephPtr fdata;
HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
make_abs_path(fname, abspath);
int r;
- if((r=ceph_unlink(abspath.c_str())) < 0) {
+ if ((r = ceph_unlink(abspath.c_str())) < 0) {
HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), strerror(-r));
report_error(cb, r);
return;
cb->response_ok();
}
-void CephBroker::length(ResponseCallbackLength *cb, const char *fname){
+void CephBroker::length(ResponseCallbackLength *cb, const char *fname) {
int r;
struct stat statbuf;
}
void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
- uint32_t amount){
+ uint32_t amount) {
OpenFileDataCephPtr fdata;
ssize_t nread;
StaticBuffer buf(new uint8_t [amount], amount);
cb->response(offset, buf);
}
-void CephBroker::mkdirs(ResponseCallback *cb, const char *dname){
+void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
String absdir;
HT_DEBUGF("mkdirs dir='%s'", dname);
cb->response_ok();
}
-void CephBroker::rmdir(ResponseCallback *cb, const char *dname){
+void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
String absdir;
-
- make_abs_path(dname, absdir);
-
int r;
- if((r=rmdir_recursive(absdir.c_str())) < 0) {
+ make_abs_path(dname, absdir);
+ if((r = rmdir_recursive(absdir.c_str())) < 0) {
HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str(), r);
report_error(cb, -r);
return;
struct dirent de;
struct stat st;
int r;
- if((r = ceph_opendir(directory, &dirp) < 0)) return r; //failed to open
+ if ((r = ceph_opendir(directory, &dirp) < 0))
+ return r; //failed to open
while (r = ceph_readdirplus_r(dirp, &de, &st, 0) > 0) {
String new_dir = de.d_name;
if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
}
}
if (r < 0) return r; //we got an error
- if((r = ceph_closedir(dirp)) < 0) return r;
+ if ((r = ceph_closedir(dirp)) < 0) return r;
return ceph_rmdir(directory);
}
-void CephBroker::flush(ResponseCallback *cb, uint32_t fd){
+void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
OpenFileDataCephPtr fdata;
HT_DEBUGF("flush fd=%d", fd);
- if(!m_open_file_map.get(fd, fdata)) {
+ if (!m_open_file_map.get(fd, fdata)) {
char errbuf[32];
sprintf(errbuf, "%d", fd);
cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
}
int r;
- if((r=ceph_fsync(fdata->fd, true)) != 0) {
+ if ((r = ceph_fsync(fdata->fd, true)) != 0) {
HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(-r));
report_error(cb, -r);
return;
cb->response_ok();
}
-void CephBroker::status(ResponseCallback *cb){
+void CephBroker::status(ResponseCallback *cb) {
cb->response_ok();
/*perhaps a total cheat, but both the local and Kosmos brokers
included in Hypertable also do this. */
/* I have no idea if this is correct; it's what local and kosmos brokers do. Check the contract!
*/
-void CephBroker::shutdown(ResponseCallback *cb){
+void CephBroker::shutdown(ResponseCallback *cb) {
m_open_file_map.remove_all();
cb->response_ok();
poll(0, 0, 2000);
}
-void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname){
+void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
std::vector<String> listing;
String absdir;
cb->response(listing);
}
-void CephBroker::exists(ResponseCallbackExists *cb, const char *fname){
- String abspath;
- struct stat statbuf;
-
- HT_DEBUGF("exists file='%s'", fname);
-
- make_abs_path(fname, abspath);
-
- cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
+void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) {
+ String abspath;
+ struct stat statbuf;
+
+ HT_DEBUGF("exists file='%s'", fname);
+ make_abs_path(fname, abspath);
+ cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
}
-void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst){
+void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
String src_abs;
String dest_abs;
+ int r;
+
make_abs_path(src, src_abs);
make_abs_path(dst, dest_abs);
-
- int r;
- if((r=ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
+ if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
report_error(cb, r);
return;
}
}
void CephBroker::debug(ResponseCallback *cb, int32_t command,
- StaticBuffer &serialized_parameters ){
+ StaticBuffer &serialized_parameters) {
HT_ERROR("debug commands not implemented!");
cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported"));
}
}
-inline void CephBroker::make_abs_path(const char *fname, String& abs)
-{
- if (fname[0] == '/')
- abs = fname;
- else
- abs = m_root_dir + "/" + fname;
-}