rgw_bucket_shard_sync_info& status;
RGWObjVersionTracker& objv_tracker;
- rgw_bucket_index_marker_info& info;
const BucketIndexShardsManager& marker_mgr;
bool exclusive;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status,
- rgw_bucket_index_marker_info& _info,
+ uint64_t latest_gen,
const BucketIndexShardsManager& _marker_mgr,
RGWObjVersionTracker& objv_tracker,
bool exclusive)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
- sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, _info.latest_gen)),
- status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive)
+ sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, latest_gen)),
+ status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive)
{}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
- auto store = sync_env->store;
rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid);
- const bool stopped = status.state == rgw_bucket_shard_sync_info::StateStopped;
- bool write_status = false;
- auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, "");
-
- if (info.syncstopped) {
- if (stopped && !sync_env->sync_module->should_full_sync()) {
- // preserve our current incremental marker position
- write_status = true;
- }
- } else {
- // whether or not to do full sync, incremental sync will follow anyway
- if (sync_env->sync_module->should_full_sync()) {
- status.inc_marker.position = max_marker;
- }
- write_status = true;
- status.inc_marker.timestamp = ceph::real_clock::now();
- status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
- }
-
- if (write_status) {
- map<string, bufferlist> attrs;
- status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- obj, attrs, &objv_tracker, exclusive));
- } else {
- call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
+ // whether or not to do full sync, incremental sync will follow anyway
+ if (sync_env->sync_module->should_full_sync()) {
+ const auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, "");
+ status.inc_marker.position = max_marker;
}
+ status.inc_marker.timestamp = ceph::real_clock::now();
+ status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+
+ map<string, bufferlist> attrs;
+ status.encode_all_attrs(attrs);
+ call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
+ obj, attrs, &objv_tracker, exclusive));
+ ldout(cct, 20) << "init marker position: " << status.inc_marker.position <<
+ ". written to shard status object: " << sync_status_oid << dendl;
}
- if (info.syncstopped) {
- retcode = -ENOENT;
- }
+
if (retcode < 0) {
return set_cr_error(retcode);
}
rgw_bucket_sync_pair_info pair;
rgw_bucket_shard_sync_info status;
RGWObjVersionTracker objv;
- rgw_bucket_index_marker_info& info;
+ const uint64_t latest_gen;
const BucketIndexShardsManager& marker_mgr;
int tries = 10; // retry on racing writes
public:
InitBucketShardStatusCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& pair,
- rgw_bucket_index_marker_info& info,
+ uint64_t latest_gen,
const BucketIndexShardsManager& marker_mgr)
- : RGWCoroutine(sc->cct), sc(sc), pair(pair), info(info), marker_mgr(marker_mgr)
+ : RGWCoroutine(sc->cct), sc(sc), pair(pair), latest_gen(latest_gen), marker_mgr(marker_mgr)
{}
int operate(const DoutPrefixProvider *dpp) {
reenter(this) {
// try exclusive create with empty status
objv.generate_new_write_ver(cct);
- yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
+ yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive));
if (retcode >= 0) {
return set_cr_done();
} else if (retcode != -EEXIST) {
while (--tries) {
objv.clear();
// read current status and objv
- yield call(new ReadCR(sc, pair, &status, &objv, info.latest_gen));
+ yield call(new ReadCR(sc, pair, &status, &objv, latest_gen));
if (retcode < 0) {
return set_cr_error(retcode);
}
- yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
+ yield call(new InitCR(sc, pair, status, latest_gen, marker_mgr, objv, exclusive));
if (retcode >= 0) {
return set_cr_done();
} else if (retcode != -ECANCELED) {
static constexpr int max_concurrent_shards = 16;
RGWDataSyncCtx* sc;
rgw_bucket_sync_pair_info sync_pair;
- rgw_bucket_index_marker_info& info;
+ const uint64_t latest_gen;
const BucketIndexShardsManager& marker_mgr;
const int num_shards;
public:
InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
- rgw_bucket_index_marker_info& info,
+ uint64_t latest_gen,
const BucketIndexShardsManager& marker_mgr,
int num_shards)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- sc(sc), sync_pair(sync_pair), info(info), marker_mgr(marker_mgr), num_shards(num_shards)
+ sc(sc), sync_pair(sync_pair), latest_gen(latest_gen), marker_mgr(marker_mgr), num_shards(num_shards)
{}
bool spawn_next() override {
return false;
}
sync_pair.source_bs.shard_id = shard++;
- spawn(new InitBucketShardStatusCR(sc, sync_pair, info, marker_mgr), false);
+ spawn(new InitBucketShardStatusCR(sc, sync_pair, latest_gen, marker_mgr), false);
+ return true;
+ }
+};
+
+class RemoveBucketShardStatusCR : public RGWCoroutine {
+ RGWDataSyncCtx* const sc;
+ RGWDataSyncEnv* const sync_env;
+
+ rgw_bucket_sync_pair_info sync_pair;
+ rgw_raw_obj obj;
+ RGWObjVersionTracker objv;
+
+public:
+ RemoveBucketShardStatusCR(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair, uint64_t latest_gen)
+ : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
+ sync_pair(sync_pair),
+ obj(sync_env->svc->zone->get_zone_params().log_pool,
+ RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, latest_gen))
+ {}
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ yield call(new RGWRadosRemoveCR(sync_env->store, obj, &objv));
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(cct, 20) << "ERROR: failed to remove bucket shard status for: " << sync_pair <<
+ ". with error: " << retcode << dendl;
+ return set_cr_error(retcode);
+ }
+ ldout(cct, 20) << "removed bucket shard status object: " << obj.oid << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
+class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR {
+ static constexpr int max_concurrent_shards = 16;
+ RGWDataSyncCtx* const sc;
+ RGWDataSyncEnv* const sync_env;
+ rgw_bucket_sync_pair_info sync_pair;
+ const uint64_t latest_gen;
+
+ const int num_shards;
+ int shard = 0;
+
+ int handle_result(int r) override {
+ if (r < 0) {
+ ldout(cct, 4) << "failed to remove bucket shard status object: "
+ << cpp_strerror(r) << dendl;
+ }
+ return r;
+ }
+ public:
+ RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& sync_pair,
+ uint64_t latest_gen,
+ int num_shards)
+ : RGWShardCollectCR(sc->cct, max_concurrent_shards),
+ sc(sc), sync_env(sc->env), sync_pair(sync_pair), latest_gen(latest_gen), num_shards(num_shards)
+ {}
+
+ bool spawn_next() override {
+ if (shard >= num_shards || status < 0) { // stop spawning on any errors
+ return false;
+ }
+ sync_pair.source_bs.shard_id = shard++;
+ spawn(new RemoveBucketShardStatusCR(sc, sync_pair, latest_gen), false);
return true;
}
};
const int num_shards;
const bool check_compat;
- rgw_bucket_index_marker_info info;
+ const rgw_bucket_index_marker_info& info;
BucketIndexShardsManager marker_mgr;
bool all_incremental = true;
const rgw_raw_obj& status_obj,
rgw_bucket_sync_status& status,
RGWObjVersionTracker& objv,
- int num_shards, bool check_compat)
+ int num_shards, bool check_compat,
+ const rgw_bucket_index_marker_info& info)
: RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
sync_pair(sync_pair), status_obj(status_obj),
status(status), objv(objv), num_shards(num_shards),
- check_compat(check_compat)
+ check_compat(check_compat), info(info)
{}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
- if (retcode < 0) {
- lderr(cct) << "failed to read remote bilog info: "
- << cpp_strerror(retcode) << dendl;
- return set_cr_error(retcode);
- }
-
retcode = marker_mgr.from_string(info.max_marker, -1);
if (retcode < 0) {
lderr(cct) << "failed to parse bilog shard markers: "
if (check_compat) {
// try to convert existing per-shard incremental status for backward compatibility
yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
+ ldout(cct, 20) << "check for 'all incremental' in compatibility mode" << dendl;
if (retcode < 0) {
return set_cr_error(retcode);
}
if (status.state != BucketSyncState::Incremental) {
// initialize all shard sync status. this will populate the log marker
// positions where incremental sync will resume after full sync
- yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info, marker_mgr, num_shards));
+ yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
if (retcode < 0) {
ldout(cct, 20) << "failed to init bucket shard status: "
<< cpp_strerror(retcode) << dendl;
status.shards_done_with_gen.resize(num_shards);
status.incremental_gen = info.latest_gen;
- ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
+ ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position.to_str() << dendl;
// write bucket sync status
using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
}
};
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv)
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(RGWObjVersionTracker& objv, rgw_bucket_index_marker_info& info)
{
constexpr bool check_compat = false;
const int num_shards = num_pipes();
return new InitBucketFullSyncStatusCR(&sc, sync_pairs[0], full_status_obj,
- full_status, objv, num_shards, check_compat);
+ full_status, objv, num_shards, check_compat, info);
}
#define OMAP_READ_MAX_ENTRIES 10
for (; entries_iter != entries_end; ++entries_iter) {
auto e = *entries_iter;
if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
- ldpp_dout(dpp, 20) << "syncstop on " << e.timestamp << dendl;
+ ldpp_dout(dpp, 20) << "syncstop at: " << e.timestamp << ". marker: " << e.id << dendl;
syncstopped = true;
entries_end = std::next(entries_iter); // stop after this entry
break;
}
if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
+ ldpp_dout(dpp, 20) << "syncstart at: " << e.timestamp << ". marker: " << e.id << dendl;
continue;
}
if (e.op == CLS_RGW_OP_CANCEL) {
reenter(this) {
yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
- tn->log(0, "ERROR: failed to read sync status for bucket");
+ tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
return set_cr_error(retcode);
}
[&](uint64_t stack_id, int ret) {
handle_complete_stack(stack_id);
if (ret < 0) {
- tn->log(10, "a sync operation returned error");
+ tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
}
return ret;
});
drain_all_cb([&](uint64_t stack_id, int ret) {
handle_complete_stack(stack_id);
if (ret < 0) {
- tn->log(10, "a sync operation returned error");
+ tn->log(10, SSTR("a sync operation returned error: " << ret));
}
return ret;
});
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe& sync_pipe;
- BucketSyncState& bucket_state;
+ bool& bucket_stopped;
uint64_t generation;
ceph::real_time* progress;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_sync_pipe& sync_pipe,
- BucketSyncState& bucket_state,
+ bool& bucket_stopped,
uint64_t generation,
const RGWSyncTraceNodeRef& tn,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
- sync_pipe(sync_pipe), bucket_state(bucket_state), generation(generation), progress(progress),
+ sync_pipe(sync_pipe), bucket_stopped(bucket_stopped), generation(generation), progress(progress),
shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)),
bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
reenter(this) {
yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation));
if (retcode < 0 && retcode != -ENOENT) {
- tn->log(0, "ERROR: failed to read sync status for bucket");
+ tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
return set_cr_error(retcode);
}
tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
return set_cr_error(retcode);
}
- // TODO: handle transition to StateStopped
+
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
+ tn->log(20, SSTR("syncstopped indication for source bucket shard"));
+ bucket_stopped = true;
+ }
+
return set_cr_done();
}
const uint32_t lock_duration;
const rgw_raw_obj status_obj;
rgw_bucket_sync_status bucket_status;
+ bool bucket_stopped = false;
RGWObjVersionTracker objv;
bool init_check_compat = false;
+ rgw_bucket_index_marker_info info;
RGWSyncTraceNodeRef tn;
gen, tn, progress);
}
+#define RELEASE_LOCK(cr) \
+ if (cr) {cr->go_down(); drain_all(); cr.reset();}
+
int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
{
reenter(this) {
// read bucket sync status
using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+ using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
status_obj, &bucket_status, false, &objv));
if (retcode == -ENOENT) {
// use exclusive create to set state=Init
objv.generate_new_write_ver(cct);
- using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
status_obj, bucket_status, &objv, true));
+ tn->log(20, "bucket status object does not exist, create a new one");
if (retcode == -EEXIST) {
// raced with another create, read its status
+ tn->log(20, "raced with another create, read its status");
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
status_obj, &bucket_status, false, &objv));
}
}
if (retcode < 0) {
+ tn->log(20, SSTR("ERROR: failed to read bucket status object. error: " << retcode));
return set_cr_error(retcode);
}
do {
- tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
+ tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state <<
+ ". lease is: " << (bucket_lease_cr ? "taken" : "not taken") << ". stop indications is: " << bucket_stopped));
+
+ if (bucket_status.state == BucketSyncState::Init ||
+ bucket_status.state == BucketSyncState::Stopped ||
+ bucket_stopped) {
+ // if state is Init or Stopped, we query the remote RGW for ther state
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bucket, &info));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ if (info.syncstopped) {
+ // remote indicates stopped state
+ tn->log(20, "remote bilog indicates that sync was stopped");
+ if (!bucket_lease_cr) {
+ bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+ lock_name, lock_duration, this));
+ yield spawn(bucket_lease_cr.get(), false);
+ while (!bucket_lease_cr->is_locked()) {
+ if (bucket_lease_cr->is_done()) {
+ tn->log(5, "ERROR: failed to take bucket lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(bucket_lease_cr->get_ret_status());
+ }
+ tn->log(5, "waiting on bucket lease");
+ yield set_sleeping(true);
+ }
+ }
+
+ // check if local state is "stopped"
+ yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
+ status_obj, &bucket_status, false, &objv));
+ if (retcode < 0) {
+ tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode));
+ RELEASE_LOCK(bucket_lease_cr);
+ return set_cr_error(retcode);
+ }
+ if (bucket_status.state != BucketSyncState::Stopped) {
+ // make sure that state is changed to stopped localy
+ bucket_status.state = BucketSyncState::Stopped;
+ yield call(new WriteCR(dpp, env->async_rados, env->svc->sysobj,
+ status_obj, bucket_status, &objv, false));
+ if (retcode < 0) {
+ tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode));
+ RELEASE_LOCK(bucket_lease_cr);
+ return set_cr_error(retcode);
+ }
+ }
+ yield {
+ const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+ call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, num_shards));
+ }
+ RELEASE_LOCK(bucket_lease_cr);
+ return set_cr_done();
+ }
+ bucket_stopped = false;
+ }
- // if the state wasn't Incremental, take a bucket-wide lease to prevent
- // different shards from duplicating the init and full sync
if (bucket_status.state != BucketSyncState::Incremental) {
- assert(!bucket_lease_cr);
- bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+ // if the state wasn't Incremental, take a bucket-wide lease to prevent
+ // different shards from duplicating the init and full sync
+ if (!bucket_lease_cr) {
+ bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
lock_name, lock_duration, this));
- yield spawn(bucket_lease_cr.get(), false);
- while (!bucket_lease_cr->is_locked()) {
- if (bucket_lease_cr->is_done()) {
- tn->log(5, "failed to take bucket lease");
- set_status("lease lock failed, early abort");
- drain_all();
- return set_cr_error(bucket_lease_cr->get_ret_status());
+ yield spawn(bucket_lease_cr.get(), false);
+ while (!bucket_lease_cr->is_locked()) {
+ if (bucket_lease_cr->is_done()) {
+ tn->log(5, "ERROR: failed to take bucket lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(bucket_lease_cr->get_ret_status());
+ }
+ tn->log(5, "waiting on bucket lease");
+ yield set_sleeping(true);
}
- tn->log(5, "waiting on bucket lease");
- yield set_sleeping(true);
}
+
// reread the status after acquiring the lock
yield call(new ReadCR(dpp, env->async_rados, env->svc->sysobj,
- status_obj, &bucket_status, false, &objv));
+ status_obj, &bucket_status, false, &objv));
if (retcode < 0) {
- bucket_lease_cr->go_down();
- drain_all();
- bucket_lease_cr.reset();
+ RELEASE_LOCK(bucket_lease_cr);
+ tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
return set_cr_error(retcode);
}
- }
- if (bucket_status.state == BucketSyncState::Init ||
- bucket_status.state == BucketSyncState::Stopped) {
- assert(bucket_lease_cr);
// init sync status
yield {
init_check_compat = objv.read_version.ver <= 1; // newly-created
- int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+ const int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
bucket_status, objv, num_shards,
- init_check_compat));
+ init_check_compat, info));
}
+
if (retcode < 0) {
- bucket_lease_cr->go_down();
- drain_all();
- bucket_lease_cr.reset();
+ tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode));
+ RELEASE_LOCK(bucket_lease_cr);
return set_cr_error(retcode);
}
}
+ assert(bucket_status.state == BucketSyncState::Incremental ||
+ bucket_status.state == BucketSyncState::Full);
+
if (bucket_status.state == BucketSyncState::Full) {
assert(bucket_lease_cr);
yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
bucket_lease_cr, bucket_status,
tn, objv));
if (retcode < 0) {
- bucket_lease_cr->go_down();
- drain_all();
- bucket_lease_cr.reset();
+ tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode));
+ RELEASE_LOCK(bucket_lease_cr);
return set_cr_error(retcode);
}
}
if (bucket_status.state == BucketSyncState::Incremental) {
// lease not required for incremental sync
- if (bucket_lease_cr) {
- bucket_lease_cr->go_down();
- drain_all();
- bucket_lease_cr.reset();
- }
+ RELEASE_LOCK(bucket_lease_cr);
// if a specific gen was requested, compare that to the sync status
if (gen) {
const auto current_gen = bucket_status.incremental_gen;
if (*gen > current_gen) {
retcode = -EAGAIN;
- tn->log(10, SSTR("requested sync of future generation "
+ tn->log(10, SSTR("ERROR: requested sync of future generation "
<< *gen << " > " << current_gen
<< ", returning " << retcode << " for later retry"));
return set_cr_error(retcode);
} else if (*gen < current_gen) {
- tn->log(10, SSTR("requested sync of past generation "
+ tn->log(10, SSTR("WARNING: requested sync of past generation "
<< *gen << " < " << current_gen
<< ", returning success"));
return set_cr_done();
}
}
- if (size_t(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) {
+ assert(sync_pair.source_bs.shard_id >= 0);
+ if (static_cast<std::size_t>(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) {
tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
return set_cr_done(); // return success so we don't retry
}
}
yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
- sync_pipe, bucket_status.state,
+ sync_pipe, bucket_stopped,
bucket_status.incremental_gen, tn, progress));
if (retcode < 0) {
+ tn->log(20, SSTR("ERROR: incremental sync failed. error: " << retcode));
return set_cr_error(retcode);
}
}
// loop back to previous states unless incremental sync returns normally
- } while (bucket_status.state != BucketSyncState::Incremental);
+ } while (bucket_status.state != BucketSyncState::Incremental || bucket_stopped);
return set_cr_done();
}
list<RGWCoroutinesStack *> stacks;
// pass an empty objv tracker to each so that the version gets incremented
std::list<RGWObjVersionTracker> objvs;
+ std::list<rgw_bucket_index_marker_info> infos;
for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
objvs.emplace_back();
- stack->call(mgr->init_sync_status_cr(objvs.back()));
+ infos.emplace_back();
+ stack->call(mgr->init_sync_status_cr(objvs.back(), infos.back()));
stacks.push_back(stack);
}