]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Hypertable:Completed initial Cephbroker. Requires testing.
authorGreg Farnum <gregf@hq.newdream.net>
Thu, 9 Jul 2009 18:16:21 +0000 (11:16 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Thu, 9 Jul 2009 18:40:39 +0000 (11:40 -0700)
src/client/hypertable/CephBroker.cc
src/client/hypertable/CephBroker.h

index d3c49982311355886c3af4d92448663f76981ea4..0229e865319fad4b4e79580a9162b6c6fb7d8d38 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "Common/Compat.h"
 #include <cerrno>
+#include <string>
 
 extern "C" {
 #include <fcntl.h>
@@ -33,88 +34,375 @@ atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0);
 
 CephBroker::CephBroker(PropertiesPtr &cfg) {
   m_verbose = cfg->get_bool("Hypertable.Verbose");
+  ceph_initialize(0, NULL);
+  ceph_mount();
   /* do other stuff */
 }
 
-CephBroker::~CephBroker() { /* destroy client? */}
+CephBroker::~CephBroker(){
+  ceph_deinitialize();
+}
 
-void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz)
-{
+void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz){
+  int fd, ceph_fd;
+  String abspath;
+  HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
+
+  make_abs_path(fname, abspath);
+
+  fd = atomic_inc_return(&ms_next_fd);
+
+  if ((ceph_fd = ceph_open(abspath, O_RDONLY)) < 0) {
+    report_error(cb, ceph_fd);
+    return;
+  }
+  HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
+
+  {
+    struct sockaddr_in addr;
+    OpenFileDataCephPtr fdata (new OpenFileDataCeph(ceph_fd, O_RDONLY));
 
+    cb->get_address(addr);
+
+    m_open_file_map.create(fd, addr, fdata);
+
+    cb->response(fd);
+  }
 }
 
 void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwrite,
        int32_t bufsz, int16_t replication, int64_t blksz){
+  int fd, ceph_fd;
+  int flags;
+  String abspath;
 
-}
+  HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d blksz=%lld",
+            fname, (int)overwrite, bufsz, (int)replication, (Lld)blksz);
+
+  fd = atomic_inc_return(&ms_next_fd);
+
+  if(overwrite)
+    flags = O_WRONLY | O_CREAT | O_TRUNC;
+  else
+    flags = O_WRONLY | O_CREAT | O_APPEND;
+
+  if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) {
+    HT_ERROR("open failed: file='%s' - %s", abspath.c_str(), strerror(errno));
+    report_error(cb, ceph_fd);
+    return;
+  }
 
-void CephBroker::close(ResponseCallback *cb, uint32_t fd){
+  HT_INFOF("create( % s ) = %d", fname, ceph_fd);
 
+  {
+    struct sockaddr_in addr;
+    OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd, O_WRONLY));
+
+    cb->get_address(addr);
+
+    m_open_file_map.create(fd, addr, fdata);
+
+    cb->response(fd);
+  }
 }
 
-void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount){
+void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
+  if (m_verbose) {
+    HT_INFOF("close fd=%d", fd);
+  }
+  OpenFileDataCephPtr fdata;
+  m_open_file_map.get(fd, fdata);
+  ceph_close( fdata->fd); //BEWARE: the other fs'es don't do this...
+  m_open_file_map.remove(fd);
+  cb->response_ok();
+}
 
+void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount){
+  OpenFileDataCephPtr fdata;
+  ssize_t nread;
+  uint64t offset;
+  StaticBuffer buf(new uint8_t [amount], amount);
+
+  HT_DEBUGF("read fd=%d amount = %d", fd, amount);
+
+  if(!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    HT_ERRORF("bad file handle: %d", fd);
+    return;
+  }
+
+  if((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, strerror(errno));
+    report_error(cb, offset);
+    return;
+  }
+
+  if ((nread=ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) {
+    HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->fd, amount, errmsg.c_str());
+    report_error(cb, nread);
+    return;
+  }
+
+  buf.size = nread;
+  cb->response(offset, buf);
 }
 
 void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd,
                    uint32_t amount, const void CephBroker::*data, bool sync)
 {
-
+  OpenFileDataCephPtr fdata;
+  ssize_t nwritten;
+  uint64_t offset;
+
+  HT_DEBUG_OUT <<"append fd="<< fd <<" amount="<< amount <<" data='"
+      << format_bytes(20, data, amount) <<" sync="<< sync << HT_END;
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd,
+              strerror(errno));
+    report_error(cb, offset);
+    return;
+  }
+
+  if ((nwritten = ceph_write(fdata->fd, data, amount)) < 0) {
+    HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->fd, amount,
+              strerror(errno));
+    report_error(cb, nwritten);
+    return;
+  }
+
+  int r;
+  if (sync &&( (r=ceph_fsync(fdata->fd, true)) != 0)) {
+    HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(errno));
+    report_error(cb, r);
+    return;
+  }
+
+  cb->response(offset, nwritten);
 }
 void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset){
+  OpenFileDataLocalPtr fdata;
 
-}
+  HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
 
-void CephBroker::remove(ResponseCallback *cb, const char *fname){
+  if ((offset = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
+    HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, fdata->fd, (Llu)offset,
+              strerror(errno));
+    report_error(cb, offset);
+    return;
+  }
 
+  cb->response_ok();
 }
 
-void CephBroker::length(ResponseCallbackLength *cb, const char *fname){
+void CephBroker::remove(ResponseCallback *cb, const char *fname) {
+  String abspath;
+  
+  HT_DEBUGF("remove file='%s'", fname);
+  
+  make_abs_path(fname, abspath);
+  
+  int r;
+  if((r=ceph_unlink(abspath.c_str())) < 0) {
+    HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), strerror(errno));
+    report_error(cb, r);
+    return;
+  }
+  cb->response_ok();
+}
 
+void CephBroker::length(ResponseCallbackLength *cb, const char *fname){
+  int res;
+  struct stat statbuf;
+
+  HT_DEBUGF("length file='%s'", fname);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  if ((res = ceph_fstat(fdata->fd, &statbuf)) < 0) {
+    String abspath;
+    make_abs_path(fname, abspath);
+    HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), strerror(errno));
+    report_error(cb, res);
+    return;
+  }
+  cb->response(statbuf.st_size);
 }
 
 void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset,
                   uint32_t amount){
+  OpenFileDataCephPtr fdata;
+  ssize_t nread;
+  StaticBuffer buf(new unit8_t [amount], amount);
 
+  HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);
+
+  if (!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount, offset)) < 0) {
+    HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd,
+              amount, (Llu)offset, strerror(errno));
+    report_error(cb, nread);
+    return;
+  }
+
+  buf.size = nread;
+
+  cb->response(offset, buf);
 }
 
 void CephBroker::mkdirs(ResponseCallback *cb, const char *dname){
-
+  String absdir;
+
+  HT_DEBUGF("mkdirs dir='%s'", dname);
+
+  make_abs_path(dname, absdir);
+  int r;
+  if((r=ceph_mkdir(absdir.c_str(), 0644)) < 0) {
+    HT_ERRORF("mkdirs failed: dname='%s' - %s", absdir.c_str(), strerror(errno));
+    report_error(cb, r);
+    return;
+  }
+  cb->response_ok();
 }
 
 void CephBroker::rmdir(ResponseCallback *cb, const char *dname){
+  String absdir;
+
+  make_abs_path(dname, absdir);
+  int r;
+  if((r=ceph_rmdir(absdir.c_str())) < 0) {
+    HT_ERRORF("failed to remove dir %s", absdir) report_error(cb, r);
+    return;
+  }
 
+  cb->response_ok();
 }
 
 void CephBroker::flush(ResponseCallback *cb, uint32_t fd){
+  OpenFileDataCephPtr fdata;
 
+  HT_DEBUGF("flush fd=%d", fd);
+
+  if(!m_open_file_map.get(fd, fdata)) {
+    char errbuf[32];
+    sprintf(errbuf, "%d", fd);
+    cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf);
+    return;
+  }
+
+  int r;
+  if((r=ceph_fsync(fdata->fd, true)) != 0) {
+    HT_ERRORF("flush failed: fd=%d  ceph_fd=%d - %s", fd, fdata->fd, strerror(errno));
+    report_error(cb, r);
+    return;
+  }
+
+  cb->response_ok();
 }
 
 void CephBroker::status(ResponseCallback *cb){
-
+  cb->response_ok();
+  /*perhaps a total cheat, but both the local and Kosmos brokers
+    included in Hypertable also do this. */
 }
 
+/* I have no idea if this is correct; it's what local and kosmos brokers do. Check the contract!
+ */
 void CephBroker::shutdown(ResponseCallback *cb){
-
+  m_open_file_map.remove_all();
+  cb->response_ok();
+  poll(0, 0, 2000);
 }
 
 void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname){
+  std::vector<String> listing;
+  String absdir;
 
+  HT_DEBUGF("Readdir dir='%s'", dname);
+
+  //get from ceph in list<string>
+  make_abs_path(dname, absdir);
+  list<string> dir_con;
+  ceph_getdir(absdir, dir_con);
+
+  //convert to vector<String>
+  for (list<string>::iterator i = dir_con.begin(); i!=dir_con.end(); ++i) {
+    listing.push_back(*i); //BEWARE: Assumes getdir doesn't include . and ..
+  }
+  cb->response(listing);
 }
 
 void CephBroker::exists(ResponseCallbackExists *cb, const char *fname){
+    String abspath;
+
+    HT_DEBUGF("exists file='%s'", fname);
 
+    make_abs_path(fname, abspath);
+
+    cb->response(ceph_fstat(abspath) == 0);
 }
 
 void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst){
-
+  String src_abs;
+  String dest_abs;
+  make_abs_path(src, src_abs);
+  make_abs_path(dst, dest_abs);
+
+  int r;
+  if((r=ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
+    report_error(cb, r);
+    return;
+  }
+  cb->response_ok();
 }
 
 void CephBroker::debug(ResponseCallback *, int32_t command,
                   StaticBuffer &serialized_parameters){
-
+  HT_ERRORF("debug commands not implemented!");
+  cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported"));
 }
 
 void CephBroker::report_error(ResponseCallback *cb, int error) {
+  char errbuf[128];
+  errbuf[0] = 0;
 
+  strerror_t(error, errbuf, 128);
+
+  cb->error(Error::DFSBROKER_IO_ERROR, errbuf);
+}
+
+
+inline void make_abs_path(const char *fname, String& abs)
+{
+  if (fname[0] == '/')
+    abs = fname;
+  else
+    abs = m_rootdir + "/" + fname;
 }
index 38d17e06d64cab13cdc209007484a430c108cd26..925768f6830bb4cd647246a80b67c8f04f6a3e14 100644 (file)
@@ -87,6 +87,8 @@ namespace Hypertable {
 
     virtual void report_error(ResponseCallback *cb, int error);
 
+    inline void make_abs_path(const char *fname, String& abs);
+
     bool m_verbose;
     String m_root_dir;
   };