]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move code around
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 14 Sep 2015 21:19:34 +0000 (14:19 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:47 +0000 (16:12 -0800)
move code from rgw_sync.cc to rgw_cr_rados.{h,cc}

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/CMakeLists.txt
src/rgw/Makefile.am
src/rgw/rgw_cr_rados.cc [new file with mode: 0644]
src/rgw/rgw_cr_rados.h [new file with mode: 0644]
src/rgw/rgw_sync.cc

index cee8483dcf39a4dd2e27214dff2a76365ce200bb..f01f98dee9c4033a36e9b0d75b7227cdb17725d3 100644 (file)
@@ -1165,7 +1165,7 @@ if(${WITH_RADOSGW})
     rgw/rgw_sync.cc
     rgw/rgw_dencoder.cc
     rgw/rgw_coroutine.cc
-    rgw/rgw_object_expirer_core.cc
+    rgw/rgw_cr_rados.cc
     rgw/rgw_object_expirer_core.cc
     rgw/rgw_website.cc
     rgw/rgw_xml_enc.cc)
index 72441cb790c3fe17ca33d9d6f481e1d13070529d..b05a6499b97c0be48d2b7904e066685d666e80e3 100644 (file)
@@ -26,6 +26,7 @@ librgw_la_SOURCES =  \
        rgw/rgw_acl_swift.cc \
        rgw/rgw_client_io.cc \
        rgw/rgw_coroutine.cc \
+       rgw/rgw_cr_rados.cc \
        rgw/rgw_fcgi.cc \
        rgw/rgw_xml.cc \
        rgw/rgw_usage.cc \
@@ -143,6 +144,7 @@ noinst_HEADERS += \
        rgw/rgw_acl_swift.h \
        rgw/rgw_client_io.h \
        rgw/rgw_coroutine.h \
+       rgw/rgw_cr_rados.h \
        rgw/rgw_fcgi.h \
        rgw/rgw_xml.h \
        rgw/rgw_basic_types.h \
diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc
new file mode 100644 (file)
index 0000000..d66265b
--- /dev/null
@@ -0,0 +1,349 @@
+#include "rgw_rados.h"
+#include "rgw_coroutine.h"
+#include "rgw_cr_rados.h"
+
+#include "cls/lock/cls_lock_client.h"
+
+#include <boost/asio/coroutine.hpp>
+#include <boost/asio/yield.hpp>
+
+
+#define dout_subsys ceph_subsys_rgw
+
+bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest *req) {
+  processor->m_req_queue.push_back(req);
+  dout(20) << "enqueued request req=" << hex << req << dec << dendl;
+  _dump_queue();
+  return true;
+}
+
+bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
+  return processor->m_req_queue.empty();
+}
+
+RGWAsyncRadosRequest *RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
+  if (processor->m_req_queue.empty())
+    return NULL;
+  RGWAsyncRadosRequest *req = processor->m_req_queue.front();
+  processor->m_req_queue.pop_front();
+  dout(20) << "dequeued request req=" << hex << req << dec << dendl;
+  _dump_queue();
+  return req;
+}
+
+void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest *req) {
+  processor->handle_request(req);
+  processor->req_throttle.put(1);
+}
+
+void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
+  if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
+    return;
+  }
+  deque<RGWAsyncRadosRequest *>::iterator iter;
+  if (processor->m_req_queue.empty()) {
+    dout(20) << "RGWWQ: empty" << dendl;
+    return;
+  }
+  dout(20) << "RGWWQ:" << dendl;
+  for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
+    dout(20) << "req: " << hex << *iter << dec << dendl;
+  }
+}
+
+RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
+  : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads),
+  req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
+  req_wq(this, g_conf->rgw_op_thread_timeout,
+         g_conf->rgw_op_thread_suicide_timeout, &m_tp) {
+}
+
+void RGWAsyncRadosProcessor::start() {
+  m_tp.start();
+}
+
+void RGWAsyncRadosProcessor::stop() {
+  m_tp.drain(&req_wq);
+  m_tp.stop();
+}
+
+void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest *req) {
+  req->send_request();
+}
+
+void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest *req) {
+  req_throttle.get(1);
+  req_wq.queue(req);
+}
+
+int RGWAsyncGetSystemObj::_send_request()
+{
+  return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
+}
+
+RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                       bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
+                                                                   objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
+                                                                  ofs(_ofs), end(_end)
+{
+}
+
+
+int RGWAsyncPutSystemObj::_send_request()
+{
+  return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
+                               NULL, attrs, objv_tracker, mtime);
+}
+
+RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                     RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+                     bufferlist& _bl, time_t _mtime) : RGWAsyncRadosRequest(cn), store(_store),
+                                                       objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
+                                                       bl(_bl), mtime(_mtime)
+{
+}
+
+
+RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
+                      : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
+                        store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0)
+{
+}
+
+int RGWAsyncLockSystemObj::_send_request()
+{
+  librados::IoCtx ioctx;
+  librados::Rados *rados = store->get_rados_handle();
+  int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+    return r;
+  }
+
+  rados::cls::lock::Lock l(lock_name);
+  utime_t duration(duration_secs, 0);
+  l.set_duration(duration);
+  l.set_cookie(cookie);
+
+  return l.lock_exclusive(&ioctx, obj.get_object());
+}
+
+RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                      RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                       const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store),
+                                                              obj(_obj),
+                                                              lock_name(_name),
+                                                              cookie(_cookie),
+                                                              duration_secs(_duration_secs)
+{
+}
+
+int RGWAsyncUnlockSystemObj::_send_request()
+{
+  librados::IoCtx ioctx;
+  librados::Rados *rados = store->get_rados_handle();
+  int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
+    return r;
+  }
+
+  rados::cls::lock::Lock l(lock_name);
+
+  l.set_cookie(cookie);
+
+  return l.unlock(&ioctx, obj.get_object());
+}
+
+RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                                                 RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                                                 const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store),
+  obj(_obj),
+  lock_name(_name), cookie(_cookie)
+{
+}
+
+
+RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados *_store,
+                      rgw_bucket& _pool, const string& _oid,
+                      map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
+                                                store(_store),
+                                                entries(_entries),
+                                                pool(_pool), oid(_oid), cn(NULL)
+{
+}
+
+RGWRadosSetOmapKeysCR::~RGWRadosSetOmapKeysCR()
+{
+  cn->put();
+}
+
+int RGWRadosSetOmapKeysCR::send_request()
+{
+  librados::IoCtx ioctx;
+  librados::Rados *rados = store->get_rados_handle();
+  int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
+    return r;
+  }
+
+  librados::ObjectWriteOperation op;
+  op.omap_set(entries);
+
+  cn = stack->create_completion_notifier();
+  cn->get();
+  return ioctx.aio_operate(oid, cn->completion(), &op);
+}
+
+int RGWRadosSetOmapKeysCR::request_complete()
+{
+  return cn->completion()->get_return_value();
+}
+
+RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
+                      rgw_bucket& _pool, const string& _oid,
+                      const string& _marker,
+                      map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
+                                                store(_store),
+                                                marker(_marker),
+                                                entries(_entries), max_entries(_max_entries), rval(0),
+                                                pool(_pool), oid(_oid), cn(NULL)
+{
+}
+
+RGWRadosGetOmapKeysCR::~RGWRadosGetOmapKeysCR()
+{
+}
+
+int RGWRadosGetOmapKeysCR::send_request() {
+  librados::Rados *rados = store->get_rados_handle();
+  int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
+    return r;
+  }
+
+  librados::ObjectReadOperation op;
+  op.omap_get_vals(marker, max_entries, entries, &rval);
+
+  cn = stack->create_completion_notifier();
+  return ioctx.aio_operate(oid, cn->completion(), &op, NULL);
+}
+
+RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+                      const string& _cookie,
+                      uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados),
+                                                store(_store),
+                                                lock_name(_lock_name),
+                                                cookie(_cookie),
+                                                duration(_duration),
+                                                pool(_pool), oid(_oid),
+                                                req(NULL)
+{
+}
+
+RGWSimpleRadosLockCR::~RGWSimpleRadosLockCR()
+{
+  delete req;
+}
+
+int RGWSimpleRadosLockCR::send_request()
+{
+  rgw_obj obj = rgw_obj(pool, oid);
+  req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(),
+                                 store, NULL, obj, lock_name, cookie, duration);
+  async_rados->queue(req);
+  return 0;
+}
+
+int RGWSimpleRadosLockCR::request_complete()
+{
+  return req->get_ret_status();
+}
+
+RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+                      const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados),
+                                                store(_store),
+                                                lock_name(_lock_name),
+                                                cookie(_cookie),
+                                                pool(_pool), oid(_oid),
+                                                req(NULL)
+{
+}
+
+RGWSimpleRadosUnlockCR::~RGWSimpleRadosUnlockCR()
+{
+  delete req;
+}
+
+int RGWSimpleRadosUnlockCR::send_request()
+{
+  rgw_obj obj = rgw_obj(pool, oid);
+  req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(),
+                                 store, NULL, obj, lock_name, cookie);
+  async_rados->queue(req);
+  return 0;
+}
+
+int RGWSimpleRadosUnlockCR::request_complete()
+{
+  return req->get_ret_status();
+}
+
+
+#define OMAP_APPEND_MAX_ENTRIES 100
+int RGWOmapAppend::operate() {
+  reenter(this) {
+    for (;;) {
+      if (!has_product() && going_down) {
+        break;
+      }
+      yield wait_for_product();
+      yield {
+        string entry;
+        while (consume(&entry)) {
+          entries[entry] = bufferlist();
+          if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
+            break;
+          }
+        }
+        if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
+          call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
+          entries.clear();
+        }
+      }
+      if (get_ret_status() < 0) {
+        return set_state(RGWCoroutine_Error);
+      }
+    }
+    /* done with coroutine */
+    return set_state(RGWCoroutine_Done);
+  }
+  return 0;
+}
+
+void RGWOmapAppend::flush_pending() {
+  receive(pending_entries);
+  num_pending_entries = 0;
+}
+
+void RGWOmapAppend::append(const string& s) {
+  pending_entries.push_back(s);
+  if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
+    flush_pending();
+  }
+}
+
+void RGWOmapAppend::finish() {
+  going_down = true;
+  flush_pending();
+  set_sleeping(false);
+}
+
+
diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h
new file mode 100644 (file)
index 0000000..ab1844d
--- /dev/null
@@ -0,0 +1,352 @@
+#ifndef CEPH_RGW_CR_RADOS_H
+#define CEPH_RGW_CR_RADOS_H
+
+#include "rgw_coroutine.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+class RGWAsyncRadosRequest {
+  RGWAioCompletionNotifier *notifier;
+
+  void *user_info;
+  int retcode;
+
+protected:
+  virtual int _send_request() = 0;
+public:
+  RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
+  virtual ~RGWAsyncRadosRequest() {}
+
+  void send_request() {
+    retcode = _send_request();
+    notifier->cb();
+  }
+
+  int get_ret_status() { return retcode; }
+};
+
+
+class RGWAsyncRadosProcessor {
+  deque<RGWAsyncRadosRequest *> m_req_queue;
+protected:
+  RGWRados *store;
+  ThreadPool m_tp;
+  Throttle req_throttle;
+
+  struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
+    RGWAsyncRadosProcessor *processor;
+    RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+      : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
+
+    bool _enqueue(RGWAsyncRadosRequest *req);
+    void _dequeue(RGWAsyncRadosRequest *req) {
+      assert(0);
+    }
+    bool _empty();
+    RGWAsyncRadosRequest *_dequeue();
+    using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
+    void _process(RGWAsyncRadosRequest *req);
+    void _dump_queue();
+    void _clear() {
+      assert(processor->m_req_queue.empty());
+    }
+  } req_wq;
+
+public:
+  RGWAsyncRadosProcessor(RGWRados *_store, int num_threads);
+  ~RGWAsyncRadosProcessor() {}
+  void start();
+  void stop();
+  void handle_request(RGWAsyncRadosRequest *req);
+  void queue(RGWAsyncRadosRequest *req);
+};
+
+
+class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjectCtx *obj_ctx;
+  RGWRados::SystemObject::Read::GetObjState read_state;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_obj obj;
+  bufferlist *pbl;
+  off_t ofs;
+  off_t end;
+protected:
+  int _send_request();
+public:
+  RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                       bufferlist *_pbl, off_t _ofs, off_t _end);
+};
+
+class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  RGWObjVersionTracker *objv_tracker;
+  rgw_obj obj;
+  bool exclusive;
+  bufferlist bl;
+  map<string, bufferlist> attrs;
+  time_t mtime;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
+                       bufferlist& _bl, time_t _mtime = 0);
+};
+
+class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_obj obj;
+  string lock_name;
+  string cookie;
+  uint32_t duration_secs;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                        RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                       const string& _name, const string& _cookie, uint32_t _duration_secs);
+};
+
+class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  rgw_obj obj;
+  string lock_name;
+  string cookie;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                        RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
+                       const string& _name, const string& _cookie);
+};
+
+
+template <class T>
+class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWObjectCtx& obj_ctx;
+  bufferlist bl;
+
+  rgw_bucket pool;
+  string oid;
+
+  T *result;
+
+  RGWAsyncGetSystemObj *req;
+
+public:
+  RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     RGWObjectCtx& _obj_ctx,
+                     rgw_bucket& _pool, const string& _oid,
+                     T *_result) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados), store(_store),
+                                                obj_ctx(_obj_ctx),
+                                               pool(_pool), oid(_oid),
+                                               result(_result),
+                                                req(NULL) { }
+                                                         
+  ~RGWSimpleRadosReadCR() {
+    delete req;
+  }
+
+  int send_request();
+  int request_complete();
+
+  virtual int handle_data(T& data) {
+    return 0;
+  }
+};
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::send_request()
+{
+  rgw_obj obj = rgw_obj(pool, oid);
+  req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(),
+                                store, &obj_ctx, NULL,
+                                obj,
+                                &bl, 0, -1);
+  async_rados->queue(req);
+  return 0;
+}
+
+template <class T>
+int RGWSimpleRadosReadCR<T>::request_complete()
+{
+  int ret = req->get_ret_status();
+  retcode = ret;
+  if (ret != -ENOENT) {
+    if (ret < 0) {
+      return ret;
+    }
+    bufferlist::iterator iter = bl.begin();
+    try {
+      ::decode(*result, iter);
+    } catch (buffer::error& err) {
+      return -EIO;
+    }
+  } else {
+    *result = T();
+  }
+
+  return handle_data(*result);
+}
+
+template <class T>
+class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  bufferlist bl;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAsyncPutSystemObj *req;
+
+public:
+  RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid,
+                     const T& _data) : RGWSimpleCoroutine(_store->ctx()),
+                                                async_rados(_async_rados),
+                                               store(_store),
+                                               pool(_pool), oid(_oid),
+                                                req(NULL) {
+    ::encode(_data, bl);
+  }
+
+  ~RGWSimpleRadosWriteCR() {
+    delete req;
+  }
+
+  int send_request() {
+    rgw_obj obj = rgw_obj(pool, oid);
+    req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(),
+                                  store, NULL, obj, false, bl);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() {
+    return req->get_ret_status();
+  }
+};
+
+class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+  map<string, bufferlist> entries;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAioCompletionNotifier *cn;
+
+public:
+  RGWRadosSetOmapKeysCR(RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid,
+                     map<string, bufferlist>& _entries);
+  ~RGWRadosSetOmapKeysCR();
+
+  int send_request();
+  int request_complete();
+};
+
+class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
+  RGWRados *store;
+
+  string marker;
+  map<string, bufferlist> *entries;
+  int max_entries;
+
+  int rval;
+  librados::IoCtx ioctx;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAioCompletionNotifier *cn;
+
+public:
+  RGWRadosGetOmapKeysCR(RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid,
+                     const string& _marker,
+                     map<string, bufferlist> *_entries, int _max_entries);
+  ~RGWRadosGetOmapKeysCR();
+
+  int send_request();
+
+  int request_complete() {
+    return rval;
+  }
+};
+
+class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string lock_name;
+  string cookie;
+  uint32_t duration;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAsyncLockSystemObj *req;
+
+public:
+  RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+                     const string& _cookie,
+                     uint32_t _duration);
+  ~RGWSimpleRadosLockCR();
+
+  int send_request();
+  int request_complete();
+};
+
+class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string lock_name;
+  string cookie;
+
+  rgw_bucket pool;
+  string oid;
+
+  RGWAsyncUnlockSystemObj *req;
+
+public:
+  RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                     rgw_bucket& _pool, const string& _oid, const string& _lock_name,
+                     const string& _cookie);
+  ~RGWSimpleRadosUnlockCR();
+
+  int send_request();
+  int request_complete();
+};
+
+class RGWOmapAppend : public RGWConsumerCR<string> {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+
+  rgw_bucket pool;
+  string oid;
+
+  bool going_down;
+
+  int num_pending_entries;
+  list<string> pending_entries;
+
+  map<string, bufferlist> entries;
+public:
+  RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid);
+  int operate();
+  void flush_pending();
+  void append(const string& s);
+  void finish();
+};
+
+#endif
index 26769f3bfb305e1586b60c438063a822b44bb548..15f72d97fc76fd135cf5c641a5127faec23eb653 100644 (file)
@@ -10,6 +10,7 @@
 #include "rgw_metadata.h"
 #include "rgw_rest_conn.h"
 #include "rgw_tools.h"
+#include "rgw_cr_rados.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -60,217 +61,6 @@ void rgw_mdlog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
-class RGWAsyncRadosRequest {
-  RGWAioCompletionNotifier *notifier;
-
-  void *user_info;
-  int retcode;
-
-protected:
-  virtual int _send_request() = 0;
-public:
-  RGWAsyncRadosRequest(RGWAioCompletionNotifier *_cn) : notifier(_cn) {}
-  virtual ~RGWAsyncRadosRequest() {}
-
-  void send_request() {
-    retcode = _send_request();
-    notifier->cb();
-  }
-
-  int get_ret_status() { return retcode; }
-};
-
-
-class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  RGWObjectCtx *obj_ctx;
-  RGWRados::SystemObject::Read::GetObjState read_state;
-  RGWObjVersionTracker *objv_tracker;
-  rgw_obj obj;
-  bufferlist *pbl;
-  off_t ofs;
-  off_t end;
-protected:
-  int _send_request() {
-    return store->get_system_obj(*obj_ctx, read_state, objv_tracker, obj, *pbl, ofs, end, NULL);
-  }
-public:
-  RGWAsyncGetSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
-                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
-                       bufferlist *_pbl, off_t _ofs, off_t _end) : RGWAsyncRadosRequest(cn), store(_store), obj_ctx(_obj_ctx),
-                                                                   objv_tracker(_objv_tracker), obj(_obj), pbl(_pbl),
-                                                                 ofs(_ofs), end(_end) {
-  }
-};
-
-class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  RGWObjVersionTracker *objv_tracker;
-  rgw_obj obj;
-  bool exclusive;
-  bufferlist bl;
-  map<string, bufferlist> attrs;
-  time_t mtime;
-
-protected:
-  int _send_request() {
-    return store->put_system_obj(NULL, obj, bl.c_str(), bl.length(), exclusive,
-                                 NULL, attrs, objv_tracker, mtime);
-  }
-public:
-  RGWAsyncPutSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
-                       RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj, bool _exclusive,
-                       bufferlist& _bl, time_t _mtime = 0) : RGWAsyncRadosRequest(cn), store(_store),
-                                                                   objv_tracker(_objv_tracker), obj(_obj), exclusive(_exclusive),
-                                                                  bl(_bl), mtime(_mtime) {}
-};
-
-class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  rgw_obj obj;
-  string lock_name;
-  string cookie;
-  uint32_t duration_secs;
-
-protected:
-  int _send_request() {
-    librados::IoCtx ioctx;
-    librados::Rados *rados = store->get_rados_handle();
-    int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
-      return r;
-    }
-
-    rados::cls::lock::Lock l(lock_name);
-    utime_t duration(duration_secs, 0);
-    l.set_duration(duration);
-    l.set_cookie(cookie);
-
-    return l.lock_exclusive(&ioctx, obj.get_object());
-  }
-public:
-  RGWAsyncLockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
-                        RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
-                       const string& _name, const string& _cookie, uint32_t _duration_secs) : RGWAsyncRadosRequest(cn), store(_store),
-                                                                obj(_obj),
-                                                                lock_name(_name),
-                                                                cookie(_cookie),
-                                                               duration_secs(_duration_secs) {}
-};
-
-class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest {
-  RGWRados *store;
-  rgw_obj obj;
-  string lock_name;
-  string cookie;
-
-protected:
-  int _send_request() {
-    librados::IoCtx ioctx;
-    librados::Rados *rados = store->get_rados_handle();
-    int r = rados->ioctx_create(obj.bucket.name.c_str(), ioctx); /* system object only! */
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: failed to open pool (" << obj.bucket.name << ") ret=" << r << dendl;
-      return r;
-    }
-
-    rados::cls::lock::Lock l(lock_name);
-
-    l.set_cookie(cookie);
-
-    return l.unlock(&ioctx, obj.get_object());
-  }
-public:
-  RGWAsyncUnlockSystemObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
-                        RGWObjVersionTracker *_objv_tracker, rgw_obj& _obj,
-                       const string& _name, const string& _cookie) : RGWAsyncRadosRequest(cn), store(_store),
-                                               obj(_obj),
-                                               lock_name(_name), cookie(_cookie) {}
-};
-
-
-class RGWAsyncRadosProcessor {
-  deque<RGWAsyncRadosRequest *> m_req_queue;
-protected:
-  RGWRados *store;
-  ThreadPool m_tp;
-  Throttle req_throttle;
-
-  struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
-    RGWAsyncRadosProcessor *processor;
-    RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
-      : ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
-
-    bool _enqueue(RGWAsyncRadosRequest *req) {
-      processor->m_req_queue.push_back(req);
-      dout(20) << "enqueued request req=" << hex << req << dec << dendl;
-      _dump_queue();
-      return true;
-    }
-    void _dequeue(RGWAsyncRadosRequest *req) {
-      assert(0);
-    }
-    bool _empty() {
-      return processor->m_req_queue.empty();
-    }
-    RGWAsyncRadosRequest *_dequeue() {
-      if (processor->m_req_queue.empty())
-       return NULL;
-      RGWAsyncRadosRequest *req = processor->m_req_queue.front();
-      processor->m_req_queue.pop_front();
-      dout(20) << "dequeued request req=" << hex << req << dec << dendl;
-      _dump_queue();
-      return req;
-    }
-    using ThreadPool::WorkQueue<RGWAsyncRadosRequest>::_process;
-    void _process(RGWAsyncRadosRequest *req) {
-      processor->handle_request(req);
-      processor->req_throttle.put(1);
-    }
-    void _dump_queue() {
-      if (!g_conf->subsys.should_gather(ceph_subsys_rgw, 20)) {
-        return;
-      }
-      deque<RGWAsyncRadosRequest *>::iterator iter;
-      if (processor->m_req_queue.empty()) {
-        dout(20) << "RGWWQ: empty" << dendl;
-        return;
-      }
-      dout(20) << "RGWWQ:" << dendl;
-      for (iter = processor->m_req_queue.begin(); iter != processor->m_req_queue.end(); ++iter) {
-        dout(20) << "req: " << hex << *iter << dec << dendl;
-      }
-    }
-    void _clear() {
-      assert(processor->m_req_queue.empty());
-    }
-  } req_wq;
-
-public:
-  RGWAsyncRadosProcessor(RGWRados *_store, int num_threads)
-    : store(_store), m_tp(store->ctx(), "RGWAsyncRadosProcessor::m_tp", num_threads),
-      req_throttle(store->ctx(), "rgw_async_rados_ops", num_threads * 2),
-      req_wq(this, g_conf->rgw_op_thread_timeout,
-            g_conf->rgw_op_thread_suicide_timeout, &m_tp) {}
-  ~RGWAsyncRadosProcessor() {}
-  void start() {
-    m_tp.start();
-  }
-  void stop() {
-    m_tp.drain(&req_wq);
-    m_tp.stop();
-  }
-  void handle_request(RGWAsyncRadosRequest *req) {
-    req->send_request();
-  }
-
-  void queue(RGWAsyncRadosRequest *req) {
-    req_throttle.get(1);
-    req_wq.queue(req);
-  }
-};
-
 int RGWRemoteMetaLog::read_log_info(rgw_mdlog_info *log_info)
 {
   rgw_http_param_pair pairs[] = { { "type", "metadata" },
@@ -437,293 +227,6 @@ string RGWMetaSyncStatusManager::shard_obj_name(int shard_id)
   return string(buf);
 }
 
-template <class T>
-class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-  RGWObjectCtx& obj_ctx;
-  bufferlist bl;
-
-  rgw_bucket pool;
-  string oid;
-
-  T *result;
-
-  RGWAsyncGetSystemObj *req;
-
-public:
-  RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     RGWObjectCtx& _obj_ctx,
-                     rgw_bucket& _pool, const string& _oid,
-                     T *_result) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados), store(_store),
-                                                obj_ctx(_obj_ctx),
-                                               pool(_pool), oid(_oid),
-                                               result(_result),
-                                                req(NULL) { }
-                                                         
-  ~RGWSimpleRadosReadCR() {
-    delete req;
-  }
-
-  int send_request();
-  int request_complete();
-
-  virtual int handle_data(T& data) {
-    return 0;
-  }
-};
-
-template <class T>
-int RGWSimpleRadosReadCR<T>::send_request()
-{
-  rgw_obj obj = rgw_obj(pool, oid);
-  req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(),
-                                store, &obj_ctx, NULL,
-                                obj,
-                                &bl, 0, -1);
-  async_rados->queue(req);
-  return 0;
-}
-
-template <class T>
-int RGWSimpleRadosReadCR<T>::request_complete()
-{
-  int ret = req->get_ret_status();
-  retcode = ret;
-  if (ret != -ENOENT) {
-    if (ret < 0) {
-      return ret;
-    }
-    bufferlist::iterator iter = bl.begin();
-    try {
-      ::decode(*result, iter);
-    } catch (buffer::error& err) {
-      ldout(store->ctx(), 0) << "ERROR: failed to decode global mdlog status" << dendl;
-    }
-  } else {
-    *result = T();
-  }
-
-  return handle_data(*result);
-}
-
-template <class T>
-class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-  bufferlist bl;
-
-  rgw_bucket pool;
-  string oid;
-
-  RGWAsyncPutSystemObj *req;
-
-public:
-  RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     rgw_bucket& _pool, const string& _oid,
-                     const T& _data) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados),
-                                               store(_store),
-                                               pool(_pool), oid(_oid),
-                                                req(NULL) {
-    ::encode(_data, bl);
-  }
-
-  ~RGWSimpleRadosWriteCR() {
-    delete req;
-  }
-
-  int send_request() {
-    rgw_obj obj = rgw_obj(pool, oid);
-    req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(),
-                                  store, NULL, obj, false, bl);
-    async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() {
-    return req->get_ret_status();
-  }
-};
-
-class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWRados *store;
-  map<string, bufferlist> entries;
-
-  rgw_bucket pool;
-  string oid;
-
-  RGWAioCompletionNotifier *cn;
-
-public:
-  RGWRadosSetOmapKeysCR(RGWRados *_store,
-                     rgw_bucket& _pool, const string& _oid,
-                     map<string, bufferlist>& _entries) : RGWSimpleCoroutine(_store->ctx()),
-                                               store(_store),
-                                               entries(_entries),
-                                               pool(_pool), oid(_oid), cn(NULL) {
-  }
-
-  ~RGWRadosSetOmapKeysCR() {
-    cn->put();
-  }
-
-  int send_request() {
-    librados::IoCtx ioctx;
-    librados::Rados *rados = store->get_rados_handle();
-    int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
-      return r;
-    }
-
-    librados::ObjectWriteOperation op;
-    op.omap_set(entries);
-
-    cn = stack->create_completion_notifier();
-    cn->get();
-    return ioctx.aio_operate(oid, cn->completion(), &op);
-  }
-
-  int request_complete() {
-    return cn->completion()->get_return_value();
-
-  }
-};
-
-class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
-  RGWRados *store;
-
-  string marker;
-  map<string, bufferlist> *entries;
-  int max_entries;
-
-  int rval;
-  librados::IoCtx ioctx;
-
-  rgw_bucket pool;
-  string oid;
-
-  RGWAioCompletionNotifier *cn;
-
-public:
-  RGWRadosGetOmapKeysCR(RGWRados *_store,
-                     rgw_bucket& _pool, const string& _oid,
-                     const string& _marker,
-                     map<string, bufferlist> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
-                                               store(_store),
-                                               marker(_marker),
-                                               entries(_entries), max_entries(_max_entries), rval(0),
-                                               pool(_pool), oid(_oid), cn(NULL) {
-  }
-
-  ~RGWRadosGetOmapKeysCR() {
-  }
-
-  int send_request() {
-    librados::Rados *rados = store->get_rados_handle();
-    int r = rados->ioctx_create(pool.name.c_str(), ioctx); /* system object only! */
-    if (r < 0) {
-      lderr(store->ctx()) << "ERROR: failed to open pool (" << pool.name << ") ret=" << r << dendl;
-      return r;
-    }
-
-    librados::ObjectReadOperation op;
-    op.omap_get_vals(marker, max_entries, entries, &rval);
-
-    cn = stack->create_completion_notifier();
-    return ioctx.aio_operate(oid, cn->completion(), &op, NULL);
-  }
-
-  int request_complete() {
-    return rval;
-  }
-};
-
-class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-  string lock_name;
-  string cookie;
-  uint32_t duration;
-
-  rgw_bucket pool;
-  string oid;
-
-  RGWAsyncLockSystemObj *req;
-
-public:
-  RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     rgw_bucket& _pool, const string& _oid, const string& _lock_name,
-                     const string& _cookie,
-                     uint32_t _duration) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados),
-                                               store(_store),
-                                               lock_name(_lock_name),
-                                               cookie(_cookie),
-                                               duration(_duration),
-                                               pool(_pool), oid(_oid),
-                                                req(NULL) {
-  }
-
-  ~RGWSimpleRadosLockCR() {
-    delete req;
-  }
-
-  int send_request() {
-    rgw_obj obj = rgw_obj(pool, oid);
-    req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(),
-                                  store, NULL, obj, lock_name, cookie, duration);
-    async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() {
-    return req->get_ret_status();
-  }
-};
-
-class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-  string lock_name;
-  string cookie;
-
-  rgw_bucket pool;
-  string oid;
-
-  RGWAsyncUnlockSystemObj *req;
-
-public:
-  RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
-                     rgw_bucket& _pool, const string& _oid, const string& _lock_name,
-                     const string& _cookie) : RGWSimpleCoroutine(_store->ctx()),
-                                                async_rados(_async_rados),
-                                               store(_store),
-                                               lock_name(_lock_name),
-                                               cookie(_cookie),
-                                               pool(_pool), oid(_oid),
-                                                req(NULL) {
-  }
-
-  ~RGWSimpleRadosUnlockCR() {
-    delete req;
-  }
-
-  int send_request() {
-    rgw_obj obj = rgw_obj(pool, oid);
-    req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(),
-                                  store, NULL, obj, lock_name, cookie);
-    async_rados->queue(req);
-    return 0;
-  }
-
-  int request_complete() {
-    return req->get_ret_status();
-  }
-};
-
 class RGWReadMDLogShardInfo : public RGWSimpleCoroutine {
   RGWRados *store;
   RGWMetadataLog *mdlog;
@@ -1108,75 +611,6 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data)
   return 0;
 }
 
-class RGWOmapAppend : public RGWConsumerCR<string> {
-  RGWAsyncRadosProcessor *async_rados;
-  RGWRados *store;
-
-  rgw_bucket pool;
-  string oid;
-
-  bool going_down;
-
-  int num_pending_entries;
-  list<string> pending_entries;
-
-  map<string, bufferlist> entries;
-public:
-
-  RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, rgw_bucket& _pool, const string& _oid)
-                      : RGWConsumerCR<string>(_store->ctx()), async_rados(_async_rados),
-                       store(_store), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {}
-
-#define OMAP_APPEND_MAX_ENTRIES 100
-  int operate() {
-    reenter(this) {
-      for (;;) {
-       if (!has_product() && going_down) {
-         break;
-       }
-        yield wait_for_product();
-        yield {
-         string entry;
-         while (consume(&entry)) {
-           entries[entry] = bufferlist();
-           if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) {
-             break;
-           }
-         }
-         if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) {
-           call(new RGWRadosSetOmapKeysCR(store, pool, oid, entries));
-           entries.clear();
-         }
-       }
-        if (get_ret_status() < 0) {
-         return set_state(RGWCoroutine_Error);
-        }
-      }
-      /* done with coroutine */
-      return set_state(RGWCoroutine_Done);
-    }
-    return 0;
-  }
-
-  void flush_pending() {
-    receive(pending_entries);
-    num_pending_entries = 0;
-  }
-
-  void append(const string& s) {
-    pending_entries.push_back(s);
-    if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) {
-      flush_pending();
-    }
-  }
-
-  void finish() {
-    going_down = true;
-    flush_pending();
-    set_sleeping(false);
-  }
-};
-
 class RGWShardedOmapCRManager {
   RGWAsyncRadosProcessor *async_rados;
   RGWRados *store;