]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: svc_notify: initial work
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 5 Sep 2018 10:11:16 +0000 (03:11 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 8 Nov 2018 17:19:29 +0000 (09:19 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
13 files changed:
src/rgw/CMakeLists.txt
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/services/svc_notify.cc [new file with mode: 0644]
src/rgw/services/svc_notify.h [new file with mode: 0644]
src/rgw/services/svc_rados.cc
src/rgw/services/svc_rados.h
src/rgw/services/svc_sys_obj.cc
src/rgw/services/svc_sys_obj.h
src/rgw/services/svc_sys_obj_cache.cc
src/rgw/services/svc_sys_obj_cache.h
src/rgw/services/svc_sys_obj_core.cc
src/rgw/services/svc_sys_obj_core.h

index 703375cad266f14cfedf6198604cf6b3e2c10edb..24046a12b4289f524d6f6d6d8b3a35dd856a4d16 100644 (file)
@@ -39,6 +39,7 @@ function(gperf_generate input output)
 endfunction()
 
 set(librgw_common_srcs
+  services/svc_notify.cc
   services/svc_quota.cc
   services/svc_rados.cc
   services/svc_sys_obj.cc
index f081cf591380064c99e7049604cb47bbb1cd2855..80a013dde697970c58e36f87f9e4088d830853df 100644 (file)
@@ -605,176 +605,6 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct)
 }
 
 
-int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
-  int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
-  if (r < 0)
-    return r;
-  return 0;
-}
-
-int RGWRados::aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) {
-  int r = control_pool_ctx.aio_watch2(oid, c, watch_handle, ctx, 0);
-  if (r < 0)
-    return r;
-  return 0;
-}
-
-int RGWRados::unwatch(uint64_t watch_handle)
-{
-  int r = control_pool_ctx.unwatch2(watch_handle);
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
-    return r;
-  }
-  r = rados[0].watch_flush();
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
-    return r;
-  }
-  return 0;
-}
-
-void RGWRados::add_watcher(int i)
-{
-  ldout(cct, 20) << "add_watcher() i=" << i << dendl;
-  Mutex::Locker l(watchers_lock);
-  watchers_set.insert(i);
-  if (watchers_set.size() ==  (size_t)num_watchers) {
-    ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
-    set_cache_enabled(true);
-  }
-}
-
-void RGWRados::remove_watcher(int i)
-{
-  ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
-  Mutex::Locker l(watchers_lock);
-  size_t orig_size = watchers_set.size();
-  watchers_set.erase(i);
-  if (orig_size == (size_t)num_watchers &&
-      watchers_set.size() < orig_size) { /* actually removed */
-    ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
-    set_cache_enabled(false);
-  }
-}
-
-class RGWWatcher : public librados::WatchCtx2 {
-  RGWRados *rados;
-  int index;
-  string oid;
-  uint64_t watch_handle;
-  int register_ret{0};
-  librados::AioCompletion *register_completion{nullptr};
-
-  class C_ReinitWatch : public Context {
-    RGWWatcher *watcher;
-    public:
-      explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
-      void finish(int r) override {
-        watcher->reinit();
-      }
-  };
-public:
-  RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {}
-  void handle_notify(uint64_t notify_id,
-                    uint64_t cookie,
-                    uint64_t notifier_id,
-                    bufferlist& bl) override {
-    ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() "
-                           << " notify_id " << notify_id
-                           << " cookie " << cookie
-                           << " notifier " << notifier_id
-                           << " bl.length()=" << bl.length() << dendl;
-
-    if (unlikely(rados->inject_notify_timeout_probability == 1) ||
-       (rados->inject_notify_timeout_probability > 0 &&
-        (rados->inject_notify_timeout_probability >
-         ceph::util::generate_random_number(0.0, 1.0)))) {
-      ldout(rados->ctx(), 0)
-       << "RGWWatcher::handle_notify() dropping notification! "
-       << "If this isn't what you want, set "
-       << "rgw_inject_notify_timeout_probability to zero!" << dendl;
-      return;
-    }
-
-
-
-    rados->watch_cb(notify_id, cookie, notifier_id, bl);
-
-    bufferlist reply_bl; // empty reply payload
-    rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl);
-  }
-  void handle_error(uint64_t cookie, int err) override {
-    lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
-                       << " err " << cpp_strerror(err) << dendl;
-    rados->remove_watcher(index);
-    rados->schedule_context(new C_ReinitWatch(this));
-  }
-
-  void reinit() {
-    int ret = unregister_watch();
-    if (ret < 0) {
-      ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
-      return;
-    }
-    ret = register_watch();
-    if (ret < 0) {
-      ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
-      return;
-    }
-  }
-
-  int unregister_watch() {
-    int r = rados->unwatch(watch_handle);
-    if (r < 0) {
-      return r;
-    }
-    rados->remove_watcher(index);
-    return 0;
-  }
-
-  int register_watch_async() {
-    if (register_completion) {
-      register_completion->release();
-      register_completion = nullptr;
-    }
-    register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
-    register_ret = rados->aio_watch(oid, &watch_handle, this, register_completion);
-    if (register_ret < 0) {
-      register_completion->release();
-      return register_ret;
-    }
-    return 0;
-  }
-
-  int register_watch_finish() {
-    if (register_ret < 0) {
-      return register_ret;
-    }
-    if (!register_completion) {
-      return -EINVAL;
-    }
-    register_completion->wait_for_safe();
-    int r = register_completion->get_return_value();
-    register_completion->release();
-    register_completion = nullptr;
-    if (r < 0) {
-      return r;
-    }
-    rados->add_watcher(index);
-    return 0;
-  }
-
-  int register_watch() {
-    int r = rados->watch(oid, &watch_handle, this);
-    if (r < 0) {
-      return r;
-    }
-    rados->add_watcher(index);
-    return 0;
-  }
-};
-
 class RGWMetaNotifierManager : public RGWCoroutinesManager {
   RGWRados *store;
   RGWHTTPManager http_manager;
@@ -1698,14 +1528,6 @@ int RGWRados::init_complete()
   period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
                                             svc.zone->get_current_period()));
 
-  if (need_watch_notify()) {
-    ret = init_watch();
-    if (ret < 0) {
-      lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
-      return ret;
-    }
-  }
-
   ret = open_root_pool_ctx();
   if (ret < 0)
     return ret;
@@ -1924,18 +1746,6 @@ int RGWRados::initialize()
   return init_complete();
 }
 
-void RGWRados::finalize_watch()
-{
-  for (int i = 0; i < num_watchers; i++) {
-    RGWWatcher *watcher = watchers[i];
-    watcher->unregister_watch();
-    delete watcher;
-  }
-
-  delete[] notify_oids;
-  delete[] watchers;
-}
-
 void RGWRados::schedule_context(Context *c) {
   finisher->queue(c);
 }
@@ -1991,80 +1801,6 @@ int RGWRados::open_reshard_pool_ctx()
   return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().reshard_pool, reshard_pool_ctx, true);
 }
 
-int RGWRados::init_watch()
-{
-#warning needs to be part of the sysobj/cache service
-  int r = rgw_init_ioctx(&rados[0], svc.zone->get_zone_params().control_pool, control_pool_ctx, true);
-  if (r < 0) {
-    return r;
-  }
-
-  num_watchers = cct->_conf->rgw_num_control_oids;
-
-  bool compat_oid = (num_watchers == 0);
-
-  if (num_watchers <= 0)
-    num_watchers = 1;
-
-  notify_oids = new string[num_watchers];
-  watchers = new RGWWatcher *[num_watchers];
-
-  int error = 0;
-
-  for (int i=0; i < num_watchers; i++) {
-    string& notify_oid = notify_oids[i];
-    notify_oid = notify_oid_prefix;
-    if (!compat_oid) {
-      char buf[16];
-      snprintf(buf, sizeof(buf), ".%d", i);
-      notify_oid.append(buf);
-    }
-    r = control_pool_ctx.create(notify_oid, false);
-    if (r < 0 && r != -EEXIST)
-      return r;
-
-    RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid);
-    watchers[i] = watcher;
-
-    r = watcher->register_watch_async();
-    if (r < 0) {
-      ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
-      error = r;
-      continue;
-    }
-  }
-
-  for (int i = 0; i < num_watchers; ++i) {
-    int r = watchers[i]->register_watch_finish();
-    if (r < 0) {
-      ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
-      error = r;
-    }
-  }
-
-  if (error < 0) {
-    return error;
-  }
-
-  watch_initialized = true;
-
-  set_cache_enabled(true);
-
-  return 0;
-}
-
-void RGWRados::pick_control_oid(const string& key, string& notify_oid)
-{
-  uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
-
-  int i = r % num_watchers;
-  char buf[16];
-  snprintf(buf, sizeof(buf), ".%d", i);
-
-  notify_oid = notify_oid_prefix;
-  notify_oid.append(buf);
-}
-
 int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
 {
   constexpr bool create = true; // create the pool if it doesn't exist
@@ -9264,29 +9000,20 @@ int RGWRados::append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl)
 
 int RGWRados::distribute(const string& key, bufferlist& bl)
 {
-  /*
-   * we were called before watch was initialized. This can only happen if we're updating some system
-   * config object (e.g., zone info) during init. Don't try to distribute the cache info for these
-   * objects, they're currently only read on startup anyway.
-   */
-  if (!watch_initialized)
-    return 0;
-
-  string notify_oid;
-  pick_control_oid(key, notify_oid);
+  RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
 
   ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
   return robust_notify(notify_oid, bl);
 }
 
-int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl)
+int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
 {
   // The reply of every machine that acks goes in here.
   boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
   bufferlist rbl;
 
   // First, try to send, without being fancy about it.
-  auto r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+  auto r = notify_obj.notify(bl, 0, &rbl);
 
   // If that doesn't work, get serious.
   if (r < 0) {
@@ -9326,7 +9053,7 @@ int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl)
       rbl.clear();
       // Reset the timeouts, we're only concerned with new ones.
       timeouts.clear();
-      r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+      r = notify_obj.notify(bl, 0, &rbl);
       if (r < 0) {
        ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
                      << cpp_strerror(-r) << dendl;
index 761676817cc8aa2c968aea4ed2619be2461e9b24..ca43ae8ddd717c4c896f86691c47afbce053ffa0 100644 (file)
@@ -1216,7 +1216,6 @@ class RGWRados : public AdminSocketHook
 
   std::atomic<int64_t> max_req_id = { 0 };
   Mutex lock;
-  Mutex watchers_lock;
   SafeTimer *timer;
 
   RGWGC *gc;
@@ -1242,12 +1241,7 @@ class RGWRados : public AdminSocketHook
   Mutex meta_sync_thread_lock;
   Mutex data_sync_thread_lock;
 
-  int num_watchers;
-  RGWWatcher **watchers;
-  std::set<int> watchers_set;
   librados::IoCtx root_pool_ctx;      // .rgw
-  librados::IoCtx control_pool_ctx;   // .rgw.control
-  bool watch_initialized;
 
   double inject_notify_timeout_probability = 0;
   unsigned max_notify_retries = 0;
@@ -1312,13 +1306,11 @@ protected:
   RGWServiceRegistryRef svc_registry;
   RGWIndexCompletionManager *index_completion_manager{nullptr};
 public:
-  RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
+  RGWRados() : lock("rados_timer_lock"), timer(NULL),
                gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
                run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
                data_notifier(NULL), meta_sync_processor_thread(NULL),
                meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
-               num_watchers(0), watchers(NULL),
-               watch_initialized(false),
                bucket_id_lock("rados_bucket_id"),
                bucket_index_max_shards(0),
                max_bucket_id(0), cct(NULL),
diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc
new file mode 100644 (file)
index 0000000..9e1c3f3
--- /dev/null
@@ -0,0 +1,320 @@
+#include "include/random.h"
+#include "common/errno.h"
+
+#include "svc_notify.h"
+#include "svc_zone.h"
+#include "svc_rados.h"
+
+#include "rgw/rgw_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static string notify_oid_prefix = "notify";
+
+class RGWWatcher : public librados::WatchCtx2 {
+  CephContext *cct;
+  RGWSI_Notify *svc;
+  int index;
+  RGWSI_RADOS::Obj obj;
+  uint64_t watch_handle;
+  int register_ret{0};
+  librados::AioCompletion *register_completion{nullptr};
+
+  class C_ReinitWatch : public Context {
+    RGWWatcher *watcher;
+    public:
+      explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
+      void finish(int r) override {
+        watcher->reinit();
+      }
+  };
+public:
+  RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
+  void handle_notify(uint64_t notify_id,
+                    uint64_t cookie,
+                    uint64_t notifier_id,
+                    bufferlist& bl) override {
+    ldout(cct, 10) << "RGWWatcher::handle_notify() "
+                   << " notify_id " << notify_id
+                   << " cookie " << cookie
+                   << " notifier " << notifier_id
+                   << " bl.length()=" << bl.length() << dendl;
+
+    if (unlikely(svc->inject_notify_timeout_probability == 1) ||
+       (svc->inject_notify_timeout_probability > 0 &&
+         (svc->inject_notify_timeout_probability >
+         ceph::util::generate_random_number(0.0, 1.0)))) {
+      ldout(cct, 0)
+       << "RGWWatcher::handle_notify() dropping notification! "
+       << "If this isn't what you want, set "
+       << "rgw_inject_notify_timeout_probability to zero!" << dendl;
+      return;
+    }
+
+    svc->watch_cb(notify_id, cookie, notifier_id, bl);
+
+    bufferlist reply_bl; // empty reply payload
+    obj.notify_ack(notify_id, cookie, reply_bl);
+  }
+  void handle_error(uint64_t cookie, int err) override {
+    lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
+                       << " err " << cpp_strerror(err) << dendl;
+    svc->remove_watcher(index);
+    svc->schedule_context(new C_ReinitWatch(this));
+  }
+
+  void reinit() {
+    int ret = unregister_watch();
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
+      return;
+    }
+    ret = register_watch();
+    if (ret < 0) {
+      ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
+      return;
+    }
+  }
+
+  int unregister_watch() {
+    int r = svc->unwatch(obj, watch_handle);
+    if (r < 0) {
+      return r;
+    }
+    svc->remove_watcher(index);
+    return 0;
+  }
+
+  int register_watch_async() {
+    if (register_completion) {
+      register_completion->release();
+      register_completion = nullptr;
+    }
+    register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+    register_ret = obj.aio_watch(register_completion, &watch_handle, this);
+    if (register_ret < 0) {
+      register_completion->release();
+      return register_ret;
+    }
+    return 0;
+  }
+
+  int register_watch_finish() {
+    if (register_ret < 0) {
+      return register_ret;
+    }
+    if (!register_completion) {
+      return -EINVAL;
+    }
+    register_completion->wait_for_safe();
+    int r = register_completion->get_return_value();
+    register_completion->release();
+    register_completion = nullptr;
+    if (r < 0) {
+      return r;
+    }
+    svc->add_watcher(index);
+    return 0;
+  }
+
+  int register_watch() {
+    int r = obj.watch(&watch_handle, this);
+    if (r < 0) {
+      return r;
+    }
+    svc->add_watcher(index);
+    return 0;
+  }
+};
+
+int RGWS_Notify::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+  instance->reset(new RGWSI_Notify(this, cct));
+  return 0;
+}
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_Notify::get_deps()
+{
+  map<string, RGWServiceInstance::dependency> deps;
+  deps["zone_dep"] = { .name = "zone",
+                       .conf = "{}" };
+  deps["rados_dep"] = { .name = "rados",
+                        .conf = "{}" };
+  return deps;
+}
+
+int RGWSI_Notify::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
+{
+  zone_svc = static_pointer_cast<RGWSI_Zone>(dep_refs["zone_dep"]);
+  assert(zone_svc);
+  rados_svc = static_pointer_cast<RGWSI_RADOS>(dep_refs["rados_dep"]);
+  assert(rados_svc);
+  return 0;
+}
+
+string RGWSI_Notify::get_control_oid(int i)
+{
+  char buf[notify_oid_prefix.size() + 16];
+  snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
+
+  return string(buf);
+}
+
+RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
+{
+  uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
+
+  int i = r % num_watchers;
+  return notify_objs[i];
+}
+
+int RGWSI_Notify::init_watch()
+{
+  num_watchers = cct->_conf->rgw_num_control_oids;
+
+  bool compat_oid = (num_watchers == 0);
+
+  if (num_watchers <= 0)
+    num_watchers = 1;
+
+  watchers = new RGWWatcher *[num_watchers];
+
+  int error = 0;
+
+  notify_objs.resize(num_watchers);
+
+  for (int i=0; i < num_watchers; i++) {
+    string notify_oid;
+
+    if (!compat_oid) {
+      notify_oid = get_control_oid(i);
+    } else {
+      notify_oid = notify_oid_prefix;
+    }
+
+    notify_objs[i] = rados_svc->obj({control_pool, notify_oid});
+    auto& notify_obj = notify_objs[i];
+
+    librados::ObjectWriteOperation op;
+    op.create(false);
+    int r = notify_obj.operate(&op);
+    if (r < 0 && r != -EEXIST) {
+      return r;
+    }
+
+    RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
+    watchers[i] = watcher;
+
+    r = watcher->register_watch_async();
+    if (r < 0) {
+      ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
+      error = r;
+      continue;
+    }
+  }
+
+  for (int i = 0; i < num_watchers; ++i) {
+    int r = watchers[i]->register_watch_finish();
+    if (r < 0) {
+      ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
+      error = r;
+    }
+  }
+
+  if (error < 0) {
+    return error;
+  }
+
+  return 0;
+}
+
+void RGWSI_Notify::finalize_watch()
+{
+  for (int i = 0; i < num_watchers; i++) {
+    RGWWatcher *watcher = watchers[i];
+    watcher->unregister_watch();
+    delete watcher;
+  }
+
+  delete[] watchers;
+}
+
+int RGWSI_Notify::init()
+{
+  control_pool = zone_svc->get_zone_params().control_pool;
+
+  int ret = init_watch();
+  if (ret < 0) {
+    lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+void RGWSI_Notify::shutdown()
+{
+  finalize_watch();
+}
+
+int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx)
+{
+  int r = obj.watch(watch_handle, ctx);
+  if (r < 0)
+    return r;
+  return 0;
+}
+
+int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c)
+{
+  int r = obj.aio_watch(c, watch_handle, ctx, 0);
+  if (r < 0)
+    return r;
+  return 0;
+}
+
+int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
+{
+  int r = obj.unwatch(watch_handle);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
+    return r;
+  }
+  r = rados[0].watch_flush();
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
+    return r;
+  }
+  return 0;
+}
+
+void RGWSI_Notify::add_watcher(int i)
+{
+  ldout(cct, 20) << "add_watcher() i=" << i << dendl;
+  Mutex::Locker l(watchers_lock);
+  watchers_set.insert(i);
+  if (watchers_set.size() ==  (size_t)num_watchers) {
+    ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
+#warning fixme
+#if 0
+    set_cache_enabled(true);
+#endif
+  }
+}
+
+void RGWSI_Notify::remove_watcher(int i)
+{
+  ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
+  Mutex::Locker l(watchers_lock);
+  size_t orig_size = watchers_set.size();
+  watchers_set.erase(i);
+  if (orig_size == (size_t)num_watchers &&
+      watchers_set.size() < orig_size) { /* actually removed */
+    ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
+#warning fixme
+#if 0
+    set_cache_enabled(false);
+#endif
+  }
+}
+
diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h
new file mode 100644 (file)
index 0000000..d1118cb
--- /dev/null
@@ -0,0 +1,63 @@
+#ifndef CEPH_RGW_SERVICES_NOTIFY_H
+#define CEPH_RGW_SERVICES_NOTIFY_H
+
+
+#include "rgw/rgw_service.h"
+
+#include "svc_rados.h"
+
+
+class RGWSI_Zone;
+
+class RGWWatcher;
+
+class RGWS_Notify : public RGWService
+{
+public:
+  RGWS_Notify(CephContext *cct) : RGWService(cct, "quota") {}
+
+  int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
+class RGWSI_Notify : public RGWServiceInstance
+{
+  std::shared_ptr<RGWSI_Zone> zone_svc;
+  std::shared_ptr<RGWSI_RADOS> rados_svc;
+
+  std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
+  int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
+
+  Mutex watchers_lock{"watchers_lock"};
+  rgw_pool control_pool;
+
+  int num_watchers{0};
+  RGWWatcher **watchers{nullptr};
+  std::set<int> watchers_set;
+  vector<RGWSI_RADOS::Obj> notify_objs;
+
+  double inject_notify_timeout_probability{0};
+  unsigned max_notify_retries{0};
+
+  friend class RGWWatcher;
+
+  string get_control_oid(int i);
+  RGWSI_RADOS::Obj pick_control_obj(const string& key);
+
+  int init_watch();
+  void finalize_watch();
+
+  int init() override;
+  void shutdown() override;
+
+  int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
+  int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
+  int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle);
+  void add_watcher(int i);
+  void remove_watcher(int i);
+public:
+  RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+
+};
+
+#endif
+
index 9d4eb227105fae94c9235a6ad9879319c986a00b..192734e7e818b87424dc1aeb9405a85411f4533b 100644 (file)
@@ -163,6 +163,21 @@ int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectWr
   return ref.ioctx.aio_operate(ref.oid, c, op);
 }
 
+int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx)
+{
+  return ref.ioctx.watch2(ref.oid, handle, ctx);
+}
+
+int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
+{
+  return ref.ioctx.aio_watch(ref.oid, c, handle, ctx);
+}
+
+int RGWSI_RADOS::Obj::unwatch(uint64_t handle)
+{
+  return ref.ioctx.unwatch2(handle);
+}
+
 uint64_t RGWSI_RADOS::Obj::get_last_version()
 {
   return ref.ioctx.get_last_version();
index 7cafb78690124afce1d8d3a748b5b0387310732c..1949a3ad4b4e1121262fec958beed48a7513bd04 100644 (file)
@@ -92,6 +92,10 @@ public:
     int operate(librados::ObjectReadOperation *op, bufferlist *pbl);
     int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op);
 
+    int watch(uint64_t *handle, librados::WatchCtx2 *ctx);
+    int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx);
+    int unwatch(uint64_t handle);
+
     uint64_t get_last_version();
   };
 
index 722c43a46f1fba96319928bba5a3ae1148a89bdd..e0b54fb508e21ee9fcbef14d694e7d0adc6272b2 100644 (file)
@@ -68,6 +68,7 @@ int RGWSI_SysObj::Obj::ROp::read(int64_t ofs, int64_t end, bufferlist *bl)
                    objv_tracker,
                    obj, bl, ofs, end,
                    attrs,
+                   cache_info,
                    refresh_version);
 }
 
@@ -111,7 +112,7 @@ int RGWSI_SysObj::Obj::WOp::write_attrs()
   RGWSI_SysObj_Core *svc = source.core_svc;
   rgw_raw_obj& obj = source.get_obj();
 
-  return svc->set_attrs(obj, attrs, objv_tracker);
+  return svc->set_attrs(obj, attrs, nullptr, objv_tracker);
 }
 
 int RGWSI_SysObj::Pool::Op::list_prefixed_objs(const string& prefix, list<string> *result)
index c85b5b2607d83dd4a37d941838ec20a28e1db5fa..2a7432da33b849f928b69516e59473c74914383d 100644 (file)
@@ -12,6 +12,8 @@ class RGWSI_Zone;
 class RGWSI_SysObj;
 class RGWSysObjectCtx;
 
+struct rgw_cache_entry_info;
+
 class RGWS_SysObj : public RGWService
 {
 public:
@@ -59,6 +61,7 @@ public:
       boost::optional<obj_version> refresh_version{boost::none};
       ceph::real_time *lastmod{nullptr};
       uint64_t *obj_size{nullptr};
+      rgw_cache_entry_info *cache_info{nullptr};
 
       ROp& set_last_mod(ceph::real_time *_lastmod) {
         lastmod = _lastmod;
@@ -80,6 +83,11 @@ public:
         return *this;
       }
 
+      ROp& set_cache_info(rgw_cache_entry_info *ci) {
+        cache_info = ci;
+        return *this;
+      }
+
       ROp(Obj& _source) : source(_source) {}
 
       int stat();
index 7f2c55a2963dc963b48665d19583606f90e6bc30..5acea09c0bd9e3af5cba58a28d8d008375b177a5 100644 (file)
@@ -1,4 +1,25 @@
 #include "svc_sys_obj_cache.h"
+#include "svc_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static string normal_name(rgw_pool& pool, const std::string& oid) {
+  std::string buf;
+  buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
+  buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
+  return buf;
+}
+
+void RGWSI_SysObj_Cache::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
+{
+  if (src_obj.size()) {
+    dst_pool = src_pool;
+    dst_obj = src_obj;
+  } else {
+    dst_pool = zone_svc->get_zone_params().domain_root;
+    dst_obj = src_pool.name;
+  }
+}
 
 
 int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
@@ -58,7 +79,7 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
 
     obl->clear();
 
-    i.copy_all(obl);
+    i.copy_all(*obl);
     if (objv_tracker)
       objv_tracker->read_version = info.version;
     if (attrs)
@@ -101,6 +122,7 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx,
 
 int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, 
                                   map<string, bufferlist>& attrs,
+                                  map<string, bufferlist> *rmattrs,
                                   RGWObjVersionTracker *objv_tracker) 
 {
   rgw_pool pool;
@@ -108,6 +130,9 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj,
   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
   ObjectCacheInfo info;
   info.xattrs = attrs;
+  if (rmattrs) {
+    info.rm_xattrs = *rmattrs;
+  }
   info.status = 0;
   info.flags = CACHE_FLAG_MODIFY_XATTRS;
   if (objv_tracker) {
@@ -120,7 +145,7 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj,
     cache.put(name, info, NULL);
     int r = distribute_cache(name, obj, info, UPDATE_OBJ);
     if (r < 0)
-      mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+      ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
   } else {
     cache.remove(name);
   }
@@ -128,13 +153,13 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj,
   return ret;
 }
 
-int RGWSI_SysObjCache::write(rgw_raw_obj& obj,
+int RGWSI_SysObj_Cache::write(rgw_raw_obj& obj,
                              real_time *pmtime,
                              map<std::string, bufferlist>& attrs,
                              bool exclusive,
                              const bufferlist& data,
                              RGWObjVersionTracker *objv_tracker,
-                             real_time set_mtime) override;
+                             real_time set_mtime)
 {
   rgw_pool pool;
   string oid;
@@ -170,7 +195,7 @@ int RGWSI_SysObjCache::write(rgw_raw_obj& obj,
     if (!exclusive) {
       int r = distribute_cache(name, obj, info, UPDATE_OBJ);
       if (r < 0)
-       mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+       ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
     }
   } else {
     cache.remove(name);
@@ -180,7 +205,7 @@ int RGWSI_SysObjCache::write(rgw_raw_obj& obj,
 }
 
 int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
-                                   const bufferlist& bl,
+                                   const bufferlist& data,
                                    bool exclusive,
                                    RGWObjVersionTracker *objv_tracker)
 {
@@ -196,13 +221,13 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
     info.version = objv_tracker->write_version;
     info.flags |= CACHE_FLAG_OBJV;
   }
-  int ret = RGWSI_SysObj_Core::write_data(obj, data, ofs, exclusive, objv_tracker);
+  int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker);
   string name = normal_name(pool, oid);
   if (ret >= 0) {
     cache.put(name, info, NULL);
     int r = distribute_cache(name, obj, info, UPDATE_OBJ);
     if (r < 0)
-      mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+      ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
   } else {
     cache.remove(name);
   }
@@ -210,7 +235,7 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
   return ret;
 }
 
-int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch,
+int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
                                  map<string, bufferlist> *attrs, bufferlist *first_chunk,
                                  RGWObjVersionTracker *objv_tracker)
 {
@@ -294,10 +319,10 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
     auto iter = bl.cbegin();
     decode(info, iter);
   } catch (buffer::end_of_buffer& err) {
-    mydout(0) << "ERROR: got bad notification" << dendl;
+    ldout(cct, 0) << "ERROR: got bad notification" << dendl;
     return -EIO;
   } catch (buffer::error& err) {
-    mydout(0) << "ERROR: buffer::error" << dendl;
+    ldout(cct, 0) << "ERROR: buffer::error" << dendl;
     return -EIO;
   }
 
@@ -314,14 +339,14 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
     cache.remove(name);
     break;
   default:
-    mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
+    ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
     return -EINVAL;
   }
 
   return 0;
 }
 
-int RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
+void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
 {
   cache.for_each(
     [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
index ece465d91f864e87a2091bd6240a49f734b88c47..3cc1570e8dd024e1a9ee72947955e831d7561498 100644 (file)
@@ -4,12 +4,16 @@
 
 
 #include "rgw/rgw_service.h"
+#include "rgw/rgw_cache.h"
 
-#include "svc_rados.h"
+#include "svc_sys_obj_core.h"
 
 
 class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
 {
+  ObjectCache cache;
+
+  void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
 protected:
   std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
   int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
@@ -24,12 +28,14 @@ protected:
            rgw_raw_obj& obj,
            bufferlist *bl, off_t ofs, off_t end,
            map<string, bufferlist> *attrs,
+           rgw_cache_entry_info *cache_info,
            boost::optional<obj_version>) override;
 
   int get_attr(rgw_raw_obj& obj, const char *name, bufferlist *dest) override;
 
   int set_attrs(rgw_raw_obj& obj, 
                 map<string, bufferlist>& attrs,
+                map<string, bufferlist> *rmattrs,
                 RGWObjVersionTracker *objv_tracker);
 
   int remove(RGWSysObjectCtxBase& obj_ctx,
@@ -57,9 +63,9 @@ protected:
                bufferlist& bl);
 
 public:
-  RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct): RGW_SysObj_Core(svc, cct) {}
+  RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {}
 
-  int call_list(const std::optional<std::string>& filter, Formatter* f);
+  void call_list(const std::optional<std::string>& filter, Formatter* f);
   int call_inspect(const std::string& target, Formatter* f);
   int call_erase(const std::string& target);
   int call_zap();
index 39c88483e1ceab547391df37a6971aca07ec3aa6..742bfcc73523b04f27352a65d91544c89d3b3a68 100644 (file)
@@ -202,6 +202,7 @@ int RGWSI_SysObj_Core::read(RGWSysObjectCtxBase& obj_ctx,
                             rgw_raw_obj& obj,
                             bufferlist *bl, off_t ofs, off_t end,
                             map<string, bufferlist> *attrs,
+                            rgw_cache_entry_info *cache_info,
                             boost::optional<obj_version>)
 {
   uint64_t len;
@@ -281,6 +282,7 @@ int RGWSI_SysObj_Core::get_attr(rgw_raw_obj& obj,
 
 int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj, 
                                  map<string, bufferlist>& attrs,
+                                 map<string, bufferlist> *rmattrs,
                                  RGWObjVersionTracker *objv_tracker) 
 {
   RGWSI_RADOS::Obj rados_obj;
@@ -290,7 +292,7 @@ int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj,
     return r;
   }
 
-  ObjectWriteOperation op;
+  librados::ObjectWriteOperation op;
 
   if (objv_tracker) {
     objv_tracker->prepare_op_for_write(&op);
@@ -562,7 +564,7 @@ int RGWSI_SysObj_Core::write_data(rgw_raw_obj& obj,
     return r;
   }
 
-  ObjectWriteOperation op;
+  librados::ObjectWriteOperation op;
 
   if (exclusive) {
     op.create(true);
@@ -571,11 +573,7 @@ int RGWSI_SysObj_Core::write_data(rgw_raw_obj& obj,
   if (objv_tracker) {
     objv_tracker->prepare_op_for_write(&op);
   }
-  if (ofs == -1) {
-    op.write_full(bl);
-  } else {
-    op.write(ofs, bl);
-  }
+  op.write_full(bl);
   r = rados_obj.operate(&op);
   if (r < 0)
     return r;
index ec00f08a7d0c454557f1bf2d546a44a69d695c17..5ca9aed31022effb25c61bfcd0c1717fed1cc176 100644 (file)
@@ -9,6 +9,8 @@
 
 class RGWSI_Zone;
 
+struct rgw_cache_entry_info;
+
 struct RGWSysObjState {
   rgw_raw_obj obj;
   bool has_attrs{false};
@@ -133,6 +135,7 @@ protected:
                    rgw_raw_obj& obj,
                    bufferlist *bl, off_t ofs, off_t end,
                    map<string, bufferlist> *attrs,
+                   rgw_cache_entry_info *cache_info,
                    boost::optional<obj_version>);
 
   virtual int remove(RGWSysObjectCtxBase& obj_ctx,
@@ -156,6 +159,7 @@ protected:
 
   virtual int set_attrs(rgw_raw_obj& obj, 
                         map<string, bufferlist>& attrs,
+                        map<string, bufferlist> *rmattrs,
                         RGWObjVersionTracker *objv_tracker);
 
   virtual int omap_get_all(rgw_raw_obj& obj, std::map<string, bufferlist> *m);