if (entry.written_shards.empty() && info->partial_writes_last_complete.empty()) {
return;
}
+ const pg_pool_t &pool = get_parent()->get_pool();
+ if (pool.is_nonprimary_shard(get_parent()->whoami_shard().shard)) {
+ // Don't update pwlc on nonprimary shards because they only
+ // observe writes that update their shard
+ return;
+ }
auto dpp = get_parent()->get_dpp();
ldpp_dout(dpp, 20) << __func__ << " version=" << entry.version
<< " written_shards=" << entry.written_shards
- << " pwlc=" << info->partial_writes_last_complete
+ << " pwlc=e" << info->partial_writes_last_complete_epoch
+ << ":" << info->partial_writes_last_complete
<< " previous_version=" << previous_version
<< dendl;
- const pg_pool_t &pool = get_parent()->get_pool();
for (shard_id_t shard : pool.nonprimary_shards) {
auto pwlc_iter = info->partial_writes_last_complete.find(shard);
if (!entry.is_written_shard(shard)) {
// 1st partial write since all logs were updated
info->partial_writes_last_complete[shard] =
std::pair(previous_version, entry.version);
-
+ info->partial_writes_last_complete_epoch = get_osdmap_epoch();
continue;
}
auto &&[old_v, new_v] = pwlc_iter->second;
// invalid
ldpp_dout(dpp, 20) << __func__ << " pwlc invalid " << shard
<< dendl;
- } else if (old_v.version >= entry.version.version) {
+ } else if (old_v >= entry.version) {
// Abnormal case - consider_adjusting_pwlc may advance pwlc
// during peering because all shards have updates but these
// have not been marked complete. At the end of peering
} else {
old_v = previous_version;
new_v = entry.version;
+ info->partial_writes_last_complete_epoch = get_osdmap_epoch();
}
} else if (new_v == previous_version) {
// Subsequent partial write, contiguous versions
new_v = entry.version;
+ info->partial_writes_last_complete_epoch = get_osdmap_epoch();
} else {
// Subsequent partial write, discontiguous versions
ldpp_dout(dpp, 20) << __func__ << " cannot update shard " << shard
// shard is backfilling or in async recovery, pwlc is invalid
ldpp_dout(dpp, 20) << __func__ << " pwlc invalid " << shard
<< dendl;
- } else if (old_v.version >= entry.version.version) {
+ } else if (old_v >= entry.version) {
// Abnormal case - see above
ldpp_dout(dpp, 20) << __func__ << " pwlc is ahead of entry " << shard
<< dendl;
} else {
old_v = new_v = entry.version;
+ info->partial_writes_last_complete_epoch = get_osdmap_epoch();
}
}
}
- ldpp_dout(dpp, 20) << __func__ << " after pwlc="
- << info->partial_writes_last_complete << dendl;
+ ldpp_dout(dpp, 20) << __func__ << " after pwlc=e"
+ << info->partial_writes_last_complete_epoch
+ << ":" << info->partial_writes_last_complete << dendl;
}
void PGBackend::remove(
// knowledge of partial_writes
const auto & [fromversion, toversion] = pwlc;
if (toversion > info.last_update) {
- if (fromversion.version <= info.last_update.version) {
+ if (fromversion <= info.last_update) {
if (info.last_complete == info.last_update) {
psdout(10) << "osd." << shard << " has last_complete"
<< "=last_update " << info.last_update
{
// Merge pwlc information from another shard into
// info.partial_writes_last_complete keeping the newest
- // updates
- if (!oinfo.partial_writes_last_complete.empty()) {
+ // updates. Ignore pwlc from nonprimary shards.
+ if (!oinfo.partial_writes_last_complete.empty()&&
+ !pool.info.is_nonprimary_shard(from.shard)) {
bool updated = false;
// oinfo includes partial_writes_last_complete data.
// Merge this with our copy keeping the most up to date versions
if (info.partial_writes_last_complete.contains(shard)) {
auto & [fromversion, toversion] =
info.partial_writes_last_complete[shard];
- // Prefer pwlc with a newer toversion, if toversion matches prefer an
- // older fromversion.
- if ((ofromversion.epoch > fromversion.epoch) ||
- ((ofromversion.epoch == fromversion.epoch) && (otoversion > toversion)) ||
- ((ofromversion.epoch == fromversion.epoch) && (otoversion == toversion) &&
- (ofromversion.version < fromversion.version))) {
+ // Prefer pwlc with a newer epoch, then pwlc with a newer
+ // toversion, then pwlc with an older fromversion.
+ bool newer_epoch = (oinfo.partial_writes_last_complete_epoch >
+ info.partial_writes_last_complete_epoch);
+ bool same_epoch = (oinfo.partial_writes_last_complete_epoch ==
+ info.partial_writes_last_complete_epoch);
+ if (newer_epoch ||
+ (same_epoch && (otoversion > toversion)) ||
+ (same_epoch && (otoversion == toversion) && (ofromversion < fromversion))) {
if (!updated) {
updated = true;
psdout(10) << "osd." << from
if (updated) {
psdout(10) << "pwlc=" << info.partial_writes_last_complete << dendl;
}
+ // Update last updated epoch
+ info.partial_writes_last_complete_epoch = std::max(
+ info.partial_writes_last_complete_epoch,
+ oinfo.partial_writes_last_complete_epoch);
}
// 3 cases:
// 1. This is the primary, from is the shard that sent the oinfo which may
tinfo.pgid.shard = pg_whoami.shard;
// add partial write from our info
tinfo.partial_writes_last_complete = info.partial_writes_last_complete;
+ tinfo.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
if (info.partial_writes_last_complete.contains(from.shard)) {
apply_pwlc(info.partial_writes_last_complete[from.shard], from, tinfo);
}
if (!tinfo.partial_writes_last_complete.empty()) {
psdout(20) << "sending info to " << from
- << " pwlc=" << tinfo.partial_writes_last_complete
+ << " pwlc=e" << tinfo.partial_writes_last_complete_epoch
+ << ":" << tinfo.partial_writes_last_complete
<< " info=" << tinfo
<< dendl;
}
<< " is up to date, queueing in pending_activators" << dendl;
if (!info.partial_writes_last_complete.empty()) {
psdout(20) << "sending info to " << peer
- << " pwlc=" << info.partial_writes_last_complete
+ << " pwlc=e" << info.partial_writes_last_complete_epoch
+ << ":" << info.partial_writes_last_complete
<< " info=" << info
<< dendl;
}
<< " to " << info.last_update;
pi.partial_writes_last_complete = info.partial_writes_last_complete;
+ pi.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
pi.last_update = info.last_update;
pi.last_complete = info.last_update;
pi.set_last_backfill(hobject_t());
psdout(10) << "shard " << shard << " pwlc rolled back to "
<< info.partial_writes_last_complete[shard] << dendl;
}
- // Always assign the current epoch to the version number so that
- // pwlc adjustments made by the whole proc_master_log process
- // are recognized as the newest updates
- info.partial_writes_last_complete[shard].first.epoch =
- get_osdmap_epoch();
}
+ // Update the epoch so that pwlc adjustments made by the whole
+ // proc_master_log process are recognized as the newest updates
+ info.partial_writes_last_complete_epoch = get_osdmap_epoch();
}
void PeeringState::proc_master_log(
// fix up pwlc - it may refer to log entries that are no longer in the log
child->info.partial_writes_last_complete = info.partial_writes_last_complete;
+ child->info.partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
pg_log.split_pwlc(info);
child->pg_log.split_pwlc(child->info);
info.partial_writes_last_complete) {
auto &&[old_v, new_v] = versionrange;
old_v = new_v = info.last_update;
- old_v.epoch = get_osdmap_epoch();
}
- psdout(10) << "merged pwlc=" << info.partial_writes_last_complete << dendl;
+ info.partial_writes_last_complete_epoch = get_osdmap_epoch();
+ psdout(10) << "merged pwlc=e" << info.partial_writes_last_complete_epoch
+ << ":" << info.partial_writes_last_complete << dendl;
}
}
fromversion.version = eversion_t::max().version;
toversion = fromversion;
}
+ info.partial_writes_last_complete_epoch = 0;
}
for (auto p = logv.begin(); p != logv.end(); ++p) {
i.history.last_epoch_started = evt.activation_epoch;
i.history.last_interval_started = i.history.same_interval_since;
if (!i.partial_writes_last_complete.empty()) {
- psdout(20) << "sending info to " << ps->get_primary() << " pwlc="
- << i.partial_writes_last_complete << " info=" << i << dendl;
+ psdout(20) << "sending info to " << ps->get_primary() << " pwlc=e"
+ << i.partial_writes_last_complete_epoch
+ << ":" << i.partial_writes_last_complete
+ << " info=" << i << dendl;
}
rctx.send_info(
ps->get_primary().osd,
psdout(20) << "info from osd." << infoevt.from
<< " last_update=" << infoevt.info.last_update
<< " last_complete=" << infoevt.info.last_complete
- << " pwlc=" << pwlc
+ << " pwlc=e" << infoevt.info.partial_writes_last_complete_epoch
+ << ":" << pwlc
<< " our last_update=" << ps->info.last_update << dendl;
// Our last update must be in the range described by partial write
// last_complete
- ceph_assert(ps->info.last_update.version >= pwlc.first.version);
+ ceph_assert(ps->info.last_update >= pwlc.first);
// Last complete must match the partial write last_update
ceph_assert(pwlc.second == infoevt.info.last_update);
} else {
void pg_info_t::encode(ceph::buffer::list &bl) const
{
- ENCODE_START(33, 26, bl);
+ ENCODE_START(34, 26, bl);
encode(pgid.pgid, bl);
encode(last_update, bl);
encode(last_complete, bl);
encode(true, bl); // was last_backfill_bitwise
encode(last_interval_started, bl);
encode(partial_writes_last_complete, bl);
+ encode(partial_writes_last_complete_epoch, bl);
ENCODE_FINISH(bl);
}
void pg_info_t::decode(ceph::buffer::list::const_iterator &bl)
{
- DECODE_START(33, bl);
+ DECODE_START(34, bl);
decode(pgid.pgid, bl);
decode(last_update, bl);
decode(last_complete, bl);
if (struct_v >= 33) {
decode(partial_writes_last_complete, bl);
}
+ if (struct_v >= 34) {
+ decode(partial_writes_last_complete_epoch, bl);
+ }
DECODE_FINISH(bl);
}
f->close_section();
}
f->close_section();
+ f->dump_stream("partial_writes_last_complete_epoch") << partial_writes_last_complete_epoch;
f->open_array_section("purged_snaps");
for (interval_set<snapid_t>::const_iterator i=purged_snaps.begin();
i != purged_snaps.end();
std::map<shard_id_t,std::pair<eversion_t, eversion_t>>
partial_writes_last_complete; ///< last_complete for shards not modified by a partial write
+ epoch_t partial_writes_last_complete_epoch; ///< epoch when pwlc was last updated
pg_stat_t stats;
l.last_backfill == r.last_backfill &&
l.purged_snaps == r.purged_snaps &&
l.partial_writes_last_complete == r.partial_writes_last_complete &&
+ l.partial_writes_last_complete_epoch == r.partial_writes_last_complete_epoch &&
l.stats == r.stats &&
l.history == r.history &&
l.hit_set == r.hit_set;
: last_epoch_started(0),
last_interval_started(0),
last_user_version(0),
- last_backfill(hobject_t::get_max())
+ last_backfill(hobject_t::get_max()),
+ partial_writes_last_complete_epoch(0)
{ }
// cppcheck-suppress noExplicitConstructor
pg_info_t(spg_t p)
last_epoch_started(0),
last_interval_started(0),
last_user_version(0),
- last_backfill(hobject_t::get_max())
+ last_backfill(hobject_t::get_max()),
+ partial_writes_last_complete_epoch(0)
{ }
void set_last_backfill(hobject_t pos) {
eversion_t last_complete;
version_t last_user_version;
std::map<shard_id_t,std::pair<eversion_t,eversion_t>> partial_writes_last_complete;
+ epoch_t partial_writes_last_complete_epoch;
struct { // pg_stat_t stats
eversion_t version;
version_t reported_seq;
last_complete = info.last_complete;
last_user_version = info.last_user_version;
partial_writes_last_complete = info.partial_writes_last_complete;
+ partial_writes_last_complete_epoch = info.partial_writes_last_complete_epoch;
stats.version = info.stats.version;
stats.reported_seq = info.stats.reported_seq;
stats.last_fresh = info.stats.last_fresh;
info->last_complete = last_complete;
info->last_user_version = last_user_version;
info->partial_writes_last_complete = partial_writes_last_complete;
+ info->partial_writes_last_complete_epoch = partial_writes_last_complete_epoch;
info->stats.version = stats.version;
info->stats.reported_seq = stats.reported_seq;
info->stats.last_fresh = stats.last_fresh;
}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(last_update, bl);
encode(last_complete, bl);
encode(last_user_version, bl);
encode(stats.stats.sum.num_wr_kb, bl);
encode(stats.stats.sum.num_objects_dirty, bl);
encode(partial_writes_last_complete, bl);
+ encode(partial_writes_last_complete_epoch, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& p) {
- DECODE_START(2, p);
+ DECODE_START(3, p);
decode(last_update, p);
decode(last_complete, p);
decode(last_user_version, p);
decode(stats.stats.sum.num_objects_dirty, p);
if (struct_v >= 2)
decode(partial_writes_last_complete, p);
+ if (struct_v >= 3)
+ decode(partial_writes_last_complete_epoch, p);
DECODE_FINISH(p);
}
void dump(ceph::Formatter *f) const {
f->close_section();
}
f->close_section();
+ f->dump_stream("partial_writes_last_complete_epoch") << partial_writes_last_complete_epoch;
f->open_object_section("stats");
f->dump_stream("version") << stats.version;
f->dump_unsigned("reported_seq", stats.reported_seq);