}
yield call(data_sync_module->start_sync(sync_env));
-
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
+
yield {
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
conf->init_instance(sync_env->store->svc()->zone->get_realm(), instance_id);
- // try to get elastic search version
- RGWCoroutinesManager crs(sync_env->store->ctx(), sync_env->store->getRados()->get_cr_registry());
- RGWHTTPManager http_manager(sync_env->store->ctx(), crs.get_completion_mgr());
- int ret = http_manager.start();
- if (ret < 0) {
- return;
- }
- ret = crs.run(new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
- conf->conn.get(),
- &http_manager,
- "/", nullptr,
- &(conf->default_headers),
- &(conf->es_info)));
- http_manager.stop();
- if (ret < 0) {
- ldout(sync_env->cct, 1) << conf->id << ": fetch elastic info failed: " << ret << dendl;
- } else {
- ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
- }
}
RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
return new RGWElasticInitConfigCBCR(sync_env, conf);
}
+
+ RGWCoroutine *start_sync(RGWDataSyncEnv *sync_env) override {
+ ldout(sync_env->cct, 5) << conf->id << ": start_sync" << dendl;
+ // try to get elastic search version
+ return new RGWReadRESTResourceCR<ESInfo>(sync_env->cct,
+ conf->conn.get(),
+ sync_env->http_manager,
+ "/", nullptr,
+ &(conf->default_headers),
+ &(conf->es_info));
+ }
+
RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
if (!conf->should_handle_operation(bucket_info)) {