]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compiles now wip-rgw-sip-new-3
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 20 Feb 2023 20:59:01 +0000 (15:59 -0500)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 20 Feb 2023 20:59:01 +0000 (15:59 -0500)
src/rgw/driver/rados/rgw_data_sync.cc
src/rgw/driver/rados/rgw_sync.h
src/rgw/driver/rados/rgw_trim_bilog.cc
src/rgw/driver/rados/rgw_trim_mdlog.cc
src/rgw/rgw_admin.cc

index 540abcbb4a1a5f72e5197f27fa44963289b9275f..c52582c3772fa60d13c41844b99f40adad0c3899 100644 (file)
@@ -867,7 +867,7 @@ int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_s
   int ret = local_call(dpp, [&](RGWCoroutinesManager& crs, RGWDataSyncCtx& sc_local) {
 
     return crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status,
-                                                           &objv, &shard_objvs));
+                                                           &objv, shard_objvs));
   });
 
   return ret;
@@ -944,7 +944,7 @@ public:
       }
       tn->log(5, "acquired data sync status lease");
       objv_tracker.generate_new_write_ver(sc->cct);
-      yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
+      yield call(new RGWInitDataSyncStatusCoroutine(sc, instance_id,
                                                    tn, sync_status, lease_cr,
                                                    objv_tracker, objvs));
       lease_cr->go_down();
@@ -2238,7 +2238,10 @@ public:
   }
 };
 
-RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
+RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc,
+                               std::shared_ptr<RGWDataSyncInfoCRHandler>& sip,
+                               int shard_id,
+                               const rgw_bucket_shard& src,
                                 std::optional<uint64_t> gen,
                                 const std::string marker,
                                 ceph::real_time timestamp,
@@ -2250,7 +2253,7 @@ RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard&
                                 bool retry) {
   auto state = bucket_shard_cache->get(src, gen);
   auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
-  return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
+  return new RGWDataSyncSingleEntryCR(sc, sip, shard_id, std::move(state), std::move(obligation),
                                       &*marker_tracker, error_repo,
                                       lease_cr.get(), tn);
 }
@@ -2287,6 +2290,8 @@ class RGWDataFullSyncSingleEntryCR : public RGWCoroutine {
   bool first_shard = true;
   bool error_inject;
 
+  std::shared_ptr<RGWDataSyncInfoCRHandler> no_sip; /* nullptr */
+
 public:
   RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool, const rgw_bucket_shard& _source_bs,
                       const std::string& _key, const rgw_data_sync_status& sync_status, const rgw_raw_obj& _error_repo,
@@ -2341,7 +2346,7 @@ public:
                 rgw::error_repo::encode_key(source_bs, each->gen),
                timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
           } else {
-          shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
+         shard_cr = data_sync_single_entry(sc, no_sip, 0, source_bs, each->gen, key, timestamp,
                       lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false);
           tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
           if (first_shard) {
@@ -2470,20 +2475,20 @@ public:
           retcode = parse_bucket_key(iter->first, source_bs);
           if (retcode < 0) {
             tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
-            marker_tracker->try_update_high_marker(iter->first, std::nullopt, 0,
+            marker_tracker->try_update_high_marker(iter->first, 0,
                                                   entry_timestamp);
             continue;
           }
           tn->log(20, SSTR("full sync: " << iter->first));
           total_entries++;
-          if (!marker_tracker->start(iter->first, std::nullopt, total_entries,
+          if (!marker_tracker->start(iter->first, total_entries,
                                     entry_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first
                            << ". Duplicate entry?"));
           } else {
             tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
             yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
-                                no_sip, source_bs, iter->first, sync_status,
+                                sc, pool, source_bs, iter->first, sync_status,
                                 error_repo, entry_timestamp, lease_cr,
                                 bucket_shard_cache, &*marker_tracker, tn),
                               sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
@@ -2616,7 +2621,8 @@ public:
           }
           tn->log(20, SSTR("received async update notification: "
                           << modified_iter->key));
-          spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {},
+          spawn(data_sync_single_entry(sc, sc->dsi.inc, shard_id,
+                                      source_bs, modified_iter->gen, {},
                                       ceph::real_time{}, lease_cr,
                                       bucket_shard_cache, &*marker_tracker,
                                       error_repo, tn, false), false);
@@ -2666,7 +2672,8 @@ public:
               tn->log(20, SSTR("handle error entry key="
                               << to_string(source_bs, gen)
                               << " timestamp=" << entry_timestamp));
-              spawn(data_sync_single_entry(sc, source_bs, gen, "",
+              spawn(data_sync_single_entry(sc, sc->dsi.inc, shard_id,
+                                          source_bs, gen, "",
                                           entry_timestamp, lease_cr,
                                           bucket_shard_cache, &*marker_tracker,
                                           error_repo, tn, true), false);
@@ -2725,7 +2732,7 @@ public:
                            << ". Duplicate entry?"));
           } else {
             tn->log(1, SSTR("incremental sync on " << log_iter->entry.key  << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
-            yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
+            yield_spawn_window(data_sync_single_entry(sc, sc->dsi.inc, shard_id, source_bs, log_iter->entry.gen, log_iter->log_id,
                                                  log_iter->log_timestamp, lease_cr,bucket_shard_cache,
                                                  &*marker_tracker, error_repo, tn, false),
                                sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
@@ -3838,7 +3845,7 @@ int RGWDataSyncStatusManager::init(const DoutPrefixProvider *dpp)
     sync_module = driver->get_sync_module();
   }
 
-  auto opt_conns = store->ctl()->remote->zone_conns(source_zone);
+  auto opt_conns = driver->ctl()->remote->zone_conns(source_zone);
   if (!opt_conns) {
     ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
     return -EINVAL;
@@ -4759,7 +4766,7 @@ public:
   {}
 
 
-  RGWCoroutine *store_marker(const rgw_obj_key& new_marker, const rgw_obj_key& key,
+  RGWCoroutine *store_marker(const rgw_obj_key& new_marker,
                              uint64_t index_pos, const real_time& timestamp) override {
     sync_status.full.position = new_marker;
     sync_status.full.count = index_pos;
@@ -4861,7 +4868,7 @@ public:
 
   const rgw_raw_obj& get_obj() const { return obj; }
 
-  RGWCoroutine* store_marker(const string& new_marker, const rgw_obj_key& key,
+  RGWCoroutine* store_marker(const string& new_marker,
                              uint64_t index_pos, const real_time& timestamp) override {
     sync_marker.position = new_marker;
     sync_marker.timestamp = timestamp;
index 02e542c0d3655f640cab6f923bb8360d3ff3ca7f..7cf78bd6e5e6e499f964178c5f2af2bd9b1af03b 100644 (file)
@@ -341,18 +341,15 @@ class RGWSyncShardMarkerTrack {
   struct marker_entry {
     uint64_t tracker_pos;
     T marker;
-    K key;
     uint64_t pos;
     real_time timestamp;
 
     marker_entry() : pos(0) {}
     marker_entry(uint64_t _tracker_pos,
                  const T& _marker,
-                 std::optional<K> _key,
                  uint64_t _p, const
                  real_time& _ts) : tracker_pos(_tracker_pos),
                                    marker(_marker),
-                                   key(_key.value_or(K())),
                                    pos(_p),
                                    timestamp(_ts) {}
   };
@@ -374,7 +371,7 @@ class RGWSyncShardMarkerTrack {
 protected:
   typename std::set<K> need_retry_set;
 
-  virtual RGWCoroutine *store_marker(const DoutPrefixProvider *dpp, const T& new_marker, const K& key, uint64_t index_pos, const real_time& timestamp) = 0;
+  virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
   virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
   virtual void handle_finish(const T& marker) { }
 
@@ -388,22 +385,22 @@ public:
     }
   }
 
-  bool start(const T& marker, std::optional<K> key, int index_pos, const real_time& timestamp) {
+  bool start(const T& marker, int index_pos, const real_time& timestamp) {
     if (markers.find(marker) != markers.end()) {
       return false;
     }
     auto i = ++max_keys;
-    pending.push_back(marker_entry(i, marker, key, index_pos, timestamp));
+    pending.push_back(marker_entry(i, marker, index_pos, timestamp));
     markers[marker] = std::prev(pending.end());
     return true;
   }
 
-  void try_update_high_marker(const T& marker, std::optional<K> key, int index_pos, const real_time& timestamp) {
+  void try_update_high_marker(const T& marker, int index_pos, const real_time& timestamp) {
     auto i = ++max_keys;
-    finish_markers[i] = marker_entry(i, marker, key, index_pos, timestamp);
+    finish_markers[i] = marker_entry(i, marker, index_pos, timestamp);
   }
 
-  RGWCoroutine *finish(const DoutPrefixProvider *dpp, const T& marker) {
+  RGWCoroutine *finish(const T& marker) {
     if (pending.empty()) {
       /* can happen, due to a bug that ended up with multiple objects with the same name and version
        * -- which can happen when versioning is enabled an the version is 'null'.
@@ -437,12 +434,12 @@ public:
 
 
     if (is_first && (updates_since_flush >= window_size || pending.empty())) {
-      return flush(dpp);
+      return flush();
     }
     return nullptr;
   }
 
-  RGWCoroutine *flush(const DoutPrefixProvider *dpp) {
+  RGWCoroutine *flush() {
     if (finish_markers.empty()) {
       return NULL;
     }
@@ -463,7 +460,7 @@ public:
     --i;
     high_entry = i->second;
     high_entry_exists = true;
-    RGWCoroutine *cr = order(store_marker(dpp, high_entry.marker, high_entry.key, high_entry.pos, high_entry.timestamp));
+    RGWCoroutine *cr = order(store_marker(high_entry.marker, high_entry.pos, high_entry.timestamp));
     finish_markers.erase(finish_markers.begin(), last);
     return cr;
   }
@@ -563,7 +560,7 @@ class RGWShardCollectCR : public RGWCoroutine {
     : RGWCoroutine(_cct), max_concurrent(_max_concurrent)
   {}
 
-  virtual bool spawn_next(const DoutPrefixProvider *dpp) = 0;
+  virtual bool spawn_next() = 0;
   int operate(const DoutPrefixProvider *dpp) override;
 };
 
index ff6f6f1e7657a35d2f959f9983918019655f0a5b..44ff62ee9cf9a43e8092cf0e4bc2e021d407ac4e 100644 (file)
@@ -413,10 +413,10 @@ class BucketTrimShardCollectCR : public RGWShardCollectCR {
       generation(generation), markers(markers),
       sip_mgr(_sip_mgr)
   {}
-  bool spawn_next(const DoutPrefixProvider *dpp) override;
+  bool spawn_next() override;
 };
 
-bool BucketTrimShardCollectCR::spawn_next(const DoutPrefixProvider *dpp)
+bool BucketTrimShardCollectCR::spawn_next()
 {
   while (i < markers.size()) {
     const auto& opt_marker = markers[i];
@@ -472,7 +472,7 @@ class BucketCleanIndexCollectCR : public RGWShardCollectCR {
       dpp(dpp), store(store), bucket_info(bucket_info),
       index(index)
   {}
-  bool spawn_next(const DoutPrefixProvider *dpp) override {
+  bool spawn_next() override {
     if (shard < num_shards) {
       RGWRados::BucketShard bs(store->getRados());
       bs.init(dpp, bucket_info, index, shard);
@@ -926,10 +926,10 @@ class BucketTrimInstanceCollectCR : public RGWShardCollectCR {
       bucket(buckets.begin()), end(buckets.end()),
       dpp(dpp)
   {}
-  bool spawn_next(const DoutPrefixProvider *dpp) override;
+  bool spawn_next() override;
 };
 
-bool BucketTrimInstanceCollectCR::spawn_next(const DoutPrefixProvider *dpp)
+bool BucketTrimInstanceCollectCR::spawn_next()
 {
   if (bucket == end) {
     return false;
index ed47a7b5512b844ac9f3446d1cd41dab3b8a6d54..d8e19594aeaa2e3535ab1a1eed699dedc04245ad 100644 (file)
@@ -46,7 +46,7 @@ class PurgeLogShardsCR : public RGWShardCollectCR {
       store(store), mdlog(mdlog), num_shards(num_shards), obj(pool, "")
   {}
 
-  bool spawn_next(const DoutPrefixProvider *dpp) override {
+  bool spawn_next() override {
     if (i == num_shards) {
       return false;
     }
@@ -289,10 +289,10 @@ class MetaMasterTrimShardCollectCR : public RGWShardCollectCR {
       env(env), mdlog(mdlog), sync_status(sync_status)
   {}
 
-  bool spawn_next(const DoutPrefixProvider *dpp) override;
+  bool spawn_next() override;
 };
 
-bool MetaMasterTrimShardCollectCR::spawn_next(const DoutPrefixProvider *dpp)
+bool MetaMasterTrimShardCollectCR::spawn_next()
 {
   while (shard_id < env.num_shards) {
     auto m = sync_status.sync_markers.find(shard_id);
@@ -350,7 +350,7 @@ class MetaMasterStatusCollectCR : public RGWShardCollectCR {
       env(env), c(env.connections.begin()), s(env.peer_status.begin())
   {}
 
-  bool spawn_next(const DoutPrefixProvider *dpp) override {
+  bool spawn_next() override {
     if (c == env.connections.end()) {
       return false;
     }
@@ -569,10 +569,10 @@ class MetaPeerTrimShardCollectCR : public RGWShardCollectCR {
                   env.store->getRados()->get_sync_tracer());
   }
 
-  bool spawn_next(const DoutPrefixProvider *dpp) override;
+  bool spawn_next() override;
 };
 
-bool MetaPeerTrimShardCollectCR::spawn_next(const DoutPrefixProvider *dpp)
+bool MetaPeerTrimShardCollectCR::spawn_next()
 {
   if (shard_id >= env.num_shards) {
     return false;
index 0f4d766eabf3dbe1c5f4bf1fe8d54482443bb2af..885600145c52cfbeee43ab3267a02d9db2fc2aa7 100644 (file)
@@ -2864,7 +2864,7 @@ static int bucket_sync_info(rgw::sal::Driver* driver, const RGWBucketInfo& info,
 
 static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info,
                               const rgw_zone_id& source_zone_id,
-                             std::optional<rgw_bucket>& opt_source_bucket,
+                              std::optional<rgw_bucket>& opt_source_bucket,
                               std::ostream& out)
 {
   const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup();
@@ -2874,10 +2874,85 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
   out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n";
   out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n";
   out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n";
-  out << indented{width, "bucket"} << info.bucket << "\n";
-  out << indented{width, "current time"}
-    << to_iso_8601(ceph::real_clock::now(), iso_8601_format::YMDhms) << "\n\n";
+  out << indented{width, "bucket"} << info.bucket << "\n\n";
+
+  if (!static_cast<rgw::sal::RadosStore*>(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) {
+    out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
+    return 0;
+  }
+
+  RGWBucketSyncPolicyHandlerRef handler;
+
+  int r = driver->get_sync_policy_handler(dpp(), std::nullopt, info.bucket, &handler, null_yield);
+  if (r < 0) {
+    ldpp_dout(dpp(), -1) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl;
+    return r;
+  }
 
+  auto sources = handler->get_all_sources();
+    
+  set<rgw_zone_id> zone_ids;
+
+  const auto& rzg =
+    static_cast<const rgw::sal::RadosZoneGroup&>(zonegroup).get_group();
+
+  if (!source_zone_id.empty()) {
+    auto z = rzg.combined_zones.find(source_zone_id);
+    if (z == rzg.combined_zones.end()) {
+      ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup "
+          << zonegroup.get_name() << dendl;
+      return -EINVAL;
+    }
+    auto c = static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote->zone_conns(source_zone_id);
+    if (!c) {
+      lderr(driver->ctx()) << "No connection to zone " << z->second << dendl;
+      return -EINVAL; 
+    }
+    zone_ids.insert(source_zone_id);
+  } else {
+    for (const auto& entry : rzg.combined_zones) {
+      zone_ids.insert(entry.first);
+    }
+  } 
+
+  for (auto& zone_id : zone_ids) {
+    auto z = rzg.combined_zones.find(zone_id.id);
+    if (z == rzg.combined_zones.end()) { /* should't happen */
+      continue;
+    }
+  
+    for (auto& entry : sources) {
+      auto& pipe = entry.second;
+      if (opt_source_bucket &&
+          pipe.source.bucket != opt_source_bucket) {
+        continue;
+      }
+      if (pipe.source.zone.value_or(rgw_zone_id()) == z->first) {
+        bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(),
+                                  z->first, z->second,
+                                  info, pipe,
+                                  width, out);
+      }
+    }
+  }
+
+  return 0;
+}
+
+#if 0
+static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& info,
+                              const rgw_zone_id& source_zone_id,
+                              std::optional<rgw_bucket>& opt_source_bucket,
+                              std::ostream& out)
+{
+  const rgw::sal::ZoneGroup& zonegroup = driver->get_zone()->get_zonegroup();
+  rgw::sal::Zone* zone = driver->get_zone();
+  constexpr int width = 15;
+
+  out << indented{width, "realm"} << zone->get_realm_id() << " (" << zone->get_realm_name() << ")\n";
+  out << indented{width, "zonegroup"} << zonegroup.get_id() << " (" << zonegroup.get_name() << ")\n";
+  out << indented{width, "zone"} << zone->get_id() << " (" << zone->get_name() << ")\n";
+  out << indented{width, "bucket"} << info.bucket << "\n\n";
 
   if (!static_cast<rgw::sal::RadosStore*>(driver)->ctl()->bucket->bucket_imports_data(info.bucket, null_yield, dpp())) {
     out << "Sync is disabled for bucket " << info.bucket.name << " or bucket has no sync sources" << std::endl;
@@ -2894,7 +2969,7 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
 
   auto sources = handler->get_all_sources();
 
-  auto& zone_conn_map = static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone_conn_map();
+  auto& source_zones = static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_data_sync_source_zones();
   set<rgw_zone_id> zone_ids;
 
   if (!source_zone_id.empty()) {
@@ -2902,12 +2977,12 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
     int ret = driver->get_zone()->get_zonegroup().get_zone_by_id(source_zone_id.id, &zone);
     if (ret < 0) {
       ldpp_dout(dpp(), -1) << "Source zone not found in zonegroup "
-          << zonegroup.get_name() << dendl;
+        << zonegroup.get_name() << dendl;
       return -EINVAL;
     }
-    auto c = zone_conn_map.find(source_zone_id);
-    if (c == zone_conn_map.end()) {
-      ldpp_dout(dpp(), -1) << "No connection to zone " << zone->get_name() << dendl;
+    auto c = static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote->zone_conns(source_zone_id);
+    if (!c) {
+      lderr(driver->ctx()) << "No connection to zone " << zone->get_name() << dendl;
       return -EINVAL;
     }
     zone_ids.insert(source_zone_id);
@@ -2916,7 +2991,7 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
     int ret = driver->get_zone()->get_zonegroup().list_zones(ids);
     if (ret == 0) {
       for (const auto& entry : ids) {
-       zone_ids.insert(entry);
+        zone_ids.insert(entry);
       }
     }
   }
@@ -2926,28 +3001,29 @@ static int bucket_sync_status(rgw::sal::Driver* driver, const RGWBucketInfo& inf
     if (z == static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zonegroup().zones.end()) { /* should't happen */
       continue;
     }
-    auto c = zone_conn_map.find(zone_id.id);
-    if (c == zone_conn_map.end()) { /* should't happen */
+    auto c = static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote->zone_conns(zone_id.id);
+    if (!c) { /* should't happen */
       continue;
     }
 
     for (auto& entry : sources) {
       auto& pipe = entry.second;
       if (opt_source_bucket &&
-         pipe.source.bucket != opt_source_bucket) {
-       continue;
+          pipe.source.bucket != opt_source_bucket) {
+        continue;
       }
       if (pipe.source.zone.value_or(rgw_zone_id()) == z->second.id) {
-       bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
-                                 c->second,
-                                 info, pipe,
-                                 width, out);
+        bucket_source_sync_status(dpp(), static_cast<rgw::sal::RadosStore*>(driver), static_cast<rgw::sal::RadosStore*>(driver)->svc()->zone->get_zone(), z->second,
+                                  c->second,
+                                  info, pipe,
+                                  width, out);
       }
     }
   }
 
   return 0;
 }
+#endif
 
 static void parse_tier_config_param(const string& s, map<string, string, ltstr_nocase>& out)
 {
@@ -3356,13 +3432,14 @@ public:
   }
 };
 
-void init_realm_param(CephContext *cct, string& var, std::optional<string>& opt_var, const string& conf_name)
-{
+template <class T>
+void init_realm_param(CephContext *cct, string& var, std::optional<T>& opt_var, const string& conf_name)
+{ 
   var = cct->_conf.get_val<string>(conf_name);
   if (!var.empty()) {
     opt_var = var;
   }
-}
+} 
 
 class SIPCRMgr : public RGWCoroutinesManager {
   CephContext *cct;
@@ -3386,9 +3463,9 @@ public:
   SIProviderCRMgr *create(std::optional<rgw_zone_id> zid) {
     if (!zid) {
       return new SIProviderCRMgr_Local(cct,
-                                       static_cast<rgw::sal::RadosStore*>(store)->svc()->sip_marker,
-                                       static_cast<rgw::sal::RadosStore*>(store)->ctl()->si.mgr,
-                                       static_cast<rgw::sal::RadosStore*>(store)->svc()->rados->get_async_processor());
+                                       static_cast<rgw::sal::RadosStore*>(driver)->svc()->sip_marker,
+                                       static_cast<rgw::sal::RadosStore*>(driver)->ctl()->si.mgr,
+                                       static_cast<rgw::sal::RadosStore*>(driver)->svc()->rados->get_async_processor());
     }
 
     auto c = ctl.remote->zone_conns(*zid);
@@ -3543,9 +3620,9 @@ int find_sip_provider(std::optional<string> opt_sip,
   }
 
   if (opt_zone_id) {
-    sip_rest_mgr.emplace(static_cast<rgw::sal::RadosStore*>(store)->ctx(),
-                         static_cast<rgw::sal::RadosStore*>(store)->ctl()->remote,
-                         static_cast<rgw::sal::RadosStore*>(store)->getRados()->get_cr_registry());
+    sip_rest_mgr.emplace(static_cast<rgw::sal::RadosStore*>(driver)->ctx(),
+                         static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote,
+                         static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_cr_registry());
     auto sip_type_handler = get_sip_type_handler(opt_sip, opt_sip_data_type);
     if (!sip_type_handler) {
       cerr << "ERROR: unknown sip type: " << (opt_sip ? *opt_sip : *opt_sip_data_type) << std::endl;
@@ -3559,9 +3636,9 @@ int find_sip_provider(std::optional<string> opt_sip,
                                          sip_type_handler));
   } else {
     if (opt_sip) {
-      *provider = static_cast<rgw::sal::RadosStore*>(store)->ctl()->si.mgr->find_sip(dpp(), *opt_sip, opt_sip_instance);
+      *provider = static_cast<rgw::sal::RadosStore*>(driver)->ctl()->si.mgr->find_sip(dpp(), *opt_sip, opt_sip_instance);
     } else {
-      *provider = static_cast<rgw::sal::RadosStore*>(store)->ctl()->si.mgr->find_sip_by_type(dpp(),
+      *provider = static_cast<rgw::sal::RadosStore*>(driver)->ctl()->si.mgr->find_sip_by_type(dpp(),
                                                          *opt_sip_data_type,
                                                          *opt_sip_stage_type,
                                                          opt_sip_instance);
@@ -6710,7 +6787,7 @@ int main(int argc, const char **argv)
         data_access_conf.secret = opt_secret;
 
        RGWZoneGroup zonegroup(zonegroup_id, zonegroup_name);
-       int ret = zonegroup.init(dpp(), g_ceph_context, static_cast<rgw::sal::RadosStore*>(store)->svc()->sysobj, null_yield);
+       int ret = zonegroup.init(dpp(), g_ceph_context, static_cast<rgw::sal::RadosStore*>(driver)->svc()->sysobj, null_yield);
        if (ret < 0) {
          cerr << "failed to init zonegroup: " << cpp_strerror(-ret) << std::endl;
          return -ret;
@@ -8721,8 +8798,8 @@ next:
     rgw_obj source_object(sal_source_bucket->get_key(), *opt_source_object);
     rgw_obj dest_object(sal_dest_bucket->get_key(), *opt_dest_object);
 
-    auto conn  = get_source_conn(store->ctx(),
-                                 static_cast<rgw::sal::RadosStore*>(store)->ctl()->remote,
+    auto conn  = get_source_conn(driver->ctx(),
+                                 static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote,
                                  opt_source_zone_id,
                                  opt_endpoint,
                                  opt_region,
@@ -8740,13 +8817,13 @@ next:
       }
     }
 
-    rgw::sal::RadosObject sal_source_object(static_cast<rgw::sal::RadosStore*>(store), source_object.key, sal_source_bucket.get());
-    rgw::sal::RadosObject sal_dest_object(static_cast<rgw::sal::RadosStore*>(store), dest_object.key, sal_dest_bucket.get());
+    rgw::sal::RadosObject sal_source_object(static_cast<rgw::sal::RadosStore*>(driver), source_object.key, sal_source_bucket.get());
+    rgw::sal::RadosObject sal_dest_object(static_cast<rgw::sal::RadosStore*>(driver), dest_object.key, sal_dest_bucket.get());
 
     RGWRados::FetchRemoteObjParams params;
 
-    RGWObjectCtx obj_ctx(store);
-    ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->fetch_remote_obj(dpp(), obj_ctx,
+    RGWObjectCtx obj_ctx(driver);
+    ret = static_cast<rgw::sal::RadosStore*>(driver)->getRados()->fetch_remote_obj(dpp(), obj_ctx,
                                               conn,
                                               false, /* foreign source */
                                               *opt_dest_owner,
@@ -10432,8 +10509,7 @@ next:
     }
 
     auto num_shards = g_conf()->rgw_data_log_num_shards;
-    std::vector<std::string> markers(num_shards);
-    ret = crs.run(dpp(), create_admin_data_log_trim_cr(dpp(), static_cast<rgw::sal::RadosStore*>(driver), &http, num_shards, markers));
+    ret = crs.run(dpp(), create_admin_data_log_trim_cr(dpp(), static_cast<rgw::sal::RadosStore*>(driver), &http, num_shards));
     if (ret < 0) {
       cerr << "automated datalog trim failed with " << cpp_strerror(ret) << std::endl;
       return -ret;
@@ -11058,8 +11134,8 @@ next:
 
  if (opt_cmd == OPT::SI_PROVIDER_LIST) {
    SIPCRMgr sip_cr_mgr(cct.get(),
-                       static_cast<rgw::sal::RadosStore*>(store)->ctl()->remote,
-                       static_cast<rgw::sal::RadosStore*>(store)->getRados()->get_cr_registry());
+                       static_cast<rgw::sal::RadosStore*>(driver)->ctl()->remote,
+                       static_cast<rgw::sal::RadosStore*>(driver)->getRados()->get_cr_registry());
 
    auto mgr = sip_cr_mgr.create(opt_zone_id);
    if (!mgr) {
@@ -11311,7 +11387,7 @@ next:
        return EINVAL;
      }
    }
-   auto marker_handler = static_cast<rgw::sal::RadosStore*>(store)->svc()->sip_marker->get_handler(provider);
+   auto marker_handler = static_cast<rgw::sal::RadosStore*>(driver)->svc()->sip_marker->get_handler(provider);
    if (!marker_handler) {
      cerr << "ERROR: can't get sip marker handler" << std::endl;
      return EIO;
@@ -11385,7 +11461,7 @@ next:
        return EINVAL;
      }
    }
-   auto marker_handler = static_cast<rgw::sal::RadosStore*>(store)->svc()->sip_marker->get_handler(provider);
+   auto marker_handler = static_cast<rgw::sal::RadosStore*>(driver)->svc()->sip_marker->get_handler(provider);
    if (!marker_handler) {
      cerr << "ERROR: can't get sip marker handler" << std::endl;
      return EIO;
@@ -11452,7 +11528,7 @@ next:
        return EINVAL;
      }
    }
-   auto marker_handler = static_cast<rgw::sal::RadosStore*>(store)->svc()->sip_marker->get_handler(provider);
+   auto marker_handler = static_cast<rgw::sal::RadosStore*>(driver)->svc()->sip_marker->get_handler(provider);
    if (!marker_handler) {
      cerr << "ERROR: can't get sip marker handler" << std::endl;
      return EIO;