]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: adding generation number to async notification
authorShilpa Jagannath <smanjara@redhat.com>
Mon, 15 Feb 2021 14:46:29 +0000 (20:16 +0530)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:50 +0000 (12:27 -0400)
Signed-off-by: Shilpa Jagannath <smanjara@redhat.com>
15 files changed:
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_datalog.cc
src/rgw/rgw_datalog.h
src/rgw/rgw_http_errors.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_rados.h
src/rgw/rgw_sync_error_repo.cc

index 52519600141b1ce28bf0aa597230455e6b9bf5fb..1c18d3ea1aa24d5e9f6c48c803b6195e5a5354fc 100644 (file)
@@ -8,6 +8,10 @@
 #include "rgw_cr_rados.h"
 #include "rgw_sync_counters.h"
 #include "rgw_bucket.h"
+#include "rgw_datalog_notify.h"
+#include "rgw_cr_rest.h"
+#include "rgw_rest_conn.h"
+#include "rgw_rados.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_zone_utils.h"
@@ -18,6 +22,7 @@
 #include "cls/rgw/cls_rgw_client.h"
 
 #include <boost/asio/yield.hpp>
+#include <boost/container/flat_set.hpp>
 
 #define dout_context g_ceph_context
 #define dout_subsys ceph_subsys_rgw
@@ -982,3 +987,34 @@ int RGWRadosNotifyCR::request_complete()
 
   return r;
 }
+
+
+int RGWDataPostNotifyCR::operate(const DoutPrefixProvider* dpp)
+{
+  reenter(this) {
+    using PostNotify2 = RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>>, int>;
+    yield {
+      rgw_http_param_pair pairs[] = { { "type", "data" },
+                                      { "notify2", NULL },
+                                      { "source-zone", source_zone },
+                                      { NULL, NULL } };
+      call(new PostNotify2(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, nullptr));
+    }
+    if (retcode == -ERR_METHOD_NOT_ALLOWED) {
+      using PostNotify1 = RGWPostRESTResourceCR<rgw_data_notify_v1_encoder, int>;
+      yield {
+        rgw_http_param_pair pairs[] = { { "type", "data" },
+                                        { "notify", NULL },
+                                        { "source-zone", source_zone },
+                                        { NULL, NULL } };
+        auto encoder = rgw_data_notify_v1_encoder{shards};
+        call(new PostNotify1(store->ctx(), conn, &http_manager, "/admin/log", pairs, encoder, nullptr));
+      }
+    }
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+    return set_cr_done();
+  }
+  return 0;
+}
index f88b3b556cc066d3f773836ed85a4efa93f9a28d..8dc5a9d17d270100821fcd42f4325523e416f027 100644 (file)
@@ -19,6 +19,9 @@
 
 #define dout_subsys ceph_subsys_rgw
 
+struct rgw_http_param_pair;
+class RGWRESTConn;
+
 class RGWAsyncRadosRequest : public RefCountedObject {
   RGWCoroutine *caller;
   RGWAioCompletionNotifier *notifier;
@@ -1467,4 +1470,20 @@ public:
   int request_complete() override;
 };
 
+class RGWDataPostNotifyCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager& http_manager;
+  bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shards;
+  const char *source_zone;
+  RGWRESTConn *conn;
+
+public:
+  RGWDataPostNotifyCR(RGWRados *_store, RGWHTTPManager& _http_manager, bc::flat_map<int,
+                    bc::flat_set<rgw_data_notify_entry> >& _shards, const char *_zone, RGWRESTConn *_conn)
+                    : RGWCoroutine(_store->ctx()), store(_store), http_manager(_http_manager),
+                      shards(_shards), source_zone(_zone), conn(_conn) {}
+
+  int operate(const DoutPrefixProvider* dpp) override;
+};
+
 #endif
index bc3aa812288518049299cc5175ebf95e277cfd4d..c783dcbc19a8b2ca02ee0059d49fd91d1c2b2048 100644 (file)
@@ -1430,10 +1430,10 @@ class RGWDataSyncShardCR : public RGWCoroutine {
   boost::asio::coroutine full_cr;
 
 
-  set<string> modified_shards;
-  set<string> current_modified;
+  bc::flat_set<rgw_data_notify_entry> modified_shards;
+  bc::flat_set<rgw_data_notify_entry> current_modified;
 
-  set<string>::iterator modified_iter;
+  bc::flat_set<rgw_data_notify_entry>::iterator modified_iter;
 
   uint64_t total_entries = 0;
   static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW;
@@ -1493,9 +1493,9 @@ public:
     }
   }
 
-  void append_modified_shards(set<string>& keys) {
+  void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& entries) {
     std::lock_guard l{inc_lock};
-    modified_shards.insert(keys.begin(), keys.end());
+    modified_shards.insert(entries.begin(), entries.end());
   }
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -1671,13 +1671,13 @@ public:
         }
         /* process out of band updates */
         for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
-          retcode = parse_bucket_key(*modified_iter, source_bs);
+          retcode = parse_bucket_key(modified_iter->key, source_bs);
           if (retcode < 0) {
-            tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter));
+            tn->log(1, SSTR("failed to parse bucket shard: " << modified_iter->key));
             continue;
           }
-          tn->log(20, SSTR("received async update notification: " << *modified_iter));
-          spawn(sync_single_entry(source_bs, std::nullopt, string(),
+          tn->log(20, SSTR("received async update notification: " << modified_iter->key));
+          spawn(sync_single_entry(source_bs, modified_iter->gen, string(),
                                   ceph::real_time{}, false), false);
         }
 
@@ -1816,7 +1816,7 @@ public:
                                                           &sync_marker);
   }
 
-  void append_modified_shards(set<string>& keys) {
+  void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& keys) {
     std::lock_guard l{cr_lock()};
 
     RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
@@ -1949,13 +1949,13 @@ public:
                                                          sync_status.sync_info);
   }
 
-  void wakeup(int shard_id, set<string>& keys) {
+  void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
     std::lock_guard l{shard_crs_lock};
     map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
     if (iter == shard_crs.end()) {
       return;
     }
-    iter->second->append_modified_shards(keys);
+    iter->second->append_modified_shards(entries);
     iter->second->wakeup();
   }
 };
@@ -2571,7 +2571,7 @@ public:
     return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr());
   }
 
-  void wakeup(int shard_id, set<string>& keys) {
+  void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
     ceph::mutex& m = cr_lock();
 
     m.lock();
@@ -2585,20 +2585,19 @@ public:
     m.unlock();
 
     if (cr) {
-      tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
-      cr->wakeup(shard_id, keys);
+      cr->wakeup(shard_id, entries);
     }
 
     cr->put();
   }
 };
 
-void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
+void RGWRemoteDataLog::wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
   std::shared_lock rl{lock};
   if (!data_sync_cr) {
     return;
   }
-  data_sync_cr->wakeup(shard_id, keys);
+  data_sync_cr->wakeup(shard_id, entries);
 }
 
 int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards)
index 4c97c0d8d27957c0d07bf19e3ee4c0556a31d4f1..97533b4d82c8e9bf03474ea48551983396082745 100644 (file)
@@ -389,7 +389,7 @@ public:
   int init_sync_status(const DoutPrefixProvider *dpp, int num_shards);
   int run_sync(const DoutPrefixProvider *dpp, int num_shards);
 
-  void wakeup(int shard_id, std::set<std::string>& keys);
+  void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries);
 };
 
 class RGWDataSyncStatusManager : public DoutPrefixProvider {
@@ -456,7 +456,8 @@ public:
 
   int run(const DoutPrefixProvider *dpp) { return source_log.run_sync(dpp, num_shards); }
 
-  void wakeup(int shard_id, std::set<std::string>& keys) { return source_log.wakeup(shard_id, keys); }
+  void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) { return source_log.wakeup(shard_id, entries); }
+
   void stop() {
     source_log.finish();
   }
index 7d9e03a663e4a9e90ae476aa06cecbfb91842794..0b381313ee56220ab718022efd97555ec030e50d 100644 (file)
@@ -76,6 +76,17 @@ void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entry", entry, obj);
 }
 
+void rgw_data_notify_entry::dump(Formatter *f) const
+{
+  encode_json("key", key, f);
+  encode_json("gen", gen, f);
+}
+
+void rgw_data_notify_entry::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("key", key, obj);
+  JSONDecoder::decode_json("gen", gen, obj);
+}
+
 class RGWDataChangesOmap final : public RGWDataChangesBE {
   using centries = std::list<cls_log_entry>;
   std::vector<std::string> oids;
@@ -621,7 +632,8 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
   rgw_bucket_shard bs(bucket, shard_id);
 
   int index = choose_oid(bs);
-  mark_modified(index, bs);
+
+  mark_modified(index, bs, gen.gen);
 
   std::unique_lock l(lock);
 
@@ -998,19 +1010,19 @@ void RGWDataChangesLog::renew_stop()
   renew_cond.notify_all();
 }
 
-void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
+void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen)
 {
   auto key = bs.get_key();
   {
     std::shared_lock rl{modified_lock}; // read lock to check for existence
     auto shard = modified_shards.find(shard_id);
-    if (shard != modified_shards.end() && shard->second.count(key)) {
+    if (shard != modified_shards.end() && shard->second.count({key, gen})) {
       return;
     }
   }
 
   std::unique_lock wl{modified_lock}; // write lock for insertion
-  modified_shards[shard_id].insert(key);
+  modified_shards[shard_id].insert(rgw_data_notify_entry{key, gen});
 }
 
 std::string RGWDataChangesLog::max_marker() const {
index 35822e1c7f42ff8130ca37644626b51fbc39e40a..423316bee0d34181778e567efe0ae2d85bfd2252 100644 (file)
@@ -13,6 +13,7 @@
 #include <vector>
 
 #include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
 #include <boost/smart_ptr/intrusive_ptr.hpp>
 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
 
@@ -138,7 +139,7 @@ struct rgw_data_notify_entry {
 
   rgw_data_notify_entry& operator=(const rgw_data_notify_entry&) = default;
 
-  bool operator<(const rgw_data_notify_entry& d) const {
+  bool operator <(const rgw_data_notify_entry& d) const {
     if (key < d.key) {
       return true;
     }
@@ -147,6 +148,10 @@ struct rgw_data_notify_entry {
     }
     return gen < d.gen;
   }
+  friend std::ostream& operator <<(std::ostream& m,
+                                  const rgw_data_notify_entry& e) {
+    return m << "[key: " << e.key << ", gen: " << e.gen << "]";
+  }
 };
 
 class RGWDataChangesBE;
@@ -214,7 +219,7 @@ class RGWDataChangesLog {
   ceph::mutex lock = ceph::make_mutex("RGWDataChangesLog::lock");
   ceph::shared_mutex modified_lock =
     ceph::make_shared_mutex("RGWDataChangesLog::modified_lock");
-  bc::flat_map<int, bc::flat_set<std::string>> modified_shards;
+  bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> modified_shards;
 
   std::atomic<bool> down_flag = { false };
 
@@ -276,7 +281,7 @@ public:
                   std::vector<rgw_data_change_log_entry>& entries,
                   LogMarker& marker, bool* ptruncated);
 
-  void mark_modified(int shard_id, const rgw_bucket_shard& bs);
+  void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
   auto read_clear_modified() {
     std::unique_lock wl{modified_lock};
     decltype(modified_shards) modified;
index a06d20529ade419edbe5ac486cb769b1c0d75448..d8674552ab6a3291c802ddf0325dd2edecf7c24b 100644 (file)
@@ -31,6 +31,8 @@ static inline int rgw_http_error_to_errno(int http_err)
         return -EACCES;
     case 404:
         return -ENOENT;
+    case 405:
+        return -ERR_METHOD_NOT_ALLOWED;
     case 409:
         return -ENOTEMPTY;
     case 503:
index 96518eab01c83b990d4416984bc61e0c11eff71c..cac073308f7b073d13792ffe4556534c0a77ba34 100644 (file)
@@ -47,6 +47,7 @@
 #include "rgw_etag_verifier.h"
 #include "rgw_worker.h"
 #include "rgw_notify.h"
+#include "rgw_http_errors.h"
 
 #undef fork // fails to compile RGWPeriod::fork() below
 
@@ -74,6 +75,7 @@ using namespace librados;
 #include "rgw_data_sync.h"
 #include "rgw_realm_watcher.h"
 #include "rgw_reshard.h"
+#include "rgw_cr_rados.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_zone_utils.h"
@@ -329,20 +331,17 @@ public:
   }
 
   int notify_all(const DoutPrefixProvider *dpp, map<rgw_zone_id, RGWRESTConn *>& conn_map,
-                bc::flat_map<int, bc::flat_set<string> >& shards) {
-    rgw_http_param_pair pairs[] = { { "type", "data" },
-                                    { "notify", NULL },
-                                    { "source-zone", store->svc.zone->get_zone_params().get_id().c_str() },
-                                    { NULL, NULL } };
+               bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shards) {
 
     list<RGWCoroutinesStack *> stacks;
+    const char *source_zone = store->svc.zone->get_zone_params().get_id().c_str();
     for (auto iter = conn_map.begin(); iter != conn_map.end(); ++iter) {
       RGWRESTConn *conn = iter->second;
       RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
-      stack->call(new RGWPostRESTResourceCR<bc::flat_map<int, bc::flat_set<string> >, int>(store->ctx(), conn, &http_manager, "/admin/log", pairs, shards, NULL));
-
+      stack->call(new RGWDataPostNotifyCR(store, http_manager, shards, source_zone, conn));
       stacks.push_back(stack);
     }
+
     return run(dpp, stacks);
   }
 };
@@ -441,6 +440,7 @@ int RGWMetaNotifier::process(const DoutPrefixProvider *dpp)
 
 class RGWDataNotifier : public RGWRadosThread {
   RGWDataNotifierManager notify_mgr;
+  bc::flat_set<rgw_data_notify_entry> entry;
 
   uint64_t interval_msec() override {
     return cct->_conf.get_val<int64_t>("rgw_data_notify_interval_msec");
@@ -467,9 +467,12 @@ int RGWDataNotifier::process(const DoutPrefixProvider *dpp)
     return 0;
   }
 
-  for (const auto& [shard_id, keys] : shards) {
-    ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id="
-                  << shard_id << ": " << keys << dendl;
+  for (const auto& [shard_id, entries] : shards) {
+    bc::flat_set<rgw_data_notify_entry>::iterator it;
+    for (const auto& entry : entries) {
+      ldpp_dout(dpp, 20) << __func__ << "(): notifying datalog change, shard_id="
+        << shard_id << ":" << entry.gen << ":" << entry.key << dendl;
+    }
   }
 
   notify_mgr.notify_all(dpp, store->svc.zone->get_zone_data_notify_to_map(), shards);
@@ -547,11 +550,12 @@ public:
       sync(_store, async_rados, source_zone->id, counters.get()),
       initialized(false) {}
 
-  void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
-    for (map<int, set<string> >::iterator iter = shard_ids.begin(); iter != shard_ids.end(); ++iter) {
+  void wakeup_sync_shards(bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries) {
+    for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
       sync.wakeup(iter->first, iter->second);
     }
   }
+
   RGWDataSyncStatusManager* get_manager() { return &sync; }
 
   int init(const DoutPrefixProvider *dpp) override {
@@ -645,9 +649,18 @@ void RGWRados::wakeup_meta_sync_shards(set<int>& shard_ids)
   }
 }
 
-void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map<int, set<string> >& shard_ids)
+void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries)
 {
-  ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", shard_ids=" << shard_ids << dendl;
+  ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", entries=" << entries << dendl;
+  for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
+    ldpp_dout(dpp, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
+    bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+    for (const auto& [key, gen] : entries) {
+      ldpp_dout(dpp, 20) << __func__ << ": source_zone=" << source_zone << ", key=" << key
+                        << ", gen=" << gen << dendl;
+    }
+  }
+
   std::lock_guard l{data_sync_thread_lock};
   auto iter = data_sync_processor_threads.find(source_zone);
   if (iter == data_sync_processor_threads.end()) {
@@ -657,7 +670,7 @@ void RGWRados::wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_
 
   RGWDataSyncProcessorThread *thread = iter->second;
   ceph_assert(thread);
-  thread->wakeup_sync_shards(shard_ids);
+  thread->wakeup_sync_shards(entries);
 }
 
 RGWMetaSyncStatusManager* RGWRados::get_meta_sync_manager()
index 532ea398128be939b6c0e65bb2c37d7b34dcb7a5..73d0203e543fd4f3412b9f65ee0546dd724f4397 100644 (file)
@@ -6,6 +6,7 @@
 
 #include <functional>
 #include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
 
 #include "include/rados/librados.hpp"
 #include "include/Context.h"
@@ -1234,9 +1235,8 @@ public:
   int delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, optional_yield y, const DoutPrefixProvider *dpp, bool check_empty = true);
 
   void wakeup_meta_sync_shards(std::set<int>& shard_ids);
-  void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
-                              const rgw_zone_id& source_zone,
-                              std::map<int, std::set<std::string> >& shard_ids);
+
+  void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& entries);
 
   RGWMetaSyncStatusManager* get_meta_sync_manager();
   RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone);
index 1554a3a8243efca778895b01abe2f519311f33c2..c7f672f8724ece7e5b55a6530d12c7ea353abb7e 100644 (file)
@@ -25,6 +25,7 @@
 #include "rgw_common.h"
 #include "rgw_zone.h"
 #include "rgw_mdlog.h"
+#include "rgw_datalog_notify.h"
 
 #include "services/svc_zone.h"
 #include "services/svc_mdlog.h"
@@ -761,7 +762,56 @@ void RGWOp_DATALog_Notify::execute(optional_yield y) {
     return;
   }
 
-  map<int, set<string> > updated_shards;
+  bc::flat_map<int, bc::flat_set<rgw_data_notify_entry>> updated_shards;
+  try {
+    auto decoder = rgw_data_notify_v1_decoder{updated_shards};
+    decode_json_obj(decoder, &p);
+  } catch (JSONDecoder::err& err) {
+    ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl;
+    op_ret = -EINVAL;
+    return;
+  }
+
+  if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
+    for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+      ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
+      bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+      for (const auto& [key, gen] : entries) {
+        ldpp_dout(this, 20) << __func__ << "(): modified key=" << key
+        << " of gen=" << gen << dendl;
+      }
+    }
+  }
+
+  store->wakeup_data_sync_shards(this, source_zone, updated_shards);
+
+  op_ret = 0;
+}
+
+void RGWOp_DATALog_Notify2::execute(optional_yield y) {
+  string  source_zone = s->info.args.get("source-zone");
+#define LARGE_ENOUGH_BUF (128 * 1024)
+
+  int r = 0;
+  bufferlist data;
+  std::tie(r, data) = rgw_rest_read_all_input(s, LARGE_ENOUGH_BUF);
+  if (r < 0) {
+    op_ret = r;
+    return;
+  }
+
+  char* buf = data.c_str();
+  ldout(s->cct, 20) << __func__ << "(): read data: " << buf << dendl;
+
+  JSONParser p;
+  r = p.parse(buf, data.length());
+  if (r < 0) {
+    ldout(s->cct, 0) << "ERROR: failed to parse JSON" << dendl;
+    op_ret = r;
+    return;
+  }
+
+  bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> > updated_shards;
   try {
     decode_json_obj(updated_shards, &p);
   } catch (JSONDecoder::err& err) {
@@ -771,11 +821,13 @@ void RGWOp_DATALog_Notify::execute(optional_yield y) {
   }
 
   if (store->ctx()->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
-    for (map<int, set<string> >::iterator iter = updated_shards.begin(); iter != updated_shards.end(); ++iter) {
+    for (bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >::iterator iter =
+        updated_shards.begin(); iter != updated_shards.end(); ++iter) {
       ldpp_dout(this, 20) << __func__ << "(): updated shard=" << iter->first << dendl;
-      set<string>& keys = iter->second;
-      for (set<string>::iterator kiter = keys.begin(); kiter != keys.end(); ++kiter) {
-      ldpp_dout(this, 20) << __func__ << "(): modified key=" << *kiter << dendl;
+      bc::flat_set<rgw_data_notify_entry>& entries = iter->second;
+      for (const auto& [key, gen] : entries) {
+        ldpp_dout(this, 20) << __func__ << "(): modified key=" << key <<
+        " of generation=" << gen << dendl;
       }
     }
   }
@@ -1145,8 +1197,11 @@ RGWOp *RGWHandler_Log::op_post() {
     else if (s->info.args.exists("notify"))
       return new RGWOp_MDLog_Notify;
   } else if (type.compare("data") == 0) {
-    if (s->info.args.exists("notify"))
+    if (s->info.args.exists("notify")) {
       return new RGWOp_DATALog_Notify;
+    } else if (s->info.args.exists("notify2")) {
+      return new RGWOp_DATALog_Notify2;
+    }
   }
   return NULL;
 }
index 577d799ba56bb33d9a8f76baba6d575a0faf7477..f01fcf857e93fef6f9aaec9e9c80bf4aa6244b82 100644 (file)
@@ -277,6 +277,21 @@ public:
   RGWOpType get_type() override { return RGW_OP_SYNC_DATALOG_NOTIFY; }
 };
 
+class RGWOp_DATALog_Notify2 : public RGWRESTOp {
+  rgw_data_notify_entry data_notify;
+public:
+  RGWOp_DATALog_Notify2() {}
+  ~RGWOp_DATALog_Notify2() override {}
+
+  int check_caps(const RGWUserCaps& caps) override {
+    return caps.check_cap("datalog", RGW_CAP_WRITE);
+  }
+  void execute(optional_yield y) override;
+  const char* name() const override {
+    return "datalog_notify2";
+  }
+};
+
 class RGWOp_DATALog_Delete : public RGWRESTOp {
 public:
   RGWOp_DATALog_Delete() {}
index 472745cd7bf8153211f30fcbc1235a87274079d2..e7421b8cdc869e53a8031944505fbd3223e2b458 100644 (file)
@@ -17,6 +17,7 @@
 
 #include "rgw_user.h"
 #include "rgw_notify_event_type.h"
+#include "rgw_datalog_notify.h"
 
 class RGWGetDataCB;
 struct RGWObjState;
@@ -25,7 +26,7 @@ class RGWLC;
 class RGWObjManifest;
 struct RGWZoneGroup;
 struct RGWZoneParams;
-struct RGWRealm;
+class RGWRealm;
 struct RGWCtl;
 struct rgw_user_bucket;
 class RGWUsageBatch;
@@ -226,7 +227,11 @@ class Store {
                                        optional_yield y) = 0;
     virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) = 0;
     virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) = 0;
-    virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map<int, std::set<std::string> >& shard_ids) = 0;
+    virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+                                        const rgw_zone_id& source_zone,
+                                        boost::container::flat_map<
+                                          int,
+                                          boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) = 0;
     virtual int clear_usage(const DoutPrefixProvider *dpp) = 0;
     virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
                               uint32_t max_entries, bool* is_truncated,
index 69ad2a3f61c5ed8365e26d190988b860b9a8c5a3..8d01546ff7f80e4351cac3b69da50f1034e8726f 100644 (file)
@@ -319,8 +319,12 @@ namespace rgw { namespace sal {
           RGWBucketSyncPolicyHandlerRef *phandler,
           optional_yield y) override;
       virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override;
-      virtual void wakeup_meta_sync_shards(set<int>& shard_ids) override { return; }
-      virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, map<int, set<string> >& shard_ids) override { return; }
+      virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) override { return; }
+      virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+                                          const rgw_zone_id& source_zone,
+                                          boost::container::flat_map<
+                                            int,
+                                          boost::container::flat_set<rgw_data_notify_entry>>& shard_ids) override { return; }
       virtual int clear_usage(const DoutPrefixProvider *dpp) override { return 0; }
       virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
           uint32_t max_entries, bool *is_truncated,
index 0731d2add38260c9a3adf4cd78bd0a61fec26424..bf8185b05a9ffb8a79168bceab6394819d14e759 100644 (file)
@@ -414,7 +414,11 @@ class RadosStore : public Store {
                                        optional_yield y) override;
     virtual RGWDataSyncStatusManager* get_data_sync_manager(const rgw_zone_id& source_zone) override;
     virtual void wakeup_meta_sync_shards(std::set<int>& shard_ids) override { rados->wakeup_meta_sync_shards(shard_ids); }
-    virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp, const rgw_zone_id& source_zone, std::map<int, std::set<std::string> >& shard_ids) override { rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids); }
+    virtual void wakeup_data_sync_shards(const DoutPrefixProvider *dpp,
+                                         const rgw_zone_id& source_zone,
+                                         bc::flat_map<int, bc::flat_set<rgw_data_notify_entry> >& shard_ids) override {
+      rados->wakeup_data_sync_shards(dpp, source_zone, shard_ids);
+    }
     virtual int clear_usage(const DoutPrefixProvider *dpp) override { return rados->clear_usage(dpp); }
     virtual int read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
                               uint32_t max_entries, bool* is_truncated,
index 75f552de93134ff0e97bd9de605a3557140eaa91..44305b60b6b21f19e5aebfd161e9096cafac314c 100644 (file)
@@ -102,7 +102,7 @@ int write(librados::ObjectWriteOperation& op,
 {
   // overwrite the existing timestamp if value is greater
   const uint64_t value = timestamp.time_since_epoch().count();
-  using namespace cls::cmpomap;
+  using namespace ::cls::cmpomap;
   const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys
   return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero);
 }
@@ -113,7 +113,7 @@ int remove(librados::ObjectWriteOperation& op,
 {
   // remove the omap key if value >= existing
   const uint64_t value = timestamp.time_since_epoch().count();
-  using namespace cls::cmpomap;
+  using namespace ::cls::cmpomap;
   return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}});
 }