From 04437e416ed45cf5ed82619520289528491473fb Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 10 Feb 2015 16:20:12 -0800 Subject: [PATCH] rgw: switch to new watch/notify API Note that we don't really use it fully, yet. The main semantic change here is that we have to explicitly ack the notify. However, still missing is re-registration of the watch if we see a failure, and ignoring the cache if watch_check tells us the watch is stale. Signed-off-by: Sage Weil --- src/rgw/rgw_cache.h | 10 ++++++++-- src/rgw/rgw_rados.cc | 33 ++++++++++++++++++++++++--------- src/rgw/rgw_rados.h | 7 ++++++- 3 files changed, 38 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 4db884b41c73a..2446bf0729b9d 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -201,7 +201,10 @@ class RGWCache : public T } int distribute_cache(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op); - int watch_cb(int opcode, uint64_t ver, bufferlist& bl); + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl); public: RGWCache() {} @@ -554,7 +557,10 @@ int RGWCache::distribute_cache(const string& normal_name, rgw_obj& obj, Objec } template -int RGWCache::watch_cb(int opcode, uint64_t ver, bufferlist& bl) +int RGWCache::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { RGWCacheNotifyInfo info; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 036204e4a9549..e9fc30a9d6e31 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1201,13 +1201,28 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s return 0; } -class RGWWatcher : public librados::WatchCtx { +class RGWWatcher : public librados::WatchCtx2 { RGWRados *rados; + string oid; public: - RGWWatcher(RGWRados *r) : rados(r) {} - void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { - ldout(rados->ctx(), 10) << "RGWWatcher::notify() opcode=" << (int)opcode << " ver=" << ver << " bl.length()=" << bl.length() << dendl; - rados->watch_cb(opcode, ver, bl); + RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {} + void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { + ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() " + << " notify_id " << notify_id + << " cookie " << cookie + << " notifier " << notifier_id + << " bl.length()=" << bl.length() << dendl; + rados->watch_cb(notify_id, cookie, notifier_id, bl); + + bufferlist reply_bl; // empty reply payload + rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl); + } + void handle_error(uint64_t cookie, int err) { + lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie + << " err " << cpp_strerror(err) << dendl; } }; @@ -1472,7 +1487,7 @@ void RGWRados::finalize_watch() if (notify_oid.empty()) continue; uint64_t watch_handle = watch_handles[i]; - control_pool_ctx.unwatch(notify_oid, watch_handle); + control_pool_ctx.unwatch2(watch_handle); RGWWatcher *watcher = watchers[i]; delete watcher; @@ -1604,10 +1619,10 @@ int RGWRados::init_watch() if (r < 0 && r != -EEXIST) return r; - RGWWatcher *watcher = new RGWWatcher(this); + RGWWatcher *watcher = new RGWWatcher(this, notify_oid); watchers[i] = watcher; - r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher); + r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher); if (r < 0) return r; } @@ -7018,7 +7033,7 @@ int RGWRados::distribute(const string& key, bufferlist& bl) pick_control_oid(key, notify_oid); ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl; - int r = control_pool_ctx.notify(notify_oid, 0, bl); + int r = control_pool_ctx.notify2(notify_oid, bl, 0, NULL); return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index e6fe72b60d5f8..fa09f0617dd3a 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1208,6 +1208,8 @@ class RGWRados librados::IoCtx control_pool_ctx; // .rgw.control bool watch_initialized; + friend class RGWWatcher; + Mutex bucket_id_lock; // This field represents the number of bucket index object shards @@ -1888,7 +1890,10 @@ public: virtual int init_watch(); virtual void finalize_watch(); virtual int distribute(const string& key, bufferlist& bl); - virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; } + virtual int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { return 0; } void pick_control_oid(const string& key, string& notify_oid); void set_atomic(void *ctx, rgw_obj& obj) { -- 2.39.5