From 8f278647f4aac364c543059a039610d8a28a6b4d Mon Sep 17 00:00:00 2001 From: Josh Durgin Date: Wed, 29 Feb 2012 16:28:20 -0800 Subject: [PATCH] librados: split into separate files and remove unnecessary headers Signed-off-by: Josh Durgin --- src/Makefile.am | 8 +- src/librados.cc | 4418 ------------------------ src/librados/AioCompletionImpl.h | 133 + src/librados/IoCtxImpl.cc | 87 + src/librados/IoCtxImpl.h | 87 + src/librados/PoolAsyncCompletionImpl.h | 88 + src/librados/RadosClient.cc | 2014 +++++++++++ src/librados/RadosClient.h | 234 ++ src/librados/librados.cc | 2046 +++++++++++ 9 files changed, 4696 insertions(+), 4419 deletions(-) delete mode 100644 src/librados.cc create mode 100644 src/librados/AioCompletionImpl.h create mode 100644 src/librados/IoCtxImpl.cc create mode 100644 src/librados/IoCtxImpl.h create mode 100644 src/librados/PoolAsyncCompletionImpl.h create mode 100644 src/librados/RadosClient.cc create mode 100644 src/librados/RadosClient.h create mode 100644 src/librados/librados.cc diff --git a/src/Makefile.am b/src/Makefile.am index 08e7c38f5a4ae..f43a28fa3efd6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -272,7 +272,9 @@ bin_DEBUGPROGRAMS += testsignal_handlers # librados librados_SOURCES = \ - librados.cc \ + librados/librados.cc \ + librados/RadosClient.cc \ + librados/IoCtxImpl.cc \ osdc/Objecter.cc librados_la_SOURCES = ${librados_SOURCES} librados_la_CFLAGS = ${CRYPTO_CFLAGS} ${AM_CFLAGS} @@ -1250,6 +1252,10 @@ noinst_HEADERS = \ include/rados/buffer.h\ include/rbd/librbd.h\ include/rbd/librbd.hpp\ + librados/AioCompletionImpl.h\ + librados/IoCtxImpl.h\ + librados/PoolAsyncCompletionImpl.h\ + librados/RadosClient.h\ logrotate.conf\ json_spirit/json_spirit.h\ json_spirit/json_spirit_error_position.h\ diff --git a/src/librados.cc b/src/librados.cc deleted file mode 100644 index 3b0183d6bc8be..0000000000000 --- a/src/librados.cc +++ /dev/null @@ -1,4418 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2009 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include - -#include -#include -#include -#include -#include -using namespace std; - -#include "common/config.h" -#include "include/str_list.h" - -#include "mon/MonMap.h" -#include "mds/MDS.h" -#include "osd/OSDMap.h" - -#include "msg/SimpleMessenger.h" - -#include "common/ceph_argparse.h" -#include "common/Timer.h" -#include "common/common_init.h" - -#include "mon/MonClient.h" - -#include "osdc/Objecter.h" - -#include "include/rados/librados.h" -#include "include/rados/librados.hpp" - -#include "messages/MWatchNotify.h" -#include "common/errno.h" - -#define RADOS_LIST_MAX_ENTRIES 1024 -#define DOUT_SUBSYS rados -#undef dout_prefix -#define dout_prefix *_dout << "librados: " - - -static atomic_t rados_instance; - - -/* - * Structure of this file - * - * RadosClient and the related classes are the internal implementation of librados. - * Above that layer sits the C API, found in include/rados/librados.h, and - * the C++ API, found in include/rados/librados.hpp - * - * The C++ API sometimes implements things in terms of the C API. - * Both the C++ and C API rely on RadosClient. - * - * Visually: - * +--------------------------------------+ - * | C++ API | - * +--------------------+ | - * | C API | | - * +--------------------+-----------------+ - * | RadosClient | - * +--------------------------------------+ - */ - -///////////////////////////// RadosClient ////////////////////////////// -struct librados::IoCtxImpl { - atomic_t ref_cnt; - RadosClient *client; - int64_t poolid; - string pool_name; - snapid_t snap_seq; - ::SnapContext snapc; - uint64_t assert_ver; - map assert_src_version; - eversion_t last_objver; - uint32_t notify_timeout; - object_locator_t oloc; - - Mutex aio_write_list_lock; - tid_t aio_write_seq; - Cond aio_write_cond; - xlist aio_write_list; - - IoCtxImpl(); - IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s); - - void dup(const IoCtxImpl& rhs) { - // Copy everything except the ref count - client = rhs.client; - poolid = rhs.poolid; - pool_name = rhs.pool_name; - snap_seq = rhs.snap_seq; - snapc = rhs.snapc; - assert_ver = rhs.assert_ver; - assert_src_version = rhs.assert_src_version; - last_objver = rhs.last_objver; - notify_timeout = rhs.notify_timeout; - oloc = rhs.oloc; - } - - void set_snap_read(snapid_t s); - int set_snap_write_context(snapid_t seq, vector& snaps); - - void get() { - ref_cnt.inc(); - } - - void put() { - if (ref_cnt.dec() == 0) - delete this; - } - - void queue_aio_write(struct AioCompletionImpl *c); - void complete_aio_write(struct AioCompletionImpl *c); - void flush_aio_writes(); - - int64_t get_id() { - return poolid; - } -}; - -size_t librados::ObjectOperation::size() -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - return o->size(); -} - -void librados::ObjectOperation::set_op_flags(ObjectOperationFlags flags) -{ - int rados_flags = 0; - if (flags & OP_EXCL) - rados_flags |= CEPH_OSD_OP_FLAG_EXCL; - if (flags & OP_FAILOK) - rados_flags |= CEPH_OSD_OP_FLAG_FAILOK; - - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->set_last_op_flags(rados_flags); -} - -void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, const bufferlist& v) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_STRING, v); -} - -void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, uint64_t v) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist bl; - ::encode(v, bl); - o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_U64, bl); -} - -void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, - const char *name, int op, const bufferlist& v) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - object_t oid(src_oid); - o->src_cmpxattr(oid, CEPH_NOSNAP, name, v, op, CEPH_OSD_CMPXATTR_MODE_STRING); -} - -void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, - const char *name, int op, uint64_t val) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - object_t oid(src_oid); - bufferlist bl; - ::encode(val, bl); - o->src_cmpxattr(oid, CEPH_NOSNAP, name, bl, op, CEPH_OSD_CMPXATTR_MODE_U64); -} - -void librados::ObjectOperation::exec(const char *cls, const char *method, bufferlist& inbl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->call(cls, method, inbl); -} - -void librados::ObjectReadOperation::stat(uint64_t *psize, time_t *pmtime, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->stat(psize, pmtime, prval); -} - -void librados::ObjectReadOperation::read(size_t off, uint64_t len, bufferlist *pbl, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->read(off, len, pbl, prval); -} - -void librados::ObjectReadOperation::tmap_get(bufferlist *pbl, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->tmap_get(pbl, prval); -} - -void librados::ObjectReadOperation::getxattr(const char *name, bufferlist *pbl, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->getxattr(name, pbl, prval); -} - -void librados::ObjectReadOperation::omap_get_vals( - const std::string &start_after, - const std::string &filter_prefix, - uint64_t max_return, - std::map *out_vals, - int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_get_vals(start_after, filter_prefix, max_return, out_vals, prval); -} - -void librados::ObjectReadOperation::omap_get_vals( - const std::string &start_after, - uint64_t max_return, - std::map *out_vals, - int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_get_vals(start_after, "", max_return, out_vals, prval); -} - -void librados::ObjectReadOperation::omap_get_keys( - const std::string &start_after, - uint64_t max_return, - std::set *out_keys, - int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_get_keys(start_after, max_return, out_keys, prval); -} - -void librados::ObjectReadOperation::omap_get_header(bufferlist *bl, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_get_header(bl, prval); -} - -void librados::ObjectReadOperation::omap_get_vals_by_keys( - const std::set &keys, - std::map *map, - int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_get_vals_by_keys(keys, map, prval); -} - -void librados::ObjectReadOperation::getxattrs(map *pattrs, int *prval) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->getxattrs(pattrs, prval); -} - -void librados::ObjectWriteOperation::create(bool exclusive) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->create(exclusive); -} - -void librados::ObjectWriteOperation::create(bool exclusive, const std::string& category) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->create(exclusive, category); -} - -void librados::ObjectWriteOperation::write(uint64_t off, const bufferlist& bl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist c = bl; - o->write(off, c); -} - -void librados::ObjectWriteOperation::write_full(const bufferlist& bl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist c = bl; - o->write_full(c); -} - -void librados::ObjectWriteOperation::append(const bufferlist& bl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist c = bl; - o->append(c); -} - -void librados::ObjectWriteOperation::remove() -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->remove(); -} - -void librados::ObjectWriteOperation::truncate(uint64_t off) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->truncate(off); -} - -void librados::ObjectWriteOperation::zero(uint64_t off, uint64_t len) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->zero(off, len); -} - -void librados::ObjectWriteOperation::rmxattr(const char *name) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->rmxattr(name); -} - -void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist& v) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->setxattr(name, v); -} - -void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist c = bl; - o->tmap_put(c); -} - -void librados::ObjectWriteOperation::tmap_update(const bufferlist& cmdbl) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - bufferlist c = cmdbl; - o->tmap_update(c); -} - -void librados::ObjectWriteOperation::clone_range(uint64_t dst_off, - const std::string& src_oid, uint64_t src_off, - size_t len) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->clone_range(src_oid, src_off, len, dst_off); -} - -void librados::ObjectWriteOperation::omap_set( - const map &map) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_set(map); -} - -void librados::ObjectWriteOperation::omap_set_header(const bufferlist &bl) -{ - bufferlist c = bl; - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_set_header(c); -} - -void librados::ObjectWriteOperation::omap_clear() -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_clear(); -} - -void librados::ObjectWriteOperation::omap_rm_keys( - const std::set &to_rm) -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - o->omap_rm_keys(to_rm); -} - -librados::WatchCtx:: -~WatchCtx() -{ -} - -struct librados::AioCompletionImpl { - Mutex lock; - Cond cond; - int ref, rval; - bool released; - bool ack, safe; - eversion_t objver; - - rados_callback_t callback_complete, callback_safe; - void *callback_arg; - - // for read - bufferlist bl, *pbl; - char *buf; - unsigned maxlen; - - IoCtxImpl *io; - tid_t aio_write_seq; - xlist::item aio_write_list_item; - - AioCompletionImpl() : lock("AioCompletionImpl lock"), - ref(1), rval(0), released(false), ack(false), safe(false), - callback_complete(0), callback_safe(0), callback_arg(0), - pbl(0), buf(0), maxlen(0), - io(NULL), aio_write_seq(0), aio_write_list_item(this) { } - - int set_complete_callback(void *cb_arg, rados_callback_t cb) { - lock.Lock(); - callback_complete = cb; - callback_arg = cb_arg; - lock.Unlock(); - return 0; - } - int set_safe_callback(void *cb_arg, rados_callback_t cb) { - lock.Lock(); - callback_safe = cb; - callback_arg = cb_arg; - lock.Unlock(); - return 0; - } - int wait_for_complete() { - lock.Lock(); - while (!ack) - cond.Wait(lock); - lock.Unlock(); - return 0; - } - int wait_for_safe() { - lock.Lock(); - while (!safe) - cond.Wait(lock); - lock.Unlock(); - return 0; - } - int is_complete() { - lock.Lock(); - int r = ack; - lock.Unlock(); - return r; - } - int is_safe() { - lock.Lock(); - int r = safe; - lock.Unlock(); - return r; - } - int get_return_value() { - lock.Lock(); - int r = rval; - lock.Unlock(); - return r; - } - uint64_t get_version() { - lock.Lock(); - eversion_t v = objver; - lock.Unlock(); - return v.version; - } - - void get() { - lock.Lock(); - assert(ref > 0); - ref++; - lock.Unlock(); - } - void release() { - lock.Lock(); - assert(!released); - released = true; - put_unlock(); - } - void put() { - lock.Lock(); - put_unlock(); - } - void put_unlock() { - assert(ref > 0); - int n = --ref; - lock.Unlock(); - if (!n) - delete this; - } -}; - -void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) -{ - get(); - aio_write_list_lock.Lock(); - assert(!c->io); - c->io = this; - c->aio_write_seq = ++aio_write_seq; - aio_write_list.push_back(&c->aio_write_list_item); - aio_write_list_lock.Unlock(); -} - -void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) -{ - aio_write_list_lock.Lock(); - assert(c->io == this); - c->io = NULL; - c->aio_write_list_item.remove_myself(); - aio_write_cond.Signal(); - aio_write_list_lock.Unlock(); - put(); -} - -void librados::IoCtxImpl::flush_aio_writes() -{ - aio_write_list_lock.Lock(); - tid_t seq = aio_write_seq; - while (!aio_write_list.empty() && - aio_write_list.front()->aio_write_seq <= seq) - aio_write_cond.Wait(aio_write_list_lock); - aio_write_list_lock.Unlock(); -} - -struct librados::ObjListCtx { - librados::IoCtxImpl *ctx; - Objecter::ListContext *lc; - - ObjListCtx(IoCtxImpl *c, Objecter::ListContext *l) : ctx(c), lc(l) {} - ~ObjListCtx() { - delete lc; - } -}; - -struct librados::PoolAsyncCompletionImpl { - Mutex lock; - Cond cond; - int ref, rval; - bool released; - bool done; - - rados_callback_t callback; - void *callback_arg; - - PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), - ref(1), rval(0), released(false), done(false), - callback(0), callback_arg(0) {} - - int set_callback(void *cb_arg, rados_callback_t cb) { - lock.Lock(); - callback = cb; - callback_arg = cb_arg; - lock.Unlock(); - return 0; - } - int wait() { - lock.Lock(); - while (!done) - cond.Wait(lock); - lock.Unlock(); - return 0; - } - int is_complete() { - lock.Lock(); - int r = done; - lock.Unlock(); - return r; - } - int get_return_value() { - lock.Lock(); - int r = rval; - lock.Unlock(); - return r; - } - void get() { - lock.Lock(); - assert(ref > 0); - ref++; - lock.Unlock(); - } - void release() { - lock.Lock(); - assert(!released); - released = true; - put_unlock(); - } - void put() { - lock.Lock(); - put_unlock(); - } - void put_unlock() { - assert(ref > 0); - int n = --ref; - lock.Unlock(); - if (!n) - delete this; - } -}; - -class librados::RadosClient : public Dispatcher -{ -public: - CephContext *cct; - md_config_t *conf; -private: - enum { - DISCONNECTED, - CONNECTING, - CONNECTED, - } state; - - OSDMap osdmap; - MonClient monclient; - SimpleMessenger *messenger; - - bool _dispatch(Message *m); - bool ms_dispatch(Message *m); - - bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new) { - //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl; - /* monitor authorization is being handled on different layer */ - if (dest_type == CEPH_ENTITY_TYPE_MON) - return true; - *authorizer = monclient.auth->build_authorizer(dest_type); - return *authorizer != NULL; - } - void ms_handle_connect(Connection *con); - bool ms_handle_reset(Connection *con); - void ms_handle_remote_reset(Connection *con); - - - Objecter *objecter; - - Mutex lock; - Cond cond; - SafeTimer timer; - -public: - RadosClient(CephContext *cct_) : Dispatcher(cct_), - cct(cct_), conf(cct_->_conf), - state(DISCONNECTED), monclient(cct_), - messenger(NULL), objecter(NULL), - lock("radosclient"), timer(cct, lock), max_watch_cookie(0) - { - } - - ~RadosClient(); - int connect(); - void shutdown(); - - int64_t lookup_pool(const char *name) { - int64_t ret = osdmap.lookup_pg_pool_name(name); - if (ret < 0) - return -ENOENT; - return ret; - } - - const char *get_pool_name(int64_t poolid_) - { - return osdmap.get_pool_name(poolid_); - } - - ::ObjectOperation *prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op); - - // snaps - int snap_list(IoCtxImpl *io, vector *snaps); - int snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid); - int snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s); - int snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t); - int snap_create(rados_ioctx_t io, const char* snapname); - int selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid); - int snap_remove(rados_ioctx_t io, const char* snapname); - int rollback(rados_ioctx_t io_, const object_t& oid, const char *snapName); - int selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid); - int selfmanaged_snap_rollback_object(rados_ioctx_t io, const object_t& oid, - ::SnapContext& snapc, uint64_t snapid); - - // io - int create(IoCtxImpl& io, const object_t& oid, bool exclusive); - int create(IoCtxImpl& io, const object_t& oid, bool exclusive, const std::string& category); - int write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); - int append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len); - int write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - int clone_range(IoCtxImpl& io, const object_t& dst_oid, uint64_t dst_offset, - const object_t& src_oid, uint64_t src_offset, uint64_t len); - int read(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); - int mapext(IoCtxImpl& io, const object_t& oid, uint64_t off, size_t len, - std::map& m); - int sparse_read(IoCtxImpl& io, const object_t& oid, std::map& m, - bufferlist& bl, size_t len, uint64_t off); - int remove(IoCtxImpl& io, const object_t& oid); - int stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime); - int trunc(IoCtxImpl& io, const object_t& oid, uint64_t size); - - int tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl); - int tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - int tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl); - - int exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); - - int getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); - int setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); - int getxattrs(IoCtxImpl& io, const object_t& oid, map& attrset); - int rmxattr(IoCtxImpl& io, const object_t& oid, const char *name); - - int pool_list(std::list& ls); - int get_pool_stats(std::list& ls, map& result); - int get_fs_stats(ceph_statfs& result); - - int pool_create(string& name, unsigned long long auid=0, __u8 crush_rule=0); - int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, - __u8 crush_rule=0); - int pool_delete(const char *name); - int pool_change_auid(rados_ioctx_t io, unsigned long long auid); - int pool_get_auid(rados_ioctx_t io, unsigned long long *auid); - - int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); - int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c); - - int list(Objecter::ListContext *context, int max_entries); - - int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, time_t *pmtime); - int operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); - int aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c); - int aio_operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl); - - struct C_aio_Ack : public Context { - AioCompletionImpl *c; - void finish(int r) { - c->lock.Lock(); - c->rval = r; - c->ack = true; - c->cond.Signal(); - - if (c->buf && c->bl.length() > 0) { - unsigned l = MIN(c->bl.length(), c->maxlen); - c->bl.copy(0, l, c->buf); - c->rval = c->bl.length(); - } - if (c->pbl) { - *c->pbl = c->bl; - } - - if (c->callback_complete) { - rados_callback_t cb = c->callback_complete; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); - } - C_aio_Ack(AioCompletionImpl *_c) : c(_c) { - c->get(); - } - }; - - struct C_aio_sparse_read_Ack : public Context { - AioCompletionImpl *c; - bufferlist *data_bl; - std::map *m; - - void finish(int r) { - c->lock.Lock(); - c->rval = r; - c->ack = true; - c->cond.Signal(); - - bufferlist::iterator iter = c->bl.begin(); - if (r >= 0) { - ::decode(*m, iter); - ::decode(*data_bl, iter); - } - - if (c->callback_complete) { - rados_callback_t cb = c->callback_complete; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); - } - C_aio_sparse_read_Ack(AioCompletionImpl *_c) : c(_c) { - c->get(); - } - }; - - struct C_aio_Safe : public Context { - AioCompletionImpl *c; - void finish(int r) { - c->lock.Lock(); - if (!c->ack) { - c->rval = r; - c->ack = true; - } - c->safe = true; - c->cond.Signal(); - - if (c->callback_safe) { - rados_callback_t cb = c->callback_safe; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->io->complete_aio_write(c); - - c->put_unlock(); - } - C_aio_Safe(AioCompletionImpl *_c) : c(_c) { - c->get(); - } - }; - - int aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - bufferlist *pbl, size_t len, uint64_t off); - int aio_read(IoCtxImpl& io, object_t oid, AioCompletionImpl *c, - char *buf, size_t len, uint64_t off); - int aio_sparse_read(IoCtxImpl& io, const object_t oid, - AioCompletionImpl *c, std::map *m, - bufferlist *data_bl, size_t len, uint64_t off); - int aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len, uint64_t off); - int aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len); - int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl); - int aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl); - - struct C_PoolAsync_Safe : public Context { - PoolAsyncCompletionImpl *c; - void finish(int r) { - c->lock.Lock(); - c->rval = r; - c->done = true; - c->cond.Signal(); - - if (c->callback) { - rados_callback_t cb = c->callback; - void *cb_arg = c->callback_arg; - c->lock.Unlock(); - cb(c, cb_arg); - c->lock.Lock(); - } - - c->put_unlock(); - } - C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) : c(_c) { - c->get(); - } - }; - - static PoolAsyncCompletionImpl *pool_async_create_completion() { - return new PoolAsyncCompletionImpl; - } - static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, rados_callback_t cb) { - PoolAsyncCompletionImpl *c = new PoolAsyncCompletionImpl; - if (cb) - c->set_callback(cb_arg, cb); - return c; - } - - static AioCompletionImpl *aio_create_completion() { - return new AioCompletionImpl; - } - static AioCompletionImpl *aio_create_completion(void *cb_arg, - rados_callback_t cb_complete, - rados_callback_t cb_safe) { - AioCompletionImpl *c = new AioCompletionImpl; - if (cb_complete) - c->set_complete_callback(cb_arg, cb_complete); - if (cb_safe) - c->set_safe_callback(cb_arg, cb_safe); - return c; - } - - // watch/notify - struct WatchContext { - IoCtxImpl *io_ctx_impl; - const object_t oid; - uint64_t cookie; - uint64_t ver; - librados::WatchCtx *ctx; - uint64_t linger_id; - - WatchContext(IoCtxImpl *io_ctx_impl_, const object_t& _oc, librados::WatchCtx *_ctx) - : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) { - io_ctx_impl->get(); - } - ~WatchContext() { - io_ctx_impl->put(); - } - void notify(RadosClient *client, MWatchNotify *m) { - ctx->notify(m->opcode, m->ver, m->bl); - if (m->opcode != WATCH_NOTIFY_COMPLETE) { - client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver); - } - } - }; - - struct C_NotifyComplete : public librados::WatchCtx { - Mutex *lock; - Cond *cond; - bool *done; - - C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d) : lock(_l), cond(_c), done(_d) { - *done = false; - } - - void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { - *done = true; - cond->Signal(); - } - }; - - uint64_t max_watch_cookie; - map watchers; - - void set_sync_op_version(IoCtxImpl& io, eversion_t& ver) { - io.last_objver = ver; - } - - void register_watcher(IoCtxImpl& io, const object_t& oid, librados::WatchCtx *ctx, - uint64_t *cookie, WatchContext **pwc = NULL) { - assert(lock.is_locked()); - WatchContext *wc = new WatchContext(&io, oid, ctx); - *cookie = ++max_watch_cookie; - watchers[*cookie] = wc; - if (pwc) - *pwc = wc; - } - - void unregister_watcher(uint64_t cookie) { - assert(lock.is_locked()); - map::iterator iter = watchers.find(cookie); - if (iter != watchers.end()) { - WatchContext *ctx = iter->second; - if (ctx->linger_id) - objecter->unregister_linger(ctx->linger_id); - delete ctx; - watchers.erase(iter); - } - } - - void watch_notify(MWatchNotify *m); - - int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); - int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie); - int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl); - int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver); - - eversion_t last_version(IoCtxImpl& io) { - return io.last_objver; - } - void set_assert_version(IoCtxImpl& io, uint64_t ver) { - io.assert_ver = ver; - } - void set_assert_src_version(IoCtxImpl& io, const object_t& oid, uint64_t ver) { - io.assert_src_version[oid] = ver; - } - - void set_notify_timeout(IoCtxImpl& io, uint32_t timeout) { - io.notify_timeout = timeout; - } -}; - -librados::IoCtxImpl::IoCtxImpl() - : aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") -{ -} - -librados::IoCtxImpl::IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) - : ref_cnt(0), client(c), poolid(pid), - pool_name(pool_name_), snap_seq(s), assert_ver(0), - notify_timeout(c->cct->_conf->client_notify_timeout), oloc(pid), - aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) -{ -} - -void librados::IoCtxImpl::set_snap_read(snapid_t s) -{ - if (!s) - s = CEPH_NOSNAP; - ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl; - snap_seq = s; -} - -int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector& snaps) -{ - ::SnapContext n; - ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl; - n.seq = seq; - n.snaps = snaps; - if (!n.is_valid()) - return -EINVAL; - snapc = n; - return 0; -} - -int librados::RadosClient::connect() -{ - common_init_finish(cct); - - int err; - uint64_t nonce; - - // already connected? - if (state == CONNECTING) - return -EINPROGRESS; - if (state == CONNECTED) - return -EISCONN; - state = CONNECTING; - - // get monmap - err = monclient.build_initial_monmap(); - if (err < 0) - goto out; - - err = -ENOMEM; - nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc()); - messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), nonce); - if (!messenger) - goto out; - - // require OSDREPLYMUX feature. this means we will fail to talk to - // old servers. this is necessary because otherwise we won't know - // how to decompose the reply data into its consituent pieces. - messenger->set_default_policy(Messenger::Policy::client(0, CEPH_FEATURE_OSDREPLYMUX)); - - ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl; - - ldout(cct, 1) << "starting objecter" << dendl; - - err = -ENOMEM; - objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); - if (!objecter) - goto out; - objecter->set_balanced_budget(); - - monclient.set_messenger(messenger); - - messenger->add_dispatcher_head(this); - - messenger->start(); - - ldout(cct, 1) << "setting wanted keys" << dendl; - monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); - ldout(cct, 1) << "calling monclient init" << dendl; - err = monclient.init(); - if (err) { - ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl; - shutdown(); - goto out; - } - - err = monclient.authenticate(conf->client_mount_timeout); - if (err) { - ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl; - shutdown(); - goto out; - } - messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); - - lock.Lock(); - - timer.init(); - - objecter->set_client_incarnation(0); - objecter->init(); - monclient.renew_subs(); - - while (osdmap.get_epoch() == 0) { - ldout(cct, 1) << "waiting for osdmap" << dendl; - cond.Wait(lock); - } - state = CONNECTED; - lock.Unlock(); - - ldout(cct, 1) << "init done" << dendl; - err = 0; - - out: - if (err) - state = DISCONNECTED; - return err; -} - -void librados::RadosClient::shutdown() -{ - lock.Lock(); - if (state == DISCONNECTED) { - lock.Unlock(); - return; - } - monclient.shutdown(); - if (objecter && state == CONNECTED) - objecter->shutdown(); - state = DISCONNECTED; - timer.shutdown(); // will drop+retake lock - lock.Unlock(); - if (messenger) { - messenger->shutdown(); - messenger->wait(); - } - ldout(cct, 1) << "shutdown" << dendl; -} - -librados::RadosClient::~RadosClient() -{ - if (messenger) - delete messenger; - if (objecter) - delete objecter; - common_destroy_context(cct); - cct = NULL; -} - - -bool librados::RadosClient::ms_dispatch(Message *m) -{ - bool ret; - - lock.Lock(); - if (state == DISCONNECTED) { - ldout(cct, 10) << "disconnected, discarding " << *m << dendl; - m->put(); - ret = true; - } else { - ret = _dispatch(m); - } - lock.Unlock(); - return ret; -} - -void librados::RadosClient::ms_handle_connect(Connection *con) -{ - Mutex::Locker l(lock); - objecter->ms_handle_connect(con); -} - -bool librados::RadosClient::ms_handle_reset(Connection *con) -{ - Mutex::Locker l(lock); - objecter->ms_handle_reset(con); - return false; -} - -void librados::RadosClient::ms_handle_remote_reset(Connection *con) -{ - Mutex::Locker l(lock); - objecter->ms_handle_remote_reset(con); -} - - -bool librados::RadosClient::_dispatch(Message *m) -{ - switch (m->get_type()) { - // OSD - case CEPH_MSG_OSD_OPREPLY: - objecter->handle_osd_op_reply((class MOSDOpReply*)m); - break; - case CEPH_MSG_OSD_MAP: - objecter->handle_osd_map((MOSDMap*)m); - cond.Signal(); - break; - case MSG_GETPOOLSTATSREPLY: - objecter->handle_get_pool_stats_reply((MGetPoolStatsReply*)m); - break; - - case CEPH_MSG_MDS_MAP: - break; - - case CEPH_MSG_STATFS_REPLY: - objecter->handle_fs_stats_reply((MStatfsReply*)m); - break; - - case CEPH_MSG_POOLOP_REPLY: - objecter->handle_pool_op_reply((MPoolOpReply*)m); - break; - - case CEPH_MSG_WATCH_NOTIFY: - watch_notify((MWatchNotify *)m); - break; - default: - return false; - } - - return true; -} - -int librados::RadosClient::pool_list(std::list& v) -{ - Mutex::Locker l(lock); - for (map::const_iterator p = osdmap.get_pools().begin(); - p != osdmap.get_pools().end(); - p++) - v.push_back(osdmap.get_pool_name(p->first)); - return 0; -} - -int librados::RadosClient::get_pool_stats(std::list& pools, - map& result) -{ - Mutex mylock("RadosClient::get_pool_stats::mylock"); - Cond cond; - bool done; - - lock.Lock(); - objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done)); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - return 0; -} - -int librados::RadosClient::get_fs_stats(ceph_statfs& stats) -{ - Mutex mylock ("RadosClient::get_fs_stats::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->get_fs_stats(stats, new C_SafeCond(&mylock, &cond, &done)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - - return 0; -} - - -// SNAPS - -int librados::RadosClient::snap_create(rados_ioctx_t io, const char *snapName) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - string sName(snapName); - - Mutex mylock ("RadosClient::snap_create::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->create_pool_snap(poolID, - sName, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while(!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::selfmanaged_snap_create(rados_ioctx_t io, uint64_t *psnapid) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::selfmanaged_snap_create::mylock"); - Cond cond; - bool done; - lock.Lock(); - snapid_t snapid; - objecter->allocate_selfmanaged_snap(poolID, &snapid, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - if (reply == 0) - *psnapid = snapid; - return reply; -} - -int librados::RadosClient::snap_remove(rados_ioctx_t io, const char *snapName) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - string sName(snapName); - - Mutex mylock ("RadosClient::snap_remove::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->delete_pool_snap(poolID, - sName, - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while(!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::selfmanaged_snap_rollback_object(rados_ioctx_t io, - const object_t& oid, - ::SnapContext& snapc, - uint64_t snapid) -{ - int reply; - IoCtxImpl* ctx = (IoCtxImpl *) io; - - Mutex mylock("RadosClient::snap_rollback::mylock"); - Cond cond; - bool done; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); - - lock.Lock(); - objecter->rollback_object(oid, ctx->oloc, snapc, snapid, - ceph_clock_now(cct), onack, NULL); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::rollback(rados_ioctx_t io_, const object_t& oid, - const char *snapName) -{ - IoCtxImpl* io = (IoCtxImpl *) io_; - string sName(snapName); - - lock.Lock(); - snapid_t snap; - const map& pools = objecter->osdmap->get_pools(); - const pg_pool_t& pg_pool = pools.find(io->poolid)->second; - map::const_iterator p; - for (p = pg_pool.snaps.begin(); - p != pg_pool.snaps.end(); - ++p) { - if (p->second.name == snapName) { - snap = p->first; - break; - } - } - if (p == pg_pool.snaps.end()) { - lock.Unlock(); - return -ENOENT; - } - lock.Unlock(); - - return selfmanaged_snap_rollback_object(io_, oid, io->snapc, snap); -} - -int librados::RadosClient::selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid) -{ - int reply; - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::selfmanaged_snap_remove::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->delete_selfmanaged_snap(poolID, snapid_t(snapid), - new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return (int)reply; -} - -int librados::RadosClient::pool_create(string& name, unsigned long long auid, - __u8 crush_rule) -{ - int reply; - - Mutex mylock ("RadosClient::pool_create::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->create_pool(name, - new C_SafeCond(&mylock, &cond, &done, &reply), - auid, crush_rule); - lock.Unlock(); - - mylock.Lock(); - while(!done) - cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c, - unsigned long long auid, - __u8 crush_rule) -{ - Mutex::Locker l(lock); - objecter->create_pool(name, - new C_PoolAsync_Safe(c), - auid, crush_rule); - return 0; -} - -int librados::RadosClient::pool_delete(const char *name) -{ - int tmp_pool_id = osdmap.lookup_pg_pool_name(name); - if (tmp_pool_id < 0) - return -ENOENT; - - Mutex mylock("RadosClient::pool_delete::mylock"); - Cond cond; - bool done; - lock.Lock(); - int reply = 0; - objecter->delete_pool(tmp_pool_id, new C_SafeCond(&mylock, &cond, &done, &reply)); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c) -{ - int tmp_pool_id = osdmap.lookup_pg_pool_name(name); - if (tmp_pool_id < 0) - return -ENOENT; - - Mutex::Locker l(lock); - objecter->delete_pool(tmp_pool_id, new C_PoolAsync_Safe(c)); - - return 0; -} - -int librados::RadosClient::pool_change_auid(rados_ioctx_t io, unsigned long long auid) -{ - int reply; - - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex mylock("RadosClient::pool_change_auid::mylock"); - Cond cond; - bool done; - lock.Lock(); - objecter->change_pool_auid(poolID, - new C_SafeCond(&mylock, &cond, &done, &reply), - auid); - lock.Unlock(); - - mylock.Lock(); - while (!done) cond.Wait(mylock); - mylock.Unlock(); - return reply; -} - -int librados::RadosClient::pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, - PoolAsyncCompletionImpl *c) -{ - int64_t poolID = ((IoCtxImpl *)io)->poolid; - - Mutex::Locker l(lock); - objecter->change_pool_auid(poolID, - new C_PoolAsync_Safe(c), - auid); - return 0; -} - -int librados::RadosClient::pool_get_auid(rados_ioctx_t io, unsigned long long *auid) -{ - Mutex::Locker l(lock); - int64_t pool_id = ((IoCtxImpl *)io)->poolid; - const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); - if (!pg) - return -ENOENT; - *auid = pg->auid; - return 0; -} - -int librados::RadosClient::snap_list(IoCtxImpl *io, vector *snaps) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - for (map::const_iterator p = pi->snaps.begin(); - p != pi->snaps.end(); - p++) - snaps->push_back(p->first); - return 0; -} - -int librados::RadosClient::snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - for (map::const_iterator p = pi->snaps.begin(); - p != pi->snaps.end(); - p++) { - if (p->second.name == name) { - *snapid = p->first; - return 0; - } - } - return -ENOENT; -} - -int librados::RadosClient::snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - map::const_iterator p = pi->snaps.find(snapid); - if (p == pi->snaps.end()) - return -ENOENT; - *s = p->second.name.c_str(); - return 0; -} - -int librados::RadosClient::snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t) -{ - Mutex::Locker l(lock); - const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); - map::const_iterator p = pi->snaps.find(snapid); - if (p == pi->snaps.end()) - return -ENOENT; - *t = p->second.stamp.sec(); - return 0; -} - - -// IO - -int librados::RadosClient::list(Objecter::ListContext *context, int max_entries) -{ - Cond cond; - bool done; - int r = 0; - object_t oid; - Mutex mylock("RadosClient::list::mylock"); - - if (context->at_end) - return 0; - - context->max_entries = max_entries; - - lock.Lock(); - objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r)); - lock.Unlock(); - - mylock.Lock(); - while(!done) - cond.Wait(mylock); - mylock.Unlock(); - - return r; -} - -int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::create::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->create(oid, io.oloc, - io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), - onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive, - const std::string& category) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::create::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - ::ObjectOperation o; - o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category); - - lock.Lock(); - objecter->mutate(oid, io.oloc, o, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -/* - * add any version assert operations that are appropriate given the - * stat in the IoCtx, either the target version assert or any src - * object asserts. these affect a single ioctx operation, so clear - * the ioctx state when we're doing. - * - * return a pointer to the ObjectOperation if we added any events; - * this is convenient for passing the extra_ops argument into Objecter - * methods. - */ -::ObjectOperation *librados::RadosClient::prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op) -{ - ::ObjectOperation *pop = NULL; - if (io->assert_ver) { - op->assert_version(io->assert_ver); - io->assert_ver = 0; - pop = op; - } - while (!io->assert_src_version.empty()) { - map::iterator p = io->assert_src_version.begin(); - op->assert_src_version(p->first, CEPH_NOSNAP, p->second); - io->assert_src_version.erase(p); - pop = op; - } - return pop; -} - -int librados::RadosClient::write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, - size_t len, uint64_t off) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - // extra ops? - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->write(oid, io.oloc, - off, len, io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return len; -} - -int librados::RadosClient::append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::append::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->append(oid, io.oloc, - len, io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return len; -} - -int librados::RadosClient::write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write_full::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->write_full(oid, io.oloc, - io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::clone_range(IoCtxImpl& io, - const object_t& dst_oid, uint64_t dst_offset, - const object_t& src_oid, uint64_t src_offset, - uint64_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::clone_range::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.clone_range(src_oid, src_offset, len, dst_offset); - objecter->mutate(dst_oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::operate(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, time_t *pmtime) -{ - utime_t ut; - if (pmtime) { - ut = utime_t(*pmtime, 0); - } else { - ut = ceph_clock_now(cct); - } - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - if (!o->size()) - return 0; - - Mutex mylock("RadosClient::mutate::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->mutate(oid, io.oloc, - *o, io.snapc, ut, 0, - onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::operate_read(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, bufferlist *pbl) -{ - if (!o->size()) - return 0; - - Mutex mylock("RadosClient::mutate::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->read(oid, io.oloc, - *o, io.snap_seq, pbl, 0, - onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::aio_operate_read(IoCtxImpl& io, const object_t &oid, - ::ObjectOperation *o, - AioCompletionImpl *c, bufferlist *pbl) -{ - Context *onack = new C_aio_Ack(c); - - c->pbl = pbl; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - *o, io.snap_seq, pbl, 0, - onack, 0); - return 0; -} - -int librados::RadosClient::aio_operate(IoCtxImpl& io, const object_t& oid, - ::ObjectOperation *o, AioCompletionImpl *c) -{ - utime_t ut = ceph_clock_now(cct); - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Context *onack = new C_aio_Ack(c); - Context *oncommit = new C_aio_Safe(c); - - io.queue_aio_write(c); - - Mutex::Locker l(lock); - objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - bufferlist *pbl, size_t len, uint64_t off) -{ - - Context *onack = new C_aio_Ack(c); - eversion_t ver; - - c->pbl = pbl; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack, &c->objver); - return 0; -} - -int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, - char *buf, size_t len, uint64_t off) -{ - Context *onack = new C_aio_Ack(c); - - c->buf = buf; - c->maxlen = len; - - Mutex::Locker l(lock); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_sparse_read(IoCtxImpl& io, const object_t oid, - AioCompletionImpl *c, std::map *m, - bufferlist *data_bl, size_t len, uint64_t off) -{ - - C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c); - onack->m = m; - onack->data_bl = data_bl; - eversion_t ver; - - c->pbl = NULL; - - Mutex::Locker l(lock); - objecter->sparse_read(oid, io.oloc, - off, len, io.snap_seq, &c->bl, 0, - onack); - return 0; -} - -int librados::RadosClient::aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len, uint64_t off) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->write(oid, io.oloc, - off, len, io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, - const bufferlist& bl, size_t len) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->append(oid, io.oloc, - len, io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::aio_write_full(IoCtxImpl& io, const object_t &oid, - AioCompletionImpl *c, const bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - io.queue_aio_write(c); - - Context *onack = new C_aio_Ack(c); - Context *onsafe = new C_aio_Safe(c); - - Mutex::Locker l(lock); - objecter->write_full(oid, io.oloc, - io.snapc, bl, ut, 0, - onack, onsafe, &c->objver); - - return 0; -} - -int librados::RadosClient::remove(IoCtxImpl& io, const object_t& oid) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::remove::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->remove(oid, io.oloc, - io.snapc, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::trunc(IoCtxImpl& io, const object_t& oid, uint64_t size) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::write_full::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->trunc(oid, io.oloc, - io.snapc, ut, 0, - size, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_update::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.tmap_update(cmdbl); - objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_put::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation wr; - prepare_assert_ops(&io, &wr); - wr.tmap_put(bl); - objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl) -{ - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::tmap_put::mylock"); - Cond cond; - bool done; - int r = 0; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - bufferlist outbl; - - lock.Lock(); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.tmap_get(&bl, NULL); - objecter->read(oid, io.oloc, rd, io.snap_seq, 0, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::exec(IoCtxImpl& io, const object_t& oid, - const char *cls, const char *method, - bufferlist& inbl, bufferlist& outbl) -{ - Mutex mylock("RadosClient::exec::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - - lock.Lock(); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.call(cls, method, inbl); - objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, - const char *cls, const char *method, - bufferlist& inbl, bufferlist *outbl) -{ - Context *onack = new C_aio_Ack(c); - - Mutex::Locker l(lock); - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.call(cls, method, inbl); - objecter->read(oid, io.oloc, rd, io.snap_seq, outbl, 0, onack, &c->objver); - - return 0; -} - -int librados::RadosClient::read(IoCtxImpl& io, const object_t& oid, - bufferlist& bl, size_t len, uint64_t off) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->read(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - if (bl.length() < len) { - ldout(cct, 10) << "Returned length " << bl.length() - << " less than original length "<< len << dendl; - } - - return bl.length(); -} - -int librados::RadosClient::mapext(IoCtxImpl& io, const object_t& oid, - uint64_t off, size_t len, std::map& m) -{ - CephContext *cct = io.client->cct; - bufferlist bl; - - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->mapext(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - if (r < 0) - return r; - - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - - return m.size(); -} - -int librados::RadosClient::sparse_read(IoCtxImpl& io, const object_t& oid, - std::map& m, - bufferlist& data_bl, size_t len, uint64_t off) -{ - CephContext *cct = io.client->cct; - bufferlist bl; - - Mutex mylock("RadosClient::read::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - objecter->sparse_read(oid, io.oloc, - off, len, io.snap_seq, &bl, 0, - onack); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; - - if (r < 0) - return r; - - bufferlist::iterator iter = bl.begin(); - ::decode(m, iter); - ::decode(data_bl, iter); - - return m.size(); -} - -int librados::RadosClient::stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::stat::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - uint64_t size; - utime_t mtime; - eversion_t ver; - - if (!psize) - psize = &size; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->stat(oid, io.oloc, - io.snap_seq, psize, &mtime, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from stat" << dendl; - - if (r >= 0 && pmtime) { - *pmtime = mtime.sec(); - } - - set_sync_op_version(io, ver); - - return r; -} - -int librados::RadosClient::getxattr(IoCtxImpl& io, const object_t& oid, - const char *name, bufferlist& bl) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::getxattr::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->getxattr(oid, io.oloc, - name, io.snap_seq, &bl, 0, - onack, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - ldout(cct, 10) << "Objecter returned from getxattr" << dendl; - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return bl.length(); -} - -int librados::RadosClient::rmxattr(IoCtxImpl& io, const object_t& oid, const char *name) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::rmxattr::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->removexattr(oid, io.oloc, name, - io.snapc, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return 0; -} - -int librados::RadosClient::setxattr(IoCtxImpl& io, const object_t& oid, - const char *name, bufferlist& bl) -{ - utime_t ut = ceph_clock_now(cct); - - /* can't write to a snapshot */ - if (io.snap_seq != CEPH_NOSNAP) - return -EROFS; - - Mutex mylock("RadosClient::setxattr::mylock"); - Cond cond; - bool done; - int r; - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - lock.Lock(); - objecter->setxattr(oid, io.oloc, name, - io.snapc, bl, ut, 0, - onack, NULL, &ver, pop); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - if (r < 0) - return r; - - return 0; -} - -int librados::RadosClient::getxattrs(IoCtxImpl& io, const object_t& oid, - map& attrset) -{ - CephContext *cct = io.client->cct; - Mutex mylock("RadosClient::getexattrs::mylock"); - Cond cond; - bool done; - int r; - eversion_t ver; - - ::ObjectOperation op; - ::ObjectOperation *pop = prepare_assert_ops(&io, &op); - - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - - lock.Lock(); - map aset; - objecter->getxattrs(oid, io.oloc, io.snap_seq, - aset, - 0, onack, &ver, pop); - lock.Unlock(); - - attrset.clear(); - - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - for (map::iterator p = aset.begin(); p != aset.end(); p++) { - ldout(cct, 10) << "RadosClient::getxattrs: xattr=" << p->first << dendl; - attrset[p->first.c_str()] = p->second; - } - - set_sync_op_version(io, ver); - - return r; -} - -void librados::RadosClient::watch_notify(MWatchNotify *m) -{ - assert(lock.is_locked()); - WatchContext *wc = NULL; - map::iterator iter = watchers.find(m->cookie); - if (iter != watchers.end()) - wc = iter->second; - - if (!wc) - return; - - wc->notify(this, m); - - m->put(); -} - -int librados::RadosClient::watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, - uint64_t *cookie, librados::WatchCtx *ctx) -{ - ::ObjectOperation rd; - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - bool done; - int r; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t objver; - - lock.Lock(); - - WatchContext *wc; - register_watcher(io, oid, ctx, cookie, &wc); - prepare_assert_ops(&io, &rd); - rd.watch(*cookie, ver, 1); - bufferlist bl; - wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, bl, NULL, 0, NULL, onfinish, &objver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, objver); - - if (r < 0) { - lock.Lock(); - unregister_watcher(*cookie); - lock.Unlock(); - } - - return r; -} - - -/* this is called with RadosClient::lock held */ -int librados::RadosClient::_notify_ack(IoCtxImpl& io, const object_t& oid, - uint64_t notify_id, uint64_t ver) -{ - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - eversion_t objver; - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.notify_ack(notify_id, ver); - objecter->read(oid, io.oloc, rd, io.snap_seq, (bufferlist*)NULL, 0, 0, 0); - - return 0; -} - -int librados::RadosClient::unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie) -{ - bufferlist inbl, outbl; - - Mutex mylock("RadosClient::watch::mylock"); - Cond cond; - bool done; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t ver; - lock.Lock(); - - unregister_watcher(cookie); - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - rd.watch(cookie, 0, 0); - objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); - lock.Unlock(); - - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - set_sync_op_version(io, ver); - - return r; -}// --------------------------------------------- - -int librados::RadosClient::notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl) -{ - bufferlist inbl, outbl; - - Mutex mylock("RadosClient::notify::mylock"); - Mutex mylock_all("RadosClient::notify::mylock_all"); - Cond cond, cond_all; - bool done, done_all; - int r; - Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); - eversion_t objver; - uint64_t cookie; - C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); - - ::ObjectOperation rd; - prepare_assert_ops(&io, &rd); - - lock.Lock(); - WatchContext *wc; - register_watcher(io, oid, ctx, &cookie, &wc); - uint32_t prot_ver = 1; - uint32_t timeout = io.notify_timeout; - ::encode(prot_ver, inbl); - ::encode(timeout, inbl); - ::encode(bl, inbl); - rd.notify(cookie, ver, inbl); - wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, inbl, NULL, - 0, onack, NULL, &objver); - lock.Unlock(); - - mylock_all.Lock(); - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - - if (r == 0) { - while (!done_all) - cond_all.Wait(mylock_all); - } - - mylock_all.Unlock(); - - lock.Lock(); - unregister_watcher(cookie); - lock.Unlock(); - - set_sync_op_version(io, objver); - delete ctx; - - return r; -} - -///////////////////////////// ObjectIterator ///////////////////////////// -librados::ObjectIterator::ObjectIterator(ObjListCtx *ctx_) - : ctx(ctx_) -{ -} - -librados::ObjectIterator::~ObjectIterator() -{ - ctx.reset(); -} - -bool librados::ObjectIterator::operator==(const librados::ObjectIterator& rhs) const { - return (ctx.get() == rhs.ctx.get()); -} - -bool librados::ObjectIterator::operator!=(const librados::ObjectIterator& rhs) const { - return (ctx.get() != rhs.ctx.get()); -} - -const pair& librados::ObjectIterator::operator*() const { - return cur_obj; -} - -const pair* librados::ObjectIterator::operator->() const { - return &cur_obj; -} - -librados::ObjectIterator& librados::ObjectIterator::operator++() -{ - get_next(); - return *this; -} - -librados::ObjectIterator librados::ObjectIterator::operator++(int) -{ - librados::ObjectIterator ret(*this); - get_next(); - return ret; -} - -void librados::ObjectIterator::get_next() -{ - const char *entry, *key; - int ret = rados_objects_list_next(ctx.get(), &entry, &key); - if (ret == -ENOENT) { - ctx.reset(); - *this = __EndObjectIterator; - return; - } - else if (ret) { - ostringstream oss; - oss << "rados returned " << cpp_strerror(ret); - throw std::runtime_error(oss.str()); - } - - cur_obj = make_pair(entry, key ? key : string()); -} - -const librados::ObjectIterator librados::ObjectIterator::__EndObjectIterator(NULL); - -///////////////////////////// PoolAsyncCompletion ////////////////////////////// -int librados::PoolAsyncCompletion::PoolAsyncCompletion::set_callback(void *cb_arg, - rados_callback_t cb) -{ - PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; - return c->set_callback(cb_arg, cb); -} - -int librados::PoolAsyncCompletion::PoolAsyncCompletion::wait() -{ - PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; - return c->wait(); -} - -bool librados::PoolAsyncCompletion::PoolAsyncCompletion::is_complete() -{ - PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; - return c->is_complete(); -} - -int librados::PoolAsyncCompletion::PoolAsyncCompletion::get_return_value() -{ - PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; - return c->get_return_value(); -} - -void librados::PoolAsyncCompletion::PoolAsyncCompletion::release() -{ - PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; - c->release(); - delete this; -} - -///////////////////////////// AioCompletion ////////////////////////////// -int librados::AioCompletion::AioCompletion::set_complete_callback(void *cb_arg, rados_callback_t cb) -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->set_complete_callback(cb_arg, cb); -} - -int librados::AioCompletion::AioCompletion::set_safe_callback(void *cb_arg, rados_callback_t cb) -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->set_safe_callback(cb_arg, cb); -} - -int librados::AioCompletion::AioCompletion::wait_for_complete() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->wait_for_complete(); -} - -int librados::AioCompletion::AioCompletion::wait_for_safe() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->wait_for_safe(); -} - -bool librados::AioCompletion::AioCompletion::is_complete() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->is_complete(); -} - -bool librados::AioCompletion::AioCompletion::is_safe() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->is_safe(); -} - -int librados::AioCompletion::AioCompletion::get_return_value() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->get_return_value(); -} - -int librados::AioCompletion::AioCompletion::get_version() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - return c->get_version(); -} - -void librados::AioCompletion::AioCompletion::release() -{ - AioCompletionImpl *c = (AioCompletionImpl *)pc; - c->release(); - delete this; -} - -///////////////////////////// IoCtx ////////////////////////////// -librados::IoCtx::IoCtx() : io_ctx_impl(NULL) -{ -} - -void librados::IoCtx::from_rados_ioctx_t(rados_ioctx_t p, IoCtx &io) -{ - IoCtxImpl *io_ctx_impl = (IoCtxImpl*)p; - - io.io_ctx_impl = io_ctx_impl; - if (io_ctx_impl) { - io_ctx_impl->get(); - } -} - -librados::IoCtx::IoCtx(const IoCtx& rhs) -{ - io_ctx_impl = rhs.io_ctx_impl; - if (io_ctx_impl) { - io_ctx_impl->get(); - } -} - -librados::IoCtx& librados::IoCtx::operator=(const IoCtx& rhs) -{ - if (io_ctx_impl) - io_ctx_impl->put(); - io_ctx_impl = rhs.io_ctx_impl; - io_ctx_impl->get(); - return *this; -} - -librados::IoCtx::~IoCtx() -{ - close(); -} - -void librados::IoCtx::close() -{ - if (io_ctx_impl) - io_ctx_impl->put(); - io_ctx_impl = 0; -} - -void librados::IoCtx::dup(const IoCtx& rhs) -{ - if (io_ctx_impl) - io_ctx_impl->put(); - io_ctx_impl = new IoCtxImpl(); - io_ctx_impl->get(); - io_ctx_impl->dup(*rhs.io_ctx_impl); -} - -int librados::IoCtx::set_auid(uint64_t auid_) -{ - return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_); -} - -int librados::IoCtx::set_auid_async(uint64_t auid_, PoolAsyncCompletion *c) -{ - return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc); -} - -int librados::IoCtx::get_auid(uint64_t *auid_) -{ - return io_ctx_impl->client->pool_get_auid(io_ctx_impl, (unsigned long long *)auid_); -} - -int librados::IoCtx::create(const std::string& oid, bool exclusive) -{ - object_t obj(oid); - return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive); -} - -int librados::IoCtx::create(const std::string& oid, bool exclusive, const std::string& category) -{ - object_t obj(oid); - return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive, category); -} - -int librados::IoCtx::write(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) -{ - object_t obj(oid); - return io_ctx_impl->client->write(*io_ctx_impl, obj, bl, len, off); -} - -int librados::IoCtx::append(const std::string& oid, bufferlist& bl, size_t len) -{ - object_t obj(oid); - return io_ctx_impl->client->append(*io_ctx_impl, obj, bl, len); -} - -int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->write_full(*io_ctx_impl, obj, bl); -} - -int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off, - const std::string& src_oid, uint64_t src_off, - size_t len) -{ - object_t src(src_oid), dst(dst_oid); - return io_ctx_impl->client->clone_range(*io_ctx_impl, dst, dst_off, src, src_off, len); -} - -int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) -{ - object_t obj(oid); - return io_ctx_impl->client->read(*io_ctx_impl, obj, bl, len, off); -} - -int librados::IoCtx::remove(const std::string& oid) -{ - object_t obj(oid); - return io_ctx_impl->client->remove(*io_ctx_impl, obj); -} - -int librados::IoCtx::trunc(const std::string& oid, uint64_t size) -{ - object_t obj(oid); - return io_ctx_impl->client->trunc(*io_ctx_impl, obj, size); -} - -int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len, - std::map& m) -{ - object_t obj(oid); - return io_ctx_impl->client->mapext(*io_ctx_impl, oid, off, len, m); -} - -int librados::IoCtx::sparse_read(const std::string& oid, std::map& m, - bufferlist& bl, size_t len, uint64_t off) -{ - object_t obj(oid); - return io_ctx_impl->client->sparse_read(*io_ctx_impl, oid, m, bl, len, off); -} - -int librados::IoCtx::getxattr(const std::string& oid, const char *name, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->getxattr(*io_ctx_impl, obj, name, bl); -} - -int librados::IoCtx::getxattrs(const std::string& oid, map& attrset) -{ - object_t obj(oid); - return io_ctx_impl->client->getxattrs(*io_ctx_impl, obj, attrset); -} - -int librados::IoCtx::setxattr(const std::string& oid, const char *name, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->setxattr(*io_ctx_impl, obj, name, bl); -} - -int librados::IoCtx::rmxattr(const std::string& oid, const char *name) -{ - object_t obj(oid); - return io_ctx_impl->client->rmxattr(*io_ctx_impl, obj, name); -} - -int librados::IoCtx::stat(const std::string& oid, uint64_t *psize, time_t *pmtime) -{ - object_t obj(oid); - return io_ctx_impl->client->stat(*io_ctx_impl, oid, psize, pmtime); -} - -int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method, - bufferlist& inbl, bufferlist& outbl) -{ - object_t obj(oid); - return io_ctx_impl->client->exec(*io_ctx_impl, obj, cls, method, inbl, outbl); -} - -int librados::IoCtx::tmap_update(const std::string& oid, bufferlist& cmdbl) -{ - object_t obj(oid); - return io_ctx_impl->client->tmap_update(*io_ctx_impl, obj, cmdbl); -} - -int librados::IoCtx::tmap_put(const std::string& oid, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->tmap_put(*io_ctx_impl, obj, bl); -} - -int librados::IoCtx::tmap_get(const std::string& oid, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->tmap_get(*io_ctx_impl, obj, bl); -} - -int librados::IoCtx::omap_get_vals(const std::string& oid, - const std::string& start_after, - uint64_t max_return, - std::map *out_vals) -{ - ObjectReadOperation op; - int r; - op.omap_get_vals(start_after, max_return, out_vals, &r); - bufferlist bl; - int ret = operate(oid, &op, &bl); - if (ret < 0) - return ret; - - return r; -} - -int librados::IoCtx::omap_get_vals(const std::string& oid, - const std::string& start_after, - const std::string& filter_prefix, - uint64_t max_return, - std::map *out_vals) -{ - ObjectReadOperation op; - int r; - op.omap_get_vals(start_after, filter_prefix, max_return, out_vals, &r); - bufferlist bl; - int ret = operate(oid, &op, &bl); - if (ret < 0) - return ret; - - return r; -} - -int librados::IoCtx::omap_get_keys(const std::string& oid, - const std::string& start_after, - uint64_t max_return, - std::set *out_keys) -{ - ObjectReadOperation op; - int r; - op.omap_get_keys(start_after, max_return, out_keys, &r); - bufferlist bl; - int ret = operate(oid, &op, &bl); - if (ret < 0) - return ret; - - return r; -} - -int librados::IoCtx::omap_get_header(const std::string& oid, - bufferlist *bl) -{ - ObjectReadOperation op; - int r; - op.omap_get_header(bl, &r); - bufferlist b; - int ret = operate(oid, &op, &b); - if (ret < 0) - return ret; - - return r; -} - -int librados::IoCtx::omap_get_vals_by_keys(const std::string& oid, - const std::set& keys, - std::map *vals) -{ - ObjectReadOperation op; - int r; - bufferlist bl; - op.omap_get_vals_by_keys(keys, vals, &r); - int ret = operate(oid, &op, &bl); - if (ret < 0) - return ret; - - return r; -} - -int librados::IoCtx::omap_set(const std::string& oid, - const map& m) -{ - ObjectWriteOperation op; - op.omap_set(m); - return operate(oid, &op); -} - -int librados::IoCtx::omap_set_header(const std::string& oid, - const bufferlist& bl) -{ - ObjectWriteOperation op; - op.omap_set_header(bl); - return operate(oid, &op); -} - -int librados::IoCtx::omap_clear(const std::string& oid) -{ - ObjectWriteOperation op; - op.omap_clear(); - return operate(oid, &op); -} - -int librados::IoCtx::omap_rm_keys(const std::string& oid, - const std::set& keys) -{ - ObjectWriteOperation op; - op.omap_rm_keys(keys); - return operate(oid, &op); -} - -int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o) -{ - object_t obj(oid); - return io_ctx_impl->client->operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, o->pmtime); -} - -int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl) -{ - object_t obj(oid); - return io_ctx_impl->client->operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, pbl); -} - -int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o) -{ - object_t obj(oid); - return io_ctx_impl->client->aio_operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc); -} - -int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl) -{ - object_t obj(oid); - return io_ctx_impl->client->aio_operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc, pbl); -} - -void librados::IoCtx::snap_set_read(snap_t seq) -{ - io_ctx_impl->set_snap_read(seq); -} - -int librados::IoCtx::selfmanaged_snap_set_write_ctx(snap_t seq, vector& snaps) -{ - vector snv; - snv.resize(snaps.size()); - for (unsigned i=0; iset_snap_write_context(seq, snv); -} - -int librados::IoCtx::snap_create(const char *snapname) -{ - return io_ctx_impl->client->snap_create(io_ctx_impl, snapname); -} - -int librados::IoCtx::snap_lookup(const char *name, snap_t *snapid) -{ - return io_ctx_impl->client->snap_lookup(io_ctx_impl, name, snapid); -} - -int librados::IoCtx::snap_get_stamp(snap_t snapid, time_t *t) -{ - return io_ctx_impl->client->snap_get_stamp(io_ctx_impl, snapid, t); -} - -int librados::IoCtx::snap_get_name(snap_t snapid, std::string *s) -{ - return io_ctx_impl->client->snap_get_name(io_ctx_impl, snapid, s); -} - -int librados::IoCtx::snap_remove(const char *snapname) -{ - return io_ctx_impl->client->snap_remove(io_ctx_impl, snapname); -} - -int librados::IoCtx::snap_list(std::vector *snaps) -{ - return io_ctx_impl->client->snap_list(io_ctx_impl, snaps); -} - -int librados::IoCtx::rollback(const std::string& oid, const char *snapname) -{ - return io_ctx_impl->client->rollback(io_ctx_impl, oid, snapname); -} - -int librados::IoCtx::selfmanaged_snap_create(uint64_t *snapid) -{ - return io_ctx_impl->client->selfmanaged_snap_create(io_ctx_impl, snapid); -} - -int librados::IoCtx::selfmanaged_snap_remove(uint64_t snapid) -{ - return io_ctx_impl->client->selfmanaged_snap_remove(io_ctx_impl, snapid); -} - -int librados::IoCtx::selfmanaged_snap_rollback(const std::string& oid, uint64_t snapid) -{ - return io_ctx_impl->client->selfmanaged_snap_rollback_object(io_ctx_impl, - oid, - io_ctx_impl->snapc, - snapid); -} - -librados::ObjectIterator librados::IoCtx::objects_begin() -{ - rados_list_ctx_t listh; - rados_objects_list_open(io_ctx_impl, &listh); - ObjectIterator iter((ObjListCtx*)listh); - iter.get_next(); - return iter; -} - -const librados::ObjectIterator& librados::IoCtx::objects_end() const -{ - return ObjectIterator::__EndObjectIterator; -} - -uint64_t librados::IoCtx::get_last_version() -{ - eversion_t ver = io_ctx_impl->client->last_version(*io_ctx_impl); - return ver.version; -} - -int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c, - bufferlist *pbl, size_t len, uint64_t off) -{ - return io_ctx_impl->client->aio_read(*io_ctx_impl, oid, c->pc, pbl, len, off); -} - -int librados::IoCtx::aio_exec(const std::string& oid, librados::AioCompletion *c, const char *cls, const char *method, - bufferlist& inbl, bufferlist *outbl) -{ - object_t obj(oid); - return io_ctx_impl->client->aio_exec(*io_ctx_impl, obj, c->pc, cls, method, inbl, outbl); -} - -int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c, - std::map *m, bufferlist *data_bl, - size_t len, uint64_t off) -{ - return io_ctx_impl->client->aio_sparse_read(*io_ctx_impl, oid, c->pc, - m, data_bl, len, off); -} - -int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c, - const bufferlist& bl, size_t len, uint64_t off) -{ - return io_ctx_impl->client->aio_write(*io_ctx_impl, oid, c->pc, bl, len, off); -} - -int librados::IoCtx::aio_append(const std::string& oid, librados::AioCompletion *c, - const bufferlist& bl, size_t len) -{ - return io_ctx_impl->client->aio_append(*io_ctx_impl, oid, c->pc, bl, len); -} - -int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioCompletion *c, - const bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl); -} - -int librados::IoCtx::aio_flush() -{ - io_ctx_impl->flush_aio_writes(); - return 0; -} - -int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie, - librados::WatchCtx *ctx) -{ - object_t obj(oid); - return io_ctx_impl->client->watch(*io_ctx_impl, obj, ver, cookie, ctx); -} - -int librados::IoCtx::unwatch(const string& oid, uint64_t handle) -{ - uint64_t cookie = handle; - object_t obj(oid); - return io_ctx_impl->client->unwatch(*io_ctx_impl, obj, cookie); -} - -int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) -{ - object_t obj(oid); - return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl); -} - -void librados::IoCtx::set_notify_timeout(uint32_t timeout) -{ - io_ctx_impl->client->set_notify_timeout(*io_ctx_impl, timeout); -} - -void librados::IoCtx::set_assert_version(uint64_t ver) -{ - io_ctx_impl->client->set_assert_version(*io_ctx_impl, ver); -} - -void librados::IoCtx::set_assert_src_version(const std::string& oid, uint64_t ver) -{ - object_t obj(oid); - io_ctx_impl->client->set_assert_src_version(*io_ctx_impl, obj, ver); -} - -const std::string& librados::IoCtx::get_pool_name() const -{ - return io_ctx_impl->pool_name; -} - -void librados::IoCtx::locator_set_key(const string& key) -{ - io_ctx_impl->oloc.key = key; -} - -int64_t librados::IoCtx::get_id() -{ - return io_ctx_impl->get_id(); -} - -librados::config_t librados::IoCtx::cct() -{ - return (config_t)io_ctx_impl->client->cct; -} - -librados::IoCtx::IoCtx(IoCtxImpl *io_ctx_impl_) - : io_ctx_impl(io_ctx_impl_) -{ -} - -///////////////////////////// Rados ////////////////////////////// -void librados::Rados::version(int *major, int *minor, int *extra) -{ - rados_version(major, minor, extra); -} - -librados::Rados::Rados() : client(NULL) -{ -} - -librados::Rados::~Rados() -{ - shutdown(); -} - -int librados::Rados::init(const char * const id) -{ - return rados_create((rados_t *)&client, id); -} - -int librados::Rados::init_with_context(config_t cct_) -{ - return rados_create_with_context((rados_t *)&client, (rados_config_t)cct_); -} - -int librados::Rados::connect() -{ - return client->connect(); -} - -librados::config_t librados::Rados::cct() -{ - return (config_t)client->cct; -} - -void librados::Rados::shutdown() -{ - if (!client) - return; - client->shutdown(); - delete client; - client = NULL; -} - -int librados::Rados::conf_read_file(const char * const path) const -{ - return rados_conf_read_file((rados_t)client, path); -} - -int librados::Rados::conf_parse_argv(int argc, const char ** argv) const -{ - return rados_conf_parse_argv((rados_t)client, argc, argv); -} - -int librados::Rados::conf_parse_env(const char *name) const -{ - return rados_conf_parse_env((rados_t)client, name); -} - -int librados::Rados::conf_set(const char *option, const char *value) -{ - return rados_conf_set((rados_t)client, option, value); -} - -int librados::Rados::conf_get(const char *option, std::string &val) -{ - char *str; - md_config_t *conf = client->cct->_conf; - int ret = conf->get_val(option, &str, -1); - if (ret) - return ret; - val = str; - free(str); - return 0; -} - -int librados::Rados::pool_create(const char *name) -{ - string str(name); - return client->pool_create(str); -} - -int librados::Rados::pool_create(const char *name, uint64_t auid) -{ - string str(name); - return client->pool_create(str, auid); -} - -int librados::Rados::pool_create(const char *name, uint64_t auid, __u8 crush_rule) -{ - string str(name); - return client->pool_create(str, auid, crush_rule); -} - -int librados::Rados::pool_create_async(const char *name, PoolAsyncCompletion *c) -{ - string str(name); - return client->pool_create_async(str, c->pc); -} - -int librados::Rados::pool_create_async(const char *name, uint64_t auid, PoolAsyncCompletion *c) -{ - string str(name); - return client->pool_create_async(str, c->pc, auid); -} - -int librados::Rados::pool_create_async(const char *name, uint64_t auid, __u8 crush_rule, - PoolAsyncCompletion *c) -{ - string str(name); - return client->pool_create_async(str, c->pc, auid, crush_rule); -} - -int librados::Rados::pool_delete(const char *name) -{ - return client->pool_delete(name); -} - -int librados::Rados::pool_delete_async(const char *name, PoolAsyncCompletion *c) -{ - return client->pool_delete_async(name, c->pc); -} - -int librados::Rados::pool_list(std::list& v) -{ - return client->pool_list(v); -} - -int64_t librados::Rados::pool_lookup(const char *name) -{ - return client->lookup_pool(name); -} - -int librados::Rados::ioctx_create(const char *name, IoCtx &io) -{ - rados_ioctx_t p; - int ret = rados_ioctx_create((rados_t)client, name, &p); - if (ret) - return ret; - io.io_ctx_impl = (IoCtxImpl*)p; - return 0; -} - -int librados::Rados::get_pool_stats(std::list& v, std::map& result) -{ - string category; - return get_pool_stats(v, category, result); -} - -int librados::Rados::get_pool_stats(std::list& v, string& category, - std::map& result) -{ - map rawresult; - int r = client->get_pool_stats(v, rawresult); - for (map::iterator p = rawresult.begin(); - p != rawresult.end(); - p++) { - stats_map& c = result[p->first]; - - string cat; - vector cats; - - if (!category.size()) { - cats.push_back(cat); - map::iterator iter; - for (iter = p->second.stats.cat_sum.begin(); iter != p->second.stats.cat_sum.end(); ++iter) { - cats.push_back(iter->first); - } - } else { - cats.push_back(category); - } - - vector::iterator cat_iter; - for (cat_iter = cats.begin(); cat_iter != cats.end(); ++cat_iter) { - string& cur_category = *cat_iter; - object_stat_sum_t *sum; - - if (!cur_category.size()) { - sum = &p->second.stats.sum; - } else { - map::iterator iter = p->second.stats.cat_sum.find(cur_category); - if (iter == p->second.stats.cat_sum.end()) - continue; - sum = &iter->second; - } - - pool_stat_t& pv = c[cur_category]; - pv.num_kb = SHIFT_ROUND_UP(sum->num_bytes, 10); - pv.num_bytes = sum->num_bytes; - pv.num_objects = sum->num_objects; - pv.num_object_clones = sum->num_object_clones; - pv.num_object_copies = sum->num_object_copies; - pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary; - pv.num_objects_unfound = sum->num_objects_unfound; - pv.num_objects_degraded = sum->num_objects_degraded; - pv.num_rd = sum->num_rd; - pv.num_rd_kb = sum->num_rd_kb; - pv.num_wr = sum->num_wr; - pv.num_wr_kb = sum->num_wr_kb; - } - } - return r; -} - -int librados::Rados::cluster_stat(cluster_stat_t& result) -{ - ceph_statfs stats; - int r = client->get_fs_stats(stats); - result.kb = stats.kb; - result.kb_used = stats.kb_used; - result.kb_avail = stats.kb_avail; - result.num_objects = stats.num_objects; - return r; -} - -librados::PoolAsyncCompletion *librados::Rados::pool_async_create_completion() -{ - PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion(); - return new PoolAsyncCompletion(c); -} - -librados::AioCompletion *librados::Rados::aio_create_completion() -{ - AioCompletionImpl *c = RadosClient::aio_create_completion(); - return new AioCompletion(c); -} - -librados::AioCompletion *librados::Rados::aio_create_completion(void *cb_arg, - callback_t cb_complete, - callback_t cb_safe) -{ - AioCompletionImpl *c = RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); - return new AioCompletion(c); -} - - -librados::ObjectOperation::ObjectOperation() -{ - impl = (ObjectOperationImpl *)new ::ObjectOperation; -} - -librados::ObjectOperation::~ObjectOperation() -{ - ::ObjectOperation *o = (::ObjectOperation *)impl; - delete o; -} - -///////////////////////////// C API ////////////////////////////// -extern "C" int rados_create(rados_t *pcluster, const char * const id) -{ - CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); - if (id) { - iparams.name.set(CEPH_ENTITY_TYPE_CLIENT, id); - } - - CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0); - cct->_conf->parse_env(); // environment variables override - cct->_conf->apply_changes(NULL); - - librados::RadosClient *radosp = new librados::RadosClient(cct); - *pcluster = (void *)radosp; - return 0; -} - -/* This function is intended for use by Ceph daemons. These daemons have - * already called global_init and want to use that particular configuration for - * their cluster. - */ -extern "C" int rados_create_with_context(rados_t *pcluster, rados_config_t cct_) -{ - CephContext *cct = (CephContext *)cct_; - librados::RadosClient *radosp = new librados::RadosClient(cct); - *pcluster = (void *)radosp; - return 0; -} - -extern "C" rados_config_t rados_cct(rados_t cluster) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - return (rados_config_t)client->cct; -} - -extern "C" int rados_connect(rados_t cluster) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - return client->connect(); -} - -extern "C" void rados_shutdown(rados_t cluster) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - radosp->shutdown(); - delete radosp; -} - -extern "C" void rados_version(int *major, int *minor, int *extra) -{ - if (major) - *major = LIBRADOS_VER_MAJOR; - if (minor) - *minor = LIBRADOS_VER_MINOR; - if (extra) - *extra = LIBRADOS_VER_EXTRA; -} - - -// -- config -- -extern "C" int rados_conf_read_file(rados_t cluster, const char *path_list) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - md_config_t *conf = client->cct->_conf; - std::deque parse_errors; - int ret = conf->parse_config_files(path_list, &parse_errors, 0); - if (ret) - return ret; - conf->parse_env(); // environment variables override - - conf->apply_changes(NULL); - complain_about_parse_errors(client->cct, &parse_errors); - return 0; -} - -extern "C" int rados_conf_parse_argv(rados_t cluster, int argc, const char **argv) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - md_config_t *conf = client->cct->_conf; - vector args; - argv_to_vec(argc, argv, args); - int ret = conf->parse_argv(args); - if (ret) - return ret; - conf->apply_changes(NULL); - return 0; -} - -extern "C" int rados_conf_parse_env(rados_t cluster, const char *env) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - md_config_t *conf = client->cct->_conf; - vector args; - env_to_vec(args, env); - int ret = conf->parse_argv(args); - if (ret) - return ret; - conf->apply_changes(NULL); - return 0; -} - -extern "C" int rados_conf_set(rados_t cluster, const char *option, const char *value) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - md_config_t *conf = client->cct->_conf; - int ret = conf->set_val(option, value); - if (ret) - return ret; - conf->apply_changes(NULL); - return 0; -} - -/* cluster info */ -extern "C" int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - - ceph_statfs stats; - int r = client->get_fs_stats(stats); - result->kb = stats.kb; - result->kb_used = stats.kb_used; - result->kb_avail = stats.kb_avail; - result->num_objects = stats.num_objects; - return r; -} - -extern "C" int rados_conf_get(rados_t cluster, const char *option, char *buf, size_t len) -{ - char *tmp = buf; - librados::RadosClient *client = (librados::RadosClient *)cluster; - md_config_t *conf = client->cct->_conf; - return conf->get_val(option, &tmp, len); -} - -extern "C" int64_t rados_pool_lookup(rados_t cluster, const char *name) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - return radosp->lookup_pool(name); -} - -extern "C" int rados_pool_list(rados_t cluster, char *buf, size_t len) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - std::list pools; - client->pool_list(pools); - - char *b = buf; - if (b) - memset(b, 0, len); - int needed = 0; - std::list::const_iterator i = pools.begin(); - std::list::const_iterator p_end = pools.end(); - for (; i != p_end; ++i) { - if (len <= 0) - break; - int rl = i->length() + 1; - strncat(b, i->c_str(), len - 2); // leave space for two NULLs - needed += rl; - len -= rl; - b += rl; - } - for (; i != p_end; ++i) { - int rl = i->length() + 1; - needed += rl; - } - return needed + 1; -} - -extern "C" int rados_ioctx_create(rados_t cluster, const char *name, rados_ioctx_t *io) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - int64_t poolid = radosp->lookup_pool(name); - if (poolid < 0) - return (int)poolid; - - librados::IoCtxImpl *ctx = new librados::IoCtxImpl(radosp, poolid, name, CEPH_NOSNAP); - if (!ctx) - return -ENOMEM; - *io = ctx; - ctx->get(); - return 0; -} - -extern "C" void rados_ioctx_destroy(rados_ioctx_t io) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - ctx->put(); -} - -extern "C" int rados_ioctx_pool_stat(rados_ioctx_t io, struct rados_pool_stat_t *stats) -{ - librados::IoCtxImpl *io_ctx_impl = (librados::IoCtxImpl *)io; - list ls; - ls.push_back(io_ctx_impl->pool_name); - map rawresult; - - int err = io_ctx_impl->client->get_pool_stats(ls, rawresult); - if (err) - return err; - - ::pool_stat_t& r = rawresult[io_ctx_impl->pool_name]; - stats->num_kb = SHIFT_ROUND_UP(r.stats.sum.num_bytes, 10); - stats->num_bytes = r.stats.sum.num_bytes; - stats->num_objects = r.stats.sum.num_objects; - stats->num_object_clones = r.stats.sum.num_object_clones; - stats->num_object_copies = r.stats.sum.num_object_copies; - stats->num_objects_missing_on_primary = r.stats.sum.num_objects_missing_on_primary; - stats->num_objects_unfound = r.stats.sum.num_objects_unfound; - stats->num_objects_degraded = r.stats.sum.num_objects_degraded; - stats->num_rd = r.stats.sum.num_rd; - stats->num_rd_kb = r.stats.sum.num_rd_kb; - stats->num_wr = r.stats.sum.num_wr; - stats->num_wr_kb = r.stats.sum.num_wr_kb; - return 0; -} - -extern "C" rados_config_t rados_ioctx_cct(rados_ioctx_t io) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return (rados_config_t)ctx->client->cct; -} - -extern "C" void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t seq) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - ctx->set_snap_read((snapid_t)seq); -} - -extern "C" int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io, - rados_snap_t seq, rados_snap_t *snaps, int num_snaps) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - vector snv; - snv.resize(num_snaps); - for (int i=0; iset_snap_write_context((snapid_t)seq, snv); -} - -extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->write(*ctx, oid, bl, len, off); -} - -extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->append(*ctx, oid, bl, len); -} - -extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->write_full(*ctx, oid, bl); -} - -extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off, - const char *src, uint64_t src_off, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t dst_oid(dst), src_oid(src); - return ctx->client->clone_range(*ctx, dst_oid, dst_off, src_oid, src_off, len); -} - -extern "C" int rados_trunc(rados_ioctx_t io, const char *o, uint64_t size) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->trunc(*ctx, oid, size); -} - -extern "C" int rados_remove(rados_ioctx_t io, const char *o) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->remove(*ctx, oid); -} - -extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len, uint64_t off) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - int ret; - object_t oid(o); - - bufferlist bl; - bufferptr bp = buffer::create_static(len, buf); - bl.push_back(bp); - - ret = ctx->client->read(*ctx, oid, bl, len, off); - if (ret >= 0) { - if (bl.length() > len) - return -ERANGE; - if (bl.c_str() != buf) - bl.copy(0, bl.length(), buf); - ret = bl.length(); // hrm :/ - } - - return ret; -} - -extern "C" uint64_t rados_get_last_version(rados_ioctx_t io) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - eversion_t ver = ctx->client->last_version(*ctx); - return ver.version; -} - -extern "C" int rados_pool_create(rados_t cluster, const char *name) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - string sname(name); - return radosp->pool_create(sname); -} - -extern "C" int rados_pool_create_with_auid(rados_t cluster, const char *name, - uint64_t auid) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - string sname(name); - return radosp->pool_create(sname, auid); -} - -extern "C" int rados_pool_create_with_crush_rule(rados_t cluster, const char *name, - __u8 crush_rule_num) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - string sname(name); - return radosp->pool_create(sname, 0, crush_rule_num); -} - -extern "C" int rados_pool_create_with_all(rados_t cluster, const char *name, - uint64_t auid, __u8 crush_rule_num) -{ - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - string sname(name); - return radosp->pool_create(sname, auid, crush_rule_num); -} - -extern "C" int rados_pool_delete(rados_t cluster, const char *pool_name) -{ - librados::RadosClient *client = (librados::RadosClient *)cluster; - return client->pool_delete(pool_name); -} - -extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->pool_change_auid(ctx, auid); -} - -extern "C" int rados_ioctx_pool_get_auid(rados_ioctx_t io, uint64_t *auid) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->pool_get_auid(ctx, (unsigned long long *)auid); -} - -extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - if (key) - ctx->oloc.key = key; - else - ctx->oloc.key = ""; -} - -extern "C" int64_t rados_ioctx_get_id(rados_ioctx_t io) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->get_id(); -} -// snaps - -extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_create(ctx, snapname); -} - -extern "C" int rados_ioctx_snap_remove(rados_ioctx_t io, const char *snapname) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_remove(ctx, snapname); -} - -extern "C" int rados_rollback(rados_ioctx_t io, const char *oid, - const char *snapname) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->rollback(ctx, oid, snapname); -} - -extern "C" int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io, - uint64_t *snapid) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_create(ctx, snapid); -} - -extern "C" int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io, - uint64_t snapid) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_remove(ctx, snapid); -} - -extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, - const char *oid, - uint64_t snapid) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->selfmanaged_snap_rollback_object(ctx, oid, ctx->snapc, snapid); -} - -extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps, - int maxlen) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - vector snapvec; - int r = ctx->client->snap_list(ctx, &snapvec); - if (r < 0) - return r; - if ((int)snapvec.size() <= maxlen) { - for (unsigned i=0; iclient->snap_lookup(ctx, name, (uint64_t *)id); -} - -extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, - char *name, int maxlen) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - std::string sname; - int r = ctx->client->snap_get_name(ctx, id, &sname); - if (r < 0) - return r; - if ((int)sname.length() >= maxlen) - return -ERANGE; - strncpy(name, sname.c_str(), maxlen); - return 0; -} - -extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t *t) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - return ctx->client->snap_get_stamp(ctx, id, t); -} - -extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, - char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - int ret; - object_t oid(o); - bufferlist bl; - ret = ctx->client->getxattr(*ctx, oid, name, bl); - if (ret >= 0) { - if (bl.length() > len) - return -ERANGE; - bl.copy(0, bl.length(), buf); - ret = bl.length(); - } - - return ret; -} - -class RadosXattrsIter { -public: - RadosXattrsIter() - : val(NULL) - { - i = attrset.end(); - } - ~RadosXattrsIter() - { - free(val); - val = NULL; - } - std::map attrset; - std::map::iterator i; - char *val; -}; - -extern "C" int rados_getxattrs(rados_ioctx_t io, const char *oid, - rados_xattrs_iter_t *iter) -{ - RadosXattrsIter *it = new RadosXattrsIter(); - if (!it) - return -ENOMEM; - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t obj(oid); - int ret = ctx->client->getxattrs(*ctx, obj, it->attrset); - if (ret) { - delete it; - return ret; - } - it->i = it->attrset.begin(); - - RadosXattrsIter **iret = (RadosXattrsIter**)iter; - *iret = it; - *iter = it; - return 0; -} - -extern "C" int rados_getxattrs_next(rados_xattrs_iter_t iter, - const char **name, const char **val, size_t *len) -{ - RadosXattrsIter *it = (RadosXattrsIter*)iter; - if (it->i == it->attrset.end()) { - *name = NULL; - *val = NULL; - *len = 0; - return 0; - } - free(it->val); - const std::string &s(it->i->first); - *name = s.c_str(); - bufferlist &bl(it->i->second); - size_t bl_len = bl.length(); - it->val = (char*)malloc(bl_len); - if (!it->val) - return -ENOMEM; - memcpy(it->val, bl.c_str(), bl_len); - *val = it->val; - *len = bl_len; - ++it->i; - return 0; -} - -extern "C" void rados_getxattrs_end(rados_xattrs_iter_t iter) -{ - RadosXattrsIter *it = (RadosXattrsIter*)iter; - delete it; -} - -extern "C" int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->setxattr(*ctx, oid, name, bl); -} - -extern "C" int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->rmxattr(*ctx, oid, name); -} - -extern "C" int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->stat(*ctx, oid, psize, pmtime); -} - -extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cmdbuf, size_t cmdbuflen) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist cmdbl; - cmdbl.append(cmdbuf, cmdbuflen); - return ctx->client->tmap_update(*ctx, oid, cmdbl); -} - -extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, size_t buflen) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, buflen); - return ctx->client->tmap_put(*ctx, oid, bl); -} - -extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t buflen) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - int r = ctx->client->tmap_get(*ctx, oid, bl); - if (r < 0) - return r; - if (bl.length() > buflen) - return -ERANGE; - bl.copy(0, bl.length(), buf); - return bl.length(); -} - -extern "C" int rados_exec(rados_ioctx_t io, const char *o, const char *cls, const char *method, - const char *inbuf, size_t in_len, char *buf, size_t out_len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist inbl, outbl; - int ret; - inbl.append(inbuf, in_len); - ret = ctx->client->exec(*ctx, oid, cls, method, inbl, outbl); - if (ret >= 0) { - if (outbl.length()) { - if (outbl.length() > out_len) - return -ERANGE; - outbl.copy(0, outbl.length(), buf); - ret = outbl.length(); // hrm :/ - } - } - return ret; -} - -/* list objects */ - -extern "C" int rados_objects_list_open(rados_ioctx_t io, rados_list_ctx_t *listh) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - Objecter::ListContext *h = new Objecter::ListContext; - h->pool_id = ctx->poolid; - h->pool_snap_seq = ctx->snap_seq; - *listh = (void *)new librados::ObjListCtx(ctx, h); - return 0; -} - -extern "C" void rados_objects_list_close(rados_list_ctx_t h) -{ - librados::ObjListCtx *lh = (librados::ObjListCtx *)h; - delete lh; -} - -extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **entry, const char **key) -{ - librados::ObjListCtx *lh = (librados::ObjListCtx *)listctx; - Objecter::ListContext *h = lh->lc; - int ret; - - // if the list is non-empty, this method has been called before - if (!h->list.empty()) - // so let's kill the previously-returned object - h->list.pop_front(); - - if (h->list.empty()) { - ret = lh->ctx->client->list(lh->lc, RADOS_LIST_MAX_ENTRIES); - if (ret < 0) - return ret; - if (h->list.empty()) - return -ENOENT; - } - - *entry = h->list.front().first.name.c_str(); - - if (key) { - if (h->list.front().second.size()) - *key = h->list.front().second.c_str(); - else - *key = NULL; - } - return 0; -} - - - -// ------------------------- -// aio - -extern "C" int rados_aio_create_completion(void *cb_arg, rados_callback_t cb_complete, - rados_callback_t cb_safe, rados_completion_t *pc) -{ - *pc = librados::RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); - return 0; -} - -extern "C" int rados_aio_wait_for_complete(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->wait_for_complete(); -} - -extern "C" int rados_aio_wait_for_safe(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->wait_for_safe(); -} - -extern "C" int rados_aio_is_complete(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->is_complete(); -} - -extern "C" int rados_aio_is_safe(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->is_safe(); -} - -extern "C" int rados_aio_get_return_value(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->get_return_value(); -} - -extern "C" uint64_t rados_aio_get_version(rados_completion_t c) -{ - return ((librados::AioCompletionImpl*)c)->get_version(); -} - -extern "C" void rados_aio_release(rados_completion_t c) -{ - ((librados::AioCompletionImpl*)c)->put(); -} - -extern "C" int rados_aio_read(rados_ioctx_t io, const char *o, - rados_completion_t completion, - char *buf, size_t len, uint64_t off) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->aio_read(*ctx, oid, - (librados::AioCompletionImpl*)completion, buf, len, off); -} - -extern "C" int rados_aio_write(rados_ioctx_t io, const char *o, - rados_completion_t completion, - const char *buf, size_t len, uint64_t off) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->aio_write(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl, len, off); -} - -extern "C" int rados_aio_append(rados_ioctx_t io, const char *o, - rados_completion_t completion, - const char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->aio_append(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl, len); -} - -extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o, - rados_completion_t completion, - const char *buf, size_t len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - bl.append(buf, len); - return ctx->client->aio_write_full(*ctx, oid, - (librados::AioCompletionImpl*)completion, bl); -} - -extern "C" int rados_aio_flush(rados_ioctx_t io) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - ctx->flush_aio_writes(); - return 0; -} - -struct C_WatchCB : public librados::WatchCtx { - rados_watchcb_t wcb; - void *arg; - C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {} - void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { - wcb(opcode, ver, arg); - } -}; - -int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, - rados_watchcb_t watchcb, void *arg) -{ - uint64_t *cookie = handle; - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - C_WatchCB *wc = new C_WatchCB(watchcb, arg); - return ctx->client->watch(*ctx, oid, ver, cookie, wc); -} - -int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) -{ - uint64_t cookie = handle; - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - return ctx->client->unwatch(*ctx, oid, cookie); -} - -int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) -{ - librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; - object_t oid(o); - bufferlist bl; - if (buf) { - bufferptr p = buffer::create(buf_len); - memcpy(p.c_str(), buf, buf_len); - bl.push_back(p); - } - return ctx->client->notify(*ctx, oid, ver, bl); -} diff --git a/src/librados/AioCompletionImpl.h b/src/librados/AioCompletionImpl.h new file mode 100644 index 0000000000000..02be2f3984d98 --- /dev/null +++ b/src/librados/AioCompletionImpl.h @@ -0,0 +1,133 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H +#define CEPH_LIBRADOS_AIOCOMPLETIONIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" + +#include "include/buffer.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/xlist.h" +#include "osd/osd_types.h" + +class IoCtxImpl; + +struct librados::AioCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool ack, safe; + eversion_t objver; + + rados_callback_t callback_complete, callback_safe; + void *callback_arg; + + // for read + bufferlist bl, *pbl; + char *buf; + unsigned maxlen; + + IoCtxImpl *io; + tid_t aio_write_seq; + xlist::item aio_write_list_item; + + AioCompletionImpl() : lock("AioCompletionImpl lock"), + ref(1), rval(0), released(false), ack(false), safe(false), + callback_complete(0), callback_safe(0), callback_arg(0), + pbl(0), buf(0), maxlen(0), + io(NULL), aio_write_seq(0), aio_write_list_item(this) { } + + int set_complete_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback_complete = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int set_safe_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback_safe = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait_for_complete() { + lock.Lock(); + while (!ack) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int wait_for_safe() { + lock.Lock(); + while (!safe) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = ack; + lock.Unlock(); + return r; + } + int is_safe() { + lock.Lock(); + int r = safe; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + uint64_t get_version() { + lock.Lock(); + eversion_t v = objver; + lock.Unlock(); + return v.version; + } + + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } +}; + +#endif diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc new file mode 100644 index 0000000000000..f32556d8a6d7b --- /dev/null +++ b/src/librados/IoCtxImpl.cc @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "IoCtxImpl.h" + +#include "librados/AioCompletionImpl.h" +#include "librados/RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +librados::IoCtxImpl::IoCtxImpl() + : aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock") +{ +} + +librados::IoCtxImpl::IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) + : ref_cnt(0), client(c), poolid(pid), + pool_name(pool_name_), snap_seq(s), assert_ver(0), + notify_timeout(c->cct->_conf->client_notify_timeout), oloc(pid), + aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"), aio_write_seq(0) +{ +} + +void librados::IoCtxImpl::set_snap_read(snapid_t s) +{ + if (!s) + s = CEPH_NOSNAP; + ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl; + snap_seq = s; +} + +int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector& snaps) +{ + ::SnapContext n; + ldout(client->cct, 10) << "set snap write context: seq = " << seq << " and snaps = " << snaps << dendl; + n.seq = seq; + n.snaps = snaps; + if (!n.is_valid()) + return -EINVAL; + snapc = n; + return 0; +} + +void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c) +{ + get(); + aio_write_list_lock.Lock(); + assert(!c->io); + c->io = this; + c->aio_write_seq = ++aio_write_seq; + aio_write_list.push_back(&c->aio_write_list_item); + aio_write_list_lock.Unlock(); +} + +void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c) +{ + aio_write_list_lock.Lock(); + assert(c->io == this); + c->io = NULL; + c->aio_write_list_item.remove_myself(); + aio_write_cond.Signal(); + aio_write_list_lock.Unlock(); + put(); +} + +void librados::IoCtxImpl::flush_aio_writes() +{ + aio_write_list_lock.Lock(); + tid_t seq = aio_write_seq; + while (!aio_write_list.empty() && + aio_write_list.front()->aio_write_seq <= seq) + aio_write_cond.Wait(aio_write_list_lock); + aio_write_list_lock.Unlock(); +} diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h new file mode 100644 index 0000000000000..950e165e2eb2c --- /dev/null +++ b/src/librados/IoCtxImpl.h @@ -0,0 +1,87 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_IOCTXIMPL_H +#define CEPH_LIBRADOS_IOCTXIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/snap_types.h" +#include "include/atomic.h" +#include "include/rados.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/types.h" +#include "include/xlist.h" +#include "osd/osd_types.h" + +class RadosClient; + +struct librados::IoCtxImpl { + atomic_t ref_cnt; + RadosClient *client; + int64_t poolid; + string pool_name; + snapid_t snap_seq; + ::SnapContext snapc; + uint64_t assert_ver; + map assert_src_version; + eversion_t last_objver; + uint32_t notify_timeout; + object_locator_t oloc; + + Mutex aio_write_list_lock; + tid_t aio_write_seq; + Cond aio_write_cond; + xlist aio_write_list; + + IoCtxImpl(); + IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s); + + void dup(const IoCtxImpl& rhs) { + // Copy everything except the ref count + client = rhs.client; + poolid = rhs.poolid; + pool_name = rhs.pool_name; + snap_seq = rhs.snap_seq; + snapc = rhs.snapc; + assert_ver = rhs.assert_ver; + assert_src_version = rhs.assert_src_version; + last_objver = rhs.last_objver; + notify_timeout = rhs.notify_timeout; + oloc = rhs.oloc; + } + + void set_snap_read(snapid_t s); + int set_snap_write_context(snapid_t seq, vector& snaps); + + void get() { + ref_cnt.inc(); + } + + void put() { + if (ref_cnt.dec() == 0) + delete this; + } + + void queue_aio_write(struct AioCompletionImpl *c); + void complete_aio_write(struct AioCompletionImpl *c); + void flush_aio_writes(); + + int64_t get_id() { + return poolid; + } +}; + +#endif diff --git a/src/librados/PoolAsyncCompletionImpl.h b/src/librados/PoolAsyncCompletionImpl.h new file mode 100644 index 0000000000000..e204aaf2a53b0 --- /dev/null +++ b/src/librados/PoolAsyncCompletionImpl.h @@ -0,0 +1,88 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H +#define CEPH_LIBRADOS_POOLASYNCCOMPLETIONIMPL_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" + +struct librados::PoolAsyncCompletionImpl { + Mutex lock; + Cond cond; + int ref, rval; + bool released; + bool done; + + rados_callback_t callback; + void *callback_arg; + + PoolAsyncCompletionImpl() : lock("PoolAsyncCompletionImpl lock"), + ref(1), rval(0), released(false), done(false), + callback(0), callback_arg(0) {} + + int set_callback(void *cb_arg, rados_callback_t cb) { + lock.Lock(); + callback = cb; + callback_arg = cb_arg; + lock.Unlock(); + return 0; + } + int wait() { + lock.Lock(); + while (!done) + cond.Wait(lock); + lock.Unlock(); + return 0; + } + int is_complete() { + lock.Lock(); + int r = done; + lock.Unlock(); + return r; + } + int get_return_value() { + lock.Lock(); + int r = rval; + lock.Unlock(); + return r; + } + void get() { + lock.Lock(); + assert(ref > 0); + ref++; + lock.Unlock(); + } + void release() { + lock.Lock(); + assert(!released); + released = true; + put_unlock(); + } + void put() { + lock.Lock(); + put_unlock(); + } + void put_unlock() { + assert(ref > 0); + int n = --ref; + lock.Unlock(); + if (!n) + delete this; + } +}; + +#endif diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc new file mode 100644 index 0000000000000..b760534a2343f --- /dev/null +++ b/src/librados/RadosClient.cc @@ -0,0 +1,2014 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "common/ceph_context.h" +#include "common/config.h" +#include "common/common_init.h" +#include "common/errno.h" +#include "include/buffer.h" + +#include "messages/MWatchNotify.h" +#include "msg/SimpleMessenger.h" + +#include "AioCompletionImpl.h" +#include "IoCtxImpl.h" +#include "PoolAsyncCompletionImpl.h" +#include "RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +static atomic_t rados_instance; + +librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion() +{ + return new librados::PoolAsyncCompletionImpl; +} + +librados::PoolAsyncCompletionImpl *librados::RadosClient::pool_async_create_completion(void *cb_arg, rados_callback_t cb) +{ + librados::PoolAsyncCompletionImpl *c = new librados::PoolAsyncCompletionImpl; + if (cb) + c->set_callback(cb_arg, cb); + return c; +} + +librados::AioCompletionImpl *librados::RadosClient::aio_create_completion() +{ + return new librados::AioCompletionImpl; +} + +librados::AioCompletionImpl *librados::RadosClient::aio_create_completion(void *cb_arg, + rados_callback_t cb_complete, + rados_callback_t cb_safe) +{ + librados::AioCompletionImpl *c = new librados::AioCompletionImpl; + if (cb_complete) + c->set_complete_callback(cb_arg, cb_complete); + if (cb_safe) + c->set_safe_callback(cb_arg, cb_safe); + return c; +} + +bool librados::RadosClient::ms_get_authorizer(int dest_type, + AuthAuthorizer **authorizer, + bool force_new) { + //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl; + /* monitor authorization is being handled on different layer */ + if (dest_type == CEPH_ENTITY_TYPE_MON) + return true; + *authorizer = monclient.auth->build_authorizer(dest_type); + return *authorizer != NULL; +} + +librados::RadosClient::RadosClient(CephContext *cct_) : Dispatcher(cct_), + cct(cct_), + conf(cct_->_conf), + state(DISCONNECTED), + monclient(cct_), + messenger(NULL), + objecter(NULL), + lock("radosclient"), + timer(cct, lock), + max_watch_cookie(0) +{ +} + +int64_t librados::RadosClient::lookup_pool(const char *name) { + int64_t ret = osdmap.lookup_pg_pool_name(name); + if (ret < 0) + return -ENOENT; + return ret; +} + +const char *librados::RadosClient::get_pool_name(int64_t poolid_) +{ + return osdmap.get_pool_name(poolid_); +} + +int librados::RadosClient::connect() +{ + common_init_finish(cct); + + int err; + uint64_t nonce; + + // already connected? + if (state == CONNECTING) + return -EINPROGRESS; + if (state == CONNECTED) + return -EISCONN; + state = CONNECTING; + + // get monmap + err = monclient.build_initial_monmap(); + if (err < 0) + goto out; + + err = -ENOMEM; + nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc()); + messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1), nonce); + if (!messenger) + goto out; + + // require OSDREPLYMUX feature. this means we will fail to talk to + // old servers. this is necessary because otherwise we won't know + // how to decompose the reply data into its consituent pieces. + messenger->set_default_policy(Messenger::Policy::client(0, CEPH_FEATURE_OSDREPLYMUX)); + + ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl; + + ldout(cct, 1) << "starting objecter" << dendl; + + err = -ENOMEM; + objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer); + if (!objecter) + goto out; + objecter->set_balanced_budget(); + + monclient.set_messenger(messenger); + + messenger->add_dispatcher_head(this); + + messenger->start(); + + ldout(cct, 1) << "setting wanted keys" << dendl; + monclient.set_want_keys(CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD); + ldout(cct, 1) << "calling monclient init" << dendl; + err = monclient.init(); + if (err) { + ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl; + shutdown(); + goto out; + } + + err = monclient.authenticate(conf->client_mount_timeout); + if (err) { + ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl; + shutdown(); + goto out; + } + messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id())); + + lock.Lock(); + + timer.init(); + + objecter->set_client_incarnation(0); + objecter->init(); + monclient.renew_subs(); + + while (osdmap.get_epoch() == 0) { + ldout(cct, 1) << "waiting for osdmap" << dendl; + cond.Wait(lock); + } + state = CONNECTED; + lock.Unlock(); + + ldout(cct, 1) << "init done" << dendl; + err = 0; + + out: + if (err) + state = DISCONNECTED; + return err; +} + +void librados::RadosClient::shutdown() +{ + lock.Lock(); + if (state == DISCONNECTED) { + lock.Unlock(); + return; + } + monclient.shutdown(); + if (objecter && state == CONNECTED) + objecter->shutdown(); + state = DISCONNECTED; + timer.shutdown(); // will drop+retake lock + lock.Unlock(); + if (messenger) { + messenger->shutdown(); + messenger->wait(); + } + ldout(cct, 1) << "shutdown" << dendl; +} + +librados::RadosClient::~RadosClient() +{ + if (messenger) + delete messenger; + if (objecter) + delete objecter; + common_destroy_context(cct); + cct = NULL; +} + + +bool librados::RadosClient::ms_dispatch(Message *m) +{ + bool ret; + + lock.Lock(); + if (state == DISCONNECTED) { + ldout(cct, 10) << "disconnected, discarding " << *m << dendl; + m->put(); + ret = true; + } else { + ret = _dispatch(m); + } + lock.Unlock(); + return ret; +} + +void librados::RadosClient::ms_handle_connect(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_connect(con); +} + +bool librados::RadosClient::ms_handle_reset(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_reset(con); + return false; +} + +void librados::RadosClient::ms_handle_remote_reset(Connection *con) +{ + Mutex::Locker l(lock); + objecter->ms_handle_remote_reset(con); +} + + +bool librados::RadosClient::_dispatch(Message *m) +{ + switch (m->get_type()) { + // OSD + case CEPH_MSG_OSD_OPREPLY: + objecter->handle_osd_op_reply((class MOSDOpReply*)m); + break; + case CEPH_MSG_OSD_MAP: + objecter->handle_osd_map((MOSDMap*)m); + cond.Signal(); + break; + case MSG_GETPOOLSTATSREPLY: + objecter->handle_get_pool_stats_reply((MGetPoolStatsReply*)m); + break; + + case CEPH_MSG_MDS_MAP: + break; + + case CEPH_MSG_STATFS_REPLY: + objecter->handle_fs_stats_reply((MStatfsReply*)m); + break; + + case CEPH_MSG_POOLOP_REPLY: + objecter->handle_pool_op_reply((MPoolOpReply*)m); + break; + + case CEPH_MSG_WATCH_NOTIFY: + watch_notify((MWatchNotify *)m); + break; + default: + return false; + } + + return true; +} + +int librados::RadosClient::pool_list(std::list& v) +{ + Mutex::Locker l(lock); + for (map::const_iterator p = osdmap.get_pools().begin(); + p != osdmap.get_pools().end(); + p++) + v.push_back(osdmap.get_pool_name(p->first)); + return 0; +} + +int librados::RadosClient::get_pool_stats(std::list& pools, + map& result) +{ + Mutex mylock("RadosClient::get_pool_stats::mylock"); + Cond cond; + bool done; + + lock.Lock(); + objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done)); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + return 0; +} + +int librados::RadosClient::get_fs_stats(ceph_statfs& stats) +{ + Mutex mylock ("RadosClient::get_fs_stats::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->get_fs_stats(stats, new C_SafeCond(&mylock, &cond, &done)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + + return 0; +} + + +// SNAPS + +int librados::RadosClient::snap_create(rados_ioctx_t io, const char *snapName) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + string sName(snapName); + + Mutex mylock ("RadosClient::snap_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->create_pool_snap(poolID, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::selfmanaged_snap_create(rados_ioctx_t io, uint64_t *psnapid) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::selfmanaged_snap_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + snapid_t snapid; + objecter->allocate_selfmanaged_snap(poolID, &snapid, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + if (reply == 0) + *psnapid = snapid; + return reply; +} + +int librados::RadosClient::snap_remove(rados_ioctx_t io, const char *snapName) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + string sName(snapName); + + Mutex mylock ("RadosClient::snap_remove::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->delete_pool_snap(poolID, + sName, + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while(!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::selfmanaged_snap_rollback_object(rados_ioctx_t io, + const object_t& oid, + ::SnapContext& snapc, + uint64_t snapid) +{ + int reply; + IoCtxImpl* ctx = (IoCtxImpl *) io; + + Mutex mylock("RadosClient::snap_rollback::mylock"); + Cond cond; + bool done; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply); + + lock.Lock(); + objecter->rollback_object(oid, ctx->oloc, snapc, snapid, + ceph_clock_now(cct), onack, NULL); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::rollback(rados_ioctx_t io_, const object_t& oid, + const char *snapName) +{ + IoCtxImpl* io = (IoCtxImpl *) io_; + string sName(snapName); + + lock.Lock(); + snapid_t snap; + const map& pools = objecter->osdmap->get_pools(); + const pg_pool_t& pg_pool = pools.find(io->poolid)->second; + map::const_iterator p; + for (p = pg_pool.snaps.begin(); + p != pg_pool.snaps.end(); + ++p) { + if (p->second.name == snapName) { + snap = p->first; + break; + } + } + if (p == pg_pool.snaps.end()) { + lock.Unlock(); + return -ENOENT; + } + lock.Unlock(); + + return selfmanaged_snap_rollback_object(io_, oid, io->snapc, snap); +} + +int librados::RadosClient::selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid) +{ + int reply; + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::selfmanaged_snap_remove::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->delete_selfmanaged_snap(poolID, snapid_t(snapid), + new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return (int)reply; +} + +int librados::RadosClient::pool_create(string& name, unsigned long long auid, + __u8 crush_rule) +{ + int reply; + + Mutex mylock ("RadosClient::pool_create::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->create_pool(name, + new C_SafeCond(&mylock, &cond, &done, &reply), + auid, crush_rule); + lock.Unlock(); + + mylock.Lock(); + while(!done) + cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c, + unsigned long long auid, + __u8 crush_rule) +{ + Mutex::Locker l(lock); + objecter->create_pool(name, + new C_PoolAsync_Safe(c), + auid, crush_rule); + return 0; +} + +int librados::RadosClient::pool_delete(const char *name) +{ + int tmp_pool_id = osdmap.lookup_pg_pool_name(name); + if (tmp_pool_id < 0) + return -ENOENT; + + Mutex mylock("RadosClient::pool_delete::mylock"); + Cond cond; + bool done; + lock.Lock(); + int reply = 0; + objecter->delete_pool(tmp_pool_id, new C_SafeCond(&mylock, &cond, &done, &reply)); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c) +{ + int tmp_pool_id = osdmap.lookup_pg_pool_name(name); + if (tmp_pool_id < 0) + return -ENOENT; + + Mutex::Locker l(lock); + objecter->delete_pool(tmp_pool_id, new C_PoolAsync_Safe(c)); + + return 0; +} + +int librados::RadosClient::pool_change_auid(rados_ioctx_t io, unsigned long long auid) +{ + int reply; + + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex mylock("RadosClient::pool_change_auid::mylock"); + Cond cond; + bool done; + lock.Lock(); + objecter->change_pool_auid(poolID, + new C_SafeCond(&mylock, &cond, &done, &reply), + auid); + lock.Unlock(); + + mylock.Lock(); + while (!done) cond.Wait(mylock); + mylock.Unlock(); + return reply; +} + +int librados::RadosClient::pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, + PoolAsyncCompletionImpl *c) +{ + int64_t poolID = ((IoCtxImpl *)io)->poolid; + + Mutex::Locker l(lock); + objecter->change_pool_auid(poolID, + new C_PoolAsync_Safe(c), + auid); + return 0; +} + +int librados::RadosClient::pool_get_auid(rados_ioctx_t io, unsigned long long *auid) +{ + Mutex::Locker l(lock); + int64_t pool_id = ((IoCtxImpl *)io)->poolid; + const pg_pool_t *pg = osdmap.get_pg_pool(pool_id); + if (!pg) + return -ENOENT; + *auid = pg->auid; + return 0; +} + +int librados::RadosClient::snap_list(IoCtxImpl *io, vector *snaps) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + for (map::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) + snaps->push_back(p->first); + return 0; +} + +int librados::RadosClient::snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + for (map::const_iterator p = pi->snaps.begin(); + p != pi->snaps.end(); + p++) { + if (p->second.name == name) { + *snapid = p->first; + return 0; + } + } + return -ENOENT; +} + +int librados::RadosClient::snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + map::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *s = p->second.name.c_str(); + return 0; +} + +int librados::RadosClient::snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t) +{ + Mutex::Locker l(lock); + const pg_pool_t *pi = objecter->osdmap->get_pg_pool(io->poolid); + map::const_iterator p = pi->snaps.find(snapid); + if (p == pi->snaps.end()) + return -ENOENT; + *t = p->second.stamp.sec(); + return 0; +} + + +// IO + +int librados::RadosClient::list(Objecter::ListContext *context, int max_entries) +{ + Cond cond; + bool done; + int r = 0; + object_t oid; + Mutex mylock("RadosClient::list::mylock"); + + if (context->at_end) + return 0; + + context->max_entries = max_entries; + + lock.Lock(); + objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r)); + lock.Unlock(); + + mylock.Lock(); + while(!done) + cond.Wait(mylock); + mylock.Unlock(); + + return r; +} + +int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->create(oid, io.oloc, + io.snapc, ut, 0, (exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0), + onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::create(IoCtxImpl& io, const object_t& oid, bool exclusive, + const std::string& category) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::create::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + ::ObjectOperation o; + o.create(exclusive ? CEPH_OSD_OP_FLAG_EXCL : 0, category); + + lock.Lock(); + objecter->mutate(oid, io.oloc, o, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +/* + * add any version assert operations that are appropriate given the + * stat in the IoCtx, either the target version assert or any src + * object asserts. these affect a single ioctx operation, so clear + * the ioctx state when we're doing. + * + * return a pointer to the ObjectOperation if we added any events; + * this is convenient for passing the extra_ops argument into Objecter + * methods. + */ +::ObjectOperation *librados::RadosClient::prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op) +{ + ::ObjectOperation *pop = NULL; + if (io->assert_ver) { + op->assert_version(io->assert_ver); + io->assert_ver = 0; + pop = op; + } + while (!io->assert_src_version.empty()) { + map::iterator p = io->assert_src_version.begin(); + op->assert_src_version(p->first, CEPH_NOSNAP, p->second); + io->assert_src_version.erase(p); + pop = op; + } + return pop; +} + +int librados::RadosClient::write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, + size_t len, uint64_t off) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + // extra ops? + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->write(oid, io.oloc, + off, len, io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return len; +} + +int librados::RadosClient::append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::append::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->append(oid, io.oloc, + len, io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return len; +} + +int librados::RadosClient::write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->write_full(oid, io.oloc, + io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::clone_range(IoCtxImpl& io, + const object_t& dst_oid, uint64_t dst_offset, + const object_t& src_oid, uint64_t src_offset, + uint64_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::clone_range::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.clone_range(src_oid, src_offset, len, dst_offset); + objecter->mutate(dst_oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::operate(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, time_t *pmtime) +{ + utime_t ut; + if (pmtime) { + ut = utime_t(*pmtime, 0); + } else { + ut = ceph_clock_now(cct); + } + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + if (!o->size()) + return 0; + + Mutex mylock("RadosClient::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->mutate(oid, io.oloc, + *o, io.snapc, ut, 0, + onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::operate_read(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, bufferlist *pbl) +{ + if (!o->size()) + return 0; + + Mutex mylock("RadosClient::mutate::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->read(oid, io.oloc, + *o, io.snap_seq, pbl, 0, + onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::aio_operate_read(IoCtxImpl& io, const object_t &oid, + ::ObjectOperation *o, + AioCompletionImpl *c, bufferlist *pbl) +{ + Context *onack = new C_aio_Ack(c); + + c->pbl = pbl; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + *o, io.snap_seq, pbl, 0, + onack, 0); + return 0; +} + +int librados::RadosClient::aio_operate(IoCtxImpl& io, const object_t& oid, + ::ObjectOperation *o, AioCompletionImpl *c) +{ + utime_t ut = ceph_clock_now(cct); + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Context *onack = new C_aio_Ack(c); + Context *oncommit = new C_aio_Safe(c); + + io.queue_aio_write(c); + + Mutex::Locker l(lock); + objecter->mutate(oid, io.oloc, *o, io.snapc, ut, 0, onack, oncommit, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off) +{ + + Context *onack = new C_aio_Ack(c); + eversion_t ver; + + c->pbl = pbl; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack, &c->objver); + return 0; +} + +int librados::RadosClient::aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off) +{ + Context *onack = new C_aio_Ack(c); + + c->buf = buf; + c->maxlen = len; + + Mutex::Locker l(lock); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_sparse_read(IoCtxImpl& io, const object_t oid, + AioCompletionImpl *c, std::map *m, + bufferlist *data_bl, size_t len, uint64_t off) +{ + + C_aio_sparse_read_Ack *onack = new C_aio_sparse_read_Ack(c); + onack->m = m; + onack->data_bl = data_bl; + eversion_t ver; + + c->pbl = NULL; + + Mutex::Locker l(lock); + objecter->sparse_read(oid, io.oloc, + off, len, io.snap_seq, &c->bl, 0, + onack); + return 0; +} + +int librados::RadosClient::aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, uint64_t off) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->write(oid, io.oloc, + off, len, io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->append(oid, io.oloc, + len, io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::aio_write_full(IoCtxImpl& io, const object_t &oid, + AioCompletionImpl *c, const bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + io.queue_aio_write(c); + + Context *onack = new C_aio_Ack(c); + Context *onsafe = new C_aio_Safe(c); + + Mutex::Locker l(lock); + objecter->write_full(oid, io.oloc, + io.snapc, bl, ut, 0, + onack, onsafe, &c->objver); + + return 0; +} + +int librados::RadosClient::remove(IoCtxImpl& io, const object_t& oid) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::remove::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->remove(oid, io.oloc, + io.snapc, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::trunc(IoCtxImpl& io, const object_t& oid, uint64_t size) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::write_full::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->trunc(oid, io.oloc, + io.snapc, ut, 0, + size, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_update::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.tmap_update(cmdbl); + objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_put::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation wr; + prepare_assert_ops(&io, &wr); + wr.tmap_put(bl); + objecter->mutate(oid, io.oloc, wr, io.snapc, ut, 0, onack, NULL, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl) +{ + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::tmap_put::mylock"); + Cond cond; + bool done; + int r = 0; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + bufferlist outbl; + + lock.Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.tmap_get(&bl, NULL); + objecter->read(oid, io.oloc, rd, io.snap_seq, 0, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::exec(IoCtxImpl& io, const object_t& oid, + const char *cls, const char *method, + bufferlist& inbl, bufferlist& outbl) +{ + Mutex mylock("RadosClient::exec::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + + lock.Lock(); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.call(cls, method, inbl); + objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, + const char *cls, const char *method, + bufferlist& inbl, bufferlist *outbl) +{ + Context *onack = new C_aio_Ack(c); + + Mutex::Locker l(lock); + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.call(cls, method, inbl); + objecter->read(oid, io.oloc, rd, io.snap_seq, outbl, 0, onack, &c->objver); + + return 0; +} + +int librados::RadosClient::read(IoCtxImpl& io, const object_t& oid, + bufferlist& bl, size_t len, uint64_t off) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->read(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + if (bl.length() < len) { + ldout(cct, 10) << "Returned length " << bl.length() + << " less than original length "<< len << dendl; + } + + return bl.length(); +} + +int librados::RadosClient::mapext(IoCtxImpl& io, const object_t& oid, + uint64_t off, size_t len, std::map& m) +{ + CephContext *cct = io.client->cct; + bufferlist bl; + + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->mapext(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + + return m.size(); +} + +int librados::RadosClient::sparse_read(IoCtxImpl& io, const object_t& oid, + std::map& m, + bufferlist& data_bl, size_t len, uint64_t off) +{ + CephContext *cct = io.client->cct; + bufferlist bl; + + Mutex mylock("RadosClient::read::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + objecter->sparse_read(oid, io.oloc, + off, len, io.snap_seq, &bl, 0, + onack); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from read r=" << r << dendl; + + if (r < 0) + return r; + + bufferlist::iterator iter = bl.begin(); + ::decode(m, iter); + ::decode(data_bl, iter); + + return m.size(); +} + +int librados::RadosClient::stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::stat::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + uint64_t size; + utime_t mtime; + eversion_t ver; + + if (!psize) + psize = &size; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->stat(oid, io.oloc, + io.snap_seq, psize, &mtime, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from stat" << dendl; + + if (r >= 0 && pmtime) { + *pmtime = mtime.sec(); + } + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::getxattr(IoCtxImpl& io, const object_t& oid, + const char *name, bufferlist& bl) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::getxattr::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->getxattr(oid, io.oloc, + name, io.snap_seq, &bl, 0, + onack, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + ldout(cct, 10) << "Objecter returned from getxattr" << dendl; + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return bl.length(); +} + +int librados::RadosClient::rmxattr(IoCtxImpl& io, const object_t& oid, const char *name) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::rmxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->removexattr(oid, io.oloc, name, + io.snapc, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::RadosClient::setxattr(IoCtxImpl& io, const object_t& oid, + const char *name, bufferlist& bl) +{ + utime_t ut = ceph_clock_now(cct); + + /* can't write to a snapshot */ + if (io.snap_seq != CEPH_NOSNAP) + return -EROFS; + + Mutex mylock("RadosClient::setxattr::mylock"); + Cond cond; + bool done; + int r; + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + lock.Lock(); + objecter->setxattr(oid, io.oloc, name, + io.snapc, bl, ut, 0, + onack, NULL, &ver, pop); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + if (r < 0) + return r; + + return 0; +} + +int librados::RadosClient::getxattrs(IoCtxImpl& io, const object_t& oid, + map& attrset) +{ + CephContext *cct = io.client->cct; + Mutex mylock("RadosClient::getexattrs::mylock"); + Cond cond; + bool done; + int r; + eversion_t ver; + + ::ObjectOperation op; + ::ObjectOperation *pop = prepare_assert_ops(&io, &op); + + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + + lock.Lock(); + map aset; + objecter->getxattrs(oid, io.oloc, io.snap_seq, + aset, + 0, onack, &ver, pop); + lock.Unlock(); + + attrset.clear(); + + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + for (map::iterator p = aset.begin(); p != aset.end(); p++) { + ldout(cct, 10) << "RadosClient::getxattrs: xattr=" << p->first << dendl; + attrset[p->first.c_str()] = p->second; + } + + set_sync_op_version(io, ver); + + return r; +} + +void librados::RadosClient::watch_notify(MWatchNotify *m) +{ + assert(lock.is_locked()); + WatchContext *wc = NULL; + map::iterator iter = watchers.find(m->cookie); + if (iter != watchers.end()) + wc = iter->second; + + if (!wc) + return; + + wc->notify(this, m); + + m->put(); +} + +int librados::RadosClient::watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, + uint64_t *cookie, librados::WatchCtx *ctx) +{ + ::ObjectOperation rd; + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + + lock.Lock(); + + WatchContext *wc; + register_watcher(io, oid, ctx, cookie, &wc); + prepare_assert_ops(&io, &rd); + rd.watch(*cookie, ver, 1); + bufferlist bl; + wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, bl, NULL, 0, NULL, onfinish, &objver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, objver); + + if (r < 0) { + lock.Lock(); + unregister_watcher(*cookie); + lock.Unlock(); + } + + return r; +} + + +/* this is called with RadosClient::lock held */ +int librados::RadosClient::_notify_ack(IoCtxImpl& io, const object_t& oid, + uint64_t notify_id, uint64_t ver) +{ + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + eversion_t objver; + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.notify_ack(notify_id, ver); + objecter->read(oid, io.oloc, rd, io.snap_seq, (bufferlist*)NULL, 0, 0, 0); + + return 0; +} + +int librados::RadosClient::unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie) +{ + bufferlist inbl, outbl; + + Mutex mylock("RadosClient::watch::mylock"); + Cond cond; + bool done; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t ver; + lock.Lock(); + + unregister_watcher(cookie); + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + rd.watch(cookie, 0, 0); + objecter->read(oid, io.oloc, rd, io.snap_seq, &outbl, 0, onack, &ver); + lock.Unlock(); + + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + set_sync_op_version(io, ver); + + return r; +} + +int librados::RadosClient::notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl) +{ + bufferlist inbl, outbl; + + Mutex mylock("RadosClient::notify::mylock"); + Mutex mylock_all("RadosClient::notify::mylock_all"); + Cond cond, cond_all; + bool done, done_all; + int r; + Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); + eversion_t objver; + uint64_t cookie; + C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); + + ::ObjectOperation rd; + prepare_assert_ops(&io, &rd); + + lock.Lock(); + WatchContext *wc; + register_watcher(io, oid, ctx, &cookie, &wc); + uint32_t prot_ver = 1; + uint32_t timeout = io.notify_timeout; + ::encode(prot_ver, inbl); + ::encode(timeout, inbl); + ::encode(bl, inbl); + rd.notify(cookie, ver, inbl); + wc->linger_id = objecter->linger(oid, io.oloc, rd, io.snap_seq, inbl, NULL, + 0, onack, NULL, &objver); + lock.Unlock(); + + mylock_all.Lock(); + mylock.Lock(); + while (!done) + cond.Wait(mylock); + mylock.Unlock(); + + if (r == 0) { + while (!done_all) + cond_all.Wait(mylock_all); + } + + mylock_all.Unlock(); + + lock.Lock(); + unregister_watcher(cookie); + lock.Unlock(); + + set_sync_op_version(io, objver); + delete ctx; + + return r; +} + +void librados::RadosClient::set_sync_op_version(IoCtxImpl& io, eversion_t& ver) +{ + io.last_objver = ver; +} + +void librados::RadosClient::register_watcher(IoCtxImpl& io, + const object_t& oid, + librados::WatchCtx *ctx, + uint64_t *cookie, + WatchContext **pwc) +{ + assert(lock.is_locked()); + WatchContext *wc = new WatchContext(&io, oid, ctx); + *cookie = ++max_watch_cookie; + watchers[*cookie] = wc; + if (pwc) + *pwc = wc; +} + +void librados::RadosClient::unregister_watcher(uint64_t cookie) +{ + assert(lock.is_locked()); + map::iterator iter = watchers.find(cookie); + if (iter != watchers.end()) { + WatchContext *ctx = iter->second; + if (ctx->linger_id) + objecter->unregister_linger(ctx->linger_id); + delete ctx; + watchers.erase(iter); + } +} + +eversion_t librados::RadosClient::last_version(IoCtxImpl& io) +{ + return io.last_objver; +} + +void librados::RadosClient::set_assert_version(IoCtxImpl& io, uint64_t ver) +{ + io.assert_ver = ver; +} +void librados::RadosClient::set_assert_src_version(IoCtxImpl& io, + const object_t& oid, + uint64_t ver) +{ + io.assert_src_version[oid] = ver; +} + +void librados::RadosClient::set_notify_timeout(IoCtxImpl& io, uint32_t timeout) +{ + io.notify_timeout = timeout; +} + +///////////////////////////// C_aio_Ack //////////////////////////////// + +librados::RadosClient::C_aio_Ack::C_aio_Ack(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + if (c->buf && c->bl.length() > 0) { + unsigned l = MIN(c->bl.length(), c->maxlen); + c->bl.copy(0, l, c->buf); + c->rval = c->bl.length(); + } + if (c->pbl) { + *c->pbl = c->bl; + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +/////////////////////// C_aio_sparse_read_Ack ////////////////////////// + +librados::RadosClient::C_aio_sparse_read_Ack::C_aio_sparse_read_Ack(AioCompletionImpl *_c) + : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_sparse_read_Ack::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->ack = true; + c->cond.Signal(); + + bufferlist::iterator iter = c->bl.begin(); + if (r >= 0) { + ::decode(*m, iter); + ::decode(*data_bl, iter); + } + + if (c->callback_complete) { + rados_callback_t cb = c->callback_complete; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +//////////////////////////// C_aio_Safe //////////////////////////////// + +librados::RadosClient::C_aio_Safe::C_aio_Safe(AioCompletionImpl *_c) : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_aio_Safe::finish(int r) +{ + c->lock.Lock(); + if (!c->ack) { + c->rval = r; + c->ack = true; + } + c->safe = true; + c->cond.Signal(); + + if (c->callback_safe) { + rados_callback_t cb = c->callback_safe; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->io->complete_aio_write(c); + + c->put_unlock(); +} + +///////////////////////// C_PoolAsync_Safe ///////////////////////////// + +librados::RadosClient::C_PoolAsync_Safe::C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c) + : c(_c) +{ + c->get(); +} + +void librados::RadosClient::C_PoolAsync_Safe::finish(int r) +{ + c->lock.Lock(); + c->rval = r; + c->done = true; + c->cond.Signal(); + + if (c->callback) { + rados_callback_t cb = c->callback; + void *cb_arg = c->callback_arg; + c->lock.Unlock(); + cb(c, cb_arg); + c->lock.Lock(); + } + + c->put_unlock(); +} + +///////////////////////// C_NotifyComplete ///////////////////////////// + +librados::RadosClient::C_NotifyComplete::C_NotifyComplete(Mutex *_l, + Cond *_c, + bool *_d) + : lock(_l), cond(_c), done(_d) +{ + *done = false; +} + +void librados::RadosClient::C_NotifyComplete::notify(uint8_t opcode, + uint64_t ver, + bufferlist& bl) +{ + *done = true; + cond->Signal(); +} + +/////////////////////////// WatchContext /////////////////////////////// + +librados::RadosClient::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx) + : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0) +{ + io_ctx_impl->get(); +} + +librados::RadosClient::WatchContext::~WatchContext() +{ + io_ctx_impl->put(); +} + +void librados::RadosClient::WatchContext::notify(RadosClient *client, + MWatchNotify *m) +{ + ctx->notify(m->opcode, m->ver, m->bl); + if (m->opcode != WATCH_NOTIFY_COMPLETE) { + client->_notify_ack(*io_ctx_impl, oid, m->notify_id, m->ver); + } +} diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h new file mode 100644 index 0000000000000..98082df30b7cc --- /dev/null +++ b/src/librados/RadosClient.h @@ -0,0 +1,234 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ +#ifndef CEPH_LIBRADOS_RADOSCLIENT_H +#define CEPH_LIBRADOS_RADOSCLIENT_H + +#include "common/Cond.h" +#include "common/Mutex.h" +#include "common/Timer.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "mon/MonClient.h" +#include "msg/Dispatcher.h" +#include "osd/OSDMap.h" +#include "osdc/Objecter.h" + +class AuthAuthorizer; +class CephContext; +class Connection; +struct md_config_t; +class Message; +class MWatchNotify; +class SimpleMessenger; + +class librados::RadosClient : public Dispatcher +{ +public: + CephContext *cct; + md_config_t *conf; +private: + enum { + DISCONNECTED, + CONNECTING, + CONNECTED, + } state; + + OSDMap osdmap; + MonClient monclient; + SimpleMessenger *messenger; + + bool _dispatch(Message *m); + bool ms_dispatch(Message *m); + + bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new); + void ms_handle_connect(Connection *con); + bool ms_handle_reset(Connection *con); + void ms_handle_remote_reset(Connection *con); + + Objecter *objecter; + + Mutex lock; + Cond cond; + SafeTimer timer; + +public: + + RadosClient(CephContext *cct_); + ~RadosClient(); + int connect(); + void shutdown(); + + int64_t lookup_pool(const char *name); + const char *get_pool_name(int64_t poolid_); + ::ObjectOperation *prepare_assert_ops(IoCtxImpl *io, ::ObjectOperation *op); + + // snaps + int snap_list(IoCtxImpl *io, vector *snaps); + int snap_lookup(IoCtxImpl *io, const char *name, uint64_t *snapid); + int snap_get_name(IoCtxImpl *io, uint64_t snapid, std::string *s); + int snap_get_stamp(IoCtxImpl *io, uint64_t snapid, time_t *t); + int snap_create(rados_ioctx_t io, const char* snapname); + int selfmanaged_snap_create(rados_ioctx_t io, uint64_t *snapid); + int snap_remove(rados_ioctx_t io, const char* snapname); + int rollback(rados_ioctx_t io_, const object_t& oid, const char *snapName); + int selfmanaged_snap_remove(rados_ioctx_t io, uint64_t snapid); + int selfmanaged_snap_rollback_object(rados_ioctx_t io, const object_t& oid, + ::SnapContext& snapc, uint64_t snapid); + + // io + int create(IoCtxImpl& io, const object_t& oid, bool exclusive); + int create(IoCtxImpl& io, const object_t& oid, bool exclusive, const std::string& category); + int write(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int append(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len); + int write_full(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + int clone_range(IoCtxImpl& io, const object_t& dst_oid, uint64_t dst_offset, + const object_t& src_oid, uint64_t src_offset, uint64_t len); + int read(IoCtxImpl& io, const object_t& oid, bufferlist& bl, size_t len, uint64_t off); + int mapext(IoCtxImpl& io, const object_t& oid, uint64_t off, size_t len, + std::map& m); + int sparse_read(IoCtxImpl& io, const object_t& oid, std::map& m, + bufferlist& bl, size_t len, uint64_t off); + int remove(IoCtxImpl& io, const object_t& oid); + int stat(IoCtxImpl& io, const object_t& oid, uint64_t *psize, time_t *pmtime); + int trunc(IoCtxImpl& io, const object_t& oid, uint64_t size); + + int tmap_update(IoCtxImpl& io, const object_t& oid, bufferlist& cmdbl); + int tmap_put(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + int tmap_get(IoCtxImpl& io, const object_t& oid, bufferlist& bl); + + int exec(IoCtxImpl& io, const object_t& oid, const char *cls, const char *method, bufferlist& inbl, bufferlist& outbl); + + int getxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); + int setxattr(IoCtxImpl& io, const object_t& oid, const char *name, bufferlist& bl); + int getxattrs(IoCtxImpl& io, const object_t& oid, map& attrset); + int rmxattr(IoCtxImpl& io, const object_t& oid, const char *name); + + int pool_list(std::list& ls); + int get_pool_stats(std::list& ls, map& result); + int get_fs_stats(ceph_statfs& result); + + int pool_create(string& name, unsigned long long auid=0, __u8 crush_rule=0); + int pool_create_async(string& name, PoolAsyncCompletionImpl *c, unsigned long long auid=0, + __u8 crush_rule=0); + int pool_delete(const char *name); + int pool_change_auid(rados_ioctx_t io, unsigned long long auid); + int pool_get_auid(rados_ioctx_t io, unsigned long long *auid); + + int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); + int pool_change_auid_async(rados_ioctx_t io, unsigned long long auid, PoolAsyncCompletionImpl *c); + + int list(Objecter::ListContext *context, int max_entries); + + int operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, time_t *pmtime); + int operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, bufferlist *pbl); + int aio_operate(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c); + int aio_operate_read(IoCtxImpl& io, const object_t& oid, ::ObjectOperation *o, AioCompletionImpl *c, bufferlist *pbl); + + struct C_aio_Ack : public Context { + librados::AioCompletionImpl *c; + C_aio_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_sparse_read_Ack : public Context { + AioCompletionImpl *c; + bufferlist *data_bl; + std::map *m; + C_aio_sparse_read_Ack(AioCompletionImpl *_c); + void finish(int r); + }; + + struct C_aio_Safe : public Context { + AioCompletionImpl *c; + C_aio_Safe(AioCompletionImpl *_c); + void finish(int r); + }; + + int aio_read(IoCtxImpl& io, const object_t oid, AioCompletionImpl *c, + bufferlist *pbl, size_t len, uint64_t off); + int aio_read(IoCtxImpl& io, object_t oid, AioCompletionImpl *c, + char *buf, size_t len, uint64_t off); + int aio_sparse_read(IoCtxImpl& io, const object_t oid, + AioCompletionImpl *c, std::map *m, + bufferlist *data_bl, size_t len, uint64_t off); + int aio_write(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len, uint64_t off); + int aio_append(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl, size_t len); + int aio_write_full(IoCtxImpl& io, const object_t &oid, AioCompletionImpl *c, + const bufferlist& bl); + int aio_exec(IoCtxImpl& io, const object_t& oid, AioCompletionImpl *c, + const char *cls, const char *method, bufferlist& inbl, bufferlist *outbl); + + struct C_PoolAsync_Safe : public Context { + PoolAsyncCompletionImpl *c; + C_PoolAsync_Safe(PoolAsyncCompletionImpl *_c); + void finish(int r); + }; + + static PoolAsyncCompletionImpl *pool_async_create_completion(); + static PoolAsyncCompletionImpl *pool_async_create_completion(void *cb_arg, + rados_callback_t cb); + static AioCompletionImpl *aio_create_completion(); + static AioCompletionImpl *aio_create_completion(void *cb_arg, + rados_callback_t cb_complete, + rados_callback_t cb_safe); + + // watch/notify + struct WatchContext { + IoCtxImpl *io_ctx_impl; + const object_t oid; + uint64_t cookie; + uint64_t ver; + librados::WatchCtx *ctx; + uint64_t linger_id; + + WatchContext(IoCtxImpl *io_ctx_impl_, + const object_t& _oc, + librados::WatchCtx *_ctx); + ~WatchContext(); + void notify(RadosClient *client, MWatchNotify *m); + }; + + struct C_NotifyComplete : public librados::WatchCtx { + Mutex *lock; + Cond *cond; + bool *done; + + C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d); + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); + }; + + uint64_t max_watch_cookie; + map watchers; + + void set_sync_op_version(IoCtxImpl& io, eversion_t& ver); + + void register_watcher(IoCtxImpl& io, const object_t& oid, + librados::WatchCtx *ctx, uint64_t *cookie, + WatchContext **pwc = NULL); + void unregister_watcher(uint64_t cookie); + void watch_notify(MWatchNotify *m); + int watch(IoCtxImpl& io, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx); + int unwatch(IoCtxImpl& io, const object_t& oid, uint64_t cookie); + int notify(IoCtxImpl& io, const object_t& oid, uint64_t ver, bufferlist& bl); + int _notify_ack(IoCtxImpl& io, const object_t& oid, uint64_t notify_id, uint64_t ver); + + eversion_t last_version(IoCtxImpl& io); + void set_assert_version(IoCtxImpl& io, uint64_t ver); + void set_assert_src_version(IoCtxImpl& io, const object_t& oid, uint64_t ver); + void set_notify_timeout(IoCtxImpl& io, uint32_t timeout); +}; + +#endif diff --git a/src/librados/librados.cc b/src/librados/librados.cc new file mode 100644 index 0000000000000..48c988fccbbd3 --- /dev/null +++ b/src/librados/librados.cc @@ -0,0 +1,2046 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2012 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +using namespace std; + +#include "common/config.h" +#include "common/errno.h" +#include "common/ceph_argparse.h" +#include "common/common_init.h" +#include "include/rados/librados.h" +#include "include/rados/librados.hpp" +#include "include/types.h" + +#include "librados/AioCompletionImpl.h" +#include "librados/IoCtxImpl.h" +#include "librados/PoolAsyncCompletionImpl.h" +#include "librados/RadosClient.h" + +#define DOUT_SUBSYS rados +#undef dout_prefix +#define dout_prefix *_dout << "librados: " + +#define RADOS_LIST_MAX_ENTRIES 1024 + +/* + * Structure of this file + * + * RadosClient and the related classes are the internal implementation of librados. + * Above that layer sits the C API, found in include/rados/librados.h, and + * the C++ API, found in include/rados/librados.hpp + * + * The C++ API sometimes implements things in terms of the C API. + * Both the C++ and C API rely on RadosClient. + * + * Visually: + * +--------------------------------------+ + * | C++ API | + * +--------------------+ | + * | C API | | + * +--------------------+-----------------+ + * | RadosClient | + * +--------------------------------------+ + */ + +size_t librados::ObjectOperation::size() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + return o->size(); +} + +void librados::ObjectOperation::set_op_flags(ObjectOperationFlags flags) +{ + int rados_flags = 0; + if (flags & OP_EXCL) + rados_flags |= CEPH_OSD_OP_FLAG_EXCL; + if (flags & OP_FAILOK) + rados_flags |= CEPH_OSD_OP_FLAG_FAILOK; + + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->set_last_op_flags(rados_flags); +} + +void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_STRING, v); +} + +void librados::ObjectOperation::cmpxattr(const char *name, uint8_t op, uint64_t v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist bl; + ::encode(v, bl); + o->cmpxattr(name, op, CEPH_OSD_CMPXATTR_MODE_U64, bl); +} + +void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, + const char *name, int op, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + object_t oid(src_oid); + o->src_cmpxattr(oid, CEPH_NOSNAP, name, v, op, CEPH_OSD_CMPXATTR_MODE_STRING); +} + +void librados::ObjectOperation::src_cmpxattr(const std::string& src_oid, + const char *name, int op, uint64_t val) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + object_t oid(src_oid); + bufferlist bl; + ::encode(val, bl); + o->src_cmpxattr(oid, CEPH_NOSNAP, name, bl, op, CEPH_OSD_CMPXATTR_MODE_U64); +} + +void librados::ObjectOperation::exec(const char *cls, const char *method, bufferlist& inbl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->call(cls, method, inbl); +} + +void librados::ObjectReadOperation::stat(uint64_t *psize, time_t *pmtime, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->stat(psize, pmtime, prval); +} + +void librados::ObjectReadOperation::read(size_t off, uint64_t len, bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->read(off, len, pbl, prval); +} + +void librados::ObjectReadOperation::tmap_get(bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->tmap_get(pbl, prval); +} + +void librados::ObjectReadOperation::getxattr(const char *name, bufferlist *pbl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->getxattr(name, pbl, prval); +} + +void librados::ObjectReadOperation::omap_get_vals( + const std::string &start_after, + const std::string &filter_prefix, + uint64_t max_return, + std::map *out_vals, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals(start_after, filter_prefix, max_return, out_vals, prval); +} + +void librados::ObjectReadOperation::omap_get_vals( + const std::string &start_after, + uint64_t max_return, + std::map *out_vals, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals(start_after, "", max_return, out_vals, prval); +} + +void librados::ObjectReadOperation::omap_get_keys( + const std::string &start_after, + uint64_t max_return, + std::set *out_keys, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_keys(start_after, max_return, out_keys, prval); +} + +void librados::ObjectReadOperation::omap_get_header(bufferlist *bl, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_header(bl, prval); +} + +void librados::ObjectReadOperation::omap_get_vals_by_keys( + const std::set &keys, + std::map *map, + int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_get_vals_by_keys(keys, map, prval); +} + +int librados::IoCtx::omap_get_vals(const std::string& oid, + const std::string& start_after, + const std::string& filter_prefix, + uint64_t max_return, + std::map *out_vals) +{ + ObjectReadOperation op; + int r; + op.omap_get_vals(start_after, filter_prefix, max_return, out_vals, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +void librados::ObjectReadOperation::getxattrs(map *pattrs, int *prval) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->getxattrs(pattrs, prval); +} + +void librados::ObjectWriteOperation::create(bool exclusive) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->create(exclusive); +} + +void librados::ObjectWriteOperation::create(bool exclusive, const std::string& category) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->create(exclusive, category); +} + +void librados::ObjectWriteOperation::write(uint64_t off, const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->write(off, c); +} + +void librados::ObjectWriteOperation::write_full(const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->write_full(c); +} + +void librados::ObjectWriteOperation::append(const bufferlist& bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->append(c); +} + +void librados::ObjectWriteOperation::remove() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->remove(); +} + +void librados::ObjectWriteOperation::truncate(uint64_t off) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->truncate(off); +} + +void librados::ObjectWriteOperation::zero(uint64_t off, uint64_t len) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->zero(off, len); +} + +void librados::ObjectWriteOperation::rmxattr(const char *name) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->rmxattr(name); +} + +void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist& v) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->setxattr(name, v); +} + +void librados::ObjectWriteOperation::omap_set( + const map &map) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_set(map); +} + +void librados::ObjectWriteOperation::omap_set_header(const bufferlist &bl) +{ + bufferlist c = bl; + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_set_header(c); +} + +void librados::ObjectWriteOperation::omap_clear() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_clear(); +} + +void librados::ObjectWriteOperation::omap_rm_keys( + const std::set &to_rm) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->omap_rm_keys(to_rm); +} + +void librados::ObjectWriteOperation::tmap_put(const bufferlist &bl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = bl; + o->tmap_put(c); +} + +void librados::ObjectWriteOperation::tmap_update(const bufferlist& cmdbl) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + bufferlist c = cmdbl; + o->tmap_update(c); +} + +void librados::ObjectWriteOperation::clone_range(uint64_t dst_off, + const std::string& src_oid, uint64_t src_off, + size_t len) +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + o->clone_range(src_oid, src_off, len, dst_off); +} + +librados::WatchCtx:: +~WatchCtx() +{ +} + + +struct librados::ObjListCtx { + librados::IoCtxImpl *ctx; + Objecter::ListContext *lc; + + ObjListCtx(IoCtxImpl *c, Objecter::ListContext *l) : ctx(c), lc(l) {} + ~ObjListCtx() { + delete lc; + } +}; + +///////////////////////////// ObjectIterator ///////////////////////////// +librados::ObjectIterator::ObjectIterator(ObjListCtx *ctx_) + : ctx(ctx_) +{ +} + +librados::ObjectIterator::~ObjectIterator() +{ + ctx.reset(); +} + +bool librados::ObjectIterator::operator==(const librados::ObjectIterator& rhs) const { + return (ctx.get() == rhs.ctx.get()); +} + +bool librados::ObjectIterator::operator!=(const librados::ObjectIterator& rhs) const { + return (ctx.get() != rhs.ctx.get()); +} + +const pair& librados::ObjectIterator::operator*() const { + return cur_obj; +} + +const pair* librados::ObjectIterator::operator->() const { + return &cur_obj; +} + +librados::ObjectIterator& librados::ObjectIterator::operator++() +{ + get_next(); + return *this; +} + +librados::ObjectIterator librados::ObjectIterator::operator++(int) +{ + librados::ObjectIterator ret(*this); + get_next(); + return ret; +} + +void librados::ObjectIterator::get_next() +{ + const char *entry, *key; + int ret = rados_objects_list_next(ctx.get(), &entry, &key); + if (ret == -ENOENT) { + ctx.reset(); + *this = __EndObjectIterator; + return; + } + else if (ret) { + ostringstream oss; + oss << "rados returned " << cpp_strerror(ret); + throw std::runtime_error(oss.str()); + } + + cur_obj = make_pair(entry, key ? key : string()); +} + +const librados::ObjectIterator librados::ObjectIterator::__EndObjectIterator(NULL); + +///////////////////////////// PoolAsyncCompletion ////////////////////////////// +int librados::PoolAsyncCompletion::PoolAsyncCompletion::set_callback(void *cb_arg, + rados_callback_t cb) +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->set_callback(cb_arg, cb); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion::wait() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->wait(); +} + +bool librados::PoolAsyncCompletion::PoolAsyncCompletion::is_complete() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->is_complete(); +} + +int librados::PoolAsyncCompletion::PoolAsyncCompletion::get_return_value() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + return c->get_return_value(); +} + +void librados::PoolAsyncCompletion::PoolAsyncCompletion::release() +{ + PoolAsyncCompletionImpl *c = (PoolAsyncCompletionImpl *)pc; + c->release(); + delete this; +} + +///////////////////////////// AioCompletion ////////////////////////////// +int librados::AioCompletion::AioCompletion::set_complete_callback(void *cb_arg, rados_callback_t cb) +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->set_complete_callback(cb_arg, cb); +} + +int librados::AioCompletion::AioCompletion::set_safe_callback(void *cb_arg, rados_callback_t cb) +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->set_safe_callback(cb_arg, cb); +} + +int librados::AioCompletion::AioCompletion::wait_for_complete() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->wait_for_complete(); +} + +int librados::AioCompletion::AioCompletion::wait_for_safe() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->wait_for_safe(); +} + +bool librados::AioCompletion::AioCompletion::is_complete() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->is_complete(); +} + +bool librados::AioCompletion::AioCompletion::is_safe() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->is_safe(); +} + +int librados::AioCompletion::AioCompletion::get_return_value() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->get_return_value(); +} + +int librados::AioCompletion::AioCompletion::get_version() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + return c->get_version(); +} + +void librados::AioCompletion::AioCompletion::release() +{ + AioCompletionImpl *c = (AioCompletionImpl *)pc; + c->release(); + delete this; +} + +///////////////////////////// IoCtx ////////////////////////////// +librados::IoCtx::IoCtx() : io_ctx_impl(NULL) +{ +} + +void librados::IoCtx::from_rados_ioctx_t(rados_ioctx_t p, IoCtx &io) +{ + IoCtxImpl *io_ctx_impl = (IoCtxImpl*)p; + + io.io_ctx_impl = io_ctx_impl; + if (io_ctx_impl) { + io_ctx_impl->get(); + } +} + +librados::IoCtx::IoCtx(const IoCtx& rhs) +{ + io_ctx_impl = rhs.io_ctx_impl; + if (io_ctx_impl) { + io_ctx_impl->get(); + } +} + +librados::IoCtx& librados::IoCtx::operator=(const IoCtx& rhs) +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = rhs.io_ctx_impl; + io_ctx_impl->get(); + return *this; +} + +librados::IoCtx::~IoCtx() +{ + close(); +} + +void librados::IoCtx::close() +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = 0; +} + +void librados::IoCtx::dup(const IoCtx& rhs) +{ + if (io_ctx_impl) + io_ctx_impl->put(); + io_ctx_impl = new IoCtxImpl(); + io_ctx_impl->get(); + io_ctx_impl->dup(*rhs.io_ctx_impl); +} + +int librados::IoCtx::set_auid(uint64_t auid_) +{ + return io_ctx_impl->client->pool_change_auid(io_ctx_impl, auid_); +} + +int librados::IoCtx::set_auid_async(uint64_t auid_, PoolAsyncCompletion *c) +{ + return io_ctx_impl->client->pool_change_auid_async(io_ctx_impl, auid_, c->pc); +} + +int librados::IoCtx::get_auid(uint64_t *auid_) +{ + return io_ctx_impl->client->pool_get_auid(io_ctx_impl, (unsigned long long *)auid_); +} + +int librados::IoCtx::create(const std::string& oid, bool exclusive) +{ + object_t obj(oid); + return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive); +} + +int librados::IoCtx::create(const std::string& oid, bool exclusive, const std::string& category) +{ + object_t obj(oid); + return io_ctx_impl->client->create(*io_ctx_impl, obj, exclusive, category); +} + +int librados::IoCtx::write(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->write(*io_ctx_impl, obj, bl, len, off); +} + +int librados::IoCtx::append(const std::string& oid, bufferlist& bl, size_t len) +{ + object_t obj(oid); + return io_ctx_impl->client->append(*io_ctx_impl, obj, bl, len); +} + +int librados::IoCtx::write_full(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->write_full(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::clone_range(const std::string& dst_oid, uint64_t dst_off, + const std::string& src_oid, uint64_t src_off, + size_t len) +{ + object_t src(src_oid), dst(dst_oid); + return io_ctx_impl->client->clone_range(*io_ctx_impl, dst, dst_off, src, src_off, len); +} + +int librados::IoCtx::read(const std::string& oid, bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->read(*io_ctx_impl, obj, bl, len, off); +} + +int librados::IoCtx::remove(const std::string& oid) +{ + object_t obj(oid); + return io_ctx_impl->client->remove(*io_ctx_impl, obj); +} + +int librados::IoCtx::trunc(const std::string& oid, uint64_t size) +{ + object_t obj(oid); + return io_ctx_impl->client->trunc(*io_ctx_impl, obj, size); +} + +int librados::IoCtx::mapext(const std::string& oid, uint64_t off, size_t len, + std::map& m) +{ + object_t obj(oid); + return io_ctx_impl->client->mapext(*io_ctx_impl, oid, off, len, m); +} + +int librados::IoCtx::sparse_read(const std::string& oid, std::map& m, + bufferlist& bl, size_t len, uint64_t off) +{ + object_t obj(oid); + return io_ctx_impl->client->sparse_read(*io_ctx_impl, oid, m, bl, len, off); +} + +int librados::IoCtx::getxattr(const std::string& oid, const char *name, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->getxattr(*io_ctx_impl, obj, name, bl); +} + +int librados::IoCtx::getxattrs(const std::string& oid, map& attrset) +{ + object_t obj(oid); + return io_ctx_impl->client->getxattrs(*io_ctx_impl, obj, attrset); +} + +int librados::IoCtx::setxattr(const std::string& oid, const char *name, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->setxattr(*io_ctx_impl, obj, name, bl); +} + +int librados::IoCtx::rmxattr(const std::string& oid, const char *name) +{ + object_t obj(oid); + return io_ctx_impl->client->rmxattr(*io_ctx_impl, obj, name); +} + +int librados::IoCtx::stat(const std::string& oid, uint64_t *psize, time_t *pmtime) +{ + object_t obj(oid); + return io_ctx_impl->client->stat(*io_ctx_impl, oid, psize, pmtime); +} + +int librados::IoCtx::exec(const std::string& oid, const char *cls, const char *method, + bufferlist& inbl, bufferlist& outbl) +{ + object_t obj(oid); + return io_ctx_impl->client->exec(*io_ctx_impl, obj, cls, method, inbl, outbl); +} + +int librados::IoCtx::tmap_update(const std::string& oid, bufferlist& cmdbl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_update(*io_ctx_impl, obj, cmdbl); +} + +int librados::IoCtx::tmap_put(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_put(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::tmap_get(const std::string& oid, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->tmap_get(*io_ctx_impl, obj, bl); +} + +int librados::IoCtx::omap_get_vals(const std::string& oid, + const std::string& start_after, + uint64_t max_return, + std::map *out_vals) +{ + ObjectReadOperation op; + int r; + op.omap_get_vals(start_after, max_return, out_vals, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_keys(const std::string& oid, + const std::string& start_after, + uint64_t max_return, + std::set *out_keys) +{ + ObjectReadOperation op; + int r; + op.omap_get_keys(start_after, max_return, out_keys, &r); + bufferlist bl; + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_header(const std::string& oid, + bufferlist *bl) +{ + ObjectReadOperation op; + int r; + op.omap_get_header(bl, &r); + bufferlist b; + int ret = operate(oid, &op, &b); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_get_vals_by_keys(const std::string& oid, + const std::set& keys, + std::map *vals) +{ + ObjectReadOperation op; + int r; + bufferlist bl; + op.omap_get_vals_by_keys(keys, vals, &r); + int ret = operate(oid, &op, &bl); + if (ret < 0) + return ret; + + return r; +} + +int librados::IoCtx::omap_set(const std::string& oid, + const map& m) +{ + ObjectWriteOperation op; + op.omap_set(m); + return operate(oid, &op); +} + +int librados::IoCtx::omap_set_header(const std::string& oid, + const bufferlist& bl) +{ + ObjectWriteOperation op; + op.omap_set_header(bl); + return operate(oid, &op); +} + +int librados::IoCtx::omap_clear(const std::string& oid) +{ + ObjectWriteOperation op; + op.omap_clear(); + return operate(oid, &op); +} + +int librados::IoCtx::omap_rm_keys(const std::string& oid, + const std::set& keys) +{ + ObjectWriteOperation op; + op.omap_rm_keys(keys); + return operate(oid, &op); +} + +int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o) +{ + object_t obj(oid); + return io_ctx_impl->client->operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, o->pmtime); +} + +int librados::IoCtx::operate(const std::string& oid, librados::ObjectReadOperation *o, bufferlist *pbl) +{ + object_t obj(oid); + return io_ctx_impl->client->operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, pbl); +} + +int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectWriteOperation *o) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_operate(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc); +} + +int librados::IoCtx::aio_operate(const std::string& oid, AioCompletion *c, librados::ObjectReadOperation *o, bufferlist *pbl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_operate_read(*io_ctx_impl, obj, (::ObjectOperation*)o->impl, c->pc, pbl); +} + +void librados::IoCtx::snap_set_read(snap_t seq) +{ + io_ctx_impl->set_snap_read(seq); +} + +int librados::IoCtx::selfmanaged_snap_set_write_ctx(snap_t seq, vector& snaps) +{ + vector snv; + snv.resize(snaps.size()); + for (unsigned i=0; iset_snap_write_context(seq, snv); +} + +int librados::IoCtx::snap_create(const char *snapname) +{ + return io_ctx_impl->client->snap_create(io_ctx_impl, snapname); +} + +int librados::IoCtx::snap_lookup(const char *name, snap_t *snapid) +{ + return io_ctx_impl->client->snap_lookup(io_ctx_impl, name, snapid); +} + +int librados::IoCtx::snap_get_stamp(snap_t snapid, time_t *t) +{ + return io_ctx_impl->client->snap_get_stamp(io_ctx_impl, snapid, t); +} + +int librados::IoCtx::snap_get_name(snap_t snapid, std::string *s) +{ + return io_ctx_impl->client->snap_get_name(io_ctx_impl, snapid, s); +} + +int librados::IoCtx::snap_remove(const char *snapname) +{ + return io_ctx_impl->client->snap_remove(io_ctx_impl, snapname); +} + +int librados::IoCtx::snap_list(std::vector *snaps) +{ + return io_ctx_impl->client->snap_list(io_ctx_impl, snaps); +} + +int librados::IoCtx::rollback(const std::string& oid, const char *snapname) +{ + return io_ctx_impl->client->rollback(io_ctx_impl, oid, snapname); +} + +int librados::IoCtx::selfmanaged_snap_create(uint64_t *snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_create(io_ctx_impl, snapid); +} + +int librados::IoCtx::selfmanaged_snap_remove(uint64_t snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_remove(io_ctx_impl, snapid); +} + +int librados::IoCtx::selfmanaged_snap_rollback(const std::string& oid, uint64_t snapid) +{ + return io_ctx_impl->client->selfmanaged_snap_rollback_object(io_ctx_impl, + oid, + io_ctx_impl->snapc, + snapid); +} + +librados::ObjectIterator librados::IoCtx::objects_begin() +{ + rados_list_ctx_t listh; + rados_objects_list_open(io_ctx_impl, &listh); + ObjectIterator iter((ObjListCtx*)listh); + iter.get_next(); + return iter; +} + +const librados::ObjectIterator& librados::IoCtx::objects_end() const +{ + return ObjectIterator::__EndObjectIterator; +} + +uint64_t librados::IoCtx::get_last_version() +{ + eversion_t ver = io_ctx_impl->client->last_version(*io_ctx_impl); + return ver.version; +} + +int librados::IoCtx::aio_read(const std::string& oid, librados::AioCompletion *c, + bufferlist *pbl, size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_read(*io_ctx_impl, oid, c->pc, pbl, len, off); +} + +int librados::IoCtx::aio_exec(const std::string& oid, librados::AioCompletion *c, const char *cls, const char *method, + bufferlist& inbl, bufferlist *outbl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_exec(*io_ctx_impl, obj, c->pc, cls, method, inbl, outbl); +} + +int librados::IoCtx::aio_sparse_read(const std::string& oid, librados::AioCompletion *c, + std::map *m, bufferlist *data_bl, + size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_sparse_read(*io_ctx_impl, oid, c->pc, + m, data_bl, len, off); +} + +int librados::IoCtx::aio_write(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl, size_t len, uint64_t off) +{ + return io_ctx_impl->client->aio_write(*io_ctx_impl, oid, c->pc, bl, len, off); +} + +int librados::IoCtx::aio_append(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl, size_t len) +{ + return io_ctx_impl->client->aio_append(*io_ctx_impl, oid, c->pc, bl, len); +} + +int librados::IoCtx::aio_write_full(const std::string& oid, librados::AioCompletion *c, + const bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->aio_write_full(*io_ctx_impl, obj, c->pc, bl); +} + +int librados::IoCtx::aio_flush() +{ + io_ctx_impl->flush_aio_writes(); + return 0; +} + +int librados::IoCtx::watch(const string& oid, uint64_t ver, uint64_t *cookie, + librados::WatchCtx *ctx) +{ + object_t obj(oid); + return io_ctx_impl->client->watch(*io_ctx_impl, obj, ver, cookie, ctx); +} + +int librados::IoCtx::unwatch(const string& oid, uint64_t handle) +{ + uint64_t cookie = handle; + object_t obj(oid); + return io_ctx_impl->client->unwatch(*io_ctx_impl, obj, cookie); +} + +int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl) +{ + object_t obj(oid); + return io_ctx_impl->client->notify(*io_ctx_impl, obj, ver, bl); +} + +void librados::IoCtx::set_notify_timeout(uint32_t timeout) +{ + io_ctx_impl->client->set_notify_timeout(*io_ctx_impl, timeout); +} + +void librados::IoCtx::set_assert_version(uint64_t ver) +{ + io_ctx_impl->client->set_assert_version(*io_ctx_impl, ver); +} + +void librados::IoCtx::set_assert_src_version(const std::string& oid, uint64_t ver) +{ + object_t obj(oid); + io_ctx_impl->client->set_assert_src_version(*io_ctx_impl, obj, ver); +} + +const std::string& librados::IoCtx::get_pool_name() const +{ + return io_ctx_impl->pool_name; +} + +void librados::IoCtx::locator_set_key(const string& key) +{ + io_ctx_impl->oloc.key = key; +} + +int64_t librados::IoCtx::get_id() +{ + return io_ctx_impl->get_id(); +} + +librados::config_t librados::IoCtx::cct() +{ + return (config_t)io_ctx_impl->client->cct; +} + +librados::IoCtx::IoCtx(IoCtxImpl *io_ctx_impl_) + : io_ctx_impl(io_ctx_impl_) +{ +} + +///////////////////////////// Rados ////////////////////////////// +void librados::Rados::version(int *major, int *minor, int *extra) +{ + rados_version(major, minor, extra); +} + +librados::Rados::Rados() : client(NULL) +{ +} + +librados::Rados::~Rados() +{ + shutdown(); +} + +int librados::Rados::init(const char * const id) +{ + return rados_create((rados_t *)&client, id); +} + +int librados::Rados::init_with_context(config_t cct_) +{ + return rados_create_with_context((rados_t *)&client, (rados_config_t)cct_); +} + +int librados::Rados::connect() +{ + return client->connect(); +} + +librados::config_t librados::Rados::cct() +{ + return (config_t)client->cct; +} + +void librados::Rados::shutdown() +{ + if (!client) + return; + client->shutdown(); + delete client; + client = NULL; +} + +int librados::Rados::conf_read_file(const char * const path) const +{ + return rados_conf_read_file((rados_t)client, path); +} + +int librados::Rados::conf_parse_argv(int argc, const char ** argv) const +{ + return rados_conf_parse_argv((rados_t)client, argc, argv); +} + +int librados::Rados::conf_parse_env(const char *name) const +{ + return rados_conf_parse_env((rados_t)client, name); +} + +int librados::Rados::conf_set(const char *option, const char *value) +{ + return rados_conf_set((rados_t)client, option, value); +} + +int librados::Rados::conf_get(const char *option, std::string &val) +{ + char *str; + md_config_t *conf = client->cct->_conf; + int ret = conf->get_val(option, &str, -1); + if (ret) + return ret; + val = str; + free(str); + return 0; +} + +int librados::Rados::pool_create(const char *name) +{ + string str(name); + return client->pool_create(str); +} + +int librados::Rados::pool_create(const char *name, uint64_t auid) +{ + string str(name); + return client->pool_create(str, auid); +} + +int librados::Rados::pool_create(const char *name, uint64_t auid, __u8 crush_rule) +{ + string str(name); + return client->pool_create(str, auid, crush_rule); +} + +int librados::Rados::pool_create_async(const char *name, PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc); +} + +int librados::Rados::pool_create_async(const char *name, uint64_t auid, PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc, auid); +} + +int librados::Rados::pool_create_async(const char *name, uint64_t auid, __u8 crush_rule, + PoolAsyncCompletion *c) +{ + string str(name); + return client->pool_create_async(str, c->pc, auid, crush_rule); +} + +int librados::Rados::pool_delete(const char *name) +{ + return client->pool_delete(name); +} + +int librados::Rados::pool_delete_async(const char *name, PoolAsyncCompletion *c) +{ + return client->pool_delete_async(name, c->pc); +} + +int librados::Rados::pool_list(std::list& v) +{ + return client->pool_list(v); +} + +int64_t librados::Rados::pool_lookup(const char *name) +{ + return client->lookup_pool(name); +} + +int librados::Rados::ioctx_create(const char *name, IoCtx &io) +{ + rados_ioctx_t p; + int ret = rados_ioctx_create((rados_t)client, name, &p); + if (ret) + return ret; + io.io_ctx_impl = (IoCtxImpl*)p; + return 0; +} + +int librados::Rados::get_pool_stats(std::list& v, std::map& result) +{ + string category; + return get_pool_stats(v, category, result); +} + +int librados::Rados::get_pool_stats(std::list& v, string& category, + std::map& result) +{ + map rawresult; + int r = client->get_pool_stats(v, rawresult); + for (map::iterator p = rawresult.begin(); + p != rawresult.end(); + p++) { + stats_map& c = result[p->first]; + + string cat; + vector cats; + + if (!category.size()) { + cats.push_back(cat); + map::iterator iter; + for (iter = p->second.stats.cat_sum.begin(); iter != p->second.stats.cat_sum.end(); ++iter) { + cats.push_back(iter->first); + } + } else { + cats.push_back(category); + } + + vector::iterator cat_iter; + for (cat_iter = cats.begin(); cat_iter != cats.end(); ++cat_iter) { + string& cur_category = *cat_iter; + object_stat_sum_t *sum; + + if (!cur_category.size()) { + sum = &p->second.stats.sum; + } else { + map::iterator iter = p->second.stats.cat_sum.find(cur_category); + if (iter == p->second.stats.cat_sum.end()) + continue; + sum = &iter->second; + } + + pool_stat_t& pv = c[cur_category]; + pv.num_kb = SHIFT_ROUND_UP(sum->num_bytes, 10); + pv.num_bytes = sum->num_bytes; + pv.num_objects = sum->num_objects; + pv.num_object_clones = sum->num_object_clones; + pv.num_object_copies = sum->num_object_copies; + pv.num_objects_missing_on_primary = sum->num_objects_missing_on_primary; + pv.num_objects_unfound = sum->num_objects_unfound; + pv.num_objects_degraded = sum->num_objects_degraded; + pv.num_rd = sum->num_rd; + pv.num_rd_kb = sum->num_rd_kb; + pv.num_wr = sum->num_wr; + pv.num_wr_kb = sum->num_wr_kb; + } + } + return r; +} + +int librados::Rados::cluster_stat(cluster_stat_t& result) +{ + ceph_statfs stats; + int r = client->get_fs_stats(stats); + result.kb = stats.kb; + result.kb_used = stats.kb_used; + result.kb_avail = stats.kb_avail; + result.num_objects = stats.num_objects; + return r; +} + +librados::PoolAsyncCompletion *librados::Rados::pool_async_create_completion() +{ + PoolAsyncCompletionImpl *c = RadosClient::pool_async_create_completion(); + return new PoolAsyncCompletion(c); +} + +librados::AioCompletion *librados::Rados::aio_create_completion() +{ + AioCompletionImpl *c = RadosClient::aio_create_completion(); + return new AioCompletion(c); +} + +librados::AioCompletion *librados::Rados::aio_create_completion(void *cb_arg, + callback_t cb_complete, + callback_t cb_safe) +{ + AioCompletionImpl *c = RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); + return new AioCompletion(c); +} + + +librados::ObjectOperation::ObjectOperation() +{ + impl = (ObjectOperationImpl *)new ::ObjectOperation; +} + +librados::ObjectOperation::~ObjectOperation() +{ + ::ObjectOperation *o = (::ObjectOperation *)impl; + delete o; +} + +///////////////////////////// C API ////////////////////////////// +extern "C" int rados_create(rados_t *pcluster, const char * const id) +{ + CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); + if (id) { + iparams.name.set(CEPH_ENTITY_TYPE_CLIENT, id); + } + + CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, 0); + cct->_conf->parse_env(); // environment variables override + cct->_conf->apply_changes(NULL); + + librados::RadosClient *radosp = new librados::RadosClient(cct); + *pcluster = (void *)radosp; + return 0; +} + +/* This function is intended for use by Ceph daemons. These daemons have + * already called global_init and want to use that particular configuration for + * their cluster. + */ +extern "C" int rados_create_with_context(rados_t *pcluster, rados_config_t cct_) +{ + CephContext *cct = (CephContext *)cct_; + librados::RadosClient *radosp = new librados::RadosClient(cct); + *pcluster = (void *)radosp; + return 0; +} + +extern "C" rados_config_t rados_cct(rados_t cluster) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return (rados_config_t)client->cct; +} + +extern "C" int rados_connect(rados_t cluster) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return client->connect(); +} + +extern "C" void rados_shutdown(rados_t cluster) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + radosp->shutdown(); + delete radosp; +} + +extern "C" void rados_version(int *major, int *minor, int *extra) +{ + if (major) + *major = LIBRADOS_VER_MAJOR; + if (minor) + *minor = LIBRADOS_VER_MINOR; + if (extra) + *extra = LIBRADOS_VER_EXTRA; +} + + +// -- config -- +extern "C" int rados_conf_read_file(rados_t cluster, const char *path_list) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + std::deque parse_errors; + int ret = conf->parse_config_files(path_list, &parse_errors, 0); + if (ret) + return ret; + conf->parse_env(); // environment variables override + + conf->apply_changes(NULL); + complain_about_parse_errors(client->cct, &parse_errors); + return 0; +} + +extern "C" int rados_conf_parse_argv(rados_t cluster, int argc, const char **argv) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + vector args; + argv_to_vec(argc, argv, args); + int ret = conf->parse_argv(args); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +extern "C" int rados_conf_parse_env(rados_t cluster, const char *env) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + vector args; + env_to_vec(args, env); + int ret = conf->parse_argv(args); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +extern "C" int rados_conf_set(rados_t cluster, const char *option, const char *value) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + int ret = conf->set_val(option, value); + if (ret) + return ret; + conf->apply_changes(NULL); + return 0; +} + +/* cluster info */ +extern "C" int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + + ceph_statfs stats; + int r = client->get_fs_stats(stats); + result->kb = stats.kb; + result->kb_used = stats.kb_used; + result->kb_avail = stats.kb_avail; + result->num_objects = stats.num_objects; + return r; +} + +extern "C" int rados_conf_get(rados_t cluster, const char *option, char *buf, size_t len) +{ + char *tmp = buf; + librados::RadosClient *client = (librados::RadosClient *)cluster; + md_config_t *conf = client->cct->_conf; + return conf->get_val(option, &tmp, len); +} + +extern "C" int64_t rados_pool_lookup(rados_t cluster, const char *name) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + return radosp->lookup_pool(name); +} + +extern "C" int rados_pool_list(rados_t cluster, char *buf, size_t len) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + std::list pools; + client->pool_list(pools); + + char *b = buf; + if (b) + memset(b, 0, len); + int needed = 0; + std::list::const_iterator i = pools.begin(); + std::list::const_iterator p_end = pools.end(); + for (; i != p_end; ++i) { + if (len <= 0) + break; + int rl = i->length() + 1; + strncat(b, i->c_str(), len - 2); // leave space for two NULLs + needed += rl; + len -= rl; + b += rl; + } + for (; i != p_end; ++i) { + int rl = i->length() + 1; + needed += rl; + } + return needed + 1; +} + +extern "C" int rados_ioctx_create(rados_t cluster, const char *name, rados_ioctx_t *io) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + int64_t poolid = radosp->lookup_pool(name); + if (poolid < 0) + return (int)poolid; + + librados::IoCtxImpl *ctx = new librados::IoCtxImpl(radosp, poolid, name, CEPH_NOSNAP); + if (!ctx) + return -ENOMEM; + *io = ctx; + ctx->get(); + return 0; +} + +extern "C" void rados_ioctx_destroy(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->put(); +} + +extern "C" int rados_ioctx_pool_stat(rados_ioctx_t io, struct rados_pool_stat_t *stats) +{ + librados::IoCtxImpl *io_ctx_impl = (librados::IoCtxImpl *)io; + list ls; + ls.push_back(io_ctx_impl->pool_name); + map rawresult; + + int err = io_ctx_impl->client->get_pool_stats(ls, rawresult); + if (err) + return err; + + ::pool_stat_t& r = rawresult[io_ctx_impl->pool_name]; + stats->num_kb = SHIFT_ROUND_UP(r.stats.sum.num_bytes, 10); + stats->num_bytes = r.stats.sum.num_bytes; + stats->num_objects = r.stats.sum.num_objects; + stats->num_object_clones = r.stats.sum.num_object_clones; + stats->num_object_copies = r.stats.sum.num_object_copies; + stats->num_objects_missing_on_primary = r.stats.sum.num_objects_missing_on_primary; + stats->num_objects_unfound = r.stats.sum.num_objects_unfound; + stats->num_objects_degraded = r.stats.sum.num_objects_degraded; + stats->num_rd = r.stats.sum.num_rd; + stats->num_rd_kb = r.stats.sum.num_rd_kb; + stats->num_wr = r.stats.sum.num_wr; + stats->num_wr_kb = r.stats.sum.num_wr_kb; + return 0; +} + +extern "C" rados_config_t rados_ioctx_cct(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return (rados_config_t)ctx->client->cct; +} + +extern "C" void rados_ioctx_snap_set_read(rados_ioctx_t io, rados_snap_t seq) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->set_snap_read((snapid_t)seq); +} + +extern "C" int rados_ioctx_selfmanaged_snap_set_write_ctx(rados_ioctx_t io, + rados_snap_t seq, rados_snap_t *snaps, int num_snaps) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + vector snv; + snv.resize(num_snaps); + for (int i=0; iset_snap_write_context((snapid_t)seq, snv); +} + +extern "C" int rados_write(rados_ioctx_t io, const char *o, const char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->write(*ctx, oid, bl, len, off); +} + +extern "C" int rados_append(rados_ioctx_t io, const char *o, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->append(*ctx, oid, bl, len); +} + +extern "C" int rados_write_full(rados_ioctx_t io, const char *o, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->write_full(*ctx, oid, bl); +} + +extern "C" int rados_clone_range(rados_ioctx_t io, const char *dst, uint64_t dst_off, + const char *src, uint64_t src_off, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t dst_oid(dst), src_oid(src); + return ctx->client->clone_range(*ctx, dst_oid, dst_off, src_oid, src_off, len); +} + +extern "C" int rados_trunc(rados_ioctx_t io, const char *o, uint64_t size) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->trunc(*ctx, oid, size); +} + +extern "C" int rados_remove(rados_ioctx_t io, const char *o) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->remove(*ctx, oid); +} + +extern "C" int rados_read(rados_ioctx_t io, const char *o, char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + int ret; + object_t oid(o); + + bufferlist bl; + bufferptr bp = buffer::create_static(len, buf); + bl.push_back(bp); + + ret = ctx->client->read(*ctx, oid, bl, len, off); + if (ret >= 0) { + if (bl.length() > len) + return -ERANGE; + if (bl.c_str() != buf) + bl.copy(0, bl.length(), buf); + ret = bl.length(); // hrm :/ + } + + return ret; +} + +extern "C" uint64_t rados_get_last_version(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + eversion_t ver = ctx->client->last_version(*ctx); + return ver.version; +} + +extern "C" int rados_pool_create(rados_t cluster, const char *name) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname); +} + +extern "C" int rados_pool_create_with_auid(rados_t cluster, const char *name, + uint64_t auid) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, auid); +} + +extern "C" int rados_pool_create_with_crush_rule(rados_t cluster, const char *name, + __u8 crush_rule_num) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, 0, crush_rule_num); +} + +extern "C" int rados_pool_create_with_all(rados_t cluster, const char *name, + uint64_t auid, __u8 crush_rule_num) +{ + librados::RadosClient *radosp = (librados::RadosClient *)cluster; + string sname(name); + return radosp->pool_create(sname, auid, crush_rule_num); +} + +extern "C" int rados_pool_delete(rados_t cluster, const char *pool_name) +{ + librados::RadosClient *client = (librados::RadosClient *)cluster; + return client->pool_delete(pool_name); +} + +extern "C" int rados_ioctx_pool_set_auid(rados_ioctx_t io, uint64_t auid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->pool_change_auid(ctx, auid); +} + +extern "C" int rados_ioctx_pool_get_auid(rados_ioctx_t io, uint64_t *auid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->pool_get_auid(ctx, (unsigned long long *)auid); +} + +extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + if (key) + ctx->oloc.key = key; + else + ctx->oloc.key = ""; +} + +extern "C" int64_t rados_ioctx_get_id(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->get_id(); +} +// snaps + +extern "C" int rados_ioctx_snap_create(rados_ioctx_t io, const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_create(ctx, snapname); +} + +extern "C" int rados_ioctx_snap_remove(rados_ioctx_t io, const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_remove(ctx, snapname); +} + +extern "C" int rados_rollback(rados_ioctx_t io, const char *oid, + const char *snapname) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->rollback(ctx, oid, snapname); +} + +extern "C" int rados_ioctx_selfmanaged_snap_create(rados_ioctx_t io, + uint64_t *snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_create(ctx, snapid); +} + +extern "C" int rados_ioctx_selfmanaged_snap_remove(rados_ioctx_t io, + uint64_t snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_remove(ctx, snapid); +} + +extern "C" int rados_ioctx_selfmanaged_snap_rollback(rados_ioctx_t io, + const char *oid, + uint64_t snapid) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->selfmanaged_snap_rollback_object(ctx, oid, ctx->snapc, snapid); +} + +extern "C" int rados_ioctx_snap_list(rados_ioctx_t io, rados_snap_t *snaps, + int maxlen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + vector snapvec; + int r = ctx->client->snap_list(ctx, &snapvec); + if (r < 0) + return r; + if ((int)snapvec.size() <= maxlen) { + for (unsigned i=0; iclient->snap_lookup(ctx, name, (uint64_t *)id); +} + +extern "C" int rados_ioctx_snap_get_name(rados_ioctx_t io, rados_snap_t id, + char *name, int maxlen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + std::string sname; + int r = ctx->client->snap_get_name(ctx, id, &sname); + if (r < 0) + return r; + if ((int)sname.length() >= maxlen) + return -ERANGE; + strncpy(name, sname.c_str(), maxlen); + return 0; +} + +extern "C" int rados_ioctx_snap_get_stamp(rados_ioctx_t io, rados_snap_t id, time_t *t) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + return ctx->client->snap_get_stamp(ctx, id, t); +} + +extern "C" int rados_getxattr(rados_ioctx_t io, const char *o, const char *name, + char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + int ret; + object_t oid(o); + bufferlist bl; + ret = ctx->client->getxattr(*ctx, oid, name, bl); + if (ret >= 0) { + if (bl.length() > len) + return -ERANGE; + bl.copy(0, bl.length(), buf); + ret = bl.length(); + } + + return ret; +} + +class RadosXattrsIter { +public: + RadosXattrsIter() + : val(NULL) + { + i = attrset.end(); + } + ~RadosXattrsIter() + { + free(val); + val = NULL; + } + std::map attrset; + std::map::iterator i; + char *val; +}; + +extern "C" int rados_getxattrs(rados_ioctx_t io, const char *oid, + rados_xattrs_iter_t *iter) +{ + RadosXattrsIter *it = new RadosXattrsIter(); + if (!it) + return -ENOMEM; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t obj(oid); + int ret = ctx->client->getxattrs(*ctx, obj, it->attrset); + if (ret) { + delete it; + return ret; + } + it->i = it->attrset.begin(); + + RadosXattrsIter **iret = (RadosXattrsIter**)iter; + *iret = it; + *iter = it; + return 0; +} + +extern "C" int rados_getxattrs_next(rados_xattrs_iter_t iter, + const char **name, const char **val, size_t *len) +{ + RadosXattrsIter *it = (RadosXattrsIter*)iter; + if (it->i == it->attrset.end()) { + *name = NULL; + *val = NULL; + *len = 0; + return 0; + } + free(it->val); + const std::string &s(it->i->first); + *name = s.c_str(); + bufferlist &bl(it->i->second); + size_t bl_len = bl.length(); + it->val = (char*)malloc(bl_len); + if (!it->val) + return -ENOMEM; + memcpy(it->val, bl.c_str(), bl_len); + *val = it->val; + *len = bl_len; + ++it->i; + return 0; +} + +extern "C" void rados_getxattrs_end(rados_xattrs_iter_t iter) +{ + RadosXattrsIter *it = (RadosXattrsIter*)iter; + delete it; +} + +extern "C" int rados_setxattr(rados_ioctx_t io, const char *o, const char *name, const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->setxattr(*ctx, oid, name, bl); +} + +extern "C" int rados_rmxattr(rados_ioctx_t io, const char *o, const char *name) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->rmxattr(*ctx, oid, name); +} + +extern "C" int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->stat(*ctx, oid, psize, pmtime); +} + +extern "C" int rados_tmap_update(rados_ioctx_t io, const char *o, const char *cmdbuf, size_t cmdbuflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist cmdbl; + cmdbl.append(cmdbuf, cmdbuflen); + return ctx->client->tmap_update(*ctx, oid, cmdbl); +} + +extern "C" int rados_tmap_put(rados_ioctx_t io, const char *o, const char *buf, size_t buflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, buflen); + return ctx->client->tmap_put(*ctx, oid, bl); +} + +extern "C" int rados_tmap_get(rados_ioctx_t io, const char *o, char *buf, size_t buflen) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + int r = ctx->client->tmap_get(*ctx, oid, bl); + if (r < 0) + return r; + if (bl.length() > buflen) + return -ERANGE; + bl.copy(0, bl.length(), buf); + return bl.length(); +} + +extern "C" int rados_exec(rados_ioctx_t io, const char *o, const char *cls, const char *method, + const char *inbuf, size_t in_len, char *buf, size_t out_len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist inbl, outbl; + int ret; + inbl.append(inbuf, in_len); + ret = ctx->client->exec(*ctx, oid, cls, method, inbl, outbl); + if (ret >= 0) { + if (outbl.length()) { + if (outbl.length() > out_len) + return -ERANGE; + outbl.copy(0, outbl.length(), buf); + ret = outbl.length(); // hrm :/ + } + } + return ret; +} + +/* list objects */ + +extern "C" int rados_objects_list_open(rados_ioctx_t io, rados_list_ctx_t *listh) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + Objecter::ListContext *h = new Objecter::ListContext; + h->pool_id = ctx->poolid; + h->pool_snap_seq = ctx->snap_seq; + *listh = (void *)new librados::ObjListCtx(ctx, h); + return 0; +} + +extern "C" void rados_objects_list_close(rados_list_ctx_t h) +{ + librados::ObjListCtx *lh = (librados::ObjListCtx *)h; + delete lh; +} + +extern "C" int rados_objects_list_next(rados_list_ctx_t listctx, const char **entry, const char **key) +{ + librados::ObjListCtx *lh = (librados::ObjListCtx *)listctx; + Objecter::ListContext *h = lh->lc; + int ret; + + // if the list is non-empty, this method has been called before + if (!h->list.empty()) + // so let's kill the previously-returned object + h->list.pop_front(); + + if (h->list.empty()) { + ret = lh->ctx->client->list(lh->lc, RADOS_LIST_MAX_ENTRIES); + if (ret < 0) + return ret; + if (h->list.empty()) + return -ENOENT; + } + + *entry = h->list.front().first.name.c_str(); + + if (key) { + if (h->list.front().second.size()) + *key = h->list.front().second.c_str(); + else + *key = NULL; + } + return 0; +} + + + +// ------------------------- +// aio + +extern "C" int rados_aio_create_completion(void *cb_arg, rados_callback_t cb_complete, + rados_callback_t cb_safe, rados_completion_t *pc) +{ + *pc = librados::RadosClient::aio_create_completion(cb_arg, cb_complete, cb_safe); + return 0; +} + +extern "C" int rados_aio_wait_for_complete(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->wait_for_complete(); +} + +extern "C" int rados_aio_wait_for_safe(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->wait_for_safe(); +} + +extern "C" int rados_aio_is_complete(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->is_complete(); +} + +extern "C" int rados_aio_is_safe(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->is_safe(); +} + +extern "C" int rados_aio_get_return_value(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->get_return_value(); +} + +extern "C" uint64_t rados_aio_get_version(rados_completion_t c) +{ + return ((librados::AioCompletionImpl*)c)->get_version(); +} + +extern "C" void rados_aio_release(rados_completion_t c) +{ + ((librados::AioCompletionImpl*)c)->put(); +} + +extern "C" int rados_aio_read(rados_ioctx_t io, const char *o, + rados_completion_t completion, + char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->aio_read(*ctx, oid, + (librados::AioCompletionImpl*)completion, buf, len, off); +} + +extern "C" int rados_aio_write(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len, uint64_t off) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_write(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl, len, off); +} + +extern "C" int rados_aio_append(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_append(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl, len); +} + +extern "C" int rados_aio_write_full(rados_ioctx_t io, const char *o, + rados_completion_t completion, + const char *buf, size_t len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + bl.append(buf, len); + return ctx->client->aio_write_full(*ctx, oid, + (librados::AioCompletionImpl*)completion, bl); +} + +extern "C" int rados_aio_flush(rados_ioctx_t io) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + ctx->flush_aio_writes(); + return 0; +} + +struct C_WatchCB : public librados::WatchCtx { + rados_watchcb_t wcb; + void *arg; + C_WatchCB(rados_watchcb_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {} + void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { + wcb(opcode, ver, arg); + } +}; + +int rados_watch(rados_ioctx_t io, const char *o, uint64_t ver, uint64_t *handle, + rados_watchcb_t watchcb, void *arg) +{ + uint64_t *cookie = handle; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + C_WatchCB *wc = new C_WatchCB(watchcb, arg); + return ctx->client->watch(*ctx, oid, ver, cookie, wc); +} + +int rados_unwatch(rados_ioctx_t io, const char *o, uint64_t handle) +{ + uint64_t cookie = handle; + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + return ctx->client->unwatch(*ctx, oid, cookie); +} + +int rados_notify(rados_ioctx_t io, const char *o, uint64_t ver, const char *buf, int buf_len) +{ + librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io; + object_t oid(o); + bufferlist bl; + if (buf) { + bufferptr p = buffer::create(buf_len); + memcpy(p.c_str(), buf, buf_len); + bl.push_back(p); + } + return ctx->client->notify(*ctx, oid, ver, bl); +} -- 2.39.5