From f2eb0d758c052fe0c95e57a0a67452e91c568378 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 14 Sep 2015 14:19:34 -0700 Subject: [PATCH] rgw: move code around move code from rgw_sync.cc to rgw_cr_rados.{h,cc} Signed-off-by: Yehuda Sadeh --- src/CMakeLists.txt | 2 +- src/rgw/Makefile.am | 2 + src/rgw/rgw_cr_rados.cc | 349 ++++++++++++++++++++++++ src/rgw/rgw_cr_rados.h | 352 +++++++++++++++++++++++++ src/rgw/rgw_sync.cc | 568 +--------------------------------------- 5 files changed, 705 insertions(+), 568 deletions(-) create mode 100644 src/rgw/rgw_cr_rados.cc create mode 100644 src/rgw/rgw_cr_rados.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index cee8483dcf39a..f01f98dee9c40 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1165,7 +1165,7 @@ if(${WITH_RADOSGW}) rgw/rgw_sync.cc rgw/rgw_dencoder.cc rgw/rgw_coroutine.cc - rgw/rgw_object_expirer_core.cc + rgw/rgw_cr_rados.cc rgw/rgw_object_expirer_core.cc rgw/rgw_website.cc rgw/rgw_xml_enc.cc) diff --git a/src/rgw/Makefile.am b/src/rgw/Makefile.am index 72441cb790c3f..b05a6499b97c0 100644 --- a/src/rgw/Makefile.am +++ b/src/rgw/Makefile.am @@ -26,6 +26,7 @@ librgw_la_SOURCES = \ rgw/rgw_acl_swift.cc \ rgw/rgw_client_io.cc \ rgw/rgw_coroutine.cc \ + rgw/rgw_cr_rados.cc \ rgw/rgw_fcgi.cc \ rgw/rgw_xml.cc \ rgw/rgw_usage.cc \ @@ -143,6 +144,7 @@ noinst_HEADERS += \ rgw/rgw_acl_swift.h \ rgw/rgw_client_io.h \ rgw/rgw_coroutine.h \ + rgw/rgw_cr_rados.h \ rgw/rgw_fcgi.h \ rgw/rgw_xml.h \ rgw/rgw_basic_types.h \ diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc new file mode 100644 index 0000000000000..d66265b6d6e43 --- /dev/null +++ b/src/rgw/rgw_cr_rados.cc @@ -0,0 +1,349 @@ +#include "rgw_rados.h" +#include "rgw_coroutine.h" +#include "rgw_cr_rados.h" + +#include "cls/lock/cls_lock_client.h" + +#include +#include + + +#define dout_subsys ceph_subsys_rgw + +bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) { + processor->m_req_queue.push_back(req); + dout(20) << "enqueued request req=" << hex << req << dec << dendl; + _dump_queue(); + return true; +} + +bool RGWAsyncRadosProcessor::RGWWQ::_empty() { + return processor->m_req_queue.empty(); +} + +RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() { + if (processor->m_req_queue.empty()) + return NULL; + RGWAsyncRadosRequest *req = processor->m_req_queue.front(); + processor->m_req_queue.pop_front(); + dout(20) << "dequeued request req=" << hex << req << dec << dendl; + _dump_queue(); + return req; +} + +void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req) { + processor->handle_request(req); + processor->req_throttle.put(1); +} + +void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() { + if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) { + return; + } + deque::iterator iter; + if (processor->m_req_queue.empty()) { + dout(20) << "RGWWQ: empty" << dendl; + return; + } + dout(20) << "RGWWQ:" << dendl; + for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) { + dout(20) << "req: " << hex << *iter << dec << dendl; + } +} + +RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads) + : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads), + req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2), + req_wq(this, g_conf->rgw_op_thread_timeout, + g_conf->rgw_op_thread_suicide_timeout, &m_tp) { +} + +void RGWAsyncRadosProcessor::start() { + m_tp.start(); +} + +void RGWAsyncRadosProcessor::stop() { + m_tp.drain(&req_wq); + m_tp.stop(); +} + +void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) { + req->send_request(); +} + +void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) { + req_throttle.get(1); + req_wq.queue(req); +} + +int RGWAsyncGetSystemObj::_send_request() +{ + return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL); +} + +RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx), + objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), + ofs(_ofs), end(_end) +{ +} + + +int RGWAsyncPutSystemObj::_send_request() +{ + return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive, + NULL, attrs, objv_tracker, mtime); +} + +RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive, + bufferlist& _bl, time_t _mtime) : RGWAsyncRadosRequest(cn), store(_store), + objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive), + bl(_bl), mtime(_mtime) +{ +} + + +RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid) + : RGWConsumerCR(_store->ctx()), async_rados(_async_rados), + store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) +{ +} + +int RGWAsyncLockSystemObj::_send_request() +{ + librados::IoCtx ioctx; + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */ + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl; + return r; + } + + rados::cls::lock::Lock l(lock_name); + utime_t duration(duration_secs, 0); + l.set_duration(duration); + l.set_cookie(cookie); + + return l.lock_exclusive(&ioctx, obj.get_object()); +} + +RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store), + obj(_obj), + lock_name(_name), + cookie(_cookie), + duration_secs(_duration_secs) +{ +} + +int RGWAsyncUnlockSystemObj::_send_request() +{ + librados::IoCtx ioctx; + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */ + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl; + return r; + } + + rados::cls::lock::Lock l(lock_name); + + l.set_cookie(cookie); + + return l.unlock(&ioctx, obj.get_object()); +} + +RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store), + obj(_obj), + lock_name(_name), cookie(_cookie) +{ +} + + +RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + map& _entries) : RGWSimpleCoroutine(_store->ctx()), + store(_store), + entries(_entries), + pool(_pool), oid(_oid), cn(NULL) +{ +} + +RGWRadosSetOmapKeysCR::~RGWRadosSetOmapKeysCR() +{ + cn->put(); +} + +int RGWRadosSetOmapKeysCR::send_request() +{ + librados::IoCtx ioctx; + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */ + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl; + return r; + } + + librados::ObjectWriteOperation op; + op.omap_set(entries); + + cn = stack->create_completion_notifier(); + cn->get(); + return ioctx.aio_operate(oid, cn->completion(), &op); +} + +int RGWRadosSetOmapKeysCR::request_complete() +{ + return cn->completion()->get_return_value(); +} + +RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + const string& _marker, + map *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()), + store(_store), + marker(_marker), + entries(_entries), max_entries(_max_entries), rval(0), + pool(_pool), oid(_oid), cn(NULL) +{ +} + +RGWRadosGetOmapKeysCR::~RGWRadosGetOmapKeysCR() +{ +} + +int RGWRadosGetOmapKeysCR::send_request() { + librados::Rados *rados = store->get_rados_handle(); + int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */ + if (r < 0) { + lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl; + return r; + } + + librados::ObjectReadOperation op; + op.omap_get_vals(marker, max_entries, entries, &rval); + + cn = stack->create_completion_notifier(); + return ioctx.aio_operate(oid, cn->completion(), &op, NULL); +} + +RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, const string& _lock_name, + const string& _cookie, + uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + lock_name(_lock_name), + cookie(_cookie), + duration(_duration), + pool(_pool), oid(_oid), + req(NULL) +{ +} + +RGWSimpleRadosLockCR::~RGWSimpleRadosLockCR() +{ + delete req; +} + +int RGWSimpleRadosLockCR::send_request() +{ + rgw_obj obj = rgw_obj(pool, oid); + req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(), + store, NULL, obj, lock_name, cookie, duration); + async_rados->queue(req); + return 0; +} + +int RGWSimpleRadosLockCR::request_complete() +{ + return req->get_ret_status(); +} + +RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, const string& _lock_name, + const string& _cookie) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + lock_name(_lock_name), + cookie(_cookie), + pool(_pool), oid(_oid), + req(NULL) +{ +} + +RGWSimpleRadosUnlockCR::~RGWSimpleRadosUnlockCR() +{ + delete req; +} + +int RGWSimpleRadosUnlockCR::send_request() +{ + rgw_obj obj = rgw_obj(pool, oid); + req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(), + store, NULL, obj, lock_name, cookie); + async_rados->queue(req); + return 0; +} + +int RGWSimpleRadosUnlockCR::request_complete() +{ + return req->get_ret_status(); +} + + +#define OMAP_APPEND_MAX_ENTRIES 100 +int RGWOmapAppend::operate() { + reenter(this) { + for (;;) { + if (!has_product() && going_down) { + break; + } + yield wait_for_product(); + yield { + string entry; + while (consume(&entry)) { + entries[entry] = bufferlist(); + if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) { + break; + } + } + if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) { + call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries)); + entries.clear(); + } + } + if (get_ret_status() < 0) { + return set_state(RGWCoroutine_Error); + } + } + /* done with coroutine */ + return set_state(RGWCoroutine_Done); + } + return 0; +} + +void RGWOmapAppend::flush_pending() { + receive(pending_entries); + num_pending_entries = 0; +} + +void RGWOmapAppend::append(const string& s) { + pending_entries.push_back(s); + if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) { + flush_pending(); + } +} + +void RGWOmapAppend::finish() { + going_down = true; + flush_pending(); + set_sleeping(false); +} + + diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h new file mode 100644 index 0000000000000..ab1844da61c62 --- /dev/null +++ b/src/rgw/rgw_cr_rados.h @@ -0,0 +1,352 @@ +#ifndef CEPH_RGW_CR_RADOS_H +#define CEPH_RGW_CR_RADOS_H + +#include "rgw_coroutine.h" +#include "common/WorkQueue.h" +#include "common/Throttle.h" + +class RGWAsyncRadosRequest { + RGWAioCompletionNotifier *notifier; + + void *user_info; + int retcode; + +protected: + virtual int _send_request() = 0; +public: + RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {} + virtual ~RGWAsyncRadosRequest() {} + + void send_request() { + retcode = _send_request(); + notifier->cb(); + } + + int get_ret_status() { return retcode; } +}; + + +class RGWAsyncRadosProcessor { + deque m_req_queue; +protected: + RGWRados *store; + ThreadPool m_tp; + Throttle req_throttle; + + struct RGWWQ : public ThreadPool::WorkQueue { + RGWAsyncRadosProcessor *processor; + RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) + : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} + + bool _enqueue(RGWAsyncRadosRequest *req); + void _dequeue(RGWAsyncRadosRequest *req) { + assert(0); + } + bool _empty(); + RGWAsyncRadosRequest *_dequeue(); + using ThreadPool::WorkQueue::_process; + void _process(RGWAsyncRadosRequest *req); + void _dump_queue(); + void _clear() { + assert(processor->m_req_queue.empty()); + } + } req_wq; + +public: + RGWAsyncRadosProcessor(RGWRados *_store, int num_threads); + ~RGWAsyncRadosProcessor() {} + void start(); + void stop(); + void handle_request(RGWAsyncRadosRequest *req); + void queue(RGWAsyncRadosRequest *req); +}; + + +class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjectCtx *obj_ctx; + RGWRados::SystemObject::Read::GetObjState read_state; + RGWObjVersionTracker *objv_tracker; + rgw_obj obj; + bufferlist *pbl; + off_t ofs; + off_t end; +protected: + int _send_request(); +public: + RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + bufferlist *_pbl, off_t _ofs, off_t _end); +}; + +class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + RGWObjVersionTracker *objv_tracker; + rgw_obj obj; + bool exclusive; + bufferlist bl; + map attrs; + time_t mtime; + +protected: + int _send_request(); +public: + RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive, + bufferlist& _bl, time_t _mtime = 0); +}; + +class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_obj obj; + string lock_name; + string cookie; + uint32_t duration_secs; + +protected: + int _send_request(); +public: + RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + const string& _name, const string& _cookie, uint32_t _duration_secs); +}; + +class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest { + RGWRados *store; + rgw_obj obj; + string lock_name; + string cookie; + +protected: + int _send_request(); +public: + RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, + const string& _name, const string& _cookie); +}; + + +template +class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWObjectCtx& obj_ctx; + bufferlist bl; + + rgw_bucket pool; + string oid; + + T *result; + + RGWAsyncGetSystemObj *req; + +public: + RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + RGWObjectCtx& _obj_ctx, + rgw_bucket& _pool, const string& _oid, + T *_result) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), store(_store), + obj_ctx(_obj_ctx), + pool(_pool), oid(_oid), + result(_result), + req(NULL) { } + + ~RGWSimpleRadosReadCR() { + delete req; + } + + int send_request(); + int request_complete(); + + virtual int handle_data(T& data) { + return 0; + } +}; + +template +int RGWSimpleRadosReadCR::send_request() +{ + rgw_obj obj = rgw_obj(pool, oid); + req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(), + store, &obj_ctx, NULL, + obj, + &bl, 0, -1); + async_rados->queue(req); + return 0; +} + +template +int RGWSimpleRadosReadCR::request_complete() +{ + int ret = req->get_ret_status(); + retcode = ret; + if (ret != -ENOENT) { + if (ret < 0) { + return ret; + } + bufferlist::iterator iter = bl.begin(); + try { + ::decode(*result, iter); + } catch (buffer::error& err) { + return -EIO; + } + } else { + *result = T(); + } + + return handle_data(*result); +} + +template +class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + bufferlist bl; + + rgw_bucket pool; + string oid; + + RGWAsyncPutSystemObj *req; + +public: + RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + const T& _data) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + pool(_pool), oid(_oid), + req(NULL) { + ::encode(_data, bl); + } + + ~RGWSimpleRadosWriteCR() { + delete req; + } + + int send_request() { + rgw_obj obj = rgw_obj(pool, oid); + req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(), + store, NULL, obj, false, bl); + async_rados->queue(req); + return 0; + } + + int request_complete() { + return req->get_ret_status(); + } +}; + +class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { + RGWRados *store; + map entries; + + rgw_bucket pool; + string oid; + + RGWAioCompletionNotifier *cn; + +public: + RGWRadosSetOmapKeysCR(RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + map& _entries); + ~RGWRadosSetOmapKeysCR(); + + int send_request(); + int request_complete(); +}; + +class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { + RGWRados *store; + + string marker; + map *entries; + int max_entries; + + int rval; + librados::IoCtx ioctx; + + rgw_bucket pool; + string oid; + + RGWAioCompletionNotifier *cn; + +public: + RGWRadosGetOmapKeysCR(RGWRados *_store, + rgw_bucket& _pool, const string& _oid, + const string& _marker, + map *_entries, int _max_entries); + ~RGWRadosGetOmapKeysCR(); + + int send_request(); + + int request_complete() { + return rval; + } +}; + +class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string lock_name; + string cookie; + uint32_t duration; + + rgw_bucket pool; + string oid; + + RGWAsyncLockSystemObj *req; + +public: + RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, const string& _lock_name, + const string& _cookie, + uint32_t _duration); + ~RGWSimpleRadosLockCR(); + + int send_request(); + int request_complete(); +}; + +class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string lock_name; + string cookie; + + rgw_bucket pool; + string oid; + + RGWAsyncUnlockSystemObj *req; + +public: + RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + rgw_bucket& _pool, const string& _oid, const string& _lock_name, + const string& _cookie); + ~RGWSimpleRadosUnlockCR(); + + int send_request(); + int request_complete(); +}; + +class RGWOmapAppend : public RGWConsumerCR { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + + rgw_bucket pool; + string oid; + + bool going_down; + + int num_pending_entries; + list pending_entries; + + map entries; +public: + RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid); + int operate(); + void flush_pending(); + void append(const string& s); + void finish(); +}; + +#endif diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 26769f3bfb305..15f72d97fc76f 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -10,6 +10,7 @@ #include "rgw_metadata.h" #include "rgw_rest_conn.h" #include "rgw_tools.h" +#include "rgw_cr_rados.h" #include "cls/lock/cls_lock_client.h" @@ -60,217 +61,6 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -class RGWAsyncRadosRequest { - RGWAioCompletionNotifier *notifier; - - void *user_info; - int retcode; - -protected: - virtual int _send_request() = 0; -public: - RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {} - virtual ~RGWAsyncRadosRequest() {} - - void send_request() { - retcode = _send_request(); - notifier->cb(); - } - - int get_ret_status() { return retcode; } -}; - - -class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { - RGWRados *store; - RGWObjectCtx *obj_ctx; - RGWRados::SystemObject::Read::GetObjState read_state; - RGWObjVersionTracker *objv_tracker; - rgw_obj obj; - bufferlist *pbl; - off_t ofs; - off_t end; -protected: - int _send_request() { - return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL); - } -public: - RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, - RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, - bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx), - objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl), - ofs(_ofs), end(_end) { - } -}; - -class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { - RGWRados *store; - RGWObjVersionTracker *objv_tracker; - rgw_obj obj; - bool exclusive; - bufferlist bl; - map attrs; - time_t mtime; - -protected: - int _send_request() { - return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive, - NULL, attrs, objv_tracker, mtime); - } -public: - RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, - RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive, - bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store), - objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive), - bl(_bl), mtime(_mtime) {} -}; - -class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { - RGWRados *store; - rgw_obj obj; - string lock_name; - string cookie; - uint32_t duration_secs; - -protected: - int _send_request() { - librados::IoCtx ioctx; - librados::Rados *rados = store->get_rados_handle(); - int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */ - if (r < 0) { - lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl; - return r; - } - - rados::cls::lock::Lock l(lock_name); - utime_t duration(duration_secs, 0); - l.set_duration(duration); - l.set_cookie(cookie); - - return l.lock_exclusive(&ioctx, obj.get_object()); - } -public: - RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, - RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, - const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store), - obj(_obj), - lock_name(_name), - cookie(_cookie), - duration_secs(_duration_secs) {} -}; - -class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest { - RGWRados *store; - rgw_obj obj; - string lock_name; - string cookie; - -protected: - int _send_request() { - librados::IoCtx ioctx; - librados::Rados *rados = store->get_rados_handle(); - int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */ - if (r < 0) { - lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl; - return r; - } - - rados::cls::lock::Lock l(lock_name); - - l.set_cookie(cookie); - - return l.unlock(&ioctx, obj.get_object()); - } -public: - RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, - RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, - const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store), - obj(_obj), - lock_name(_name), cookie(_cookie) {} -}; - - -class RGWAsyncRadosProcessor { - deque m_req_queue; -protected: - RGWRados *store; - ThreadPool m_tp; - Throttle req_throttle; - - struct RGWWQ : public ThreadPool::WorkQueue { - RGWAsyncRadosProcessor *processor; - RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) - : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} - - bool _enqueue(RGWAsyncRadosRequest *req) { - processor->m_req_queue.push_back(req); - dout(20) << "enqueued request req=" << hex << req << dec << dendl; - _dump_queue(); - return true; - } - void _dequeue(RGWAsyncRadosRequest *req) { - assert(0); - } - bool _empty() { - return processor->m_req_queue.empty(); - } - RGWAsyncRadosRequest *_dequeue() { - if (processor->m_req_queue.empty()) - return NULL; - RGWAsyncRadosRequest *req = processor->m_req_queue.front(); - processor->m_req_queue.pop_front(); - dout(20) << "dequeued request req=" << hex << req << dec << dendl; - _dump_queue(); - return req; - } - using ThreadPool::WorkQueue::_process; - void _process(RGWAsyncRadosRequest *req) { - processor->handle_request(req); - processor->req_throttle.put(1); - } - void _dump_queue() { - if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) { - return; - } - deque::iterator iter; - if (processor->m_req_queue.empty()) { - dout(20) << "RGWWQ: empty" << dendl; - return; - } - dout(20) << "RGWWQ:" << dendl; - for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) { - dout(20) << "req: " << hex << *iter << dec << dendl; - } - } - void _clear() { - assert(processor->m_req_queue.empty()); - } - } req_wq; - -public: - RGWAsyncRadosProcessor(RGWRados *_store, int num_threads) - : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads), - req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2), - req_wq(this, g_conf->rgw_op_thread_timeout, - g_conf->rgw_op_thread_suicide_timeout, &m_tp) {} - ~RGWAsyncRadosProcessor() {} - void start() { - m_tp.start(); - } - void stop() { - m_tp.drain(&req_wq); - m_tp.stop(); - } - void handle_request(RGWAsyncRadosRequest *req) { - req->send_request(); - } - - void queue(RGWAsyncRadosRequest *req) { - req_throttle.get(1); - req_wq.queue(req); - } -}; - int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info) { rgw_http_param_pair pairs[] = { { "type", "metadata" }, @@ -437,293 +227,6 @@ string RGWMetaSyncStatusManager::shard_obj_name(int shard_id) return string(buf); } -template -class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - RGWObjectCtx& obj_ctx; - bufferlist bl; - - rgw_bucket pool; - string oid; - - T *result; - - RGWAsyncGetSystemObj *req; - -public: - RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - RGWObjectCtx& _obj_ctx, - rgw_bucket& _pool, const string& _oid, - T *_result) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), store(_store), - obj_ctx(_obj_ctx), - pool(_pool), oid(_oid), - result(_result), - req(NULL) { } - - ~RGWSimpleRadosReadCR() { - delete req; - } - - int send_request(); - int request_complete(); - - virtual int handle_data(T& data) { - return 0; - } -}; - -template -int RGWSimpleRadosReadCR::send_request() -{ - rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(), - store, &obj_ctx, NULL, - obj, - &bl, 0, -1); - async_rados->queue(req); - return 0; -} - -template -int RGWSimpleRadosReadCR::request_complete() -{ - int ret = req->get_ret_status(); - retcode = ret; - if (ret != -ENOENT) { - if (ret < 0) { - return ret; - } - bufferlist::iterator iter = bl.begin(); - try { - ::decode(*result, iter); - } catch (buffer::error& err) { - ldout(store->ctx(), 0) << "ERROR: failed to decode global mdlog status" << dendl; - } - } else { - *result = T(); - } - - return handle_data(*result); -} - -template -class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - bufferlist bl; - - rgw_bucket pool; - string oid; - - RGWAsyncPutSystemObj *req; - -public: - RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - rgw_bucket& _pool, const string& _oid, - const T& _data) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), - store(_store), - pool(_pool), oid(_oid), - req(NULL) { - ::encode(_data, bl); - } - - ~RGWSimpleRadosWriteCR() { - delete req; - } - - int send_request() { - rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(), - store, NULL, obj, false, bl); - async_rados->queue(req); - return 0; - } - - int request_complete() { - return req->get_ret_status(); - } -}; - -class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { - RGWRados *store; - map entries; - - rgw_bucket pool; - string oid; - - RGWAioCompletionNotifier *cn; - -public: - RGWRadosSetOmapKeysCR(RGWRados *_store, - rgw_bucket& _pool, const string& _oid, - map& _entries) : RGWSimpleCoroutine(_store->ctx()), - store(_store), - entries(_entries), - pool(_pool), oid(_oid), cn(NULL) { - } - - ~RGWRadosSetOmapKeysCR() { - cn->put(); - } - - int send_request() { - librados::IoCtx ioctx; - librados::Rados *rados = store->get_rados_handle(); - int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */ - if (r < 0) { - lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl; - return r; - } - - librados::ObjectWriteOperation op; - op.omap_set(entries); - - cn = stack->create_completion_notifier(); - cn->get(); - return ioctx.aio_operate(oid, cn->completion(), &op); - } - - int request_complete() { - return cn->completion()->get_return_value(); - - } -}; - -class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { - RGWRados *store; - - string marker; - map *entries; - int max_entries; - - int rval; - librados::IoCtx ioctx; - - rgw_bucket pool; - string oid; - - RGWAioCompletionNotifier *cn; - -public: - RGWRadosGetOmapKeysCR(RGWRados *_store, - rgw_bucket& _pool, const string& _oid, - const string& _marker, - map *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()), - store(_store), - marker(_marker), - entries(_entries), max_entries(_max_entries), rval(0), - pool(_pool), oid(_oid), cn(NULL) { - } - - ~RGWRadosGetOmapKeysCR() { - } - - int send_request() { - librados::Rados *rados = store->get_rados_handle(); - int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */ - if (r < 0) { - lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl; - return r; - } - - librados::ObjectReadOperation op; - op.omap_get_vals(marker, max_entries, entries, &rval); - - cn = stack->create_completion_notifier(); - return ioctx.aio_operate(oid, cn->completion(), &op, NULL); - } - - int request_complete() { - return rval; - } -}; - -class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - string lock_name; - string cookie; - uint32_t duration; - - rgw_bucket pool; - string oid; - - RGWAsyncLockSystemObj *req; - -public: - RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - rgw_bucket& _pool, const string& _oid, const string& _lock_name, - const string& _cookie, - uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), - store(_store), - lock_name(_lock_name), - cookie(_cookie), - duration(_duration), - pool(_pool), oid(_oid), - req(NULL) { - } - - ~RGWSimpleRadosLockCR() { - delete req; - } - - int send_request() { - rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(), - store, NULL, obj, lock_name, cookie, duration); - async_rados->queue(req); - return 0; - } - - int request_complete() { - return req->get_ret_status(); - } -}; - -class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - string lock_name; - string cookie; - - rgw_bucket pool; - string oid; - - RGWAsyncUnlockSystemObj *req; - -public: - RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - rgw_bucket& _pool, const string& _oid, const string& _lock_name, - const string& _cookie) : RGWSimpleCoroutine(_store->ctx()), - async_rados(_async_rados), - store(_store), - lock_name(_lock_name), - cookie(_cookie), - pool(_pool), oid(_oid), - req(NULL) { - } - - ~RGWSimpleRadosUnlockCR() { - delete req; - } - - int send_request() { - rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(), - store, NULL, obj, lock_name, cookie); - async_rados->queue(req); - return 0; - } - - int request_complete() { - return req->get_ret_status(); - } -}; - class RGWReadMDLogShardInfo : public RGWSimpleCoroutine { RGWRados *store; RGWMetadataLog *mdlog; @@ -1108,75 +611,6 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) return 0; } -class RGWOmapAppend : public RGWConsumerCR { - RGWAsyncRadosProcessor *async_rados; - RGWRados *store; - - rgw_bucket pool; - string oid; - - bool going_down; - - int num_pending_entries; - list pending_entries; - - map entries; -public: - - RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid) - : RGWConsumerCR(_store->ctx()), async_rados(_async_rados), - store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {} - -#define OMAP_APPEND_MAX_ENTRIES 100 - int operate() { - reenter(this) { - for (;;) { - if (!has_product() && going_down) { - break; - } - yield wait_for_product(); - yield { - string entry; - while (consume(&entry)) { - entries[entry] = bufferlist(); - if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) { - break; - } - } - if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) { - call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries)); - entries.clear(); - } - } - if (get_ret_status() < 0) { - return set_state(RGWCoroutine_Error); - } - } - /* done with coroutine */ - return set_state(RGWCoroutine_Done); - } - return 0; - } - - void flush_pending() { - receive(pending_entries); - num_pending_entries = 0; - } - - void append(const string& s) { - pending_entries.push_back(s); - if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) { - flush_pending(); - } - } - - void finish() { - going_down = true; - flush_pending(); - set_sleeping(false); - } -}; - class RGWShardedOmapCRManager { RGWAsyncRadosProcessor *async_rados; RGWRados *store; -- 2.39.5