]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: svc_cache: more work
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 6 Sep 2018 08:16:01 +0000 (01:16 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 8 Nov 2018 17:19:29 +0000 (09:19 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cache.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/services/svc_notify.cc
src/rgw/services/svc_notify.h
src/rgw/services/svc_rados.cc
src/rgw/services/svc_rados.h
src/rgw/services/svc_sys_obj_cache.cc
src/rgw/services/svc_sys_obj_cache.h

index 5d4230a41e7ea093ac7e1c9cdbadc20963ca6b25..498c86c232677f4738d35a08407c84c43114e775 100644 (file)
@@ -195,460 +195,4 @@ public:
   void invalidate_all();
 };
 
-template <class T>
-class RGWCache  : public T
-{
-  ObjectCache cache;
-
-  int list_objects_raw_init(rgw_pool& pool, RGWAccessHandle *handle) {
-    return T::list_objects_raw_init(pool, handle);
-  }
-  int list_objects_raw_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle) {
-    return T::list_objects_raw_next(obj, handle);
-  }
-
-  string normal_name(rgw_pool& pool, const std::string& oid) {
-    std::string buf;
-    buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
-    buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
-    return buf;
-  }
-
-  void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
-
-  int init_rados() override {
-    int ret;
-    cache.set_ctx(T::cct);
-    ret = T::init_rados();
-    if (ret < 0)
-      return ret;
-
-    return 0;
-  }
-
-  bool need_watch_notify() override {
-    return true;
-  }
-
-  int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op);
-  int watch_cb(uint64_t notify_id,
-              uint64_t cookie,
-              uint64_t notifier_id,
-              bufferlist& bl) override;
-
-  void set_cache_enabled(bool state) override {
-    cache.set_enabled(state);
-  }
-public:
-  RGWCache() {}
-
-  void register_chained_cache(RGWChainedCache *cc) override {
-    cache.chain_cache(cc);
-  }
-
-  int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, 
-                map<string, bufferlist>& attrs,
-                map<string, bufferlist>* rmattrs,
-                RGWObjVersionTracker *objv_tracker);
-  int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
-              map<std::string, bufferlist>& attrs, int flags,
-              const bufferlist& data,
-              RGWObjVersionTracker *objv_tracker,
-              real_time set_mtime) override;
-  int put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& bl, off_t ofs, bool exclusive,
-                          RGWObjVersionTracker *objv_tracker = nullptr) override;
-
-  int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
-                     RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
-                     bufferlist& bl, off_t ofs, off_t end,
-                     map<string, bufferlist> *attrs,
-                     rgw_cache_entry_info *cache_info,
-                    boost::optional<obj_version> refresh_version = boost::none) override;
-
-  int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs,
-                   bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override;
-
-  int delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker) override;
-
-  bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, RGWChainedCache::Entry *chained_entry) override {
-    return cache.chain_cache_entry(cache_info_entries, chained_entry);
-  }
-  void call_list(const std::optional<std::string>& filter,
-                Formatter* format) override;
-  bool call_inspect(const std::string& target, Formatter* format) override;
-  bool call_erase(const std::string& target) override;
-  void call_zap() override;
-};
-
-template <class T>
-void RGWCache<T>::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
-{
-  if (src_obj.size()) {
-    dst_pool = src_pool;
-    dst_obj = src_obj;
-  } else {
-    dst_pool = T::get_zone_params().domain_root;
-    dst_obj = src_pool.name;
-  }
-}
-
-template <class T>
-int RGWCache<T>::delete_system_obj(rgw_raw_obj& obj, RGWObjVersionTracker *objv_tracker)
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-
-  string name = normal_name(pool, oid);
-  cache.remove(name);
-
-  ObjectCacheInfo info;
-  distribute_cache(name, obj, info, REMOVE_OBJ);
-
-  return T::delete_system_obj(obj, objv_tracker);
-}
-
-template <class T>
-int RGWCache<T>::get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
-                     RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
-                     bufferlist& obl, off_t ofs, off_t end,
-                     map<string, bufferlist> *attrs,
-                    rgw_cache_entry_info *cache_info,
-                    boost::optional<obj_version> refresh_version)
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-  if (ofs != 0)
-    return T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
-
-  string name = normal_name(pool, oid);
-
-  ObjectCacheInfo info;
-
-  uint32_t flags = CACHE_FLAG_DATA;
-  if (objv_tracker)
-    flags |= CACHE_FLAG_OBJV;
-  if (attrs)
-    flags |= CACHE_FLAG_XATTRS;
-
-  if ((cache.get(name, info, flags, cache_info) == 0) &&
-      (!refresh_version || !info.version.compare(&(*refresh_version)))) {
-    if (info.status < 0)
-      return info.status;
-
-    bufferlist& bl = info.data;
-
-    bufferlist::iterator i = bl.begin();
-
-    obl.clear();
-
-    i.copy_all(obl);
-    if (objv_tracker)
-      objv_tracker->read_version = info.version;
-    if (attrs)
-      *attrs = info.xattrs;
-    return bl.length();
-  }
-  int r = T::get_system_obj(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, cache_info);
-  if (r < 0) {
-    if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors
-      info.status = r;
-      cache.put(name, info, cache_info);
-    }
-    return r;
-  }
-
-  if (obl.length() == end + 1) {
-    /* in this case, most likely object contains more data, we can't cache it */
-    return r;
-  }
-
-  bufferptr p(r);
-  bufferlist& bl = info.data;
-  bl.clear();
-  bufferlist::iterator o = obl.begin();
-  o.copy_all(bl);
-  info.status = 0;
-  info.flags = flags;
-  if (objv_tracker) {
-    info.version = objv_tracker->read_version;
-  }
-  if (attrs) {
-    info.xattrs = *attrs;
-  }
-  cache.put(name, info, cache_info);
-  return r;
-}
-
-template <class T>
-int RGWCache<T>::system_obj_set_attrs(void *ctx, rgw_raw_obj& obj, 
-                           map<string, bufferlist>& attrs,
-                           map<string, bufferlist>* rmattrs,
-                           RGWObjVersionTracker *objv_tracker) 
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-  ObjectCacheInfo info;
-  info.xattrs = attrs;
-  if (rmattrs)
-    info.rm_xattrs = *rmattrs;
-  info.status = 0;
-  info.flags = CACHE_FLAG_MODIFY_XATTRS;
-  if (objv_tracker) {
-    info.version = objv_tracker->write_version;
-    info.flags |= CACHE_FLAG_OBJV;
-  }
-  int ret = T::system_obj_set_attrs(ctx, obj, attrs, rmattrs, objv_tracker);
-  string name = normal_name(pool, oid);
-  if (ret >= 0) {
-    cache.put(name, info, NULL);
-    int r = distribute_cache(name, obj, info, UPDATE_OBJ);
-    if (r < 0)
-      mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
-  } else {
-    cache.remove(name);
-  }
-
-  return ret;
-}
-
-template <class T>
-int RGWCache<T>::put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, real_time *mtime,
-              map<std::string, bufferlist>& attrs, int flags,
-              const bufferlist& data,
-              RGWObjVersionTracker *objv_tracker,
-              real_time set_mtime)
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-  ObjectCacheInfo info;
-  info.xattrs = attrs;
-  info.status = 0;
-  info.data = data;
-  info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META;
-  if (objv_tracker) {
-    info.version = objv_tracker->write_version;
-    info.flags |= CACHE_FLAG_OBJV;
-  }
-  ceph::real_time result_mtime;
-  int ret = T::put_system_obj_impl(obj, size, &result_mtime, attrs, flags, data,
-                                  objv_tracker, set_mtime);
-  if (mtime) {
-    *mtime = result_mtime;
-  }
-  info.meta.mtime = result_mtime;
-  info.meta.size = size;
-  string name = normal_name(pool, oid);
-  if (ret >= 0) {
-    cache.put(name, info, NULL);
-    // Only distribute the cache information if we did not just create
-    // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies
-    // PUT_OBJ_CREATE. Generally speaking, when successfully creating
-    // a system object with the exclusive flag it is not necessary to
-    // call distribute_cache, as a) it's unclear whether other RGWs
-    // will need that system object in the near-term and b) it
-    // generates additional network traffic.
-    if (!(flags & PUT_OBJ_EXCL)) {
-      int r = distribute_cache(name, obj, info, UPDATE_OBJ);
-      if (r < 0)
-       mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
-    }
-  } else {
-    cache.remove(name);
-  }
-
-  return ret;
-}
-
-template <class T>
-int RGWCache<T>::put_system_obj_data(void *ctx, rgw_raw_obj& obj, const bufferlist& data, off_t ofs, bool exclusive,
-                                     RGWObjVersionTracker *objv_tracker)
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-  ObjectCacheInfo info;
-  bool cacheable = false;
-  if ((ofs == 0) || (ofs == -1)) {
-    cacheable = true;
-    info.data = data;
-    info.meta.size = data.length();
-    info.status = 0;
-    info.flags = CACHE_FLAG_DATA;
-  }
-  if (objv_tracker) {
-    info.version = objv_tracker->write_version;
-    info.flags |= CACHE_FLAG_OBJV;
-  }
-  int ret = T::put_system_obj_data(ctx, obj, data, ofs, exclusive, objv_tracker);
-  if (cacheable) {
-    string name = normal_name(pool, oid);
-    if (ret >= 0) {
-      cache.put(name, info, NULL);
-      int r = distribute_cache(name, obj, info, UPDATE_OBJ);
-      if (r < 0)
-        mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
-    } else {
-      cache.remove(name);
-    }
-  }
-
-  return ret;
-}
-
-template <class T>
-int RGWCache<T>::raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime,
-                          uint64_t *pepoch, map<string, bufferlist> *attrs,
-                          bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker)
-{
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
-
-  string name = normal_name(pool, oid);
-
-  uint64_t size;
-  real_time mtime;
-  uint64_t epoch;
-
-  ObjectCacheInfo info;
-  uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
-  if (objv_tracker)
-    flags |= CACHE_FLAG_OBJV;
-  int r = cache.get(name, info, flags, NULL);
-  if (r == 0) {
-    if (info.status < 0)
-      return info.status;
-
-    size = info.meta.size;
-    mtime = info.meta.mtime;
-    epoch = info.epoch;
-    if (objv_tracker)
-      objv_tracker->read_version = info.version;
-    goto done;
-  }
-  r = T::raw_obj_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker);
-  if (r < 0) {
-    if (r == -ENOENT) {
-      info.status = r;
-      cache.put(name, info, NULL);
-    }
-    return r;
-  }
-  info.status = 0;
-  info.epoch = epoch;
-  info.meta.mtime = mtime;
-  info.meta.size = size;
-  info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS;
-  if (objv_tracker) {
-    info.flags |= CACHE_FLAG_OBJV;
-    info.version = objv_tracker->read_version;
-  }
-  cache.put(name, info, NULL);
-done:
-  if (psize)
-    *psize = size;
-  if (pmtime)
-    *pmtime = mtime;
-  if (pepoch)
-    *pepoch = epoch;
-  if (attrs)
-    *attrs = info.xattrs;
-  return 0;
-}
-
-template <class T>
-int RGWCache<T>::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op)
-{
-  RGWCacheNotifyInfo info;
-
-  info.op = op;
-
-  info.obj_info = obj_info;
-  info.obj = obj;
-  bufferlist bl;
-  encode(info, bl);
-  return T::distribute(normal_name, bl);
-}
-
-template <class T>
-int RGWCache<T>::watch_cb(uint64_t notify_id,
-                         uint64_t cookie,
-                         uint64_t notifier_id,
-                         bufferlist& bl)
-{
-  RGWCacheNotifyInfo info;
-
-  try {
-    auto iter = bl.cbegin();
-    decode(info, iter);
-  } catch (buffer::end_of_buffer& err) {
-    mydout(0) << "ERROR: got bad notification" << dendl;
-    return -EIO;
-  } catch (buffer::error& err) {
-    mydout(0) << "ERROR: buffer::error" << dendl;
-    return -EIO;
-  }
-
-  rgw_pool pool;
-  string oid;
-  normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid);
-  string name = normal_name(pool, oid);
-  
-  switch (info.op) {
-  case UPDATE_OBJ:
-    cache.put(name, info.obj_info, NULL);
-    break;
-  case REMOVE_OBJ:
-    cache.remove(name);
-    break;
-  default:
-    mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
-    return -EINVAL;
-  }
-
-  return 0;
-}
-
-template<typename T>
-void RGWCache<T>::call_list(const std::optional<std::string>& filter,
-                           Formatter* f)
-{
-  cache.for_each(
-    [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
-      if (!filter || name.find(*filter) != name.npos) {
-       T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
-                                 entry.info.meta.size);
-      }
-    });
-}
-
-template<typename T>
-bool RGWCache<T>::call_inspect(const std::string& target, Formatter* f)
-{
-  if (const auto entry = cache.get(target)) {
-    f->open_object_section("cache_entry");
-    f->dump_string("name", target.c_str());
-    entry->dump(f);
-    f->close_section();
-    return true;
-  } else {
-    return false;
-  }
-}
-
-template<typename T>
-bool RGWCache<T>::call_erase(const std::string& target)
-{
-  return cache.remove(target);
-}
-
-template<typename T>
-void RGWCache<T>::call_zap()
-{
-  cache.invalidate_all();
-}
 #endif
index 80a013dde697970c58e36f87f9e4088d830853df..e140e8b98695c4d5516606de3bd06c6b449083f4 100644 (file)
@@ -1361,9 +1361,6 @@ void RGWRados::finalize()
   if (finisher) {
     finisher->stop();
   }
-  if (need_watch_notify()) {
-    finalize_watch();
-  }
   if (finisher) {
     /* delete finisher only after cleaning up watches, as watch error path might call
      * into finisher. We stop finisher before finalizing watch to make sure we don't
@@ -8998,113 +8995,6 @@ int RGWRados::append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl)
   return r;
 }
 
-int RGWRados::distribute(const string& key, bufferlist& bl)
-{
-  RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
-
-  ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
-  return robust_notify(notify_oid, bl);
-}
-
-int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
-{
-  // The reply of every machine that acks goes in here.
-  boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
-  bufferlist rbl;
-
-  // First, try to send, without being fancy about it.
-  auto r = notify_obj.notify(bl, 0, &rbl);
-
-  // If that doesn't work, get serious.
-  if (r < 0) {
-    ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
-                 << cpp_strerror(-r) << dendl;
-
-
-    auto p = rbl.cbegin();
-    // Gather up the replies to the first attempt.
-    try {
-      uint32_t num_acks;
-      decode(num_acks, p);
-      // Doing this ourselves since we don't care about the payload;
-      for (auto i = 0u; i < num_acks; ++i) {
-       std::pair<uint64_t, uint64_t> id;
-       decode(id, p);
-       acks.insert(id);
-       ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
-       uint32_t blen;
-       decode(blen, p);
-       p.advance(blen);
-      }
-    } catch (const buffer::error& e) {
-      ldout(cct, 0) << "robust_notify: notify response parse failed: "
-                   << e.what() << dendl;
-      acks.clear(); // Throw away junk on failed parse.
-    }
-
-
-    // Every machine that fails to reply and hasn't acked a previous
-    // attempt goes in here.
-    boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
-
-    auto tries = 1u;
-    while (r < 0 && tries < max_notify_retries) {
-      ++tries;
-      rbl.clear();
-      // Reset the timeouts, we're only concerned with new ones.
-      timeouts.clear();
-      r = notify_obj.notify(bl, 0, &rbl);
-      if (r < 0) {
-       ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
-                     << cpp_strerror(-r) << dendl;
-       p = rbl.begin();
-       try {
-         uint32_t num_acks;
-         decode(num_acks, p);
-         // Not only do we not care about the payload, but we don't
-         // want to empty the container; we just want to augment it
-         // with any new members.
-         for (auto i = 0u; i < num_acks; ++i) {
-           std::pair<uint64_t, uint64_t> id;
-           decode(id, p);
-           auto ir = acks.insert(id);
-           if (ir.second) {
-             ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
-           }
-           uint32_t blen;
-           decode(blen, p);
-           p.advance(blen);
-         }
-
-         uint32_t num_timeouts;
-         decode(num_timeouts, p);
-         for (auto i = 0u; i < num_timeouts; ++i) {
-           std::pair<uint64_t, uint64_t> id;
-           decode(id, p);
-           // Only track timeouts from hosts that haven't acked previously.
-           if (acks.find(id) != acks.cend()) {
-             ldout(cct, 20) << "robust_notify: " << id << " timed out."
-                            << dendl;
-             timeouts.insert(id);
-           }
-         }
-       } catch (const buffer::error& e) {
-         ldout(cct, 0) << "robust_notify: notify response parse failed: "
-                       << e.what() << dendl;
-         continue;
-       }
-       // If we got a good parse and timeouts is empty, that means
-       // everyone who timed out in one call received the update in a
-       // previous one.
-       if (timeouts.empty()) {
-         r = 0;
-       }
-      }
-    }
-  }
-  return r;
-}
-
 int RGWRados::pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx)
 {
   librados::IoCtx& io_ctx = ctx.io_ctx;
@@ -10751,23 +10641,35 @@ bool RGWRados::call(std::string_view command, const cmdmap_t& cmdmap,
   return false;
 }
 
-void RGWRados::call_list(const std::optional<std::string>&,
-                         ceph::Formatter*)
+void RGWRados::call_list(const std::optional<std::string>& s,
+                         ceph::Formatter *f)
 {
-  return;
+  if (!svc.cache) {
+    return;
+  }
+  svc.cache->call_list(s, f);
 }
 
-bool RGWRados::call_inspect(const std::string&, Formatter*)
+bool RGWRados::call_inspect(const std::string& s, Formatter *f)
 {
-  return false;
+  if (!svc.cache) {
+    return false;
+  }
+  return svc.cache->call_inspec(s, f);
 }
 
-bool RGWRados::call_erase(const std::string&) {
-  return false;
+bool RGWRados::call_erase(const std::string& s) {
+  if (!svc.cache) {
+    return false;
+  }
+  return svc.cache->call_erase(s);
 }
 
 void RGWRados::call_zap() {
-  return;
+  if (svc.cache) {
+    return;
+  }
+  svc.cache->call_zap();
 }
 
 string RGWRados::get_mfa_oid(const rgw_user& user)
index ca43ae8ddd717c4c896f86691c47afbce053ffa0..bb72f92fad8cc5ea7e2544eb8047c8ad93ef03a4 100644 (file)
@@ -2212,26 +2212,7 @@ public:
   int update_containers_stats(map<string, RGWBucketEnt>& m);
   int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
 
-  int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
-  int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
-  int unwatch(uint64_t watch_handle);
-  void add_watcher(int i);
-  void remove_watcher(int i);
-  virtual bool need_watch_notify() { return false; }
-  int init_watch();
-  void finalize_watch();
-  int distribute(const string& key, bufferlist& bl);
-private:
-  int robust_notify(const string& notify_oid, bufferlist& bl);
 public:
-  virtual int watch_cb(uint64_t notify_id,
-                      uint64_t cookie,
-                      uint64_t notifier_id,
-                      bufferlist& bl) { return 0; }
-  void pick_control_oid(const string& key, string& notify_oid);
-
-  virtual void set_cache_enabled(bool state) {}
-
   void set_atomic(void *ctx, rgw_obj& obj) {
     RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
     rctx->obj.set_atomic(obj);
@@ -2285,15 +2266,6 @@ public:
            bufferlist& out) override final;
 
 protected:
-  void cache_list_dump_helper(Formatter* f,
-                             const std::string& name,
-                             const ceph::real_time mtime,
-                             const std::uint64_t size) {
-    f->dump_string("name", name);
-    f->dump_string("mtime", ceph::to_iso_8601(mtime));
-    f->dump_unsigned("size", size);
-  }
-
   // `call_list` must iterate over all cache entries and call
   // `cache_list_dump_helper` with the supplied Formatter on any that
   // include `filter` as a substring.
index 9e1c3f3b259672563db74e31d4d0de9a2e978676..8c468b91051c3ffd545cae0668c6b190bbad6d2d 100644 (file)
@@ -192,7 +192,7 @@ int RGWSI_Notify::init_watch()
       notify_oid = notify_oid_prefix;
     }
 
-    notify_objs[i] = rados_svc->obj({control_pool, notify_oid});
+    notify_objs[i] = rados_svc->handle(0).obj({control_pool, notify_oid});
     auto& notify_obj = notify_objs[i];
 
     librados::ObjectWriteOperation op;
@@ -257,22 +257,6 @@ void RGWSI_Notify::shutdown()
   finalize_watch();
 }
 
-int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx)
-{
-  int r = obj.watch(watch_handle, ctx);
-  if (r < 0)
-    return r;
-  return 0;
-}
-
-int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c)
-{
-  int r = obj.aio_watch(c, watch_handle, ctx, 0);
-  if (r < 0)
-    return r;
-  return 0;
-}
-
 int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
 {
   int r = obj.unwatch(watch_handle);
@@ -280,7 +264,7 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
     ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
     return r;
   }
-  r = rados[0].watch_flush();
+  r = rados_svc->handle(0).watch_flush();
   if (r < 0) {
     ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
     return r;
@@ -291,30 +275,156 @@ int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
 void RGWSI_Notify::add_watcher(int i)
 {
   ldout(cct, 20) << "add_watcher() i=" << i << dendl;
-  Mutex::Locker l(watchers_lock);
+  RWLock::WLocker l(watchers_lock);
   watchers_set.insert(i);
   if (watchers_set.size() ==  (size_t)num_watchers) {
     ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
-#warning fixme
-#if 0
-    set_cache_enabled(true);
-#endif
+    set_enabled(true);
   }
 }
 
 void RGWSI_Notify::remove_watcher(int i)
 {
   ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
-  Mutex::Locker l(watchers_lock);
+  RWLock::WLocker l(watchers_lock);
   size_t orig_size = watchers_set.size();
   watchers_set.erase(i);
   if (orig_size == (size_t)num_watchers &&
       watchers_set.size() < orig_size) { /* actually removed */
     ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
-#warning fixme
-#if 0
-    set_cache_enabled(false);
-#endif
+    set_enabled(false);
+  }
+}
+
+int RGWSI_Notify::watch_cb(uint64_t notify_id,
+                           uint64_t cookie,
+                           uint64_t notifier_id,
+                           bufferlist& bl)
+{
+  RWLock::RLocker l(watchers_lock);
+  if (cb) {
+    return cb->watch_cb(notify_id, cookie, notifier_id, bl);
+  }
+  return 0;
+}
+
+void RGWSI_Notify::set_enabled(bool status)
+{
+  RWLock::RLocker l(watchers_lock);
+  if (cb) {
+    cb->set_enabled(status);
   }
 }
 
+int RGWSI_Notify::distribute(const string& key, bufferlist& bl)
+{
+  RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
+
+  ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().oid << " bl.length()=" << bl.length() << dendl;
+  return robust_notify(notify_obj, bl);
+}
+
+int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
+{
+  // The reply of every machine that acks goes in here.
+  boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
+  bufferlist rbl;
+
+  // First, try to send, without being fancy about it.
+  auto r = notify_obj.notify(bl, 0, &rbl);
+
+  // If that doesn't work, get serious.
+  if (r < 0) {
+    ldout(cct, 1) << "robust_notify: If at first you don't succeed: "
+                 << cpp_strerror(-r) << dendl;
+
+
+    auto p = rbl.cbegin();
+    // Gather up the replies to the first attempt.
+    try {
+      uint32_t num_acks;
+      decode(num_acks, p);
+      // Doing this ourselves since we don't care about the payload;
+      for (auto i = 0u; i < num_acks; ++i) {
+       std::pair<uint64_t, uint64_t> id;
+       decode(id, p);
+       acks.insert(id);
+       ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+       uint32_t blen;
+       decode(blen, p);
+       p.advance(blen);
+      }
+    } catch (const buffer::error& e) {
+      ldout(cct, 0) << "robust_notify: notify response parse failed: "
+                   << e.what() << dendl;
+      acks.clear(); // Throw away junk on failed parse.
+    }
+
+
+    // Every machine that fails to reply and hasn't acked a previous
+    // attempt goes in here.
+    boost::container::flat_set<std::pair<uint64_t, uint64_t>> timeouts;
+
+    auto tries = 1u;
+    while (r < 0 && tries < max_notify_retries) {
+      ++tries;
+      rbl.clear();
+      // Reset the timeouts, we're only concerned with new ones.
+      timeouts.clear();
+      r = notify_obj.notify(bl, 0, &rbl);
+      if (r < 0) {
+       ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
+                     << cpp_strerror(-r) << dendl;
+       p = rbl.begin();
+       try {
+         uint32_t num_acks;
+         decode(num_acks, p);
+         // Not only do we not care about the payload, but we don't
+         // want to empty the container; we just want to augment it
+         // with any new members.
+         for (auto i = 0u; i < num_acks; ++i) {
+           std::pair<uint64_t, uint64_t> id;
+           decode(id, p);
+           auto ir = acks.insert(id);
+           if (ir.second) {
+             ldout(cct, 20) << "robust_notify: acked by " << id << dendl;
+           }
+           uint32_t blen;
+           decode(blen, p);
+           p.advance(blen);
+         }
+
+         uint32_t num_timeouts;
+         decode(num_timeouts, p);
+         for (auto i = 0u; i < num_timeouts; ++i) {
+           std::pair<uint64_t, uint64_t> id;
+           decode(id, p);
+           // Only track timeouts from hosts that haven't acked previously.
+           if (acks.find(id) != acks.cend()) {
+             ldout(cct, 20) << "robust_notify: " << id << " timed out."
+                            << dendl;
+             timeouts.insert(id);
+           }
+         }
+       } catch (const buffer::error& e) {
+         ldout(cct, 0) << "robust_notify: notify response parse failed: "
+                       << e.what() << dendl;
+         continue;
+       }
+       // If we got a good parse and timeouts is empty, that means
+       // everyone who timed out in one call received the update in a
+       // previous one.
+       if (timeouts.empty()) {
+         r = 0;
+       }
+      }
+    }
+  }
+  return r;
+}
+
+void RGWSI_Notify::register_watch_cb(CB *_cb)
+{
+  RWLock::WLocker l(watchers_lock);
+  cb = _cb;
+}
index d1118cbabb550e1d74768ffc4a5bcfa620772818..36adefa716820240d32fd43ffb075a2f60795fb8 100644 (file)
@@ -21,13 +21,16 @@ public:
 
 class RGWSI_Notify : public RGWServiceInstance
 {
+public:
+  class CB;
+private:
   std::shared_ptr<RGWSI_Zone> zone_svc;
   std::shared_ptr<RGWSI_RADOS> rados_svc;
 
   std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
   int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
 
-  Mutex watchers_lock{"watchers_lock"};
+  RWLock watchers_lock{"watchers_lock"};
   rgw_pool control_pool;
 
   int num_watchers{0};
@@ -43,20 +46,41 @@ class RGWSI_Notify : public RGWServiceInstance
   string get_control_oid(int i);
   RGWSI_RADOS::Obj pick_control_obj(const string& key);
 
+  CB *cb{nullptr};
+
   int init_watch();
   void finalize_watch();
 
   int init() override;
   void shutdown() override;
 
-  int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
-  int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
   int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle);
   void add_watcher(int i);
   void remove_watcher(int i);
+
+  int watch_cb(uint64_t notify_id,
+               uint64_t cookie,
+               uint64_t notifier_id,
+               bufferlist& bl);
+  void set_enabled(bool status);
+
+  int robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl);
 public:
   RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
 
+  class CB {
+    public:
+      virtual ~CB() {}
+      virtual int watch_cb(uint64_t notify_id,
+                           uint64_t cookie,
+                           uint64_t notifier_id,
+                           bufferlist& bl) = 0;
+      virtual void set_enabled(bool status) = 0;
+  };
+
+  int distribute(const string& key, bufferlist& bl);
+
+  void register_watch_cb(CB *cb);
 };
 
 #endif
index 192734e7e818b87424dc1aeb9405a85411f4533b..270fcaed9734caa56c887b114a46daba01efc417 100644 (file)
@@ -64,11 +64,19 @@ int RGWSI_RADOS::load(const string& conf, map<string, RGWServiceInstanceRef>& de
   return 0;
 }
 
-librados::Rados* RGWSI_RADOS::get_rados_handle()
+librados::Rados* RGWSI_RADOS::get_rados_handle(int rados_handle)
 {
   if (rados.size() == 1) {
     return &rados[0];
   }
+
+  if (rados_handle >= 0) {
+    if (rados_handle >= (int)rados.size()) {
+      rados_handle = 0;
+    }
+    return &rados[rados_handle];
+  }
+
   handle_lock.get_read();
   pthread_t id = pthread_self();
   std::map<pthread_t, int>:: iterator it = rados_map.find(id);
@@ -90,13 +98,13 @@ librados::Rados* RGWSI_RADOS::get_rados_handle()
 
 uint64_t RGWSI_RADOS::instance_id()
 {
-  return get_rados_handle()->get_instance_id();
+  return get_rados_handle(-1)->get_instance_id();
 }
 
-int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
+int RGWSI_RADOS::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle)
 {
   constexpr bool create = true; // create the pool if it doesn't exist
-  return init_ioctx(cct, get_rados_handle(), pool, io_ctx, create);
+  return init_ioctx(cct, get_rados_handle(rados_handle), pool, io_ctx, create);
 }
 
 int RGWSI_RADOS::pool_iterate(librados::IoCtx& io_ctx,
@@ -138,7 +146,7 @@ void RGWSI_RADOS::Obj::init(const rgw_raw_obj& obj)
 
 int RGWSI_RADOS::Obj::open()
 {
-  int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx);
+  int r = rados_svc->open_pool_ctx(ref.pool, ref.ioctx, rados_handle);
   if (r < 0) {
     return r;
   }
@@ -168,7 +176,7 @@ int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx)
   return ref.ioctx.watch2(ref.oid, handle, ctx);
 }
 
-int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
+int RGWSI_RADOS::Obj::aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
 {
   return ref.ioctx.aio_watch(ref.oid, c, handle, ctx);
 }
@@ -178,6 +186,20 @@ int RGWSI_RADOS::Obj::unwatch(uint64_t handle)
   return ref.ioctx.unwatch2(handle);
 }
 
+int RGWSI_RADOS::Obj::notify(bufferlist& bl,
+                             uint64_t timeout_ms,
+                             bufferlist *pbl)
+{
+  return ref.ioctx.notify2(ref.oid, bl, timeout_ms, pbl);
+}
+
+void RGWSI_RADOS::Obj::notify_ack(uint64_t notify_id,
+                                 uint64_t cookie,
+                                 bufferlist& bl)
+{
+  ref.ioctx.notify_ack(ref.oid, notify_id, cookie, bl);
+}
+
 uint64_t RGWSI_RADOS::Obj::get_last_version()
 {
   return ref.ioctx.get_last_version();
@@ -188,7 +210,7 @@ int RGWSI_RADOS::Pool::create(const vector<rgw_pool>& pools, vector<int> *retcod
   vector<librados::PoolAsyncCompletion *> completions;
   vector<int> rets;
 
-  librados::Rados *rad = rados_svc->get_rados_handle();
+  librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
   for (auto iter = pools.begin(); iter != pools.end(); ++iter) {
     librados::PoolAsyncCompletion *c = librados::Rados::pool_async_create_completion();
     completions.push_back(c);
@@ -264,7 +286,7 @@ int RGWSI_RADOS::Pool::create(const vector<rgw_pool>& pools, vector<int> *retcod
 
 int RGWSI_RADOS::Pool::lookup(const rgw_pool& pool)
 {
-  librados::Rados *rad = rados_svc->get_rados_handle();
+  librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
   int ret = rad->pool_lookup(pool.name.c_str());
   if (ret < 0) {
     return ret;
@@ -279,7 +301,7 @@ int RGWSI_RADOS::Pool::List::init(const string& marker, RGWAccessListFilter *fil
     return -EINVAL;
   }
 
-  int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx);
+  int r = pool.rados_svc->open_pool_ctx(pool.pool, ctx.ioctx, pool.rados_handle);
   if (r < 0) {
     return r;
   }
@@ -321,3 +343,9 @@ int RGWSI_RADOS::Pool::List::get_next(int max,
   return oids->size();
 }
 
+int RGWSI_RADOS::Handle::watch_flush()
+{
+  librados::Rados *rad = rados_svc->get_rados_handle(rados_handle);
+  return rad->watch_flush();
+}
+
index 1949a3ad4b4e1121262fec958beed48a7513bd04..ed56dd04c55bc892f4c6ddbb435999c48d96ba28 100644 (file)
@@ -46,8 +46,8 @@ class RGWSI_RADOS : public RGWServiceInstance
 
   int load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& deps) override;
 
-  librados::Rados* get_rados_handle();
-  int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx);
+  librados::Rados* get_rados_handle(int rados_handle);
+  int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx, int rados_handle);
   int pool_iterate(librados::IoCtx& ioctx,
                    librados::NObjectIterator& iter,
                    uint32_t num, vector<rgw_bucket_dir_entry>& objs,
@@ -60,28 +60,35 @@ public:
 
   uint64_t instance_id();
 
+  class Handle;
+
   class Obj {
     friend class RGWSI_RADOS;
+    friend class Handle;
 
     RGWSI_RADOS *rados_svc{nullptr};
+    int rados_handle{-1};
     rgw_rados_ref ref;
 
     void init(const rgw_raw_obj& obj);
 
-    Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj) : rados_svc(_rados_svc) {
+    Obj(RGWSI_RADOS *_rados_svc, const rgw_raw_obj& _obj, int _rados_handle) : rados_svc(_rados_svc), rados_handle(_rados_handle) {
       init(_obj);
     }
 
   public:
     Obj() {}
     Obj(const Obj& o) : rados_svc(o.rados_svc),
+                        rados_handle(o.rados_handle),
                         ref(o.ref) {}
 
     Obj(Obj&& o) : rados_svc(o.rados_svc),
+                   rados_handle(o.rados_handle),
                    ref(std::move(o.ref)) {}
 
     Obj& operator=(Obj&& o) {
       rados_svc = o.rados_svc;
+      rados_handle = o.rados_handle;
       ref = std::move(o.ref);
       return *this;
     }
@@ -95,23 +102,39 @@ public:
     int watch(uint64_t *handle, librados::WatchCtx2 *ctx);
     int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx);
     int unwatch(uint64_t handle);
+    int notify(bufferlist& bl,
+               uint64_t timeout_ms,
+               bufferlist *pbl);
+    void notify_ack(uint64_t notify_id,
+                    uint64_t cookie,
+                    bufferlist& bl);
 
     uint64_t get_last_version();
+
+    rgw_rados_ref& get_ref() {
+      return ref;
+    }
   };
 
   class Pool {
     friend class RGWSI_RADOS;
+    friend class Handle;
 
     RGWSI_RADOS *rados_svc{nullptr};
+    int rados_handle{-1};
     rgw_pool pool;
 
-    Pool(RGWSI_RADOS *_rados_svc, const rgw_pool& _pool) : rados_svc(_rados_svc),
-                                                           pool(_pool) {}
+    Pool(RGWSI_RADOS *_rados_svc,
+         const rgw_pool& _pool,
+         int _rados_handle) : rados_svc(_rados_svc),
+                              rados_handle(_rados_handle),
+                              pool(_pool) {}
 
     Pool(RGWSI_RADOS *_rados_svc) : rados_svc(_rados_svc) {}
   public:
     Pool() {}
     Pool(const Pool& p) : rados_svc(p.rados_svc),
+                          rados_handle(p.rados_handle),
                           pool(p.pool) {}
 
     int create(const std::vector<rgw_pool>& pools, std::vector<int> *retcodes);
@@ -142,8 +165,32 @@ public:
     friend class List;
   };
 
+  class Handle {
+    friend class RGWSI_RADOS;
+
+    RGWSI_RADOS *rados_svc{nullptr};
+    int rados_handle{-1};
+
+    Handle(RGWSI_RADOS *_rados_svc, int _rados_handle) : rados_svc(_rados_svc),
+                                                         rados_handle(_rados_handle) {}
+  public:
+    Obj obj(const rgw_raw_obj& o) {
+      return Obj(rados_svc, o, rados_handle);
+    }
+
+    Pool pool(const rgw_pool& p) {
+      return Pool(rados_svc, p, rados_handle);
+    }
+
+    int watch_flush();
+  };
+
+  Handle handle(int rados_handle) {
+    return Handle(this, rados_handle);
+  }
+
   Obj obj(const rgw_raw_obj& o) {
-    return Obj(this, o);
+    return Obj(this, o, -1);
   }
 
   Pool pool() {
@@ -151,7 +198,7 @@ public:
   }
 
   Pool pool(const rgw_pool& p) {
-    return Pool(this, p);
+    return Pool(this, p, -1);
   }
 
   friend class Obj;
index 5acea09c0bd9e3af5cba58a28d8d008375b177a5..06399de3212dce4ad1ddc86f5c119d13292b9694 100644 (file)
@@ -1,8 +1,52 @@
 #include "svc_sys_obj_cache.h"
 #include "svc_zone.h"
+#include "svc_notify.h"
 
 #define dout_subsys ceph_subsys_rgw
 
+class RGWSI_SysObj_Cache_CB : public RGWSI_Notify::CB
+{
+  RGWSI_SysObj_Cache *svc;
+public:
+  RGWSI_SysObj_Cache_CB(RGWSI_SysObj_Cache *_svc) : svc(_svc) {}
+  int watch_cb(uint64_t notify_id,
+               uint64_t cookie,
+               uint64_t notifier_id,
+               bufferlist& bl) {
+    return svc->watch_cb(notify_id, cookie, notifier_id, bl);
+  }
+
+  void set_enabled(bool status) {
+    svc->set_enabled(status);
+  }
+};
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_SysObj_Cache::get_deps()
+{
+  map<string, RGWServiceInstance::dependency> deps = RGWSI_SysObj_Core::get_deps();
+
+  deps["cache_notify_dep"] = { .name = "notify",
+                               .conf = "{}" };
+  return deps;
+}
+
+int RGWSI_SysObj_Cache::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
+{
+  int r = RGWSI_SysObj_Core::load(conf, dep_refs);
+  if (r < 0) {
+    return r;
+  }
+
+  notify_svc = static_pointer_cast<RGWSI_Notify>(dep_refs["cache_notify_dep"]);
+  assert(notify_svc);
+
+  cb.reset(new RGWSI_SysObj_Cache_CB(this));
+
+  notify_svc->register_watch_cb(cb.get());
+
+  return 0;
+}
+
 static string normal_name(rgw_pool& pool, const std::string& oid) {
   std::string buf;
   buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
@@ -35,7 +79,10 @@ int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
   cache.remove(name);
 
   ObjectCacheInfo info;
-  distribute_cache(name, obj, info, REMOVE_OBJ);
+  int r = distribute_cache(name, obj, info, REMOVE_OBJ);
+  if (r < 0) {
+    ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl;
+  }
 
   return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj);
 }
@@ -212,11 +259,13 @@ int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
   rgw_pool pool;
   string oid;
   normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
+
   ObjectCacheInfo info;
   info.data = data;
   info.meta.size = data.length();
   info.status = 0;
   info.flags = CACHE_FLAG_DATA;
+
   if (objv_tracker) {
     info.version = objv_tracker->write_version;
     info.flags |= CACHE_FLAG_OBJV;
@@ -305,7 +354,7 @@ int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, rgw_raw_obj&
   info.obj = obj;
   bufferlist bl;
   encode(info, bl);
-  return T::distribute(normal_name, bl);
+  return notify_svc->distribute(normal_name, bl);
 }
 
 int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
@@ -346,13 +395,39 @@ int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id,
   return 0;
 }
 
+void RGWSI_SysObj_Cache::set_enabled(bool status)
+{
+  cache.set_enabled(status);
+}
+
+bool RGWSI_SysObj_Cache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
+                                           RGWChainedCache::Entry *chained_entry)
+{
+  return cache.chain_cache_entry(cache_info_entries, chained_entry);
+}
+
+void RGWSI_SysObj_Cache::register_chained_cache(RGWChainedCache *cc)
+{
+  cache.chain_cache(cc);
+}
+
+static void cache_list_dump_helper(Formatter* f,
+                                   const std::string& name,
+                                   const ceph::real_time mtime,
+                                   const std::uint64_t size)
+{
+  f->dump_string("name", name);
+  f->dump_string("mtime", ceph::to_iso_8601(mtime));
+  f->dump_unsigned("size", size);
+}
+
 void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
 {
   cache.for_each(
     [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
       if (!filter || name.find(*filter) != name.npos) {
-       T::cache_list_dump_helper(f, name, entry.info.meta.mtime,
-                                 entry.info.meta.size);
+       cache_list_dump_helper(f, name, entry.info.meta.mtime,
+                               entry.info.meta.size);
       }
     });
 }
@@ -378,4 +453,5 @@ int RGWSI_SysObj_Cache::call_erase(const std::string& target)
 int RGWSI_SysObj_Cache::call_zap()
 {
   cache.invalidate_all();
+  return 0;
 }
index 3cc1570e8dd024e1a9ee72947955e831d7561498..e108ac0b5c05e6417d08ade55d1a8072399ad80c 100644 (file)
@@ -8,11 +8,19 @@
 
 #include "svc_sys_obj_core.h"
 
+class RGWSI_Notify;
+
+class RGWSI_SysObj_Cache_CB;
 
 class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
 {
+  friend class RGWSI_SysObj_Cache_CB;
+
+  std::shared_ptr<RGWSI_Notify> notify_svc;
   ObjectCache cache;
 
+  std::shared_ptr<RGWSI_SysObj_Cache_CB> cb;
+
   void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
 protected:
   std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
@@ -62,8 +70,16 @@ protected:
                uint64_t notifier_id,
                bufferlist& bl);
 
+  void set_enabled(bool status);
+
 public:
-  RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {}
+  RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {
+    cache.set_ctx(cct);
+  }
+
+  bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries,
+                         RGWChainedCache::Entry *chained_entry);
+  void register_chained_cache(RGWChainedCache *cc);
 
   void call_list(const std::optional<std::string>& filter, Formatter* f);
   int call_inspect(const std::string& target, Formatter* f);