]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
hypertable: update bindings to use new libceph API
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Fri, 22 Apr 2011 23:07:45 +0000 (16:07 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 25 Apr 2011 18:05:44 +0000 (11:05 -0700)
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/client/hypertable/CephBroker.cc
src/client/hypertable/CephBroker.h

index c484e8607c759427187935ffca3f6c715b9b8ebe..d412a384733a9d5ed12940d96a4077dadab40518 100644 (file)
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
+/** -*- C++ -*-
+ * Copyright (C) 2009-2011 New Dream Network
  *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * This file is part of Hypertable.
  *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
+ *
+ * Authors:
+ * Gregory Farnum <gfarnum@gmail.com>
+ * Colin McCabe <cmccabe@alumni.cmu.edu>
  */
 
 #include "Common/Compat.h"
-#include <cerrno>
-#include <string>
 
-extern "C" {
+#include "CephBroker.h"
+#include "Common/Error.h"
+#include "Common/FileUtils.h"
+#include "Common/Filesystem.h"
+#include "Common/System.h"
+
+#include <ceph/libceph.h>
+#include <dirent.h>
+#include <errno.h>
 #include <fcntl.h>
 #include <poll.h>
+#include <string>
 #include <sys/types.h>
 #include <sys/uio.h>
 #include <unistd.h>
-}
-
-#include "Common/FileUtils.h"
-#include "Common/System.h"
-#include "CephBroker.h"
 
 using namespace Hypertable;
 
 atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
 
-CephBroker::CephBroker(PropertiesPtr& cfg) {
+/* A thread-safe version of strerror */
+static std::string cpp_strerror(int err)
+{
+  char buf[128];
+  if (err < 0)
+    err = -err;
+  std::ostringstream oss;
+  oss << strerror_r(err, buf, sizeof(buf));
+  return oss.str();
+}
+
+OpenFileDataCeph::OpenFileDataCeph(ceph_mount_t *cmount_, const String& fname,
+                                  int _fd, int _flags) 
+  : cmount(cmount_), fd(_fd), flags(_flags), filename(fname)
+{
+}
+
+OpenFileDataCeph::~OpenFileDataCeph() {
+  ceph_close(cmount, fd);
+}
+
+CephBroker::CephBroker(PropertiesPtr& cfg)
+  : cmount(NULL)
+{
+  int ret;
+  String id(cfg->get_str("CephBroker.Id"));
   m_verbose = cfg->get_bool("Hypertable.Verbose");
-  m_root_dir = "";
-  //construct an arguments array
-  const char *argv[10];
-  int argc = 0;
-  argv[argc++] = "cephBroker";
-  argv[argc++] = "-m";
-  argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
-  /*
+  m_root_dir = cfg->get_str("CephBroker.RootDir");
+  String mon_addr(cfg->get_str("CephBroker.MonAddr"));
+
+  HT_INFO("Calling ceph_create");
+  ret = ceph_create(&cmount, id.empty() ? NULL : id.c_str());
+  if (ret) {
+    throw Hypertable::Exception(ret, "ceph_create failed");
+  }
+  ret = ceph_conf_set(cmount, "mon_addr", mon_addr.c_str());
+  if (ret) {
+    ceph_shutdown(cmount);
+    throw Hypertable::Exception(ret, "ceph_conf_set(mon_addr) failed");
+  }
+
   // For Ceph debugging, uncomment these lines
-  argv[argc++] = "--debug_client";
-  argv[argc++] = "0";
-  argv[argc++] = "--debug_ms";
-  argv[argc++] = "0";
-  argv[argc++] = "--lockdep";
-  argv[argc++] = "0"; */
-
-  HT_INFO("Calling ceph_initialize");
-  ceph_initialize(argc, argv);
+  //ceph_conf_set(cmount, "debug_client", "1");
+  //ceph_conf_set(cmount, "debug_ms", "1");
+
   HT_INFO("Calling ceph_mount");
-  ceph_mount();
-  HT_INFO("Returning from constructor");
+  ret = ceph_mount(cmount, m_root_dir.empty() ? NULL : m_root_dir.c_str());
+  if (ret) {
+    ceph_shutdown(cmount);
+    throw Hypertable::Exception(ret, "ceph_mount failed");
+  }
+  HT_INFO("Mounted Ceph filesystem.");
 }
 
-CephBroker::~CephBroker() {
-  ceph_deinitialize();
+CephBroker::~CephBroker()
+{
+  ceph_shutdown(cmount);
+  cmount = NULL;
 }
 
-void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz) {
+void CephBroker::open(ResponseCallbackOpen *cb, const char *fname,
+                     uint32_t flags, uint32_t bufsz) {
   int fd, ceph_fd;
   String abspath;
   HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
@@ -70,7 +113,7 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs
 
   fd = atomic_inc_return(&ms_next_fd);
 
-  if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
+  if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) {
     report_error(cb, -ceph_fd);
     return;
   }
@@ -78,7 +121,7 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs
 
   {
     struct sockaddr_in addr;
-    OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd, O_RDONLY));
+    OpenFileDataCephPtr fdata(new OpenFileDataCeph(cmount, abspath, ceph_fd, O_RDONLY));
 
     cb->get_address(addr);
 
@@ -88,36 +131,37 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs
   }
 }
 
-void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwrite,
+void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags,
                        int32_t bufsz, int16_t replication, int64_t blksz){
   int fd, ceph_fd;
-  int flags;
+  int oflags;
   String abspath;
 
   make_abs_path(fname, abspath);
-  HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d blksz=%lld",
-            fname, (int)overwrite, bufsz, (int)replication, (Lld)blksz);
+  HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
+            fname, flags, bufsz, (int)replication, (Lld)blksz);
 
   fd = atomic_inc_return(&ms_next_fd);
 
-  if (overwrite)
-    flags = O_WRONLY | O_CREAT | O_TRUNC;
+  if (flags & Filesystem::OPEN_FLAG_OVERWRITE)
+    oflags = O_WRONLY | O_CREAT | O_TRUNC;
   else
-    flags = O_WRONLY | O_CREAT | O_APPEND;
+    oflags = O_WRONLY | O_CREAT | O_APPEND;
 
   //make sure the directories in the path exist
   String directory = abspath.substr(0, abspath.rfind('/'));
   int r;
   HT_INFOF("Calling mkdirs on %s", directory.c_str());
-  if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
+  if((r=ceph_mkdirs(cmount, directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
     HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r);
     report_error(cb, -r);
     return;
   }
 
   //create file
-  if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) {
-    HT_ERRORF("open failed: file=%s - %s",  abspath.c_str(), strerror(-ceph_fd));
+  if ((ceph_fd = ceph_open(cmount, abspath.c_str(), oflags, 0644)) < 0) {
+    std::string errs(cpp_strerror(-ceph_fd));
+    HT_ERRORF("open failed: file=%s - %s",  abspath.c_str(), errs.c_str());
     report_error(cb, ceph_fd);
     return;
   }
@@ -126,7 +170,7 @@ void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwr
 
   {
     struct sockaddr_in addr;
-    OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd, O_WRONLY));
+    OpenFileDataCephPtr fdata (new OpenFileDataCeph(cmount, fname, ceph_fd, O_WRONLY));
 
     cb->get_address(addr);
 
@@ -156,19 +200,21 @@ void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) {
 
   if (!m_open_file_map.get(fd, fdata)) {
     char errbuf[32];
-    snprintf(errbuf, sizeof(errbuf), "%d", fd);
+    sprintf(errbuf, "%d", fd);
     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
     HT_ERRORF("bad file handle: %d", fd);
     return;
   }
 
-  if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
-    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, strerror(-offset));
+  if ((offset = ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
+    std::string errs(cpp_strerror((int)-offset));
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s",
+             fd, fdata->fd, errs.c_str());
     report_error(cb, offset);
     return;
   }
 
-  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) {
+  if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, 0)) < 0 ) {
     HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount);
     report_error(cb, -nread);
     return;
@@ -190,28 +236,31 @@ void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
 
   if (!m_open_file_map.get(fd, fdata)) {
     char errbuf[32];
-    snprintf(errbuf, sizeof(errbuf), "%d", fd);
+    sprintf(errbuf, "%d", fd);
     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
     return;
   }
 
-  if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+  if ((offset = (uint64_t)ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) {
+    std::string errs(cpp_strerror((int)-offset));
     HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd,
-              strerror(-offset));
+              errs.c_str());
     report_error(cb, offset);
     return;
   }
 
-  if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount)) < 0) {
-    HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->fd, amount,
-              strerror(-nwritten));
+  if ((nwritten = ceph_write(cmount, fdata->fd, (const char *)data, amount, 0)) < 0) {
+    std::string errs(cpp_strerror(nwritten));
+    HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s",
+             fd, fdata->fd, amount, errs.c_str());
     report_error(cb, -nwritten);
     return;
   }
 
   int r;
-  if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) {
-    HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(errno));
+  if (sync && ((r = ceph_fsync(cmount, fdata->fd, true)) != 0)) {
+    std::string errs(cpp_strerror(errno));
+    HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
     report_error(cb, r);
     return;
   }
@@ -226,14 +275,15 @@ void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
 
   if (!m_open_file_map.get(fd, fdata)) {
     char errbuf[32];
-    snprintf(errbuf, sizeof(errbuf), "%d", fd);
+    sprintf(errbuf, "%d", fd);
     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
     return;
   }
-  int r;
-  if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
-    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, fdata->fd,
-             (Llu)offset, strerror(-r));
+  loff_t res = ceph_lseek(cmount, fdata->fd, offset, SEEK_SET);
+  if (res < 0) {
+    std::string errs(cpp_strerror((int)res));
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s",
+             fd, fdata->fd, (Llu)offset, errs.c_str());
     report_error(cb, offset);
     return;
   }
@@ -249,8 +299,9 @@ void CephBroker::remove(ResponseCallback *cb, const char *fname) {
   make_abs_path(fname, abspath);
   
   int r;
-  if ((r = ceph_unlink(abspath.c_str())) < 0) {
-    HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), strerror(-r));
+  if ((r = ceph_unlink(cmount, abspath.c_str())) < 0) {
+    std::string errs(cpp_strerror(r));
+    HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), errs.c_str());
     report_error(cb, r);
     return;
   }
@@ -263,10 +314,11 @@ void CephBroker::length(ResponseCallbackLength *cb, const char *fname) {
 
   HT_DEBUGF("length file='%s'", fname);
 
-  if ((r = ceph_lstat(fname, &statbuf)) < 0) {
+  if ((r = ceph_lstat(cmount, fname, &statbuf)) < 0) {
     String abspath;
     make_abs_path(fname, abspath);
-    HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), strerror(-r));
+    std::string errs(cpp_strerror(r));
+    HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), errs.c_str());
     report_error(cb,- r);
     return;
   }
@@ -283,14 +335,15 @@ void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
 
   if (!m_open_file_map.get(fd, fdata)) {
     char errbuf[32];
-    snprintf(errbuf, sizeof(errbuf), "%d", fd);
+    sprintf(errbuf, "%d", fd);
     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
     return;
   }
 
-  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount, offset)) < 0) {
+  if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, offset)) < 0) {
+    std::string errs(cpp_strerror(nread));
     HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd,
-              amount, (Llu)offset, strerror(-nread));
+              amount, (Llu)offset, errs.c_str());
     report_error(cb, nread);
     return;
   }
@@ -307,7 +360,7 @@ void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
 
   make_abs_path(dname, absdir);
   int r;
-  if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
+  if((r=ceph_mkdirs(cmount, absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
     HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
     report_error(cb, -r);
     return;
@@ -329,13 +382,13 @@ void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
 }
 
 int CephBroker::rmdir_recursive(const char *directory) {
-  DIR *dirp;
+  ceph_dir_result_t *dirp;
   struct dirent de;
   struct stat st;
   int r;
-  if ((r = ceph_opendir(directory, &dirp) < 0))
+  if ((r = ceph_opendir(cmount, directory, &dirp) < 0))
     return r; //failed to open
-  while ((r = ceph_readdirplus_r(dirp, &de, &st, 0)) > 0) {
+  while ((r = ceph_readdirplus_r(cmount, dirp, &de, &st, 0)) > 0) {
     String new_dir = de.d_name;
     if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
       new_dir = directory;
@@ -344,13 +397,13 @@ int CephBroker::rmdir_recursive(const char *directory) {
       if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out...
        if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
       } else { //delete this file
-       if((r=ceph_unlink(new_dir.c_str())) < 0) return r;
+       if((r=ceph_unlink(cmount, new_dir.c_str())) < 0) return r;
       }
     }
   }
   if (r < 0) return r; //we got an error
-  if ((r = ceph_closedir(dirp)) < 0) return r;
-  return ceph_rmdir(directory);
+  if ((r = ceph_closedir(cmount, dirp)) < 0) return r;
+  return ceph_rmdir(cmount, directory);
 }
 
 void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
@@ -360,14 +413,15 @@ void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
 
   if (!m_open_file_map.get(fd, fdata)) {
     char errbuf[32];
-    snprintf(errbuf, sizeof(errbuf), "%d", fd);
+    sprintf(errbuf, "%d", fd);
     cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
     return;
   }
 
   int r;
-  if ((r = ceph_fsync(fdata->fd, true)) != 0) {
-    HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd, strerror(-r));
+  if ((r = ceph_fsync(cmount, fdata->fd, true)) != 0) {
+    std::string errs(cpp_strerror(r));
+    HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd, errs.c_str());
     report_error(cb, -r);
     return;
   }
@@ -396,17 +450,17 @@ void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
   //get from ceph in a buffer
   make_abs_path(dname, absdir);
 
-  DIR *dirp;
-  ceph_opendir(absdir.c_str(), &dirp);
+  ceph_dir_result_t *dirp;
+  ceph_opendir(cmount, absdir.c_str(), &dirp);
   int r;
   int buflen = 100; //good default?
   char *buf = new char[buflen];
   String *ent;
   int bufpos;
   while (1) {
-    r = ceph_getdnames(dirp, buf, buflen);
+    r = ceph_getdnames(cmount, dirp, buf, buflen);
     if (r==-ERANGE) { //expand the buffer
-      delete [] buf;
+      delete buf;
       buflen *= 2;
       buf = new char[buflen];
       continue;
@@ -423,8 +477,8 @@ void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) {
       delete ent;
     }
   }
-  delete [] buf;
-  ceph_closedir(dirp);
+  delete buf;
+  ceph_closedir(cmount, dirp);
 
   if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames
   //(except for ERANGE) so if it happens this is bad
@@ -437,7 +491,7 @@ void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) {
   
   HT_DEBUGF("exists file='%s'", fname);
   make_abs_path(fname, abspath);
-  cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
+  cb->response(ceph_lstat(cmount, abspath.c_str(), &statbuf) == 0);
 }
 
 void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
@@ -447,7 +501,7 @@ void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst)
 
   make_abs_path(src, src_abs);
   make_abs_path(dst, dest_abs);
-  if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
+  if ((r = ceph_rename(cmount, src_abs.c_str(), dest_abs.c_str())) <0 ) {
     report_error(cb, r);
     return;
   }
index 7de3f438010a700764ee39a3a3f30f219660fd60..a64e8296a01176dfb4a37c7754171493c304ca8e 100644 (file)
@@ -1,31 +1,41 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
+/** -*- C++ -*-
+ * Copyright (C) 2009-2011 New Dream Network
  *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ * This file is part of Hypertable.
  *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
+ * Hypertable is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or any later version.
+ *
+ * Hypertable is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
+ *
+ * Authors:
+ * Gregory Farnum <gfarnum@gmail.com>
+ * Colin McCabe <cmccabe@alumni.cmu.edu>
  */
 
-#ifndef CEPH_HYPERTABLE_BROKER_H
-#define CEPH_HYPERTABLE_BROKER_H
+#ifndef HYPERTABLE_CEPHBROKER_H
+#define HYPERTABLE_CEPHBROKER_H
 
 extern "C" {
 #include <unistd.h>
 }
 
-#include "libceph.h"
 #include "Common/String.h"
 #include "Common/atomic.h"
 #include "Common/Properties.h"
 
 #include "DfsBroker/Lib/Broker.h"
 
+class ceph_mount_t;
+
 namespace Hypertable {
   using namespace DfsBroker;
   /**
@@ -33,9 +43,10 @@ namespace Hypertable {
    */
   class OpenFileDataCeph : public OpenFileData {
   public:
-    OpenFileDataCeph(const String& fname, int _fd, int _flags) :
-      fd(_fd), flags(_flags), filename(fname) {}
-    virtual ~OpenFileDataCeph() { ceph_close(fd); }
+    OpenFileDataCeph(ceph_mount_t *cmount_, const String& fname,
+                    int _fd, int _flags);
+    virtual ~OpenFileDataCeph();
+    ceph_mount_t *cmount;
     int fd;
     int flags;
     String filename;
@@ -60,9 +71,9 @@ namespace Hypertable {
     virtual ~CephBroker();
 
     virtual void open(ResponseCallbackOpen *cb, const char *fname,
-                      uint32_t bufsz);
+                      uint32_t flags, uint32_t bufsz);
     virtual void
-    create(ResponseCallbackOpen *cb, const char *fname, bool overwrite,
+    create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags,
            int32_t bufsz, int16_t replication, int64_t blksz);
     virtual void close(ResponseCallback *cb, uint32_t fd);
     virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount);
@@ -85,6 +96,7 @@ namespace Hypertable {
                        StaticBuffer &serialized_parameters);
 
   private:
+    ceph_mount_t *cmount;
     static atomic_t ms_next_fd;
 
     virtual void report_error(ResponseCallback *cb, int error);