From 6b830d8b8f9854fb9040eaaaba3de3f3dde92f4a Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Fri, 22 Apr 2011 16:07:45 -0700 Subject: [PATCH] hypertable: update bindings to use new libceph API Signed-off-by: Colin McCabe --- src/client/hypertable/CephBroker.cc | 246 +++++++++++++++++----------- src/client/hypertable/CephBroker.h | 48 ++++-- 2 files changed, 180 insertions(+), 114 deletions(-) diff --git a/src/client/hypertable/CephBroker.cc b/src/client/hypertable/CephBroker.cc index c484e8607c759..d412a384733a9 100644 --- a/src/client/hypertable/CephBroker.cc +++ b/src/client/hypertable/CephBroker.cc @@ -1,67 +1,110 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system +/** -*- C++ -*- + * Copyright (C) 2009-2011 New Dream Network * - * Copyright (C) 2004-2006 Sage Weil + * This file is part of Hypertable. * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * + * Hypertable is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or any later version. + * + * Hypertable is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hypertable. If not, see + * + * Authors: + * Gregory Farnum + * Colin McCabe */ #include "Common/Compat.h" -#include -#include -extern "C" { +#include "CephBroker.h" +#include "Common/Error.h" +#include "Common/FileUtils.h" +#include "Common/Filesystem.h" +#include "Common/System.h" + +#include +#include +#include #include #include +#include #include #include #include -} - -#include "Common/FileUtils.h" -#include "Common/System.h" -#include "CephBroker.h" using namespace Hypertable; atomic_t CephBroker::ms_next_fd = ATOMIC_INIT(0); -CephBroker::CephBroker(PropertiesPtr& cfg) { +/* A thread-safe version of strerror */ +static std::string cpp_strerror(int err) +{ + char buf[128]; + if (err < 0) + err = -err; + std::ostringstream oss; + oss << strerror_r(err, buf, sizeof(buf)); + return oss.str(); +} + +OpenFileDataCeph::OpenFileDataCeph(ceph_mount_t *cmount_, const String& fname, + int _fd, int _flags) + : cmount(cmount_), fd(_fd), flags(_flags), filename(fname) +{ +} + +OpenFileDataCeph::~OpenFileDataCeph() { + ceph_close(cmount, fd); +} + +CephBroker::CephBroker(PropertiesPtr& cfg) + : cmount(NULL) +{ + int ret; + String id(cfg->get_str("CephBroker.Id")); m_verbose = cfg->get_bool("Hypertable.Verbose"); - m_root_dir = ""; - //construct an arguments array - const char *argv[10]; - int argc = 0; - argv[argc++] = "cephBroker"; - argv[argc++] = "-m"; - argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str()); - /* + m_root_dir = cfg->get_str("CephBroker.RootDir"); + String mon_addr(cfg->get_str("CephBroker.MonAddr")); + + HT_INFO("Calling ceph_create"); + ret = ceph_create(&cmount, id.empty() ? NULL : id.c_str()); + if (ret) { + throw Hypertable::Exception(ret, "ceph_create failed"); + } + ret = ceph_conf_set(cmount, "mon_addr", mon_addr.c_str()); + if (ret) { + ceph_shutdown(cmount); + throw Hypertable::Exception(ret, "ceph_conf_set(mon_addr) failed"); + } + // For Ceph debugging, uncomment these lines - argv[argc++] = "--debug_client"; - argv[argc++] = "0"; - argv[argc++] = "--debug_ms"; - argv[argc++] = "0"; - argv[argc++] = "--lockdep"; - argv[argc++] = "0"; */ - - HT_INFO("Calling ceph_initialize"); - ceph_initialize(argc, argv); + //ceph_conf_set(cmount, "debug_client", "1"); + //ceph_conf_set(cmount, "debug_ms", "1"); + HT_INFO("Calling ceph_mount"); - ceph_mount(); - HT_INFO("Returning from constructor"); + ret = ceph_mount(cmount, m_root_dir.empty() ? NULL : m_root_dir.c_str()); + if (ret) { + ceph_shutdown(cmount); + throw Hypertable::Exception(ret, "ceph_mount failed"); + } + HT_INFO("Mounted Ceph filesystem."); } -CephBroker::~CephBroker() { - ceph_deinitialize(); +CephBroker::~CephBroker() +{ + ceph_shutdown(cmount); + cmount = NULL; } -void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufsz) { +void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, + uint32_t flags, uint32_t bufsz) { int fd, ceph_fd; String abspath; HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz); @@ -70,7 +113,7 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs fd = atomic_inc_return(&ms_next_fd); - if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) { + if ((ceph_fd = ceph_open(cmount, abspath.c_str(), O_RDONLY, 0)) < 0) { report_error(cb, -ceph_fd); return; } @@ -78,7 +121,7 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs { struct sockaddr_in addr; - OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd, O_RDONLY)); + OpenFileDataCephPtr fdata(new OpenFileDataCeph(cmount, abspath, ceph_fd, O_RDONLY)); cb->get_address(addr); @@ -88,36 +131,37 @@ void CephBroker::open(ResponseCallbackOpen *cb, const char *fname, uint32_t bufs } } -void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwrite, +void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz){ int fd, ceph_fd; - int flags; + int oflags; String abspath; make_abs_path(fname, abspath); - HT_DEBUGF("create file='%s' overwrite=%d bufsz=%d replication=%d blksz=%lld", - fname, (int)overwrite, bufsz, (int)replication, (Lld)blksz); + HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld", + fname, flags, bufsz, (int)replication, (Lld)blksz); fd = atomic_inc_return(&ms_next_fd); - if (overwrite) - flags = O_WRONLY | O_CREAT | O_TRUNC; + if (flags & Filesystem::OPEN_FLAG_OVERWRITE) + oflags = O_WRONLY | O_CREAT | O_TRUNC; else - flags = O_WRONLY | O_CREAT | O_APPEND; + oflags = O_WRONLY | O_CREAT | O_APPEND; //make sure the directories in the path exist String directory = abspath.substr(0, abspath.rfind('/')); int r; HT_INFOF("Calling mkdirs on %s", directory.c_str()); - if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) { + if((r=ceph_mkdirs(cmount, directory.c_str(), 0644)) < 0 && r!=-EEXIST) { HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r); report_error(cb, -r); return; } //create file - if ((ceph_fd = ceph_open(abspath.c_str(), flags, 0644)) < 0) { - HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), strerror(-ceph_fd)); + if ((ceph_fd = ceph_open(cmount, abspath.c_str(), oflags, 0644)) < 0) { + std::string errs(cpp_strerror(-ceph_fd)); + HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), errs.c_str()); report_error(cb, ceph_fd); return; } @@ -126,7 +170,7 @@ void CephBroker::create(ResponseCallbackOpen *cb, const char *fname, bool overwr { struct sockaddr_in addr; - OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd, O_WRONLY)); + OpenFileDataCephPtr fdata (new OpenFileDataCeph(cmount, fname, ceph_fd, O_WRONLY)); cb->get_address(addr); @@ -156,19 +200,21 @@ void CephBroker::read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount) { if (!m_open_file_map.get(fd, fdata)) { char errbuf[32]; - snprintf(errbuf, sizeof(errbuf), "%d", fd); + sprintf(errbuf, "%d", fd); cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); HT_ERRORF("bad file handle: %d", fd); return; } - if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) { - HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, strerror(-offset)); + if ((offset = ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) { + std::string errs(cpp_strerror((int)-offset)); + HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", + fd, fdata->fd, errs.c_str()); report_error(cb, offset); return; } - if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) { + if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, 0)) < 0 ) { HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount); report_error(cb, -nread); return; @@ -190,28 +236,31 @@ void CephBroker::append(ResponseCallbackAppend *cb, uint32_t fd, if (!m_open_file_map.get(fd, fdata)) { char errbuf[32]; - snprintf(errbuf, sizeof(errbuf), "%d", fd); + sprintf(errbuf, "%d", fd); cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); return; } - if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) { + if ((offset = (uint64_t)ceph_lseek(cmount, fdata->fd, 0, SEEK_CUR)) < 0) { + std::string errs(cpp_strerror((int)-offset)); HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, - strerror(-offset)); + errs.c_str()); report_error(cb, offset); return; } - if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount)) < 0) { - HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->fd, amount, - strerror(-nwritten)); + if ((nwritten = ceph_write(cmount, fdata->fd, (const char *)data, amount, 0)) < 0) { + std::string errs(cpp_strerror(nwritten)); + HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", + fd, fdata->fd, amount, errs.c_str()); report_error(cb, -nwritten); return; } int r; - if (sync && ((r = ceph_fsync(fdata->fd, true)) != 0)) { - HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(errno)); + if (sync && ((r = ceph_fsync(cmount, fdata->fd, true)) != 0)) { + std::string errs(cpp_strerror(errno)); + HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str()); report_error(cb, r); return; } @@ -226,14 +275,15 @@ void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) { if (!m_open_file_map.get(fd, fdata)) { char errbuf[32]; - snprintf(errbuf, sizeof(errbuf), "%d", fd); + sprintf(errbuf, "%d", fd); cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); return; } - int r; - if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) { - HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, fdata->fd, - (Llu)offset, strerror(-r)); + loff_t res = ceph_lseek(cmount, fdata->fd, offset, SEEK_SET); + if (res < 0) { + std::string errs(cpp_strerror((int)res)); + HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", + fd, fdata->fd, (Llu)offset, errs.c_str()); report_error(cb, offset); return; } @@ -249,8 +299,9 @@ void CephBroker::remove(ResponseCallback *cb, const char *fname) { make_abs_path(fname, abspath); int r; - if ((r = ceph_unlink(abspath.c_str())) < 0) { - HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), strerror(-r)); + if ((r = ceph_unlink(cmount, abspath.c_str())) < 0) { + std::string errs(cpp_strerror(r)); + HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), errs.c_str()); report_error(cb, r); return; } @@ -263,10 +314,11 @@ void CephBroker::length(ResponseCallbackLength *cb, const char *fname) { HT_DEBUGF("length file='%s'", fname); - if ((r = ceph_lstat(fname, &statbuf)) < 0) { + if ((r = ceph_lstat(cmount, fname, &statbuf)) < 0) { String abspath; make_abs_path(fname, abspath); - HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), strerror(-r)); + std::string errs(cpp_strerror(r)); + HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), errs.c_str()); report_error(cb,- r); return; } @@ -283,14 +335,15 @@ void CephBroker::pread(ResponseCallbackRead *cb, uint32_t fd, uint64_t offset, if (!m_open_file_map.get(fd, fdata)) { char errbuf[32]; - snprintf(errbuf, sizeof(errbuf), "%d", fd); + sprintf(errbuf, "%d", fd); cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); return; } - if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount, offset)) < 0) { + if ((nread = ceph_read(cmount, fdata->fd, (char *)buf.base, amount, offset)) < 0) { + std::string errs(cpp_strerror(nread)); HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd, - amount, (Llu)offset, strerror(-nread)); + amount, (Llu)offset, errs.c_str()); report_error(cb, nread); return; } @@ -307,7 +360,7 @@ void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) { make_abs_path(dname, absdir); int r; - if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) { + if((r=ceph_mkdirs(cmount, absdir.c_str(), 0644)) < 0 && r!=-EEXIST) { HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r); report_error(cb, -r); return; @@ -329,13 +382,13 @@ void CephBroker::rmdir(ResponseCallback *cb, const char *dname) { } int CephBroker::rmdir_recursive(const char *directory) { - DIR *dirp; + ceph_dir_result_t *dirp; struct dirent de; struct stat st; int r; - if ((r = ceph_opendir(directory, &dirp) < 0)) + if ((r = ceph_opendir(cmount, directory, &dirp) < 0)) return r; //failed to open - while ((r = ceph_readdirplus_r(dirp, &de, &st, 0)) > 0) { + while ((r = ceph_readdirplus_r(cmount, dirp, &de, &st, 0)) > 0) { String new_dir = de.d_name; if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) { new_dir = directory; @@ -344,13 +397,13 @@ int CephBroker::rmdir_recursive(const char *directory) { if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out... if((r=rmdir_recursive(new_dir.c_str())) < 0) return r; } else { //delete this file - if((r=ceph_unlink(new_dir.c_str())) < 0) return r; + if((r=ceph_unlink(cmount, new_dir.c_str())) < 0) return r; } } } if (r < 0) return r; //we got an error - if ((r = ceph_closedir(dirp)) < 0) return r; - return ceph_rmdir(directory); + if ((r = ceph_closedir(cmount, dirp)) < 0) return r; + return ceph_rmdir(cmount, directory); } void CephBroker::flush(ResponseCallback *cb, uint32_t fd) { @@ -360,14 +413,15 @@ void CephBroker::flush(ResponseCallback *cb, uint32_t fd) { if (!m_open_file_map.get(fd, fdata)) { char errbuf[32]; - snprintf(errbuf, sizeof(errbuf), "%d", fd); + sprintf(errbuf, "%d", fd); cb->error(Error::DFSBROKER_BAD_FILE_HANDLE, errbuf); return; } int r; - if ((r = ceph_fsync(fdata->fd, true)) != 0) { - HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(-r)); + if ((r = ceph_fsync(cmount, fdata->fd, true)) != 0) { + std::string errs(cpp_strerror(r)); + HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, errs.c_str()); report_error(cb, -r); return; } @@ -396,17 +450,17 @@ void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) { //get from ceph in a buffer make_abs_path(dname, absdir); - DIR *dirp; - ceph_opendir(absdir.c_str(), &dirp); + ceph_dir_result_t *dirp; + ceph_opendir(cmount, absdir.c_str(), &dirp); int r; int buflen = 100; //good default? char *buf = new char[buflen]; String *ent; int bufpos; while (1) { - r = ceph_getdnames(dirp, buf, buflen); + r = ceph_getdnames(cmount, dirp, buf, buflen); if (r==-ERANGE) { //expand the buffer - delete [] buf; + delete buf; buflen *= 2; buf = new char[buflen]; continue; @@ -423,8 +477,8 @@ void CephBroker::readdir(ResponseCallbackReaddir *cb, const char *dname) { delete ent; } } - delete [] buf; - ceph_closedir(dirp); + delete buf; + ceph_closedir(cmount, dirp); if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames //(except for ERANGE) so if it happens this is bad @@ -437,7 +491,7 @@ void CephBroker::exists(ResponseCallbackExists *cb, const char *fname) { HT_DEBUGF("exists file='%s'", fname); make_abs_path(fname, abspath); - cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0); + cb->response(ceph_lstat(cmount, abspath.c_str(), &statbuf) == 0); } void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) { @@ -447,7 +501,7 @@ void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) make_abs_path(src, src_abs); make_abs_path(dst, dest_abs); - if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) { + if ((r = ceph_rename(cmount, src_abs.c_str(), dest_abs.c_str())) <0 ) { report_error(cb, r); return; } diff --git a/src/client/hypertable/CephBroker.h b/src/client/hypertable/CephBroker.h index 7de3f438010a7..a64e8296a0117 100644 --- a/src/client/hypertable/CephBroker.h +++ b/src/client/hypertable/CephBroker.h @@ -1,31 +1,41 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system +/** -*- C++ -*- + * Copyright (C) 2009-2011 New Dream Network * - * Copyright (C) 2004-2006 Sage Weil + * This file is part of Hypertable. * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * + * Hypertable is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or any later version. + * + * Hypertable is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Hypertable. If not, see + * + * Authors: + * Gregory Farnum + * Colin McCabe */ -#ifndef CEPH_HYPERTABLE_BROKER_H -#define CEPH_HYPERTABLE_BROKER_H +#ifndef HYPERTABLE_CEPHBROKER_H +#define HYPERTABLE_CEPHBROKER_H extern "C" { #include } -#include "libceph.h" #include "Common/String.h" #include "Common/atomic.h" #include "Common/Properties.h" #include "DfsBroker/Lib/Broker.h" +class ceph_mount_t; + namespace Hypertable { using namespace DfsBroker; /** @@ -33,9 +43,10 @@ namespace Hypertable { */ class OpenFileDataCeph : public OpenFileData { public: - OpenFileDataCeph(const String& fname, int _fd, int _flags) : - fd(_fd), flags(_flags), filename(fname) {} - virtual ~OpenFileDataCeph() { ceph_close(fd); } + OpenFileDataCeph(ceph_mount_t *cmount_, const String& fname, + int _fd, int _flags); + virtual ~OpenFileDataCeph(); + ceph_mount_t *cmount; int fd; int flags; String filename; @@ -60,9 +71,9 @@ namespace Hypertable { virtual ~CephBroker(); virtual void open(ResponseCallbackOpen *cb, const char *fname, - uint32_t bufsz); + uint32_t flags, uint32_t bufsz); virtual void - create(ResponseCallbackOpen *cb, const char *fname, bool overwrite, + create(ResponseCallbackOpen *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz); virtual void close(ResponseCallback *cb, uint32_t fd); virtual void read(ResponseCallbackRead *cb, uint32_t fd, uint32_t amount); @@ -85,6 +96,7 @@ namespace Hypertable { StaticBuffer &serialized_parameters); private: + ceph_mount_t *cmount; static atomic_t ms_next_fd; virtual void report_error(ResponseCallback *cb, int error); -- 2.39.5