void invalidate_all();
};
-template <class T>
-class RGWCache : public T
-{
- ObjectCache cache;
-
- int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
- return T::list_objects_raw_init(pool, handle);
- }
- int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
- return T::list_objects_raw_next(obj, handle);
- }
-
- string normal_name(rgw_pool& pool, const std::string& oid) {
- std::string buf;
- buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
- buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
- return buf;
- }
-
- void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
-
- int init_rados() override {
- int ret;
- cache.set_ctx(T::cct);
- ret = T::init_rados();
- if (ret < 0)
- return ret;
-
- return 0;
- }
-
- bool need_watch_notify() override {
- return true;
- }
-
- int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
- int watch_cb(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id,
- bufferlist& bl) override;
-
- void set_cache_enabled(bool state) override {
- cache.set_enabled(state);
- }
-public:
- RGWCache() {}
-
- void register_chained_cache(RGWChainedCache *cc) override {
- cache.chain_cache(cc);
- }
-
- int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
- map<string, bufferlist>& attrs,
- map<string, bufferlist>* rmattrs,
- RGWObjVersionTracker *objv_tracker);
- int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
- map<std::string, bufferlist>& attrs, int flags,
- const bufferlist& data,
- RGWObjVersionTracker *objv_tracker,
- real_time set_mtime) override;
- int put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& bl, off_t ofs, bool exclusive,
- RGWObjVersionTracker *objv_tracker = nullptr) override;
-
- int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
- RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
- bufferlist& bl, off_t ofs, off_t end,
- map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info,
- boost::optional<obj_version> refresh_version = boost::none) override;
-
- int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
- bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
-
- int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
-
- bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
- return cache.chain_cache_entry(cache_info_entries, chained_entry);
- }
- void call_list(const std::optional<std::string>& filter,
- Formatter* format) override;
- bool call_inspect(const std::string& target, Formatter* format) override;
- bool call_erase(const std::string& target) override;
- void call_zap() override;
-};
-
-template <class T>
-void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
-{
- if (src_obj.size()) {
- dst_pool = src_pool;
- dst_obj = src_obj;
- } else {
- dst_pool = T::get_zone_params().domain_root;
- dst_obj = src_pool.name;
- }
-}
-
-template <class T>
-int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-
- string name = normal_name(pool, oid);
- cache.remove(name);
-
- ObjectCacheInfo info;
- distribute_cache(name, obj, info, REMOVE_OBJ);
-
- return T::delete_system_obj(obj, objv_tracker);
-}
-
-template <class T>
-int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
- RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
- bufferlist& obl, off_t ofs, off_t end,
- map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info,
- boost::optional<obj_version> refresh_version)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
- if (ofs != 0)
- return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
-
- string name = normal_name(pool, oid);
-
- ObjectCacheInfo info;
-
- uint32_t flags = CACHE_FLAG_DATA;
- if (objv_tracker)
- flags |= CACHE_FLAG_OBJV;
- if (attrs)
- flags |= CACHE_FLAG_XATTRS;
-
- if ((cache.get(name, info, flags, cache_info) == 0) &&
- (!refresh_version || !info.version.compare(&(*refresh_version)))) {
- if (info.status < 0)
- return info.status;
-
- bufferlist& bl = info.data;
-
- bufferlist::iterator i = bl.begin();
-
- obl.clear();
-
- i.copy_all(obl);
- if (objv_tracker)
- objv_tracker->read_version = info.version;
- if (attrs)
- *attrs = info.xattrs;
- return bl.length();
- }
- int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
- if (r < 0) {
- if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
- info.status = r;
- cache.put(name, info, cache_info);
- }
- return r;
- }
-
- if (obl.length() == end + 1) {
- /* in this case, most likely object contains more data, we can't cache it */
- return r;
- }
-
- bufferptr p(r);
- bufferlist& bl = info.data;
- bl.clear();
- bufferlist::iterator o = obl.begin();
- o.copy_all(bl);
- info.status = 0;
- info.flags = flags;
- if (objv_tracker) {
- info.version = objv_tracker->read_version;
- }
- if (attrs) {
- info.xattrs = *attrs;
- }
- cache.put(name, info, cache_info);
- return r;
-}
-
-template <class T>
-int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
- map<string, bufferlist>& attrs,
- map<string, bufferlist>* rmattrs,
- RGWObjVersionTracker *objv_tracker)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
- ObjectCacheInfo info;
- info.xattrs = attrs;
- if (rmattrs)
- info.rm_xattrs = *rmattrs;
- info.status = 0;
- info.flags = CACHE_FLAG_MODIFY_XATTRS;
- if (objv_tracker) {
- info.version = objv_tracker->write_version;
- info.flags |= CACHE_FLAG_OBJV;
- }
- int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
- string name = normal_name(pool, oid);
- if (ret >= 0) {
- cache.put(name, info, NULL);
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
- if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
- } else {
- cache.remove(name);
- }
-
- return ret;
-}
-
-template <class T>
-int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
- map<std::string, bufferlist>& attrs, int flags,
- const bufferlist& data,
- RGWObjVersionTracker *objv_tracker,
- real_time set_mtime)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
- ObjectCacheInfo info;
- info.xattrs = attrs;
- info.status = 0;
- info.data = data;
- info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
- if (objv_tracker) {
- info.version = objv_tracker->write_version;
- info.flags |= CACHE_FLAG_OBJV;
- }
- ceph::real_time result_mtime;
- int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
- objv_tracker, set_mtime);
- if (mtime) {
- *mtime = result_mtime;
- }
- info.meta.mtime = result_mtime;
- info.meta.size = size;
- string name = normal_name(pool, oid);
- if (ret >= 0) {
- cache.put(name, info, NULL);
- // Only distribute the cache information if we did not just create
- // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
- // PUT_OBJ_CREATE. Generally speaking, when successfully creating
- // a system object with the exclusive flag it is not necessary to
- // call distribute_cache, as a) it's unclear whether other RGWs
- // will need that system object in the near-term and b) it
- // generates additional network traffic.
- if (!(flags & PUT_OBJ_EXCL)) {
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
- if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
- }
- } else {
- cache.remove(name);
- }
-
- return ret;
-}
-
-template <class T>
-int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& data, off_t ofs, bool exclusive,
- RGWObjVersionTracker *objv_tracker)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
- ObjectCacheInfo info;
- bool cacheable = false;
- if ((ofs == 0) || (ofs == -1)) {
- cacheable = true;
- info.data = data;
- info.meta.size = data.length();
- info.status = 0;
- info.flags = CACHE_FLAG_DATA;
- }
- if (objv_tracker) {
- info.version = objv_tracker->write_version;
- info.flags |= CACHE_FLAG_OBJV;
- }
- int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
- if (cacheable) {
- string name = normal_name(pool, oid);
- if (ret >= 0) {
- cache.put(name, info, NULL);
- int r = distribute_cache(name, obj, info, UPDATE_OBJ);
- if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
- } else {
- cache.remove(name);
- }
- }
-
- return ret;
-}
-
-template <class T>
-int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
- uint64_t *pepoch, map<string, bufferlist> *attrs,
- bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
-{
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-
- string name = normal_name(pool, oid);
-
- uint64_t size;
- real_time mtime;
- uint64_t epoch;
-
- ObjectCacheInfo info;
- uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
- if (objv_tracker)
- flags |= CACHE_FLAG_OBJV;
- int r = cache.get(name, info, flags, NULL);
- if (r == 0) {
- if (info.status < 0)
- return info.status;
-
- size = info.meta.size;
- mtime = info.meta.mtime;
- epoch = info.epoch;
- if (objv_tracker)
- objv_tracker->read_version = info.version;
- goto done;
- }
- r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
- if (r < 0) {
- if (r == -ENOENT) {
- info.status = r;
- cache.put(name, info, NULL);
- }
- return r;
- }
- info.status = 0;
- info.epoch = epoch;
- info.meta.mtime = mtime;
- info.meta.size = size;
- info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
- if (objv_tracker) {
- info.flags |= CACHE_FLAG_OBJV;
- info.version = objv_tracker->read_version;
- }
- cache.put(name, info, NULL);
-done:
- if (psize)
- *psize = size;
- if (pmtime)
- *pmtime = mtime;
- if (pepoch)
- *pepoch = epoch;
- if (attrs)
- *attrs = info.xattrs;
- return 0;
-}
-
-template <class T>
-int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
-{
- RGWCacheNotifyInfo info;
-
- info.op = op;
-
- info.obj_info = obj_info;
- info.obj = obj;
- bufferlist bl;
- encode(info, bl);
- return T::distribute(normal_name, bl);
-}
-
-template <class T>
-int RGWCache<T>::watch_cb(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id,
- bufferlist& bl)
-{
- RGWCacheNotifyInfo info;
-
- try {
- auto iter = bl.cbegin();
- decode(info, iter);
- } catch (buffer::end_of_buffer& err) {
- mydout(0) << "ERROR: got bad notification" << dendl;
- return -EIO;
- } catch (buffer::error& err) {
- mydout(0) << "ERROR: buffer::error" << dendl;
- return -EIO;
- }
-
- rgw_pool pool;
- string oid;
- normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
- string name = normal_name(pool, oid);
-
- switch (info.op) {
- case UPDATE_OBJ:
- cache.put(name, info.obj_info, NULL);
- break;
- case REMOVE_OBJ:
- cache.remove(name);
- break;
- default:
- mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
- return -EINVAL;
- }
-
- return 0;
-}
-
-template<typename T>
-void RGWCache<T>::call_list(const std::optional<std::string>& filter,
- Formatter* f)
-{
- cache.for_each(
- [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
- if (!filter || name.find(*filter) != name.npos) {
- T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
- entry.info.meta.size);
- }
- });
-}
-
-template<typename T>
-bool RGWCache<T>::call_inspect(const std::string& target, Formatter* f)
-{
- if (const auto entry = cache.get(target)) {
- f->open_object_section("cache_entry");
- f->dump_string("name", target.c_str());
- entry->dump(f);
- f->close_section();
- return true;
- } else {
- return false;
- }
-}
-
-template<typename T>
-bool RGWCache<T>::call_erase(const std::string& target)
-{
- return cache.remove(target);
-}
-
-template<typename T>
-void RGWCache<T>::call_zap()
-{
- cache.invalidate_all();
-}
#endif
if (finisher) {
finisher->stop();
}
- if (need_watch_notify()) {
- finalize_watch();
- }
if (finisher) {
/* delete finisher only after cleaning up watches, as watch error path might call
* into finisher. We stop finisher before finalizing watch to make sure we don't
return r;
}
-int RGWRados::distribute(const string& key, bufferlist& bl)
-{
- RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
-
- ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
- return robust_notify(notify_oid, bl);
-}
-
-int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
-{
- // The reply of every machine that acks goes in here.
- boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
- bufferlist rbl;
-
- // First, try to send, without being fancy about it.
- auto r = notify_obj.notify(bl, 0, &rbl);
-
- // If that doesn't work, get serious.
- if (r < 0) {
- ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
- << cpp_strerror(-r) << dendl;
-
-
- auto p = rbl.cbegin();
- // Gather up the replies to the first attempt.
- try {
- uint32_t num_acks;
- decode(num_acks, p);
- // Doing this ourselves since we don't care about the payload;
- for (auto i = 0u; i < num_acks; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- acks.insert(id);
- ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
- uint32_t blen;
- decode(blen, p);
- p.advance(blen);
- }
- } catch (const buffer::error& e) {
- ldout(cct, 0) << "robust_notify: notify response parse failed: "
- << e.what() << dendl;
- acks.clear(); // Throw away junk on failed parse.
- }
-
-
- // Every machine that fails to reply and hasn't acked a previous
- // attempt goes in here.
- boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
-
- auto tries = 1u;
- while (r < 0 && tries < max_notify_retries) {
- ++tries;
- rbl.clear();
- // Reset the timeouts, we're only concerned with new ones.
- timeouts.clear();
- r = notify_obj.notify(bl, 0, &rbl);
- if (r < 0) {
- ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
- << cpp_strerror(-r) << dendl;
- p = rbl.begin();
- try {
- uint32_t num_acks;
- decode(num_acks, p);
- // Not only do we not care about the payload, but we don't
- // want to empty the container; we just want to augment it
- // with any new members.
- for (auto i = 0u; i < num_acks; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- auto ir = acks.insert(id);
- if (ir.second) {
- ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
- }
- uint32_t blen;
- decode(blen, p);
- p.advance(blen);
- }
-
- uint32_t num_timeouts;
- decode(num_timeouts, p);
- for (auto i = 0u; i < num_timeouts; ++i) {
- std::pair<uint64_t, uint64_t> id;
- decode(id, p);
- // Only track timeouts from hosts that haven't acked previously.
- if (acks.find(id) != acks.cend()) {
- ldout(cct, 20) << "robust_notify: " << id << " timed out."
- << dendl;
- timeouts.insert(id);
- }
- }
- } catch (const buffer::error& e) {
- ldout(cct, 0) << "robust_notify: notify response parse failed: "
- << e.what() << dendl;
- continue;
- }
- // If we got a good parse and timeouts is empty, that means
- // everyone who timed out in one call received the update in a
- // previous one.
- if (timeouts.empty()) {
- r = 0;
- }
- }
- }
- }
- return r;
-}
-
int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
{
librados::IoCtx& io_ctx = ctx.io_ctx;
return false;
}
-void RGWRados::call_list(const std::optional<std::string>&,
- ceph::Formatter*)
+void RGWRados::call_list(const std::optional<std::string>& s,
+ ceph::Formatter *f)
{
- return;
+ if (!svc.cache) {
+ return;
+ }
+ svc.cache->call_list(s, f);
}
-bool RGWRados::call_inspect(const std::string&, Formatter*)
+bool RGWRados::call_inspect(const std::string& s, Formatter *f)
{
- return false;
+ if (!svc.cache) {
+ return false;
+ }
+ return svc.cache->call_inspec(s, f);
}
-bool RGWRados::call_erase(const std::string&) {
- return false;
+bool RGWRados::call_erase(const std::string& s) {
+ if (!svc.cache) {
+ return false;
+ }
+ return svc.cache->call_erase(s);
}
void RGWRados::call_zap() {
- return;
+ if (svc.cache) {
+ return;
+ }
+ svc.cache->call_zap();
}
string RGWRados::get_mfa_oid(const rgw_user& user)
int update_containers_stats(map<string, RGWBucketEnt>& m);
int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
- int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
- int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
- int unwatch(uint64_t watch_handle);
- void add_watcher(int i);
- void remove_watcher(int i);
- virtual bool need_watch_notify() { return false; }
- int init_watch();
- void finalize_watch();
- int distribute(const string& key, bufferlist& bl);
-private:
- int robust_notify(const string& notify_oid, bufferlist& bl);
public:
- virtual int watch_cb(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id,
- bufferlist& bl) { return 0; }
- void pick_control_oid(const string& key, string& notify_oid);
-
- virtual void set_cache_enabled(bool state) {}
-
void set_atomic(void *ctx, rgw_obj& obj) {
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
rctx->obj.set_atomic(obj);
bufferlist& out) override final;
protected:
- void cache_list_dump_helper(Formatter* f,
- const std::string& name,
- const ceph::real_time mtime,
- const std::uint64_t size) {
- f->dump_string("name", name);
- f->dump_string("mtime", ceph::to_iso_8601(mtime));
- f->dump_unsigned("size", size);
- }
-
// `call_list` must iterate over all cache entries and call
// `cache_list_dump_helper` with the supplied Formatter on any that
// include `filter` as a substring.
notify_oid = notify_oid_prefix;
}
- notify_objs[i] = rados_svc->obj({control_pool, notify_oid});
+ notify_objs[i] = rados_svc->handle(0).obj({control_pool, notify_oid});
auto& notify_obj = notify_objs[i];
librados::ObjectWriteOperation op;
finalize_watch();
}
-int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx)
-{
- int r = obj.watch(watch_handle, ctx);
- if (r < 0)
- return r;
- return 0;
-}
-
-int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c)
-{
- int r = obj.aio_watch(c, watch_handle, ctx, 0);
- if (r < 0)
- return r;
- return 0;
-}
-
int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
{
int r = obj.unwatch(watch_handle);
ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
return r;
}
- r = rados[0].watch_flush();
+ r = rados_svc->handle(0).watch_flush();
if (r < 0) {
ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
return r;
void RGWSI_Notify::add_watcher(int i)
{
ldout(cct, 20) << "add_watcher() i=" << i << dendl;
- Mutex::Locker l(watchers_lock);
+ RWLock::WLocker l(watchers_lock);
watchers_set.insert(i);
if (watchers_set.size() == (size_t)num_watchers) {
ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
-#warning fixme
-#if 0
- set_cache_enabled(true);
-#endif
+ set_enabled(true);
}
}
void RGWSI_Notify::remove_watcher(int i)
{
ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
- Mutex::Locker l(watchers_lock);
+ RWLock::WLocker l(watchers_lock);
size_t orig_size = watchers_set.size();
watchers_set.erase(i);
if (orig_size == (size_t)num_watchers &&
watchers_set.size() < orig_size) { /* actually removed */
ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
-#warning fixme
-#if 0
- set_cache_enabled(false);
-#endif
+ set_enabled(false);
+ }
+}
+
+int RGWSI_Notify::watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl)
+{
+ RWLock::RLocker l(watchers_lock);
+ if (cb) {
+ return cb->watch_cb(notify_id, cookie, notifier_id, bl);
+ }
+ return 0;
+}
+
+void RGWSI_Notify::set_enabled(bool status)
+{
+ RWLock::RLocker l(watchers_lock);
+ if (cb) {
+ cb->set_enabled(status);
}
}
+int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
+{
+ RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
+
+ ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().oid << " bl.length()=" << bl.length() << dendl;
+ return robust_notify(notify_obj, bl);
+}
+
+int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
+{
+ // The reply of every machine that acks goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
+ bufferlist rbl;
+
+ // First, try to send, without being fancy about it.
+ auto r = notify_obj.notify(bl, 0, &rbl);
+
+ // If that doesn't work, get serious.
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
+ << cpp_strerror(-r) << dendl;
+
+
+ auto p = rbl.cbegin();
+ // Gather up the replies to the first attempt.
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Doing this ourselves since we don't care about the payload;
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ acks.insert(id);
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ acks.clear(); // Throw away junk on failed parse.
+ }
+
+
+ // Every machine that fails to reply and hasn't acked a previous
+ // attempt goes in here.
+ boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
+
+ auto tries = 1u;
+ while (r < 0 && tries < max_notify_retries) {
+ ++tries;
+ rbl.clear();
+ // Reset the timeouts, we're only concerned with new ones.
+ timeouts.clear();
+ r = notify_obj.notify(bl, 0, &rbl);
+ if (r < 0) {
+ ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
+ << cpp_strerror(-r) << dendl;
+ p = rbl.begin();
+ try {
+ uint32_t num_acks;
+ decode(num_acks, p);
+ // Not only do we not care about the payload, but we don't
+ // want to empty the container; we just want to augment it
+ // with any new members.
+ for (auto i = 0u; i < num_acks; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ auto ir = acks.insert(id);
+ if (ir.second) {
+ ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+ }
+ uint32_t blen;
+ decode(blen, p);
+ p.advance(blen);
+ }
+
+ uint32_t num_timeouts;
+ decode(num_timeouts, p);
+ for (auto i = 0u; i < num_timeouts; ++i) {
+ std::pair<uint64_t, uint64_t> id;
+ decode(id, p);
+ // Only track timeouts from hosts that haven't acked previously.
+ if (acks.find(id) != acks.cend()) {
+ ldout(cct, 20) << "robust_notify: " << id << " timed out."
+ << dendl;
+ timeouts.insert(id);
+ }
+ }
+ } catch (const buffer::error& e) {
+ ldout(cct, 0) << "robust_notify: notify response parse failed: "
+ << e.what() << dendl;
+ continue;
+ }
+ // If we got a good parse and timeouts is empty, that means
+ // everyone who timed out in one call received the update in a
+ // previous one.
+ if (timeouts.empty()) {
+ r = 0;
+ }
+ }
+ }
+ }
+ return r;
+}
+
+void RGWSI_Notify::register_watch_cb(CB *_cb)
+{
+ RWLock::WLocker l(watchers_lock);
+ cb = _cb;
+}
class RGWSI_Notify : public RGWServiceInstance
{
+public:
+ class CB;
+private:
std::shared_ptr<RGWSI_Zone> zone_svc;
std::shared_ptr<RGWSI_RADOS> rados_svc;
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
- Mutex watchers_lock{"watchers_lock"};
+ RWLock watchers_lock{"watchers_lock"};
rgw_pool control_pool;
int num_watchers{0};
string get_control_oid(int i);
RGWSI_RADOS::Obj pick_control_obj(const string& key);
+ CB *cb{nullptr};
+
int init_watch();
void finalize_watch();
int init() override;
void shutdown() override;
- int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
- int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle);
void add_watcher(int i);
void remove_watcher(int i);
+
+ int watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl);
+ void set_enabled(bool status);
+
+ int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl);
public:
RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+ class CB {
+ public:
+ virtual ~CB() {}
+ virtual int watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) = 0;
+ virtual void set_enabled(bool status) = 0;
+ };
+
+ int distribute(const string& key, bufferlist& bl);
+
+ void register_watch_cb(CB *cb);
};
#endif
return 0;
}
-librados::Rados* RGWSI_RADOS::get_rados_handle()
+librados::Rados* RGWSI_RADOS::get_rados_handle(int rados_handle)
{
if (rados.size() == 1) {
return &rados[0];
}
+
+ if (rados_handle >= 0) {
+ if (rados_handle >= (int)rados.size()) {
+ rados_handle = 0;
+ }
+ return &rados[rados_handle];
+ }
+
handle_lock.get_read();
pthread_t id = pthread_self();
std::map<pthread_t, int>:: iterator it = rados_map.find(id);
uint64_t RGWSI_RADOS::instance_id()
{
- return get_rados_handle()->get_instance_id();
+ return get_rados_handle(-1)->get_instance_id();
}
-int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
+int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle)
{
constexpr bool create = true; // create the pool if it doesn't exist
- return init_ioctx(cct, get_rados_handle(), pool, io_ctx, create);
+ return init_ioctx(cct, get_rados_handle(rados_handle), pool, io_ctx, create);
}
int RGWSI_RADOS::pool_iterate(librados::IoCtx& io_ctx,
int RGWSI_RADOS::Obj::open()
{
- int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx);
+ int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx, rados_handle);
if (r < 0) {
return r;
}
return ref.ioctx.watch2(ref.oid, handle, ctx);
}
-int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
+int RGWSI_RADOS::Obj::aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
{
return ref.ioctx.aio_watch(ref.oid, c, handle, ctx);
}
return ref.ioctx.unwatch2(handle);
}
+int RGWSI_RADOS::Obj::notify(bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl)
+{
+ return ref.ioctx.notify2(ref.oid, bl, timeout_ms, pbl);
+}
+
+void RGWSI_RADOS::Obj::notify_ack(uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist& bl)
+{
+ ref.ioctx.notify_ack(ref.oid, notify_id, cookie, bl);
+}
+
uint64_t RGWSI_RADOS::Obj::get_last_version()
{
return ref.ioctx.get_last_version();
vector<librados::PoolAsyncCompletion *> completions;
vector<int> rets;
- librados::Rados *rad = rados_svc->get_rados_handle();
+ librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
for (auto iter = pools.begin(); iter != pools.end(); ++iter) {
librados::PoolAsyncCompletion *c = librados::Rados::pool_async_create_completion();
completions.push_back(c);
int RGWSI_RADOS::Pool::lookup(const rgw_pool& pool)
{
- librados::Rados *rad = rados_svc->get_rados_handle();
+ librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
int ret = rad->pool_lookup(pool.name.c_str());
if (ret < 0) {
return ret;
return -EINVAL;
}
- int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx);
+ int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx, pool.rados_handle);
if (r < 0) {
return r;
}
return oids->size();
}
+int RGWSI_RADOS::Handle::watch_flush()
+{
+ librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
+ return rad->watch_flush();
+}
+
int load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& deps) override;
- librados::Rados* get_rados_handle();
- int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx);
+ librados::Rados* get_rados_handle(int rados_handle);
+ int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle);
int pool_iterate(librados::IoCtx& ioctx,
librados::NObjectIterator& iter,
uint32_t num, vector<rgw_bucket_dir_entry>& objs,
uint64_t instance_id();
+ class Handle;
+
class Obj {
friend class RGWSI_RADOS;
+ friend class Handle;
RGWSI_RADOS *rados_svc{nullptr};
+ int rados_handle{-1};
rgw_rados_ref ref;
void init(const rgw_raw_obj& obj);
- Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj) : rados_svc(_rados_svc) {
+ Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj, int _rados_handle) : rados_svc(_rados_svc), rados_handle(_rados_handle) {
init(_obj);
}
public:
Obj() {}
Obj(const Obj& o) : rados_svc(o.rados_svc),
+ rados_handle(o.rados_handle),
ref(o.ref) {}
Obj(Obj&& o) : rados_svc(o.rados_svc),
+ rados_handle(o.rados_handle),
ref(std::move(o.ref)) {}
Obj& operator=(Obj&& o) {
rados_svc = o.rados_svc;
+ rados_handle = o.rados_handle;
ref = std::move(o.ref);
return *this;
}
int watch(uint64_t *handle, librados::WatchCtx2 *ctx);
int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx);
int unwatch(uint64_t handle);
+ int notify(bufferlist& bl,
+ uint64_t timeout_ms,
+ bufferlist *pbl);
+ void notify_ack(uint64_t notify_id,
+ uint64_t cookie,
+ bufferlist& bl);
uint64_t get_last_version();
+
+ rgw_rados_ref& get_ref() {
+ return ref;
+ }
};
class Pool {
friend class RGWSI_RADOS;
+ friend class Handle;
RGWSI_RADOS *rados_svc{nullptr};
+ int rados_handle{-1};
rgw_pool pool;
- Pool(RGWSI_RADOS *_rados_svc, const rgw_pool& _pool) : rados_svc(_rados_svc),
- pool(_pool) {}
+ Pool(RGWSI_RADOS *_rados_svc,
+ const rgw_pool& _pool,
+ int _rados_handle) : rados_svc(_rados_svc),
+ rados_handle(_rados_handle),
+ pool(_pool) {}
Pool(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {}
public:
Pool() {}
Pool(const Pool& p) : rados_svc(p.rados_svc),
+ rados_handle(p.rados_handle),
pool(p.pool) {}
int create(const std::vector<rgw_pool>& pools, std::vector<int> *retcodes);
friend class List;
};
+ class Handle {
+ friend class RGWSI_RADOS;
+
+ RGWSI_RADOS *rados_svc{nullptr};
+ int rados_handle{-1};
+
+ Handle(RGWSI_RADOS *_rados_svc, int _rados_handle) : rados_svc(_rados_svc),
+ rados_handle(_rados_handle) {}
+ public:
+ Obj obj(const rgw_raw_obj& o) {
+ return Obj(rados_svc, o, rados_handle);
+ }
+
+ Pool pool(const rgw_pool& p) {
+ return Pool(rados_svc, p, rados_handle);
+ }
+
+ int watch_flush();
+ };
+
+ Handle handle(int rados_handle) {
+ return Handle(this, rados_handle);
+ }
+
Obj obj(const rgw_raw_obj& o) {
- return Obj(this, o);
+ return Obj(this, o, -1);
}
Pool pool() {
}
Pool pool(const rgw_pool& p) {
- return Pool(this, p);
+ return Pool(this, p, -1);
}
friend class Obj;
#include "svc_sys_obj_cache.h"
#include "svc_zone.h"
+#include "svc_notify.h"
#define dout_subsys ceph_subsys_rgw
+class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
+{
+ RGWSI_SysObj_Cache *svc;
+public:
+ RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
+ int watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) {
+ return svc->watch_cb(notify_id, cookie, notifier_id, bl);
+ }
+
+ void set_enabled(bool status) {
+ svc->set_enabled(status);
+ }
+};
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_SysObj_Cache::get_deps()
+{
+ map<string, RGWServiceInstance::dependency> deps = RGWSI_SysObj_Core::get_deps();
+
+ deps["cache_notify_dep"] = { .name = "notify",
+ .conf = "{}" };
+ return deps;
+}
+
+int RGWSI_SysObj_Cache::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
+{
+ int r = RGWSI_SysObj_Core::load(conf, dep_refs);
+ if (r < 0) {
+ return r;
+ }
+
+ notify_svc = static_pointer_cast<RGWSI_Notify>(dep_refs["cache_notify_dep"]);
+ assert(notify_svc);
+
+ cb.reset(new RGWSI_SysObj_Cache_CB(this));
+
+ notify_svc->register_watch_cb(cb.get());
+
+ return 0;
+}
+
static string normal_name(rgw_pool& pool, const std::string& oid) {
std::string buf;
buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
cache.remove(name);
ObjectCacheInfo info;
- distribute_cache(name, obj, info, REMOVE_OBJ);
+ int r = distribute_cache(name, obj, info, REMOVE_OBJ);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
+ }
return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
}
rgw_pool pool;
string oid;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
+
ObjectCacheInfo info;
info.data = data;
info.meta.size = data.length();
info.status = 0;
info.flags = CACHE_FLAG_DATA;
+
if (objv_tracker) {
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
info.obj = obj;
bufferlist bl;
encode(info, bl);
- return T::distribute(normal_name, bl);
+ return notify_svc->distribute(normal_name, bl);
}
int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
return 0;
}
+void RGWSI_SysObj_Cache::set_enabled(bool status)
+{
+ cache.set_enabled(status);
+}
+
+bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
+ RGWChainedCache::Entry *chained_entry)
+{
+ return cache.chain_cache_entry(cache_info_entries, chained_entry);
+}
+
+void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
+{
+ cache.chain_cache(cc);
+}
+
+static void cache_list_dump_helper(Formatter* f,
+ const std::string& name,
+ const ceph::real_time mtime,
+ const std::uint64_t size)
+{
+ f->dump_string("name", name);
+ f->dump_string("mtime", ceph::to_iso_8601(mtime));
+ f->dump_unsigned("size", size);
+}
+
void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
{
cache.for_each(
[this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
if (!filter || name.find(*filter) != name.npos) {
- T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
- entry.info.meta.size);
+ cache_list_dump_helper(f, name, entry.info.meta.mtime,
+ entry.info.meta.size);
}
});
}
int RGWSI_SysObj_Cache::call_zap()
{
cache.invalidate_all();
+ return 0;
}
#include "svc_sys_obj_core.h"
+class RGWSI_Notify;
+
+class RGWSI_SysObj_Cache_CB;
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
+ friend class RGWSI_SysObj_Cache_CB;
+
+ std::shared_ptr<RGWSI_Notify> notify_svc;
ObjectCache cache;
+ std::shared_ptr<RGWSI_SysObj_Cache_CB> cb;
+
void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
protected:
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
uint64_t notifier_id,
bufferlist& bl);
+ void set_enabled(bool status);
+
public:
- RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {}
+ RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {
+ cache.set_ctx(cct);
+ }
+
+ bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
+ RGWChainedCache::Entry *chained_entry);
+ void register_chained_cache(RGWChainedCache *cc);
void call_list(const std::optional<std::string>& filter, Formatter* f);
int call_inspect(const std::string& target, Formatter* f);