From 074936b5c2d8d62c3ca2afe6349b39815ec2cd62 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 5 Sep 2018 03:11:16 -0700 Subject: [PATCH] rgw: svc_notify: initial work Signed-off-by: Yehuda Sadeh --- src/rgw/CMakeLists.txt | 1 + src/rgw/rgw_rados.cc | 281 +--------------------- src/rgw/rgw_rados.h | 10 +- src/rgw/services/svc_notify.cc | 320 ++++++++++++++++++++++++++ src/rgw/services/svc_notify.h | 63 +++++ src/rgw/services/svc_rados.cc | 15 ++ src/rgw/services/svc_rados.h | 4 + src/rgw/services/svc_sys_obj.cc | 3 +- src/rgw/services/svc_sys_obj.h | 8 + src/rgw/services/svc_sys_obj_cache.cc | 51 ++-- src/rgw/services/svc_sys_obj_cache.h | 12 +- src/rgw/services/svc_sys_obj_core.cc | 12 +- src/rgw/services/svc_sys_obj_core.h | 4 + 13 files changed, 474 insertions(+), 310 deletions(-) create mode 100644 src/rgw/services/svc_notify.cc create mode 100644 src/rgw/services/svc_notify.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 703375cad26..24046a12b42 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -39,6 +39,7 @@ function(gperf_generate input output) endfunction() set(librgw_common_srcs + services/svc_notify.cc services/svc_quota.cc services/svc_rados.cc services/svc_sys_obj.cc diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index f081cf59138..80a013dde69 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -605,176 +605,6 @@ void RGWObjVersionTracker::generate_new_write_ver(CephContext *cct) } -int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) { - int r = control_pool_ctx.watch2(oid, watch_handle, ctx); - if (r < 0) - return r; - return 0; -} - -int RGWRados::aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) { - int r = control_pool_ctx.aio_watch2(oid, c, watch_handle, ctx, 0); - if (r < 0) - return r; - return 0; -} - -int RGWRados::unwatch(uint64_t watch_handle) -{ - int r = control_pool_ctx.unwatch2(watch_handle); - if (r < 0) { - ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; - return r; - } - r = rados[0].watch_flush(); - if (r < 0) { - ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl; - return r; - } - return 0; -} - -void RGWRados::add_watcher(int i) -{ - ldout(cct, 20) << "add_watcher() i=" << i << dendl; - Mutex::Locker l(watchers_lock); - watchers_set.insert(i); - if (watchers_set.size() == (size_t)num_watchers) { - ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; - set_cache_enabled(true); - } -} - -void RGWRados::remove_watcher(int i) -{ - ldout(cct, 20) << "remove_watcher() i=" << i << dendl; - Mutex::Locker l(watchers_lock); - size_t orig_size = watchers_set.size(); - watchers_set.erase(i); - if (orig_size == (size_t)num_watchers && - watchers_set.size() < orig_size) { /* actually removed */ - ldout(cct, 2) << "removed watcher, disabling cache" << dendl; - set_cache_enabled(false); - } -} - -class RGWWatcher : public librados::WatchCtx2 { - RGWRados *rados; - int index; - string oid; - uint64_t watch_handle; - int register_ret{0}; - librados::AioCompletion *register_completion{nullptr}; - - class C_ReinitWatch : public Context { - RGWWatcher *watcher; - public: - explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {} - void finish(int r) override { - watcher->reinit(); - } - }; -public: - RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {} - void handle_notify(uint64_t notify_id, - uint64_t cookie, - uint64_t notifier_id, - bufferlist& bl) override { - ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() " - << " notify_id " << notify_id - << " cookie " << cookie - << " notifier " << notifier_id - << " bl.length()=" << bl.length() << dendl; - - if (unlikely(rados->inject_notify_timeout_probability == 1) || - (rados->inject_notify_timeout_probability > 0 && - (rados->inject_notify_timeout_probability > - ceph::util::generate_random_number(0.0, 1.0)))) { - ldout(rados->ctx(), 0) - << "RGWWatcher::handle_notify() dropping notification! " - << "If this isn't what you want, set " - << "rgw_inject_notify_timeout_probability to zero!" << dendl; - return; - } - - - - rados->watch_cb(notify_id, cookie, notifier_id, bl); - - bufferlist reply_bl; // empty reply payload - rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl); - } - void handle_error(uint64_t cookie, int err) override { - lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie - << " err " << cpp_strerror(err) << dendl; - rados->remove_watcher(index); - rados->schedule_context(new C_ReinitWatch(this)); - } - - void reinit() { - int ret = unregister_watch(); - if (ret < 0) { - ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; - return; - } - ret = register_watch(); - if (ret < 0) { - ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl; - return; - } - } - - int unregister_watch() { - int r = rados->unwatch(watch_handle); - if (r < 0) { - return r; - } - rados->remove_watcher(index); - return 0; - } - - int register_watch_async() { - if (register_completion) { - register_completion->release(); - register_completion = nullptr; - } - register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); - register_ret = rados->aio_watch(oid, &watch_handle, this, register_completion); - if (register_ret < 0) { - register_completion->release(); - return register_ret; - } - return 0; - } - - int register_watch_finish() { - if (register_ret < 0) { - return register_ret; - } - if (!register_completion) { - return -EINVAL; - } - register_completion->wait_for_safe(); - int r = register_completion->get_return_value(); - register_completion->release(); - register_completion = nullptr; - if (r < 0) { - return r; - } - rados->add_watcher(index); - return 0; - } - - int register_watch() { - int r = rados->watch(oid, &watch_handle, this); - if (r < 0) { - return r; - } - rados->add_watcher(index); - return 0; - } -}; - class RGWMetaNotifierManager : public RGWCoroutinesManager { RGWRados *store; RGWHTTPManager http_manager; @@ -1698,14 +1528,6 @@ int RGWRados::init_complete() period_history.reset(new RGWPeriodHistory(cct, period_puller.get(), svc.zone->get_current_period())); - if (need_watch_notify()) { - ret = init_watch(); - if (ret < 0) { - lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl; - return ret; - } - } - ret = open_root_pool_ctx(); if (ret < 0) return ret; @@ -1924,18 +1746,6 @@ int RGWRados::initialize() return init_complete(); } -void RGWRados::finalize_watch() -{ - for (int i = 0; i < num_watchers; i++) { - RGWWatcher *watcher = watchers[i]; - watcher->unregister_watch(); - delete watcher; - } - - delete[] notify_oids; - delete[] watchers; -} - void RGWRados::schedule_context(Context *c) { finisher->queue(c); } @@ -1991,80 +1801,6 @@ int RGWRados::open_reshard_pool_ctx() return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().reshard_pool, reshard_pool_ctx, true); } -int RGWRados::init_watch() -{ -#warning needs to be part of the sysobj/cache service - int r = rgw_init_ioctx(&rados[0], svc.zone->get_zone_params().control_pool, control_pool_ctx, true); - if (r < 0) { - return r; - } - - num_watchers = cct->_conf->rgw_num_control_oids; - - bool compat_oid = (num_watchers == 0); - - if (num_watchers <= 0) - num_watchers = 1; - - notify_oids = new string[num_watchers]; - watchers = new RGWWatcher *[num_watchers]; - - int error = 0; - - for (int i=0; i < num_watchers; i++) { - string& notify_oid = notify_oids[i]; - notify_oid = notify_oid_prefix; - if (!compat_oid) { - char buf[16]; - snprintf(buf, sizeof(buf), ".%d", i); - notify_oid.append(buf); - } - r = control_pool_ctx.create(notify_oid, false); - if (r < 0 && r != -EEXIST) - return r; - - RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid); - watchers[i] = watcher; - - r = watcher->register_watch_async(); - if (r < 0) { - ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl; - error = r; - continue; - } - } - - for (int i = 0; i < num_watchers; ++i) { - int r = watchers[i]->register_watch_finish(); - if (r < 0) { - ldout(cct, 0) << "WARNING: async watch returned " << r << dendl; - error = r; - } - } - - if (error < 0) { - return error; - } - - watch_initialized = true; - - set_cache_enabled(true); - - return 0; -} - -void RGWRados::pick_control_oid(const string& key, string& notify_oid) -{ - uint32_t r = ceph_str_hash_linux(key.c_str(), key.size()); - - int i = r % num_watchers; - char buf[16]; - snprintf(buf, sizeof(buf), ".%d", i); - - notify_oid = notify_oid_prefix; - notify_oid.append(buf); -} - int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx) { constexpr bool create = true; // create the pool if it doesn't exist @@ -9264,29 +9000,20 @@ int RGWRados::append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl) int RGWRados::distribute(const string& key, bufferlist& bl) { - /* - * we were called before watch was initialized. This can only happen if we're updating some system - * config object (e.g., zone info) during init. Don't try to distribute the cache info for these - * objects, they're currently only read on startup anyway. - */ - if (!watch_initialized) - return 0; - - string notify_oid; - pick_control_oid(key, notify_oid); + RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl; return robust_notify(notify_oid, bl); } -int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl) +int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl) { // The reply of every machine that acks goes in here. boost::container::flat_set> acks; bufferlist rbl; // First, try to send, without being fancy about it. - auto r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl); + auto r = notify_obj.notify(bl, 0, &rbl); // If that doesn't work, get serious. if (r < 0) { @@ -9326,7 +9053,7 @@ int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl) rbl.clear(); // Reset the timeouts, we're only concerned with new ones. timeouts.clear(); - r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl); + r = notify_obj.notify(bl, 0, &rbl); if (r < 0) { ldout(cct, 1) << "robust_notify: retry " << tries << " failed: " << cpp_strerror(-r) << dendl; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 761676817cc..ca43ae8ddd7 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1216,7 +1216,6 @@ class RGWRados : public AdminSocketHook std::atomic max_req_id = { 0 }; Mutex lock; - Mutex watchers_lock; SafeTimer *timer; RGWGC *gc; @@ -1242,12 +1241,7 @@ class RGWRados : public AdminSocketHook Mutex meta_sync_thread_lock; Mutex data_sync_thread_lock; - int num_watchers; - RGWWatcher **watchers; - std::set watchers_set; librados::IoCtx root_pool_ctx; // .rgw - librados::IoCtx control_pool_ctx; // .rgw.control - bool watch_initialized; double inject_notify_timeout_probability = 0; unsigned max_notify_retries = 0; @@ -1312,13 +1306,11 @@ protected: RGWServiceRegistryRef svc_registry; RGWIndexCompletionManager *index_completion_manager{nullptr}; public: - RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL), + RGWRados() : lock("rados_timer_lock"), timer(NULL), gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false), run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL), data_notifier(NULL), meta_sync_processor_thread(NULL), meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"), - num_watchers(0), watchers(NULL), - watch_initialized(false), bucket_id_lock("rados_bucket_id"), bucket_index_max_shards(0), max_bucket_id(0), cct(NULL), diff --git a/src/rgw/services/svc_notify.cc b/src/rgw/services/svc_notify.cc new file mode 100644 index 00000000000..9e1c3f3b259 --- /dev/null +++ b/src/rgw/services/svc_notify.cc @@ -0,0 +1,320 @@ +#include "include/random.h" +#include "common/errno.h" + +#include "svc_notify.h" +#include "svc_zone.h" +#include "svc_rados.h" + +#include "rgw/rgw_zone.h" + +#define dout_subsys ceph_subsys_rgw + +static string notify_oid_prefix = "notify"; + +class RGWWatcher : public librados::WatchCtx2 { + CephContext *cct; + RGWSI_Notify *svc; + int index; + RGWSI_RADOS::Obj obj; + uint64_t watch_handle; + int register_ret{0}; + librados::AioCompletion *register_completion{nullptr}; + + class C_ReinitWatch : public Context { + RGWWatcher *watcher; + public: + explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {} + void finish(int r) override { + watcher->reinit(); + } + }; +public: + RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {} + void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) override { + ldout(cct, 10) << "RGWWatcher::handle_notify() " + << " notify_id " << notify_id + << " cookie " << cookie + << " notifier " << notifier_id + << " bl.length()=" << bl.length() << dendl; + + if (unlikely(svc->inject_notify_timeout_probability == 1) || + (svc->inject_notify_timeout_probability > 0 && + (svc->inject_notify_timeout_probability > + ceph::util::generate_random_number(0.0, 1.0)))) { + ldout(cct, 0) + << "RGWWatcher::handle_notify() dropping notification! " + << "If this isn't what you want, set " + << "rgw_inject_notify_timeout_probability to zero!" << dendl; + return; + } + + svc->watch_cb(notify_id, cookie, notifier_id, bl); + + bufferlist reply_bl; // empty reply payload + obj.notify_ack(notify_id, cookie, reply_bl); + } + void handle_error(uint64_t cookie, int err) override { + lderr(cct) << "RGWWatcher::handle_error cookie " << cookie + << " err " << cpp_strerror(err) << dendl; + svc->remove_watcher(index); + svc->schedule_context(new C_ReinitWatch(this)); + } + + void reinit() { + int ret = unregister_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl; + return; + } + ret = register_watch(); + if (ret < 0) { + ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl; + return; + } + } + + int unregister_watch() { + int r = svc->unwatch(obj, watch_handle); + if (r < 0) { + return r; + } + svc->remove_watcher(index); + return 0; + } + + int register_watch_async() { + if (register_completion) { + register_completion->release(); + register_completion = nullptr; + } + register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr); + register_ret = obj.aio_watch(register_completion, &watch_handle, this); + if (register_ret < 0) { + register_completion->release(); + return register_ret; + } + return 0; + } + + int register_watch_finish() { + if (register_ret < 0) { + return register_ret; + } + if (!register_completion) { + return -EINVAL; + } + register_completion->wait_for_safe(); + int r = register_completion->get_return_value(); + register_completion->release(); + register_completion = nullptr; + if (r < 0) { + return r; + } + svc->add_watcher(index); + return 0; + } + + int register_watch() { + int r = obj.watch(&watch_handle, this); + if (r < 0) { + return r; + } + svc->add_watcher(index); + return 0; + } +}; + +int RGWS_Notify::create_instance(const string& conf, RGWServiceInstanceRef *instance) +{ + instance->reset(new RGWSI_Notify(this, cct)); + return 0; +} + +std::map RGWSI_Notify::get_deps() +{ + map deps; + deps["zone_dep"] = { .name = "zone", + .conf = "{}" }; + deps["rados_dep"] = { .name = "rados", + .conf = "{}" }; + return deps; +} + +int RGWSI_Notify::load(const string& conf, std::map& dep_refs) +{ + zone_svc = static_pointer_cast(dep_refs["zone_dep"]); + assert(zone_svc); + rados_svc = static_pointer_cast(dep_refs["rados_dep"]); + assert(rados_svc); + return 0; +} + +string RGWSI_Notify::get_control_oid(int i) +{ + char buf[notify_oid_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i); + + return string(buf); +} + +RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key) +{ + uint32_t r = ceph_str_hash_linux(key.c_str(), key.size()); + + int i = r % num_watchers; + return notify_objs[i]; +} + +int RGWSI_Notify::init_watch() +{ + num_watchers = cct->_conf->rgw_num_control_oids; + + bool compat_oid = (num_watchers == 0); + + if (num_watchers <= 0) + num_watchers = 1; + + watchers = new RGWWatcher *[num_watchers]; + + int error = 0; + + notify_objs.resize(num_watchers); + + for (int i=0; i < num_watchers; i++) { + string notify_oid; + + if (!compat_oid) { + notify_oid = get_control_oid(i); + } else { + notify_oid = notify_oid_prefix; + } + + notify_objs[i] = rados_svc->obj({control_pool, notify_oid}); + auto& notify_obj = notify_objs[i]; + + librados::ObjectWriteOperation op; + op.create(false); + int r = notify_obj.operate(&op); + if (r < 0 && r != -EEXIST) { + return r; + } + + RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj); + watchers[i] = watcher; + + r = watcher->register_watch_async(); + if (r < 0) { + ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl; + error = r; + continue; + } + } + + for (int i = 0; i < num_watchers; ++i) { + int r = watchers[i]->register_watch_finish(); + if (r < 0) { + ldout(cct, 0) << "WARNING: async watch returned " << r << dendl; + error = r; + } + } + + if (error < 0) { + return error; + } + + return 0; +} + +void RGWSI_Notify::finalize_watch() +{ + for (int i = 0; i < num_watchers; i++) { + RGWWatcher *watcher = watchers[i]; + watcher->unregister_watch(); + delete watcher; + } + + delete[] watchers; +} + +int RGWSI_Notify::init() +{ + control_pool = zone_svc->get_zone_params().control_pool; + + int ret = init_watch(); + if (ret < 0) { + lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl; + return ret; + } + + return 0; +} + +void RGWSI_Notify::shutdown() +{ + finalize_watch(); +} + +int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx) +{ + int r = obj.watch(watch_handle, ctx); + if (r < 0) + return r; + return 0; +} + +int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) +{ + int r = obj.aio_watch(c, watch_handle, ctx, 0); + if (r < 0) + return r; + return 0; +} + +int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle) +{ + int r = obj.unwatch(watch_handle); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl; + return r; + } + r = rados[0].watch_flush(); + if (r < 0) { + ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl; + return r; + } + return 0; +} + +void RGWSI_Notify::add_watcher(int i) +{ + ldout(cct, 20) << "add_watcher() i=" << i << dendl; + Mutex::Locker l(watchers_lock); + watchers_set.insert(i); + if (watchers_set.size() == (size_t)num_watchers) { + ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl; +#warning fixme +#if 0 + set_cache_enabled(true); +#endif + } +} + +void RGWSI_Notify::remove_watcher(int i) +{ + ldout(cct, 20) << "remove_watcher() i=" << i << dendl; + Mutex::Locker l(watchers_lock); + size_t orig_size = watchers_set.size(); + watchers_set.erase(i); + if (orig_size == (size_t)num_watchers && + watchers_set.size() < orig_size) { /* actually removed */ + ldout(cct, 2) << "removed watcher, disabling cache" << dendl; +#warning fixme +#if 0 + set_cache_enabled(false); +#endif + } +} + diff --git a/src/rgw/services/svc_notify.h b/src/rgw/services/svc_notify.h new file mode 100644 index 00000000000..d1118cbabb5 --- /dev/null +++ b/src/rgw/services/svc_notify.h @@ -0,0 +1,63 @@ +#ifndef CEPH_RGW_SERVICES_NOTIFY_H +#define CEPH_RGW_SERVICES_NOTIFY_H + + +#include "rgw/rgw_service.h" + +#include "svc_rados.h" + + +class RGWSI_Zone; + +class RGWWatcher; + +class RGWS_Notify : public RGWService +{ +public: + RGWS_Notify(CephContext *cct) : RGWService(cct, "quota") {} + + int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override; +}; + +class RGWSI_Notify : public RGWServiceInstance +{ + std::shared_ptr zone_svc; + std::shared_ptr rados_svc; + + std::map get_deps() override; + int load(const std::string& conf, std::map& dep_refs) override; + + Mutex watchers_lock{"watchers_lock"}; + rgw_pool control_pool; + + int num_watchers{0}; + RGWWatcher **watchers{nullptr}; + std::set watchers_set; + vector notify_objs; + + double inject_notify_timeout_probability{0}; + unsigned max_notify_retries{0}; + + friend class RGWWatcher; + + string get_control_oid(int i); + RGWSI_RADOS::Obj pick_control_obj(const string& key); + + int init_watch(); + void finalize_watch(); + + int init() override; + void shutdown() override; + + int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx); + int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c); + int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle); + void add_watcher(int i); + void remove_watcher(int i); +public: + RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {} + +}; + +#endif + diff --git a/src/rgw/services/svc_rados.cc b/src/rgw/services/svc_rados.cc index 9d4eb227105..192734e7e81 100644 --- a/src/rgw/services/svc_rados.cc +++ b/src/rgw/services/svc_rados.cc @@ -163,6 +163,21 @@ int RGWSI_RADOS::Obj::aio_operate(librados::AioCompletion *c, librados::ObjectWr return ref.ioctx.aio_operate(ref.oid, c, op); } +int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx) +{ + return ref.ioctx.watch2(ref.oid, handle, ctx); +} + +int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx) +{ + return ref.ioctx.aio_watch(ref.oid, c, handle, ctx); +} + +int RGWSI_RADOS::Obj::unwatch(uint64_t handle) +{ + return ref.ioctx.unwatch2(handle); +} + uint64_t RGWSI_RADOS::Obj::get_last_version() { return ref.ioctx.get_last_version(); diff --git a/src/rgw/services/svc_rados.h b/src/rgw/services/svc_rados.h index 7cafb786901..1949a3ad4b4 100644 --- a/src/rgw/services/svc_rados.h +++ b/src/rgw/services/svc_rados.h @@ -92,6 +92,10 @@ public: int operate(librados::ObjectReadOperation *op, bufferlist *pbl); int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op); + int watch(uint64_t *handle, librados::WatchCtx2 *ctx); + int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx); + int unwatch(uint64_t handle); + uint64_t get_last_version(); }; diff --git a/src/rgw/services/svc_sys_obj.cc b/src/rgw/services/svc_sys_obj.cc index 722c43a46f1..e0b54fb508e 100644 --- a/src/rgw/services/svc_sys_obj.cc +++ b/src/rgw/services/svc_sys_obj.cc @@ -68,6 +68,7 @@ int RGWSI_SysObj::Obj::ROp::read(int64_t ofs, int64_t end, bufferlist *bl) objv_tracker, obj, bl, ofs, end, attrs, + cache_info, refresh_version); } @@ -111,7 +112,7 @@ int RGWSI_SysObj::Obj::WOp::write_attrs() RGWSI_SysObj_Core *svc = source.core_svc; rgw_raw_obj& obj = source.get_obj(); - return svc->set_attrs(obj, attrs, objv_tracker); + return svc->set_attrs(obj, attrs, nullptr, objv_tracker); } int RGWSI_SysObj::Pool::Op::list_prefixed_objs(const string& prefix, list *result) diff --git a/src/rgw/services/svc_sys_obj.h b/src/rgw/services/svc_sys_obj.h index c85b5b2607d..2a7432da33b 100644 --- a/src/rgw/services/svc_sys_obj.h +++ b/src/rgw/services/svc_sys_obj.h @@ -12,6 +12,8 @@ class RGWSI_Zone; class RGWSI_SysObj; class RGWSysObjectCtx; +struct rgw_cache_entry_info; + class RGWS_SysObj : public RGWService { public: @@ -59,6 +61,7 @@ public: boost::optional refresh_version{boost::none}; ceph::real_time *lastmod{nullptr}; uint64_t *obj_size{nullptr}; + rgw_cache_entry_info *cache_info{nullptr}; ROp& set_last_mod(ceph::real_time *_lastmod) { lastmod = _lastmod; @@ -80,6 +83,11 @@ public: return *this; } + ROp& set_cache_info(rgw_cache_entry_info *ci) { + cache_info = ci; + return *this; + } + ROp(Obj& _source) : source(_source) {} int stat(); diff --git a/src/rgw/services/svc_sys_obj_cache.cc b/src/rgw/services/svc_sys_obj_cache.cc index 7f2c55a2963..5acea09c0bd 100644 --- a/src/rgw/services/svc_sys_obj_cache.cc +++ b/src/rgw/services/svc_sys_obj_cache.cc @@ -1,4 +1,25 @@ #include "svc_sys_obj_cache.h" +#include "svc_zone.h" + +#define dout_subsys ceph_subsys_rgw + +static string normal_name(rgw_pool& pool, const std::string& oid) { + std::string buf; + buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2); + buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid); + return buf; +} + +void RGWSI_SysObj_Cache::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj) +{ + if (src_obj.size()) { + dst_pool = src_pool; + dst_obj = src_obj; + } else { + dst_pool = zone_svc->get_zone_params().domain_root; + dst_obj = src_pool.name; + } +} int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, @@ -58,7 +79,7 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, obl->clear(); - i.copy_all(obl); + i.copy_all(*obl); if (objv_tracker) objv_tracker->read_version = info.version; if (attrs) @@ -101,6 +122,7 @@ int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, map& attrs, + map *rmattrs, RGWObjVersionTracker *objv_tracker) { rgw_pool pool; @@ -108,6 +130,9 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); ObjectCacheInfo info; info.xattrs = attrs; + if (rmattrs) { + info.rm_xattrs = *rmattrs; + } info.status = 0; info.flags = CACHE_FLAG_MODIFY_XATTRS; if (objv_tracker) { @@ -120,7 +145,7 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { cache.remove(name); } @@ -128,13 +153,13 @@ int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, return ret; } -int RGWSI_SysObjCache::write(rgw_raw_obj& obj, +int RGWSI_SysObj_Cache::write(rgw_raw_obj& obj, real_time *pmtime, map& attrs, bool exclusive, const bufferlist& data, RGWObjVersionTracker *objv_tracker, - real_time set_mtime) override; + real_time set_mtime) { rgw_pool pool; string oid; @@ -170,7 +195,7 @@ int RGWSI_SysObjCache::write(rgw_raw_obj& obj, if (!exclusive) { int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; } } else { cache.remove(name); @@ -180,7 +205,7 @@ int RGWSI_SysObjCache::write(rgw_raw_obj& obj, } int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj, - const bufferlist& bl, + const bufferlist& data, bool exclusive, RGWObjVersionTracker *objv_tracker) { @@ -196,13 +221,13 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj, info.version = objv_tracker->write_version; info.flags |= CACHE_FLAG_OBJV; } - int ret = RGWSI_SysObj_Core::write_data(obj, data, ofs, exclusive, objv_tracker); + int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker); string name = normal_name(pool, oid); if (ret >= 0) { cache.put(name, info, NULL); int r = distribute_cache(name, obj, info, UPDATE_OBJ); if (r < 0) - mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl; } else { cache.remove(name); } @@ -210,7 +235,7 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj, return ret; } -int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, +int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch, map *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) { @@ -294,10 +319,10 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, auto iter = bl.cbegin(); decode(info, iter); } catch (buffer::end_of_buffer& err) { - mydout(0) << "ERROR: got bad notification" << dendl; + ldout(cct, 0) << "ERROR: got bad notification" << dendl; return -EIO; } catch (buffer::error& err) { - mydout(0) << "ERROR: buffer::error" << dendl; + ldout(cct, 0) << "ERROR: buffer::error" << dendl; return -EIO; } @@ -314,14 +339,14 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, cache.remove(name); break; default: - mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl; + ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl; return -EINVAL; } return 0; } -int RGWSI_SysObj_Cache::call_list(const std::optional& filter, Formatter* f) +void RGWSI_SysObj_Cache::call_list(const std::optional& filter, Formatter* f) { cache.for_each( [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { diff --git a/src/rgw/services/svc_sys_obj_cache.h b/src/rgw/services/svc_sys_obj_cache.h index ece465d91f8..3cc1570e8dd 100644 --- a/src/rgw/services/svc_sys_obj_cache.h +++ b/src/rgw/services/svc_sys_obj_cache.h @@ -4,12 +4,16 @@ #include "rgw/rgw_service.h" +#include "rgw/rgw_cache.h" -#include "svc_rados.h" +#include "svc_sys_obj_core.h" class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core { + ObjectCache cache; + + void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj); protected: std::map get_deps() override; int load(const std::string& conf, std::map& dep_refs) override; @@ -24,12 +28,14 @@ protected: rgw_raw_obj& obj, bufferlist *bl, off_t ofs, off_t end, map *attrs, + rgw_cache_entry_info *cache_info, boost::optional) override; int get_attr(rgw_raw_obj& obj, const char *name, bufferlist *dest) override; int set_attrs(rgw_raw_obj& obj, map& attrs, + map *rmattrs, RGWObjVersionTracker *objv_tracker); int remove(RGWSysObjectCtxBase& obj_ctx, @@ -57,9 +63,9 @@ protected: bufferlist& bl); public: - RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct): RGW_SysObj_Core(svc, cct) {} + RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {} - int call_list(const std::optional& filter, Formatter* f); + void call_list(const std::optional& filter, Formatter* f); int call_inspect(const std::string& target, Formatter* f); int call_erase(const std::string& target); int call_zap(); diff --git a/src/rgw/services/svc_sys_obj_core.cc b/src/rgw/services/svc_sys_obj_core.cc index 39c88483e1c..742bfcc7352 100644 --- a/src/rgw/services/svc_sys_obj_core.cc +++ b/src/rgw/services/svc_sys_obj_core.cc @@ -202,6 +202,7 @@ int RGWSI_SysObj_Core::read(RGWSysObjectCtxBase& obj_ctx, rgw_raw_obj& obj, bufferlist *bl, off_t ofs, off_t end, map *attrs, + rgw_cache_entry_info *cache_info, boost::optional) { uint64_t len; @@ -281,6 +282,7 @@ int RGWSI_SysObj_Core::get_attr(rgw_raw_obj& obj, int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj, map& attrs, + map *rmattrs, RGWObjVersionTracker *objv_tracker) { RGWSI_RADOS::Obj rados_obj; @@ -290,7 +292,7 @@ int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj, return r; } - ObjectWriteOperation op; + librados::ObjectWriteOperation op; if (objv_tracker) { objv_tracker->prepare_op_for_write(&op); @@ -562,7 +564,7 @@ int RGWSI_SysObj_Core::write_data(rgw_raw_obj& obj, return r; } - ObjectWriteOperation op; + librados::ObjectWriteOperation op; if (exclusive) { op.create(true); @@ -571,11 +573,7 @@ int RGWSI_SysObj_Core::write_data(rgw_raw_obj& obj, if (objv_tracker) { objv_tracker->prepare_op_for_write(&op); } - if (ofs == -1) { - op.write_full(bl); - } else { - op.write(ofs, bl); - } + op.write_full(bl); r = rados_obj.operate(&op); if (r < 0) return r; diff --git a/src/rgw/services/svc_sys_obj_core.h b/src/rgw/services/svc_sys_obj_core.h index ec00f08a7d0..5ca9aed3102 100644 --- a/src/rgw/services/svc_sys_obj_core.h +++ b/src/rgw/services/svc_sys_obj_core.h @@ -9,6 +9,8 @@ class RGWSI_Zone; +struct rgw_cache_entry_info; + struct RGWSysObjState { rgw_raw_obj obj; bool has_attrs{false}; @@ -133,6 +135,7 @@ protected: rgw_raw_obj& obj, bufferlist *bl, off_t ofs, off_t end, map *attrs, + rgw_cache_entry_info *cache_info, boost::optional); virtual int remove(RGWSysObjectCtxBase& obj_ctx, @@ -156,6 +159,7 @@ protected: virtual int set_attrs(rgw_raw_obj& obj, map& attrs, + map *rmattrs, RGWObjVersionTracker *objv_tracker); virtual int omap_get_all(rgw_raw_obj& obj, std::map *m); -- 2.39.5