rgw/rgw_sync.cc
rgw/rgw_dencoder.cc
rgw/rgw_coroutine.cc
- rgw/rgw_object_expirer_core.cc
+ rgw/rgw_cr_rados.cc
rgw/rgw_object_expirer_core.cc
rgw/rgw_website.cc
rgw/rgw_xml_enc.cc)
rgw/rgw_acl_swift.cc \
rgw/rgw_client_io.cc \
rgw/rgw_coroutine.cc \
+ rgw/rgw_cr_rados.cc \
rgw/rgw_fcgi.cc \
rgw/rgw_xml.cc \
rgw/rgw_usage.cc \
rgw/rgw_acl_swift.h \
rgw/rgw_client_io.h \
rgw/rgw_coroutine.h \
+ rgw/rgw_cr_rados.h \
rgw/rgw_fcgi.h \
rgw/rgw_xml.h \
rgw/rgw_basic_types.h \
--- /dev/null
+#include "rgw_rados.h"
+#include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
+
+#include "cls/lock/cls_lock_client.h"
+
+#include <boost/asio/coroutine.hpp>
+#include <boost/asio/yield.hpp>
+
+
+#define dout_subsys ceph_subsys_rgw
+
+bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
+ processor->m_req_queue.push_back(req);
+ dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return true;
+}
+
+bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
+ return processor->m_req_queue.empty();
+}
+
+RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
+ if (processor->m_req_queue.empty())
+ return NULL;
+ RGWAsyncRadosRequest *req = processor->m_req_queue.front();
+ processor->m_req_queue.pop_front();
+ dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+ _dump_queue();
+ return req;
+}
+
+void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req) {
+ processor->handle_request(req);
+ processor->req_throttle.put(1);
+}
+
+void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
+ if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+ return;
+ }
+ deque<RGWAsyncRadosRequest *>::iterator iter;
+ if (processor->m_req_queue.empty()) {
+ dout(20) << "RGWWQ: empty" << dendl;
+ return;
+ }
+ dout(20) << "RGWWQ:" << dendl;
+ for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
+ dout(20) << "req: " << hex << *iter << dec << dendl;
+ }
+}
+
+RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
+ : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads),
+ req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
+ req_wq(this, g_conf->rgw_op_thread_timeout,
+ g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
+}
+
+void RGWAsyncRadosProcessor::start() {
+ m_tp.start();
+}
+
+void RGWAsyncRadosProcessor::stop() {
+ m_tp.drain(&req_wq);
+ m_tp.stop();
+}
+
+void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
+ req->send_request();
+}
+
+void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
+ req_throttle.get(1);
+ req_wq.queue(req);
+}
+
+int RGWAsyncGetSystemObj::_send_request()
+{
+ return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
+}
+
+RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
+ objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
+ ofs(_ofs), end(_end)
+{
+}
+
+
+int RGWAsyncPutSystemObj::_send_request()
+{
+ return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
+ NULL, attrs, objv_tracker, mtime);
+}
+
+RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+ bufferlist& _bl, time_t _mtime) : RGWAsyncRadosRequest(cn), store(_store),
+ objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
+ bl(_bl), mtime(_mtime)
+{
+}
+
+
+RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
+ : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
+ store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0)
+{
+}
+
+int RGWAsyncLockSystemObj::_send_request()
+{
+ librados::IoCtx ioctx;
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+ return r;
+ }
+
+ rados::cls::lock::Lock l(lock_name);
+ utime_t duration(duration_secs, 0);
+ l.set_duration(duration);
+ l.set_cookie(cookie);
+
+ return l.lock_exclusive(&ioctx, obj.get_object());
+}
+
+RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store),
+ obj(_obj),
+ lock_name(_name),
+ cookie(_cookie),
+ duration_secs(_duration_secs)
+{
+}
+
+int RGWAsyncUnlockSystemObj::_send_request()
+{
+ librados::IoCtx ioctx;
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+ return r;
+ }
+
+ rados::cls::lock::Lock l(lock_name);
+
+ l.set_cookie(cookie);
+
+ return l.unlock(&ioctx, obj.get_object());
+}
+
+RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store),
+ obj(_obj),
+ lock_name(_name), cookie(_cookie)
+{
+}
+
+
+RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
+ store(_store),
+ entries(_entries),
+ pool(_pool), oid(_oid), cn(NULL)
+{
+}
+
+RGWRadosSetOmapKeysCR::~RGWRadosSetOmapKeysCR()
+{
+ cn->put();
+}
+
+int RGWRadosSetOmapKeysCR::send_request()
+{
+ librados::IoCtx ioctx;
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
+ return r;
+ }
+
+ librados::ObjectWriteOperation op;
+ op.omap_set(entries);
+
+ cn = stack->create_completion_notifier();
+ cn->get();
+ return ioctx.aio_operate(oid, cn->completion(), &op);
+}
+
+int RGWRadosSetOmapKeysCR::request_complete()
+{
+ return cn->completion()->get_return_value();
+}
+
+RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ const string& _marker,
+ map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
+ store(_store),
+ marker(_marker),
+ entries(_entries), max_entries(_max_entries), rval(0),
+ pool(_pool), oid(_oid), cn(NULL)
+{
+}
+
+RGWRadosGetOmapKeysCR::~RGWRadosGetOmapKeysCR()
+{
+}
+
+int RGWRadosGetOmapKeysCR::send_request() {
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
+ return r;
+ }
+
+ librados::ObjectReadOperation op;
+ op.omap_get_vals(marker, max_entries, entries, &rval);
+
+ cn = stack->create_completion_notifier();
+ return ioctx.aio_operate(oid, cn->completion(), &op, NULL);
+}
+
+RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+ const string& _cookie,
+ uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ lock_name(_lock_name),
+ cookie(_cookie),
+ duration(_duration),
+ pool(_pool), oid(_oid),
+ req(NULL)
+{
+}
+
+RGWSimpleRadosLockCR::~RGWSimpleRadosLockCR()
+{
+ delete req;
+}
+
+int RGWSimpleRadosLockCR::send_request()
+{
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(),
+ store, NULL, obj, lock_name, cookie, duration);
+ async_rados->queue(req);
+ return 0;
+}
+
+int RGWSimpleRadosLockCR::request_complete()
+{
+ return req->get_ret_status();
+}
+
+RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+ const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ lock_name(_lock_name),
+ cookie(_cookie),
+ pool(_pool), oid(_oid),
+ req(NULL)
+{
+}
+
+RGWSimpleRadosUnlockCR::~RGWSimpleRadosUnlockCR()
+{
+ delete req;
+}
+
+int RGWSimpleRadosUnlockCR::send_request()
+{
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(),
+ store, NULL, obj, lock_name, cookie);
+ async_rados->queue(req);
+ return 0;
+}
+
+int RGWSimpleRadosUnlockCR::request_complete()
+{
+ return req->get_ret_status();
+}
+
+
+#define OMAP_APPEND_MAX_ENTRIES 100
+int RGWOmapAppend::operate() {
+ reenter(this) {
+ for (;;) {
+ if (!has_product() && going_down) {
+ break;
+ }
+ yield wait_for_product();
+ yield {
+ string entry;
+ while (consume(&entry)) {
+ entries[entry] = bufferlist();
+ if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
+ break;
+ }
+ }
+ if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
+ call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
+ entries.clear();
+ }
+ }
+ if (get_ret_status() < 0) {
+ return set_state(RGWCoroutine_Error);
+ }
+ }
+ /* done with coroutine */
+ return set_state(RGWCoroutine_Done);
+ }
+ return 0;
+}
+
+void RGWOmapAppend::flush_pending() {
+ receive(pending_entries);
+ num_pending_entries = 0;
+}
+
+void RGWOmapAppend::append(const string& s) {
+ pending_entries.push_back(s);
+ if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
+ flush_pending();
+ }
+}
+
+void RGWOmapAppend::finish() {
+ going_down = true;
+ flush_pending();
+ set_sleeping(false);
+}
+
+
--- /dev/null
+#ifndef CEPH_RGW_CR_RADOS_H
+#define CEPH_RGW_CR_RADOS_H
+
+#include "rgw_coroutine.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+class RGWAsyncRadosRequest {
+ RGWAioCompletionNotifier *notifier;
+
+ void *user_info;
+ int retcode;
+
+protected:
+ virtual int _send_request() = 0;
+public:
+ RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
+ virtual ~RGWAsyncRadosRequest() {}
+
+ void send_request() {
+ retcode = _send_request();
+ notifier->cb();
+ }
+
+ int get_ret_status() { return retcode; }
+};
+
+
+class RGWAsyncRadosProcessor {
+ deque<RGWAsyncRadosRequest *> m_req_queue;
+protected:
+ RGWRados *store;
+ ThreadPool m_tp;
+ Throttle req_throttle;
+
+ struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
+ RGWAsyncRadosProcessor *processor;
+ RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
+
+ bool _enqueue(RGWAsyncRadosRequest *req);
+ void _dequeue(RGWAsyncRadosRequest *req) {
+ assert(0);
+ }
+ bool _empty();
+ RGWAsyncRadosRequest *_dequeue();
+ using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
+ void _process(RGWAsyncRadosRequest *req);
+ void _dump_queue();
+ void _clear() {
+ assert(processor->m_req_queue.empty());
+ }
+ } req_wq;
+
+public:
+ RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
+ ~RGWAsyncRadosProcessor() {}
+ void start();
+ void stop();
+ void handle_request(RGWAsyncRadosRequest *req);
+ void queue(RGWAsyncRadosRequest *req);
+};
+
+
+class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjectCtx *obj_ctx;
+ RGWRados::SystemObject::Read::GetObjState read_state;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_obj obj;
+ bufferlist *pbl;
+ off_t ofs;
+ off_t end;
+protected:
+ int _send_request();
+public:
+ RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ bufferlist *_pbl, off_t _ofs, off_t _end);
+};
+
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ RGWObjVersionTracker *objv_tracker;
+ rgw_obj obj;
+ bool exclusive;
+ bufferlist bl;
+ map<string, bufferlist> attrs;
+ time_t mtime;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+ bufferlist& _bl, time_t _mtime = 0);
+};
+
+class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_obj obj;
+ string lock_name;
+ string cookie;
+ uint32_t duration_secs;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ const string& _name, const string& _cookie, uint32_t _duration_secs);
+};
+
+class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ rgw_obj obj;
+ string lock_name;
+ string cookie;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+ const string& _name, const string& _cookie);
+};
+
+
+template <class T>
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ RGWObjectCtx& obj_ctx;
+ bufferlist bl;
+
+ rgw_bucket pool;
+ string oid;
+
+ T *result;
+
+ RGWAsyncGetSystemObj *req;
+
+public:
+ RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWObjectCtx& _obj_ctx,
+ rgw_bucket& _pool, const string& _oid,
+ T *_result) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ obj_ctx(_obj_ctx),
+ pool(_pool), oid(_oid),
+ result(_result),
+ req(NULL) { }
+
+ ~RGWSimpleRadosReadCR() {
+ delete req;
+ }
+
+ int send_request();
+ int request_complete();
+
+ virtual int handle_data(T& data) {
+ return 0;
+ }
+};
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::send_request()
+{
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(),
+ store, &obj_ctx, NULL,
+ obj,
+ &bl, 0, -1);
+ async_rados->queue(req);
+ return 0;
+}
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::request_complete()
+{
+ int ret = req->get_ret_status();
+ retcode = ret;
+ if (ret != -ENOENT) {
+ if (ret < 0) {
+ return ret;
+ }
+ bufferlist::iterator iter = bl.begin();
+ try {
+ ::decode(*result, iter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+ } else {
+ *result = T();
+ }
+
+ return handle_data(*result);
+}
+
+template <class T>
+class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ bufferlist bl;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAsyncPutSystemObj *req;
+
+public:
+ RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ const T& _data) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ pool(_pool), oid(_oid),
+ req(NULL) {
+ ::encode(_data, bl);
+ }
+
+ ~RGWSimpleRadosWriteCR() {
+ delete req;
+ }
+
+ int send_request() {
+ rgw_obj obj = rgw_obj(pool, oid);
+ req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(),
+ store, NULL, obj, false, bl);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+ map<string, bufferlist> entries;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAioCompletionNotifier *cn;
+
+public:
+ RGWRadosSetOmapKeysCR(RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ map<string, bufferlist>& _entries);
+ ~RGWRadosSetOmapKeysCR();
+
+ int send_request();
+ int request_complete();
+};
+
+class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
+ RGWRados *store;
+
+ string marker;
+ map<string, bufferlist> *entries;
+ int max_entries;
+
+ int rval;
+ librados::IoCtx ioctx;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAioCompletionNotifier *cn;
+
+public:
+ RGWRadosGetOmapKeysCR(RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid,
+ const string& _marker,
+ map<string, bufferlist> *_entries, int _max_entries);
+ ~RGWRadosGetOmapKeysCR();
+
+ int send_request();
+
+ int request_complete() {
+ return rval;
+ }
+};
+
+class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string lock_name;
+ string cookie;
+ uint32_t duration;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAsyncLockSystemObj *req;
+
+public:
+ RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+ const string& _cookie,
+ uint32_t _duration);
+ ~RGWSimpleRadosLockCR();
+
+ int send_request();
+ int request_complete();
+};
+
+class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string lock_name;
+ string cookie;
+
+ rgw_bucket pool;
+ string oid;
+
+ RGWAsyncUnlockSystemObj *req;
+
+public:
+ RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+ const string& _cookie);
+ ~RGWSimpleRadosUnlockCR();
+
+ int send_request();
+ int request_complete();
+};
+
+class RGWOmapAppend : public RGWConsumerCR<string> {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+
+ rgw_bucket pool;
+ string oid;
+
+ bool going_down;
+
+ int num_pending_entries;
+ list<string> pending_entries;
+
+ map<string, bufferlist> entries;
+public:
+ RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid);
+ int operate();
+ void flush_pending();
+ void append(const string& s);
+ void finish();
+};
+
+#endif
#include "rgw_metadata.h"
#include "rgw_rest_conn.h"
#include "rgw_tools.h"
+#include "rgw_cr_rados.h"
#include "cls/lock/cls_lock_client.h"
JSONDecoder::decode_json("entries", entries, obj);
};
-class RGWAsyncRadosRequest {
- RGWAioCompletionNotifier *notifier;
-
- void *user_info;
- int retcode;
-
-protected:
- virtual int _send_request() = 0;
-public:
- RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
- virtual ~RGWAsyncRadosRequest() {}
-
- void send_request() {
- retcode = _send_request();
- notifier->cb();
- }
-
- int get_ret_status() { return retcode; }
-};
-
-
-class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
- RGWRados *store;
- RGWObjectCtx *obj_ctx;
- RGWRados::SystemObject::Read::GetObjState read_state;
- RGWObjVersionTracker *objv_tracker;
- rgw_obj obj;
- bufferlist *pbl;
- off_t ofs;
- off_t end;
-protected:
- int _send_request() {
- return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
- }
-public:
- RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
- RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
- bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
- objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
- ofs(_ofs), end(_end) {
- }
-};
-
-class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
- RGWRados *store;
- RGWObjVersionTracker *objv_tracker;
- rgw_obj obj;
- bool exclusive;
- bufferlist bl;
- map<string, bufferlist> attrs;
- time_t mtime;
-
-protected:
- int _send_request() {
- return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
- NULL, attrs, objv_tracker, mtime);
- }
-public:
- RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
- RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
- bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store),
- objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
- bl(_bl), mtime(_mtime) {}
-};
-
-class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
- RGWRados *store;
- rgw_obj obj;
- string lock_name;
- string cookie;
- uint32_t duration_secs;
-
-protected:
- int _send_request() {
- librados::IoCtx ioctx;
- librados::Rados *rados = store->get_rados_handle();
- int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
- return r;
- }
-
- rados::cls::lock::Lock l(lock_name);
- utime_t duration(duration_secs, 0);
- l.set_duration(duration);
- l.set_cookie(cookie);
-
- return l.lock_exclusive(&ioctx, obj.get_object());
- }
-public:
- RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
- RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
- const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store),
- obj(_obj),
- lock_name(_name),
- cookie(_cookie),
- duration_secs(_duration_secs) {}
-};
-
-class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
- RGWRados *store;
- rgw_obj obj;
- string lock_name;
- string cookie;
-
-protected:
- int _send_request() {
- librados::IoCtx ioctx;
- librados::Rados *rados = store->get_rados_handle();
- int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
- return r;
- }
-
- rados::cls::lock::Lock l(lock_name);
-
- l.set_cookie(cookie);
-
- return l.unlock(&ioctx, obj.get_object());
- }
-public:
- RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
- RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
- const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store),
- obj(_obj),
- lock_name(_name), cookie(_cookie) {}
-};
-
-
-class RGWAsyncRadosProcessor {
- deque<RGWAsyncRadosRequest *> m_req_queue;
-protected:
- RGWRados *store;
- ThreadPool m_tp;
- Throttle req_throttle;
-
- struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
- RGWAsyncRadosProcessor *processor;
- RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
- : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
-
- bool _enqueue(RGWAsyncRadosRequest *req) {
- processor->m_req_queue.push_back(req);
- dout(20) << "enqueued request req=" << hex << req << dec << dendl;
- _dump_queue();
- return true;
- }
- void _dequeue(RGWAsyncRadosRequest *req) {
- assert(0);
- }
- bool _empty() {
- return processor->m_req_queue.empty();
- }
- RGWAsyncRadosRequest *_dequeue() {
- if (processor->m_req_queue.empty())
- return NULL;
- RGWAsyncRadosRequest *req = processor->m_req_queue.front();
- processor->m_req_queue.pop_front();
- dout(20) << "dequeued request req=" << hex << req << dec << dendl;
- _dump_queue();
- return req;
- }
- using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
- void _process(RGWAsyncRadosRequest *req) {
- processor->handle_request(req);
- processor->req_throttle.put(1);
- }
- void _dump_queue() {
- if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
- return;
- }
- deque<RGWAsyncRadosRequest *>::iterator iter;
- if (processor->m_req_queue.empty()) {
- dout(20) << "RGWWQ: empty" << dendl;
- return;
- }
- dout(20) << "RGWWQ:" << dendl;
- for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
- dout(20) << "req: " << hex << *iter << dec << dendl;
- }
- }
- void _clear() {
- assert(processor->m_req_queue.empty());
- }
- } req_wq;
-
-public:
- RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
- : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads),
- req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
- req_wq(this, g_conf->rgw_op_thread_timeout,
- g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
- ~RGWAsyncRadosProcessor() {}
- void start() {
- m_tp.start();
- }
- void stop() {
- m_tp.drain(&req_wq);
- m_tp.stop();
- }
- void handle_request(RGWAsyncRadosRequest *req) {
- req->send_request();
- }
-
- void queue(RGWAsyncRadosRequest *req) {
- req_throttle.get(1);
- req_wq.queue(req);
- }
-};
-
int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
{
rgw_http_param_pair pairs[] = { { "type", "metadata" },
return string(buf);
}
-template <class T>
-class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- RGWObjectCtx& obj_ctx;
- bufferlist bl;
-
- rgw_bucket pool;
- string oid;
-
- T *result;
-
- RGWAsyncGetSystemObj *req;
-
-public:
- RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- RGWObjectCtx& _obj_ctx,
- rgw_bucket& _pool, const string& _oid,
- T *_result) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados), store(_store),
- obj_ctx(_obj_ctx),
- pool(_pool), oid(_oid),
- result(_result),
- req(NULL) { }
-
- ~RGWSimpleRadosReadCR() {
- delete req;
- }
-
- int send_request();
- int request_complete();
-
- virtual int handle_data(T& data) {
- return 0;
- }
-};
-
-template <class T>
-int RGWSimpleRadosReadCR<T>::send_request()
-{
- rgw_obj obj = rgw_obj(pool, oid);
- req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(),
- store, &obj_ctx, NULL,
- obj,
- &bl, 0, -1);
- async_rados->queue(req);
- return 0;
-}
-
-template <class T>
-int RGWSimpleRadosReadCR<T>::request_complete()
-{
- int ret = req->get_ret_status();
- retcode = ret;
- if (ret != -ENOENT) {
- if (ret < 0) {
- return ret;
- }
- bufferlist::iterator iter = bl.begin();
- try {
- ::decode(*result, iter);
- } catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode global mdlog status" << dendl;
- }
- } else {
- *result = T();
- }
-
- return handle_data(*result);
-}
-
-template <class T>
-class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- bufferlist bl;
-
- rgw_bucket pool;
- string oid;
-
- RGWAsyncPutSystemObj *req;
-
-public:
- RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- rgw_bucket& _pool, const string& _oid,
- const T& _data) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- pool(_pool), oid(_oid),
- req(NULL) {
- ::encode(_data, bl);
- }
-
- ~RGWSimpleRadosWriteCR() {
- delete req;
- }
-
- int send_request() {
- rgw_obj obj = rgw_obj(pool, oid);
- req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(),
- store, NULL, obj, false, bl);
- async_rados->queue(req);
- return 0;
- }
-
- int request_complete() {
- return req->get_ret_status();
- }
-};
-
-class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
- RGWRados *store;
- map<string, bufferlist> entries;
-
- rgw_bucket pool;
- string oid;
-
- RGWAioCompletionNotifier *cn;
-
-public:
- RGWRadosSetOmapKeysCR(RGWRados *_store,
- rgw_bucket& _pool, const string& _oid,
- map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
- store(_store),
- entries(_entries),
- pool(_pool), oid(_oid), cn(NULL) {
- }
-
- ~RGWRadosSetOmapKeysCR() {
- cn->put();
- }
-
- int send_request() {
- librados::IoCtx ioctx;
- librados::Rados *rados = store->get_rados_handle();
- int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
- return r;
- }
-
- librados::ObjectWriteOperation op;
- op.omap_set(entries);
-
- cn = stack->create_completion_notifier();
- cn->get();
- return ioctx.aio_operate(oid, cn->completion(), &op);
- }
-
- int request_complete() {
- return cn->completion()->get_return_value();
-
- }
-};
-
-class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
- RGWRados *store;
-
- string marker;
- map<string, bufferlist> *entries;
- int max_entries;
-
- int rval;
- librados::IoCtx ioctx;
-
- rgw_bucket pool;
- string oid;
-
- RGWAioCompletionNotifier *cn;
-
-public:
- RGWRadosGetOmapKeysCR(RGWRados *_store,
- rgw_bucket& _pool, const string& _oid,
- const string& _marker,
- map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
- store(_store),
- marker(_marker),
- entries(_entries), max_entries(_max_entries), rval(0),
- pool(_pool), oid(_oid), cn(NULL) {
- }
-
- ~RGWRadosGetOmapKeysCR() {
- }
-
- int send_request() {
- librados::Rados *rados = store->get_rados_handle();
- int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
- return r;
- }
-
- librados::ObjectReadOperation op;
- op.omap_get_vals(marker, max_entries, entries, &rval);
-
- cn = stack->create_completion_notifier();
- return ioctx.aio_operate(oid, cn->completion(), &op, NULL);
- }
-
- int request_complete() {
- return rval;
- }
-};
-
-class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- string lock_name;
- string cookie;
- uint32_t duration;
-
- rgw_bucket pool;
- string oid;
-
- RGWAsyncLockSystemObj *req;
-
-public:
- RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- rgw_bucket& _pool, const string& _oid, const string& _lock_name,
- const string& _cookie,
- uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- lock_name(_lock_name),
- cookie(_cookie),
- duration(_duration),
- pool(_pool), oid(_oid),
- req(NULL) {
- }
-
- ~RGWSimpleRadosLockCR() {
- delete req;
- }
-
- int send_request() {
- rgw_obj obj = rgw_obj(pool, oid);
- req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(),
- store, NULL, obj, lock_name, cookie, duration);
- async_rados->queue(req);
- return 0;
- }
-
- int request_complete() {
- return req->get_ret_status();
- }
-};
-
-class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- string lock_name;
- string cookie;
-
- rgw_bucket pool;
- string oid;
-
- RGWAsyncUnlockSystemObj *req;
-
-public:
- RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- rgw_bucket& _pool, const string& _oid, const string& _lock_name,
- const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- lock_name(_lock_name),
- cookie(_cookie),
- pool(_pool), oid(_oid),
- req(NULL) {
- }
-
- ~RGWSimpleRadosUnlockCR() {
- delete req;
- }
-
- int send_request() {
- rgw_obj obj = rgw_obj(pool, oid);
- req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(),
- store, NULL, obj, lock_name, cookie);
- async_rados->queue(req);
- return 0;
- }
-
- int request_complete() {
- return req->get_ret_status();
- }
-};
-
class RGWReadMDLogShardInfo : public RGWSimpleCoroutine {
RGWRados *store;
RGWMetadataLog *mdlog;
return 0;
}
-class RGWOmapAppend : public RGWConsumerCR<string> {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
-
- rgw_bucket pool;
- string oid;
-
- bool going_down;
-
- int num_pending_entries;
- list<string> pending_entries;
-
- map<string, bufferlist> entries;
-public:
-
- RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
- : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
- store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {}
-
-#define OMAP_APPEND_MAX_ENTRIES 100
- int operate() {
- reenter(this) {
- for (;;) {
- if (!has_product() && going_down) {
- break;
- }
- yield wait_for_product();
- yield {
- string entry;
- while (consume(&entry)) {
- entries[entry] = bufferlist();
- if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
- break;
- }
- }
- if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
- call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
- entries.clear();
- }
- }
- if (get_ret_status() < 0) {
- return set_state(RGWCoroutine_Error);
- }
- }
- /* done with coroutine */
- return set_state(RGWCoroutine_Done);
- }
- return 0;
- }
-
- void flush_pending() {
- receive(pending_entries);
- num_pending_entries = 0;
- }
-
- void append(const string& s) {
- pending_entries.push_back(s);
- if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
- flush_pending();
- }
- }
-
- void finish() {
- going_down = true;
- flush_pending();
- set_sleeping(false);
- }
-};
-
class RGWShardedOmapCRManager {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;