static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
static string bucket_status_oid_prefix = "bucket.sync-status";
static string object_status_oid_prefix = "bucket.sync-status";
+static string bucket_sync_sources_oid_prefix = "bucket.sync-sources";
void rgw_datalog_info::decode_json(JSONObj *obj) {
return false;
}
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- spawn(new CR(env->async_rados, env->store->svc()->sysobj,
- rgw_raw_obj(env->store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
+ spawn(new CR(env->async_rados, env->svc.sysobj,
+ rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
&markers[shard_id]),
false);
shard_id++;
string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
auto& shard_keys = omapkeys[shard_id];
shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
- spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc()->zone->get_zone_params().log_pool, error_oid),
+ spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->svc.zone->get_zone_params().log_pool, error_oid),
marker, max_entries, shard_keys), false);
++shard_id;
using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
yield {
bool empty_on_enoent = false; // fail on ENOENT
- call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
- rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+ call(new ReadInfoCR(sync_env->async_rados, sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
&sync_status->sync_info, empty_on_enoent));
}
if (retcode < 0) {
RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
const string& _marker, uint32_t _max_entries,
rgw_datalog_shard_data *_result)
- : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
+ : RGWSimpleCoroutine(env->cct), sync_env(env), http_op(NULL),
shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
int send_request() override {
int ret = http_op->aio_read();
if (ret < 0) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
http_op->put();
return ret;
int ret = http_op->wait(result, null_yield);
http_op->put();
if (ret < 0 && ret != -ENOENT) {
- ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
return ret;
}
return 0;
RGWSyncTraceNodeRef& _tn_parent,
rgw_data_sync_status *status)
: RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
- pool(store->svc()->zone->get_zone_params().log_pool),
+ pool(sync_env->svc.zone->get_zone_params().log_pool),
num_shards(num_shards), status(status),
tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
lock_name = "sync_lock";
return set_cr_error(retcode);
}
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
- yield call(new WriteInfoCR(sync_env->async_rados, store->svc()->sysobj,
+ yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
/* fetch current position in logs */
yield {
- RGWRESTConn *conn = store->svc()->zone->get_zone_conn_by_id(sync_env->source_zone);
+ RGWRESTConn *conn = sync_env->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
if (!conn) {
tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
return set_cr_error(-EIO);
marker.timestamp = info.last_update;
const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
- spawn(new WriteMarkerCR(sync_env->async_rados, store->svc()->sysobj,
+ spawn(new WriteMarkerCR(sync_env->async_rados, sync_env->svc.sysobj,
rgw_raw_obj{pool, oid}, marker), true);
}
}
}
status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- yield call(new WriteInfoCR(sync_env->async_rados, store->svc()->sysobj,
+ yield call(new WriteInfoCR(sync_env->async_rados, sync_env->svc.sysobj,
rgw_raw_obj{pool, sync_status_oid},
status->sync_info));
if (retcode < 0) {
}
};
-RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *_store,
+RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp,
+ CephContext *_cct,
+ RGWCoroutinesManagerRegistry *_cr_registry,
RGWAsyncRadosProcessor *async_rados)
- : RGWCoroutinesManager(_store->ctx(), _store->getRados()->get_cr_registry()),
- dpp(dpp), store(_store), async_rados(async_rados),
- http_manager(store->ctx(), completion_mgr),
+ : RGWCoroutinesManager(_cct, _cr_registry),
+ dpp(dpp), cct(_cct), cr_registr(_cr_registry),
+ async_rados(async_rados),
+ http_manager(_cct, completion_mgr),
data_sync_cr(NULL),
initialized(false)
{
RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
PerfCounters* counters)
{
- sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
+ sync_env.init(dpp, cct, store, _conn, async_rados, &http_manager, _error_logger,
_sync_tracer, _source_zone, _sync_module, counters);
if (initialized) {
int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
{
// cannot run concurrently with run_sync(), so run in a separate manager
- RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
- RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ RGWCoroutinesManager crs(cct, cr_registry);
+ RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
if (ret < 0) {
ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
{
// cannot run concurrently with run_sync(), so run in a separate manager
- RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
- RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ RGWCoroutinesManager crs(cct, cr_registry);
+ RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
if (ret < 0) {
ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
rgw_data_sync_status sync_status;
sync_status.sync_info.num_shards = num_shards;
- RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
- RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
+ RGWCoroutinesManager crs(cct, cr_registry);
+ RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
if (ret < 0) {
ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
int operate() override {
reenter(this) {
entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
- store->svc()->zone->get_zone_params().log_pool,
+ synv_env->svc.zone->get_zone_params().log_pool,
oid_prefix);
yield; // yield so OmapAppendCRs can start
{"marker", result.marker.c_str()},
{NULL, NULL}};
- call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
+ call(new RGWReadRESTResourceCR<read_metadata_list>(sync_env->cct, sync_env->conn, sync_env->http_manager,
entrypoint, pairs, &result));
}
if (retcode < 0) {
rgw_http_param_pair pairs[] = {{"key", key.c_str()},
{NULL, NULL}};
- call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
+ call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
}
num_shards = meta_info.data.get_bucket_info().num_shards;
char buf[16];
snprintf(buf, sizeof(buf), ":%d", i);
s = key + buf;
- yield entries_index->append(s, store->svc()->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
+ yield entries_index->append(s, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
}
} else {
- yield entries_index->append(key, store->svc()->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
+ yield entries_index->append(key, synv_env->svc.datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
}
}
truncated = result.truncated;
int shard_id = (int)iter->first;
rgw_data_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc()->sysobj,
- rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+ spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, synv_env->svc.sysobj,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
marker),
true);
}
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
RGWRados *rados = sync_env->store->getRados();
- return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid),
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, synv_env->svc.sysobj,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, marker_oid),
sync_marker);
}
if (lease_cr) {
lease_cr->abort();
}
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store,
- rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, status_oid),
+ auto store = sync_env->store;
+ lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid),
lock_name, lock_duration, this));
lease_stack.reset(spawn(lease_cr.get(), false));
}
sync_marker.state = rgw_data_sync_marker::IncrementalSync;
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
- RGWRados *rados = sync_env->store->getRados();
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, status_oid),
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, synv_env->svc.sysobj,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, status_oid),
sync_marker));
}
if (retcode < 0) {
}
RGWCoroutine *alloc_finisher_cr() override {
- RGWRados *rados = sync_env->store->getRados();
- return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
+ RGWRados *store = sync_env->store;
+ return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, synv_env->svc.sysobj,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
&sync_marker);
}
tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
- RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc()->zone->get_zone_params().log_pool,
+ RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->synv_env->svc.zone->get_zone_params().log_pool,
iter->first, iter->second, tn);
cr->get();
shard_crs_lock.lock();
}
RGWCoroutine *set_sync_info_cr() {
- RGWRados *rados = sync_env->store->getRados();
- return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, synv_env->svc.sysobj,
+ rgw_raw_obj(synv_env->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
sync_status.sync_info);
}
{
RGWZone *zone_def;
- if (!store->svc()->zone->find_zone_by_id(source_zone, &zone_def)) {
+ if (!svc.zone->find_zone_by_id(source_zone, &zone_def)) {
ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
return -EIO;
}
- if (!store->svc()->sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
+ if (!svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
return -ENOTSUP;
}
- const RGWZoneParams& zone_params = store->svc()->zone->get_zone_params();
+ const RGWZoneParams& zone_params = svc.zone->get_zone_params();
if (sync_module == nullptr) {
sync_module = store->getRados()->get_sync_module();
}
- conn = store->svc()->zone->get_zone_conn_by_id(source_zone);
+ conn = svc.zone->get_zone_conn_by_id(source_zone);
if (!conn) {
ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
return -EINVAL;
}
yield {
auto store = sync_env->store;
- rgw_raw_obj obj(store->svc()->zone->get_zone_params().log_pool, sync_status_oid);
+ rgw_raw_obj obj(sync_env->svc.zone->get_zone_params().log_pool, sync_status_oid);
if (info.syncstopped) {
call(new RGWRadosRemoveCR(store, obj));
}
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc()->sysobj, obj, attrs));
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj, obj, attrs));
}
}
if (info.syncstopped) {
int RGWReadBucketPipeSyncStatusCoroutine::operate()
{
reenter(this) {
- yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc()->sysobj,
- rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, oid),
+ yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, oid),
&attrs, true));
if (retcode == -ENOENT) {
*status = rgw_bucket_shard_sync_info();
count = 0;
do {
omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
- yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, error_oid),
+ yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, error_oid),
marker, max_omap_entries, omapkeys));
if (retcode == -ENOENT) {
reenter(this){
//read sync status marker
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
- yield call(new CR(sync_env->async_rados, store->svc()->sysobj,
- rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, status_oid),
+ yield call(new CR(sync_env->async_rados, sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid),
sync_marker));
if (retcode < 0) {
ldout(sync_env->cct,0) << "failed to read sync status marker with "
RGWRados *rados = sync_env->store->getRados();
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
- return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid),
+ return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid),
attrs);
}
map<string, bufferlist> attrs;
sync_marker.encode_attr(attrs);
- RGWRados *rados = sync_env->store->getRados();
-
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
- rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, marker_oid),
+ sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, marker_oid),
attrs);
}
data_sync_module = sync_env->sync_module->get_data_handler();
zones_trace = _zones_trace;
- zones_trace.insert(sync_env->store->svc()->zone->get_zone().id);
+ zones_trace.insert(sync_env->svc.zone->get_zone().id);
}
int operate() override {
sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
map<string, bufferlist> attrs;
sync_info.encode_state_attr(attrs);
- RGWRados *rados = sync_env->store->getRados();
- call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, rados->svc.sysobj,
- rgw_raw_obj(rados->svc.zone->get_zone_params().log_pool, status_oid),
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc.sysobj,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid),
attrs));
}
} else {
sync_pipe(_sync_pipe), bs(_sync_pipe.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
marker_tracker(sync_env, status_oid, sync_info.inc_marker),
- status_oid(status_oid), zone_id(_sync_env->store->svc()->zone->get_zone().id),
+ status_oid(status_oid), zone_id(_sync_env->svc.zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
SSTR(bucket_shard_str{bs})))
{
reenter(this) {
yield {
set_status("acquiring sync lock");
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store,
- rgw_raw_obj(sync_env->store->svc()->zone->get_zone_params().log_pool, status_oid),
+ auto store = sync_env->store;
+ lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
+ rgw_raw_obj(sync_env->svc.zone->get_zone_params().log_pool, status_oid),
"sync_lock",
cct->_conf->rgw_sync_lease_period,
this));
tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
string raw_key = string("bucket.instance:") + sync_pipe.source_bs.bucket.get_key();
- meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc()->zone->get_master_conn(), sync_env->async_rados,
+ meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->svc.zone->get_master_conn(), sync_env->async_rados,
sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
int RGWBucketPipeSyncStatusManager::init()
{
- conn = store->svc()->zone->get_zone_conn_by_id(source_zone);
+ conn = sync_env->svc.zone->get_zone_conn_by_id(source_zone);
if (!conn) {
ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
return -EINVAL;
int effective_num_shards = (num_shards ? num_shards : 1);
- auto async_rados = store->svc()->rados->get_async_processor();
+ auto async_rados = sync_env->svc.rados->get_async_processor();
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, async_rados, &http_manager);
RGWCollectBucketSyncStatusCR(rgw::sal::RGWRadosStore *store, RGWDataSyncEnv *env,
int num_shards, const rgw_bucket& bucket,
Vector *status)
- : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
+ : RGWShardCollectCR(env->cct, max_concurrent_shards),
store(store), env(env), num_shards(num_shards),
bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
i(status->begin()), end(status->end())
RGWDataSyncEnv env;
RGWSyncModuleInstanceRef module; // null sync module
- env.init(dpp, store->ctx(), store, nullptr, store->svc()->rados->get_async_processor(),
+ env.init(dpp, store->ctx(), store, nullptr, sync_env->svc.rados->get_async_processor(),
nullptr, nullptr, nullptr, source_zone, module, nullptr);
RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());