// per bucket-shard state cached by DataSyncShardCR
struct State {
// the source bucket shard to sync
- rgw_bucket_shard key;
+ std::pair<rgw_bucket_shard, std::optional<uint64_t>> key;
// current sync obligation being processed by DataSyncSingleEntry
std::optional<rgw_data_sync_obligation> obligation;
// incremented with each new obligation
// highest timestamp applied by all sources
ceph::real_time progress_timestamp;
- State(const rgw_bucket_shard& key) noexcept : key(key) {}
+ State(const std::pair<rgw_bucket_shard, std::optional<uint64_t>>& key ) noexcept
+ : key(key) {}
+ State(const rgw_bucket_shard& shard, std::optional<uint64_t> gen) noexcept
+ : key(shard, gen) {}
};
struct Entry;
class Handle;
using lru_config = ceph::common::intrusive_lru_config<
- rgw_bucket_shard, Entry, EntryToKey>;
+ std::pair<rgw_bucket_shard, std::optional<uint64_t>>, Entry, EntryToKey>;
// a recyclable cache entry
struct Entry : State, ceph::common::intrusive_lru_base<lru_config> {
};
struct EntryToKey {
- using type = rgw_bucket_shard;
+ using type = std::pair<rgw_bucket_shard, std::optional<uint64_t>>;
const type& operator()(const Entry& e) { return e.key; }
};
// find or create a cache entry for the given key, and return a Handle that
// keeps it lru-pinned until destruction
- Handle get(const rgw_bucket_shard& key);
+ Handle get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen);
};
// a State handle that keeps the Cache referenced
State* operator->() const noexcept { return entry.get(); }
};
-inline Handle Cache::get(const rgw_bucket_shard& key)
+inline Handle Cache::get(const rgw_bucket_shard& shard, std::optional<uint64_t> gen)
{
- auto result = cache.get_or_create(key);
+ auto result = cache.get_or_create({ shard, gen });
return {this, std::move(result.first)};
}
obligation_counter = state->counter;
progress = ceph::real_time{};
- ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
+ ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key.first}
<< ' ' << *state->obligation << dendl;
yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
- state->key, tn,
+ state->key.first, tn,
state->obligation->gen,
&progress));
if (retcode < 0) {
complete = std::move(*state->obligation);
state->obligation.reset();
- tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}
+ tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key.first}
<< " progress=" << progress << ' ' << complete << " r=" << retcode));
}
sync_status = retcode;
std::optional<uint64_t> gen,
const std::string& marker,
ceph::real_time timestamp, bool retry) {
- auto state = bucket_shard_cache->get(src);
+ auto state = bucket_shard_cache->get(src, gen);
auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
&*marker_tracker, error_repo,