From 771eafc9733386a25764d28a38f2437d1509f231 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 6 Sep 2018 01:16:01 -0700 Subject: [PATCH] rgw: svc_cache: more work Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cache.h | 456 -------------------------- src/rgw/rgw_rados.cc | 138 ++------ src/rgw/rgw_rados.h | 28 -- src/rgw/services/svc_notify.cc | 166 ++++++++-- src/rgw/services/svc_notify.h | 30 +- src/rgw/services/svc_rados.cc | 46 ++- src/rgw/services/svc_rados.h | 61 +++- src/rgw/services/svc_sys_obj_cache.cc | 84 ++++- src/rgw/services/svc_sys_obj_cache.h | 18 +- 9 files changed, 373 insertions(+), 654 deletions(-) diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index 5d4230a41e7..498c86c2326 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -195,460 +195,4 @@ public: void invalidate_all(); }; -template -class RGWCache : public T -{ - ObjectCache cache; - - int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) { - return T::list_objects_raw_init(pool, handle); - } - int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) { - return T::list_objects_raw_next(obj, handle); - } - - string normal_name(rgw_pool& pool, const std::string& oid) { - std::string buf; - buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); - buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid); - return buf; - } - - void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj); - - int init_rados() override { - int ret; - cache.set_ctx(T::cct); - ret = T::init_rados(); - if (ret < 0) - return ret; - - return 0; - } - - bool need_watch_notify() override { - return true; - } - - int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op); - int watch_cb(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) override; - - void set_cache_enabled(bool state) override { - cache.set_enabled(state); - } -public: - RGWCache() {} - - void register_chained_cache(RGWChainedCache *cc) override { - cache.chain_cache(cc); - } - - int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, - map& attrs, - map* rmattrs, - RGWObjVersionTracker *objv_tracker); - int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime, - map& attrs, int flags, - const bufferlist& data, - RGWObjVersionTracker *objv_tracker, - real_time set_mtime) override; - int put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& bl, off_t ofs, bool exclusive, - RGWObjVersionTracker *objv_tracker = nullptr) override; - - int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state, - RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj, - bufferlist& bl, off_t ofs, off_t end, - map *attrs, - rgw_cache_entry_info *cache_info, - boost::optional refresh_version = boost::none) override; - - int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map *attrs, - bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override; - - int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override; - - bool chain_cache_entry(std::initializer_list cache_info_entries, RGWChainedCache::Entry *chained_entry) override { - return cache.chain_cache_entry(cache_info_entries, chained_entry); - } - void call_list(const std::optional& filter, - Formatter* format) override; - bool call_inspect(const std::string& target, Formatter* format) override; - bool call_erase(const std::string& target) override; - void call_zap() override; -}; - -template -void RGWCache::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj) -{ - if (src_obj.size()) { - dst_pool = src_pool; - dst_obj = src_obj; - } else { - dst_pool = T::get_zone_params().domain_root; - dst_obj = src_pool.name; - } -} - -template -int RGWCache::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - - string name = normal_name(pool, oid); - cache.remove(name); - - ObjectCacheInfo info; - distribute_cache(name, obj, info, REMOVE_OBJ); - - return T::delete_system_obj(obj, objv_tracker); -} - -template -int RGWCache::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state, - RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj, - bufferlist& obl, off_t ofs, off_t end, - map *attrs, - rgw_cache_entry_info *cache_info, - boost::optional refresh_version) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - if (ofs != 0) - return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info); - - string name = normal_name(pool, oid); - - ObjectCacheInfo info; - - uint32_t flags = CACHE_FLAG_DATA; - if (objv_tracker) - flags |= CACHE_FLAG_OBJV; - if (attrs) - flags |= CACHE_FLAG_XATTRS; - - if ((cache.get(name, info, flags, cache_info) == 0) && - (!refresh_version || !info.version.compare(&(*refresh_version)))) { - if (info.status < 0) - return info.status; - - bufferlist& bl = info.data; - - bufferlist::iterator i = bl.begin(); - - obl.clear(); - - i.copy_all(obl); - if (objv_tracker) - objv_tracker->read_version = info.version; - if (attrs) - *attrs = info.xattrs; - return bl.length(); - } - int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info); - if (r < 0) { - if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors - info.status = r; - cache.put(name, info, cache_info); - } - return r; - } - - if (obl.length() == end + 1) { - /* in this case, most likely object contains more data, we can't cache it */ - return r; - } - - bufferptr p(r); - bufferlist& bl = info.data; - bl.clear(); - bufferlist::iterator o = obl.begin(); - o.copy_all(bl); - info.status = 0; - info.flags = flags; - if (objv_tracker) { - info.version = objv_tracker->read_version; - } - if (attrs) { - info.xattrs = *attrs; - } - cache.put(name, info, cache_info); - return r; -} - -template -int RGWCache::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, - map& attrs, - map* rmattrs, - RGWObjVersionTracker *objv_tracker) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - ObjectCacheInfo info; - info.xattrs = attrs; - if (rmattrs) - info.rm_xattrs = *rmattrs; - info.status = 0; - info.flags = CACHE_FLAG_MODIFY_XATTRS; - if (objv_tracker) { - info.version = objv_tracker->write_version; - info.flags |= CACHE_FLAG_OBJV; - } - int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker); - string name = normal_name(pool, oid); - if (ret >= 0) { - cache.put(name, info, NULL); - int r = distribute_cache(name, obj, info, UPDATE_OBJ); - if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; - } else { - cache.remove(name); - } - - return ret; -} - -template -int RGWCache::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime, - map& attrs, int flags, - const bufferlist& data, - RGWObjVersionTracker *objv_tracker, - real_time set_mtime) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - ObjectCacheInfo info; - info.xattrs = attrs; - info.status = 0; - info.data = data; - info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META; - if (objv_tracker) { - info.version = objv_tracker->write_version; - info.flags |= CACHE_FLAG_OBJV; - } - ceph::real_time result_mtime; - int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data, - objv_tracker, set_mtime); - if (mtime) { - *mtime = result_mtime; - } - info.meta.mtime = result_mtime; - info.meta.size = size; - string name = normal_name(pool, oid); - if (ret >= 0) { - cache.put(name, info, NULL); - // Only distribute the cache information if we did not just create - // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies - // PUT_OBJ_CREATE. Generally speaking, when successfully creating - // a system object with the exclusive flag it is not necessary to - // call distribute_cache, as a) it's unclear whether other RGWs - // will need that system object in the near-term and b) it - // generates additional network traffic. - if (!(flags & PUT_OBJ_EXCL)) { - int r = distribute_cache(name, obj, info, UPDATE_OBJ); - if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; - } - } else { - cache.remove(name); - } - - return ret; -} - -template -int RGWCache::put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& data, off_t ofs, bool exclusive, - RGWObjVersionTracker *objv_tracker) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - ObjectCacheInfo info; - bool cacheable = false; - if ((ofs == 0) || (ofs == -1)) { - cacheable = true; - info.data = data; - info.meta.size = data.length(); - info.status = 0; - info.flags = CACHE_FLAG_DATA; - } - if (objv_tracker) { - info.version = objv_tracker->write_version; - info.flags |= CACHE_FLAG_OBJV; - } - int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker); - if (cacheable) { - string name = normal_name(pool, oid); - if (ret >= 0) { - cache.put(name, info, NULL); - int r = distribute_cache(name, obj, info, UPDATE_OBJ); - if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; - } else { - cache.remove(name); - } - } - - return ret; -} - -template -int RGWCache::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, - uint64_t *pepoch, map *attrs, - bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) -{ - rgw_pool pool; - string oid; - normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); - - string name = normal_name(pool, oid); - - uint64_t size; - real_time mtime; - uint64_t epoch; - - ObjectCacheInfo info; - uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; - if (objv_tracker) - flags |= CACHE_FLAG_OBJV; - int r = cache.get(name, info, flags, NULL); - if (r == 0) { - if (info.status < 0) - return info.status; - - size = info.meta.size; - mtime = info.meta.mtime; - epoch = info.epoch; - if (objv_tracker) - objv_tracker->read_version = info.version; - goto done; - } - r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker); - if (r < 0) { - if (r == -ENOENT) { - info.status = r; - cache.put(name, info, NULL); - } - return r; - } - info.status = 0; - info.epoch = epoch; - info.meta.mtime = mtime; - info.meta.size = size; - info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; - if (objv_tracker) { - info.flags |= CACHE_FLAG_OBJV; - info.version = objv_tracker->read_version; - } - cache.put(name, info, NULL); -done: - if (psize) - *psize = size; - if (pmtime) - *pmtime = mtime; - if (pepoch) - *pepoch = epoch; - if (attrs) - *attrs = info.xattrs; - return 0; -} - -template -int RGWCache::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) -{ - RGWCacheNotifyInfo info; - - info.op = op; - - info.obj_info = obj_info; - info.obj = obj; - bufferlist bl; - encode(info, bl); - return T::distribute(normal_name, bl); -} - -template -int RGWCache::watch_cb(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) -{ - RGWCacheNotifyInfo info; - - try { - auto iter = bl.cbegin(); - decode(info, iter); - } catch (buffer::end_of_buffer& err) { - mydout(0) << "ERROR: got bad notification" << dendl; - return -EIO; - } catch (buffer::error& err) { - mydout(0) << "ERROR: buffer::error" << dendl; - return -EIO; - } - - rgw_pool pool; - string oid; - normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); - string name = normal_name(pool, oid); - - switch (info.op) { - case UPDATE_OBJ: - cache.put(name, info.obj_info, NULL); - break; - case REMOVE_OBJ: - cache.remove(name); - break; - default: - mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl; - return -EINVAL; - } - - return 0; -} - -template -void RGWCache::call_list(const std::optional& filter, - Formatter* f) -{ - cache.for_each( - [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { - if (!filter || name.find(*filter) != name.npos) { - T::cache_list_dump_helper(f, name, entry.info.meta.mtime, - entry.info.meta.size); - } - }); -} - -template -bool RGWCache::call_inspect(const std::string& target, Formatter* f) -{ - if (const auto entry = cache.get(target)) { - f->open_object_section("cache_entry"); - f->dump_string("name", target.c_str()); - entry->dump(f); - f->close_section(); - return true; - } else { - return false; - } -} - -template -bool RGWCache::call_erase(const std::string& target) -{ - return cache.remove(target); -} - -template -void RGWCache::call_zap() -{ - cache.invalidate_all(); -} #endif diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 80a013dde69..e140e8b9869 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1361,9 +1361,6 @@ void RGWRados::finalize() if (finisher) { finisher->stop(); } - if (need_watch_notify()) { - finalize_watch(); - } if (finisher) { /* delete finisher only after cleaning up watches, as watch error path might call * into finisher. We stop finisher before finalizing watch to make sure we don't @@ -8998,113 +8995,6 @@ int RGWRados::append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl) return r; } -int RGWRados::distribute(const string& key, bufferlist& bl) -{ - RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); - - ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl; - return robust_notify(notify_oid, bl); -} - -int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl) -{ - // The reply of every machine that acks goes in here. - boost::container::flat_set> acks; - bufferlist rbl; - - // First, try to send, without being fancy about it. - auto r = notify_obj.notify(bl, 0, &rbl); - - // If that doesn't work, get serious. - if (r < 0) { - ldout(cct, 1) << "robust_notify: If at first you don't succeed: " - << cpp_strerror(-r) << dendl; - - - auto p = rbl.cbegin(); - // Gather up the replies to the first attempt. - try { - uint32_t num_acks; - decode(num_acks, p); - // Doing this ourselves since we don't care about the payload; - for (auto i = 0u; i < num_acks; ++i) { - std::pair id; - decode(id, p); - acks.insert(id); - ldout(cct, 20) << "robust_notify: acked by " << id << dendl; - uint32_t blen; - decode(blen, p); - p.advance(blen); - } - } catch (const buffer::error& e) { - ldout(cct, 0) << "robust_notify: notify response parse failed: " - << e.what() << dendl; - acks.clear(); // Throw away junk on failed parse. - } - - - // Every machine that fails to reply and hasn't acked a previous - // attempt goes in here. - boost::container::flat_set> timeouts; - - auto tries = 1u; - while (r < 0 && tries < max_notify_retries) { - ++tries; - rbl.clear(); - // Reset the timeouts, we're only concerned with new ones. - timeouts.clear(); - r = notify_obj.notify(bl, 0, &rbl); - if (r < 0) { - ldout(cct, 1) << "robust_notify: retry " << tries << " failed: " - << cpp_strerror(-r) << dendl; - p = rbl.begin(); - try { - uint32_t num_acks; - decode(num_acks, p); - // Not only do we not care about the payload, but we don't - // want to empty the container; we just want to augment it - // with any new members. - for (auto i = 0u; i < num_acks; ++i) { - std::pair id; - decode(id, p); - auto ir = acks.insert(id); - if (ir.second) { - ldout(cct, 20) << "robust_notify: acked by " << id << dendl; - } - uint32_t blen; - decode(blen, p); - p.advance(blen); - } - - uint32_t num_timeouts; - decode(num_timeouts, p); - for (auto i = 0u; i < num_timeouts; ++i) { - std::pair id; - decode(id, p); - // Only track timeouts from hosts that haven't acked previously. - if (acks.find(id) != acks.cend()) { - ldout(cct, 20) << "robust_notify: " << id << " timed out." - << dendl; - timeouts.insert(id); - } - } - } catch (const buffer::error& e) { - ldout(cct, 0) << "robust_notify: notify response parse failed: " - << e.what() << dendl; - continue; - } - // If we got a good parse and timeouts is empty, that means - // everyone who timed out in one call received the update in a - // previous one. - if (timeouts.empty()) { - r = 0; - } - } - } - } - return r; -} - int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx) { librados::IoCtx& io_ctx = ctx.io_ctx; @@ -10751,23 +10641,35 @@ bool RGWRados::call(std::string_view command, const cmdmap_t& cmdmap, return false; } -void RGWRados::call_list(const std::optional&, - ceph::Formatter*) +void RGWRados::call_list(const std::optional& s, + ceph::Formatter *f) { - return; + if (!svc.cache) { + return; + } + svc.cache->call_list(s, f); } -bool RGWRados::call_inspect(const std::string&, Formatter*) +bool RGWRados::call_inspect(const std::string& s, Formatter *f) { - return false; + if (!svc.cache) { + return false; + } + return svc.cache->call_inspec(s, f); } -bool RGWRados::call_erase(const std::string&) { - return false; +bool RGWRados::call_erase(const std::string& s) { + if (!svc.cache) { + return false; + } + return svc.cache->call_erase(s); } void RGWRados::call_zap() { - return; + if (svc.cache) { + return; + } + svc.cache->call_zap(); } string RGWRados::get_mfa_oid(const rgw_user& user) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index ca43ae8ddd7..bb72f92fad8 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2212,26 +2212,7 @@ public: int update_containers_stats(map& m); int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl); - int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx); - int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c); - int unwatch(uint64_t watch_handle); - void add_watcher(int i); - void remove_watcher(int i); - virtual bool need_watch_notify() { return false; } - int init_watch(); - void finalize_watch(); - int distribute(const string& key, bufferlist& bl); -private: - int robust_notify(const string& notify_oid, bufferlist& bl); public: - virtual int watch_cb(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) { return 0; } - void pick_control_oid(const string& key, string& notify_oid); - - virtual void set_cache_enabled(bool state) {} - void set_atomic(void *ctx, rgw_obj& obj) { RGWObjectCtx *rctx = static_cast(ctx); rctx->obj.set_atomic(obj); @@ -2285,15 +2266,6 @@ public: bufferlist& out) override final; protected: - void cache_list_dump_helper(Formatter* f, - const std::string& name, - const ceph::real_time mtime, - const std::uint64_t size) { - f->dump_string("name", name); - f->dump_string("mtime", ceph::to_iso_8601(mtime)); - f->dump_unsigned("size", size); - } - // `call_list` must iterate over all cache entries and call // `cache_list_dump_helper` with the supplied Formatter on any that // include `filter` as a substring. diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc index 9e1c3f3b259..8c468b91051 100644 --- a/src/rgw/services/svc_notify.cc +++ b/src/rgw/services/svc_notify.cc @@ -192,7 +192,7 @@ int RGWSI_Notify::init_watch() notify_oid = notify_oid_prefix; } - notify_objs[i] = rados_svc->obj({control_pool, notify_oid}); + notify_objs[i] = rados_svc->handle(0).obj({control_pool, notify_oid}); auto& notify_obj = notify_objs[i]; librados::ObjectWriteOperation op; @@ -257,22 +257,6 @@ void RGWSI_Notify::shutdown() finalize_watch(); } -int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx) -{ - int r = obj.watch(watch_handle, ctx); - if (r < 0) - return r; - return 0; -} - -int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) -{ - int r = obj.aio_watch(c, watch_handle, ctx, 0); - if (r < 0) - return r; - return 0; -} - int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) { int r = obj.unwatch(watch_handle); @@ -280,7 +264,7 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; return r; } - r = rados[0].watch_flush(); + r = rados_svc->handle(0).watch_flush(); if (r < 0) { ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl; return r; @@ -291,30 +275,156 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) void RGWSI_Notify::add_watcher(int i) { ldout(cct, 20) << "add_watcher() i=" << i << dendl; - Mutex::Locker l(watchers_lock); + RWLock::WLocker l(watchers_lock); watchers_set.insert(i); if (watchers_set.size() == (size_t)num_watchers) { ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; -#warning fixme -#if 0 - set_cache_enabled(true); -#endif + set_enabled(true); } } void RGWSI_Notify::remove_watcher(int i) { ldout(cct, 20) << "remove_watcher() i=" << i << dendl; - Mutex::Locker l(watchers_lock); + RWLock::WLocker l(watchers_lock); size_t orig_size = watchers_set.size(); watchers_set.erase(i); if (orig_size == (size_t)num_watchers && watchers_set.size() < orig_size) { /* actually removed */ ldout(cct, 2) << "removed watcher, disabling cache" << dendl; -#warning fixme -#if 0 - set_cache_enabled(false); -#endif + set_enabled(false); + } +} + +int RGWSI_Notify::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + RWLock::RLocker l(watchers_lock); + if (cb) { + return cb->watch_cb(notify_id, cookie, notifier_id, bl); + } + return 0; +} + +void RGWSI_Notify::set_enabled(bool status) +{ + RWLock::RLocker l(watchers_lock); + if (cb) { + cb->set_enabled(status); } } +int RGWSI_Notify::distribute(const string& key, bufferlist& bl) +{ + RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); + + ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().oid << " bl.length()=" << bl.length() << dendl; + return robust_notify(notify_obj, bl); +} + +int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl) +{ + // The reply of every machine that acks goes in here. + boost::container::flat_set> acks; + bufferlist rbl; + + // First, try to send, without being fancy about it. + auto r = notify_obj.notify(bl, 0, &rbl); + + // If that doesn't work, get serious. + if (r < 0) { + ldout(cct, 1) << "robust_notify: If at first you don't succeed: " + << cpp_strerror(-r) << dendl; + + + auto p = rbl.cbegin(); + // Gather up the replies to the first attempt. + try { + uint32_t num_acks; + decode(num_acks, p); + // Doing this ourselves since we don't care about the payload; + for (auto i = 0u; i < num_acks; ++i) { + std::pair id; + decode(id, p); + acks.insert(id); + ldout(cct, 20) << "robust_notify: acked by " << id << dendl; + uint32_t blen; + decode(blen, p); + p.advance(blen); + } + } catch (const buffer::error& e) { + ldout(cct, 0) << "robust_notify: notify response parse failed: " + << e.what() << dendl; + acks.clear(); // Throw away junk on failed parse. + } + + + // Every machine that fails to reply and hasn't acked a previous + // attempt goes in here. + boost::container::flat_set> timeouts; + + auto tries = 1u; + while (r < 0 && tries < max_notify_retries) { + ++tries; + rbl.clear(); + // Reset the timeouts, we're only concerned with new ones. + timeouts.clear(); + r = notify_obj.notify(bl, 0, &rbl); + if (r < 0) { + ldout(cct, 1) << "robust_notify: retry " << tries << " failed: " + << cpp_strerror(-r) << dendl; + p = rbl.begin(); + try { + uint32_t num_acks; + decode(num_acks, p); + // Not only do we not care about the payload, but we don't + // want to empty the container; we just want to augment it + // with any new members. + for (auto i = 0u; i < num_acks; ++i) { + std::pair id; + decode(id, p); + auto ir = acks.insert(id); + if (ir.second) { + ldout(cct, 20) << "robust_notify: acked by " << id << dendl; + } + uint32_t blen; + decode(blen, p); + p.advance(blen); + } + + uint32_t num_timeouts; + decode(num_timeouts, p); + for (auto i = 0u; i < num_timeouts; ++i) { + std::pair id; + decode(id, p); + // Only track timeouts from hosts that haven't acked previously. + if (acks.find(id) != acks.cend()) { + ldout(cct, 20) << "robust_notify: " << id << " timed out." + << dendl; + timeouts.insert(id); + } + } + } catch (const buffer::error& e) { + ldout(cct, 0) << "robust_notify: notify response parse failed: " + << e.what() << dendl; + continue; + } + // If we got a good parse and timeouts is empty, that means + // everyone who timed out in one call received the update in a + // previous one. + if (timeouts.empty()) { + r = 0; + } + } + } + } + return r; +} + +void RGWSI_Notify::register_watch_cb(CB *_cb) +{ + RWLock::WLocker l(watchers_lock); + cb = _cb; +} diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h index d1118cbabb5..36adefa7168 100644 --- a/src/rgw/services/svc_notify.h +++ b/src/rgw/services/svc_notify.h @@ -21,13 +21,16 @@ public: class RGWSI_Notify : public RGWServiceInstance { +public: + class CB; +private: std::shared_ptr zone_svc; std::shared_ptr rados_svc; std::map get_deps() override; int load(const std::string& conf, std::map& dep_refs) override; - Mutex watchers_lock{"watchers_lock"}; + RWLock watchers_lock{"watchers_lock"}; rgw_pool control_pool; int num_watchers{0}; @@ -43,20 +46,41 @@ class RGWSI_Notify : public RGWServiceInstance string get_control_oid(int i); RGWSI_RADOS::Obj pick_control_obj(const string& key); + CB *cb{nullptr}; + int init_watch(); void finalize_watch(); int init() override; void shutdown() override; - int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx); - int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c); int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle); void add_watcher(int i); void remove_watcher(int i); + + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl); + void set_enabled(bool status); + + int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl); public: RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {} + class CB { + public: + virtual ~CB() {} + virtual int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) = 0; + virtual void set_enabled(bool status) = 0; + }; + + int distribute(const string& key, bufferlist& bl); + + void register_watch_cb(CB *cb); }; #endif diff --git a/src/rgw/services/svc_rados.cc b/src/rgw/services/svc_rados.cc index 192734e7e81..270fcaed973 100644 --- a/src/rgw/services/svc_rados.cc +++ b/src/rgw/services/svc_rados.cc @@ -64,11 +64,19 @@ int RGWSI_RADOS::load(const string& conf, map& de return 0; } -librados::Rados* RGWSI_RADOS::get_rados_handle() +librados::Rados* RGWSI_RADOS::get_rados_handle(int rados_handle) { if (rados.size() == 1) { return &rados[0]; } + + if (rados_handle >= 0) { + if (rados_handle >= (int)rados.size()) { + rados_handle = 0; + } + return &rados[rados_handle]; + } + handle_lock.get_read(); pthread_t id = pthread_self(); std::map:: iterator it = rados_map.find(id); @@ -90,13 +98,13 @@ librados::Rados* RGWSI_RADOS::get_rados_handle() uint64_t RGWSI_RADOS::instance_id() { - return get_rados_handle()->get_instance_id(); + return get_rados_handle(-1)->get_instance_id(); } -int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx) +int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle) { constexpr bool create = true; // create the pool if it doesn't exist - return init_ioctx(cct, get_rados_handle(), pool, io_ctx, create); + return init_ioctx(cct, get_rados_handle(rados_handle), pool, io_ctx, create); } int RGWSI_RADOS::pool_iterate(librados::IoCtx& io_ctx, @@ -138,7 +146,7 @@ void RGWSI_RADOS::Obj::init(const rgw_raw_obj& obj) int RGWSI_RADOS::Obj::open() { - int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx); + int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx, rados_handle); if (r < 0) { return r; } @@ -168,7 +176,7 @@ int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx) return ref.ioctx.watch2(ref.oid, handle, ctx); } -int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx) +int RGWSI_RADOS::Obj::aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx) { return ref.ioctx.aio_watch(ref.oid, c, handle, ctx); } @@ -178,6 +186,20 @@ int RGWSI_RADOS::Obj::unwatch(uint64_t handle) return ref.ioctx.unwatch2(handle); } +int RGWSI_RADOS::Obj::notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl) +{ + return ref.ioctx.notify2(ref.oid, bl, timeout_ms, pbl); +} + +void RGWSI_RADOS::Obj::notify_ack(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl) +{ + ref.ioctx.notify_ack(ref.oid, notify_id, cookie, bl); +} + uint64_t RGWSI_RADOS::Obj::get_last_version() { return ref.ioctx.get_last_version(); @@ -188,7 +210,7 @@ int RGWSI_RADOS::Pool::create(const vector& pools, vector *retcod vector completions; vector rets; - librados::Rados *rad = rados_svc->get_rados_handle(); + librados::Rados *rad = rados_svc->get_rados_handle(rados_handle); for (auto iter = pools.begin(); iter != pools.end(); ++iter) { librados::PoolAsyncCompletion *c = librados::Rados::pool_async_create_completion(); completions.push_back(c); @@ -264,7 +286,7 @@ int RGWSI_RADOS::Pool::create(const vector& pools, vector *retcod int RGWSI_RADOS::Pool::lookup(const rgw_pool& pool) { - librados::Rados *rad = rados_svc->get_rados_handle(); + librados::Rados *rad = rados_svc->get_rados_handle(rados_handle); int ret = rad->pool_lookup(pool.name.c_str()); if (ret < 0) { return ret; @@ -279,7 +301,7 @@ int RGWSI_RADOS::Pool::List::init(const string& marker, RGWAccessListFilter *fil return -EINVAL; } - int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx); + int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx, pool.rados_handle); if (r < 0) { return r; } @@ -321,3 +343,9 @@ int RGWSI_RADOS::Pool::List::get_next(int max, return oids->size(); } +int RGWSI_RADOS::Handle::watch_flush() +{ + librados::Rados *rad = rados_svc->get_rados_handle(rados_handle); + return rad->watch_flush(); +} + diff --git a/src/rgw/services/svc_rados.h b/src/rgw/services/svc_rados.h index 1949a3ad4b4..ed56dd04c55 100644 --- a/src/rgw/services/svc_rados.h +++ b/src/rgw/services/svc_rados.h @@ -46,8 +46,8 @@ class RGWSI_RADOS : public RGWServiceInstance int load(const string& conf, std::map& deps) override; - librados::Rados* get_rados_handle(); - int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx); + librados::Rados* get_rados_handle(int rados_handle); + int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle); int pool_iterate(librados::IoCtx& ioctx, librados::NObjectIterator& iter, uint32_t num, vector& objs, @@ -60,28 +60,35 @@ public: uint64_t instance_id(); + class Handle; + class Obj { friend class RGWSI_RADOS; + friend class Handle; RGWSI_RADOS *rados_svc{nullptr}; + int rados_handle{-1}; rgw_rados_ref ref; void init(const rgw_raw_obj& obj); - Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj) : rados_svc(_rados_svc) { + Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj, int _rados_handle) : rados_svc(_rados_svc), rados_handle(_rados_handle) { init(_obj); } public: Obj() {} Obj(const Obj& o) : rados_svc(o.rados_svc), + rados_handle(o.rados_handle), ref(o.ref) {} Obj(Obj&& o) : rados_svc(o.rados_svc), + rados_handle(o.rados_handle), ref(std::move(o.ref)) {} Obj& operator=(Obj&& o) { rados_svc = o.rados_svc; + rados_handle = o.rados_handle; ref = std::move(o.ref); return *this; } @@ -95,23 +102,39 @@ public: int watch(uint64_t *handle, librados::WatchCtx2 *ctx); int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx); int unwatch(uint64_t handle); + int notify(bufferlist& bl, + uint64_t timeout_ms, + bufferlist *pbl); + void notify_ack(uint64_t notify_id, + uint64_t cookie, + bufferlist& bl); uint64_t get_last_version(); + + rgw_rados_ref& get_ref() { + return ref; + } }; class Pool { friend class RGWSI_RADOS; + friend class Handle; RGWSI_RADOS *rados_svc{nullptr}; + int rados_handle{-1}; rgw_pool pool; - Pool(RGWSI_RADOS *_rados_svc, const rgw_pool& _pool) : rados_svc(_rados_svc), - pool(_pool) {} + Pool(RGWSI_RADOS *_rados_svc, + const rgw_pool& _pool, + int _rados_handle) : rados_svc(_rados_svc), + rados_handle(_rados_handle), + pool(_pool) {} Pool(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {} public: Pool() {} Pool(const Pool& p) : rados_svc(p.rados_svc), + rados_handle(p.rados_handle), pool(p.pool) {} int create(const std::vector& pools, std::vector *retcodes); @@ -142,8 +165,32 @@ public: friend class List; }; + class Handle { + friend class RGWSI_RADOS; + + RGWSI_RADOS *rados_svc{nullptr}; + int rados_handle{-1}; + + Handle(RGWSI_RADOS *_rados_svc, int _rados_handle) : rados_svc(_rados_svc), + rados_handle(_rados_handle) {} + public: + Obj obj(const rgw_raw_obj& o) { + return Obj(rados_svc, o, rados_handle); + } + + Pool pool(const rgw_pool& p) { + return Pool(rados_svc, p, rados_handle); + } + + int watch_flush(); + }; + + Handle handle(int rados_handle) { + return Handle(this, rados_handle); + } + Obj obj(const rgw_raw_obj& o) { - return Obj(this, o); + return Obj(this, o, -1); } Pool pool() { @@ -151,7 +198,7 @@ public: } Pool pool(const rgw_pool& p) { - return Pool(this, p); + return Pool(this, p, -1); } friend class Obj; diff --git a/src/rgw/services/svc_sys_obj_cache.cc b/src/rgw/services/svc_sys_obj_cache.cc index 5acea09c0bd..06399de3212 100644 --- a/src/rgw/services/svc_sys_obj_cache.cc +++ b/src/rgw/services/svc_sys_obj_cache.cc @@ -1,8 +1,52 @@ #include "svc_sys_obj_cache.h" #include "svc_zone.h" +#include "svc_notify.h" #define dout_subsys ceph_subsys_rgw +class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB +{ + RGWSI_SysObj_Cache *svc; +public: + RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {} + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { + return svc->watch_cb(notify_id, cookie, notifier_id, bl); + } + + void set_enabled(bool status) { + svc->set_enabled(status); + } +}; + +std::map RGWSI_SysObj_Cache::get_deps() +{ + map deps = RGWSI_SysObj_Core::get_deps(); + + deps["cache_notify_dep"] = { .name = "notify", + .conf = "{}" }; + return deps; +} + +int RGWSI_SysObj_Cache::load(const string& conf, std::map& dep_refs) +{ + int r = RGWSI_SysObj_Core::load(conf, dep_refs); + if (r < 0) { + return r; + } + + notify_svc = static_pointer_cast(dep_refs["cache_notify_dep"]); + assert(notify_svc); + + cb.reset(new RGWSI_SysObj_Cache_CB(this)); + + notify_svc->register_watch_cb(cb.get()); + + return 0; +} + static string normal_name(rgw_pool& pool, const std::string& oid) { std::string buf; buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); @@ -35,7 +79,10 @@ int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, cache.remove(name); ObjectCacheInfo info; - distribute_cache(name, obj, info, REMOVE_OBJ); + int r = distribute_cache(name, obj, info, REMOVE_OBJ); + if (r < 0) { + ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl; + } return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj); } @@ -212,11 +259,13 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj, rgw_pool pool; string oid; normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; info.data = data; info.meta.size = data.length(); info.status = 0; info.flags = CACHE_FLAG_DATA; + if (objv_tracker) { info.version = objv_tracker->write_version; info.flags |= CACHE_FLAG_OBJV; @@ -305,7 +354,7 @@ int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, rgw_raw_obj& info.obj = obj; bufferlist bl; encode(info, bl); - return T::distribute(normal_name, bl); + return notify_svc->distribute(normal_name, bl); } int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, @@ -346,13 +395,39 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, return 0; } +void RGWSI_SysObj_Cache::set_enabled(bool status) +{ + cache.set_enabled(status); +} + +bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list cache_info_entries, + RGWChainedCache::Entry *chained_entry) +{ + return cache.chain_cache_entry(cache_info_entries, chained_entry); +} + +void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc) +{ + cache.chain_cache(cc); +} + +static void cache_list_dump_helper(Formatter* f, + const std::string& name, + const ceph::real_time mtime, + const std::uint64_t size) +{ + f->dump_string("name", name); + f->dump_string("mtime", ceph::to_iso_8601(mtime)); + f->dump_unsigned("size", size); +} + void RGWSI_SysObj_Cache::call_list(const std::optional& filter, Formatter* f) { cache.for_each( [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { if (!filter || name.find(*filter) != name.npos) { - T::cache_list_dump_helper(f, name, entry.info.meta.mtime, - entry.info.meta.size); + cache_list_dump_helper(f, name, entry.info.meta.mtime, + entry.info.meta.size); } }); } @@ -378,4 +453,5 @@ int RGWSI_SysObj_Cache::call_erase(const std::string& target) int RGWSI_SysObj_Cache::call_zap() { cache.invalidate_all(); + return 0; } diff --git a/src/rgw/services/svc_sys_obj_cache.h b/src/rgw/services/svc_sys_obj_cache.h index 3cc1570e8dd..e108ac0b5c0 100644 --- a/src/rgw/services/svc_sys_obj_cache.h +++ b/src/rgw/services/svc_sys_obj_cache.h @@ -8,11 +8,19 @@ #include "svc_sys_obj_core.h" +class RGWSI_Notify; + +class RGWSI_SysObj_Cache_CB; class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core { + friend class RGWSI_SysObj_Cache_CB; + + std::shared_ptr notify_svc; ObjectCache cache; + std::shared_ptr cb; + void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj); protected: std::map get_deps() override; @@ -62,8 +70,16 @@ protected: uint64_t notifier_id, bufferlist& bl); + void set_enabled(bool status); + public: - RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {} + RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) { + cache.set_ctx(cct); + } + + bool chain_cache_entry(std::initializer_list cache_info_entries, + RGWChainedCache::Entry *chained_entry); + void register_chained_cache(RGWChainedCache *cc); void call_list(const std::optional& filter, Formatter* f); int call_inspect(const std::string& target, Formatter* f); -- 2.39.5