epoch_t epoch_sent;
epoch_t min_epoch;
pg_info_t info;
+ std::optional<pg_lease_t> lease;
+ std::optional<pg_lease_ack_t> lease_ack;
spg_t get_spg() const override {
return spgid;
MInfoRec(
pg_shard_t(get_source().num(), info.pgid.shard),
info,
- epoch_sent));
+ epoch_sent,
+ lease,
+ lease_ack));
}
MOSDPGInfo2() : MOSDPeeringOp{MSG_OSD_PG_INFO2,
spg_t s,
pg_info_t q,
epoch_t sent,
- epoch_t min)
+ epoch_t min,
+ std::optional<pg_lease_t> l,
+ std::optional<pg_lease_ack_t> la)
: MOSDPeeringOp{MSG_OSD_PG_INFO2, HEAD_VERSION, COMPAT_VERSION},
spgid(s),
epoch_sent(sent),
min_epoch(min),
- info(q) {
+ info(q),
+ lease(l),
+ lease_ack(la) {
set_priority(CEPH_MSG_PRIO_HIGH);
}
encode(epoch_sent, payload);
encode(min_epoch, payload);
encode(info, payload);
+ encode(lease, payload);
+ encode(lease_ack, payload);
}
void decode_payload() override {
using ceph::decode;
decode(epoch_sent, p);
decode(min_epoch, p);
decode(info, p);
+ decode(lease, p);
+ decode(lease_ack, p);
}
private:
template<class T, typename... Args>
pg_shard_t from;
pg_info_t info;
epoch_t msg_epoch;
- MInfoRec(pg_shard_t from, const pg_info_t &info, epoch_t msg_epoch) :
- from(from), info(info), msg_epoch(msg_epoch) {}
+ std::optional<pg_lease_t> lease;
+ std::optional<pg_lease_ack_t> lease_ack;
+ MInfoRec(pg_shard_t from, const pg_info_t &info, epoch_t msg_epoch,
+ std::optional<pg_lease_t> l = {},
+ std::optional<pg_lease_ack_t> la = {})
+ : from(from), info(info), msg_epoch(msg_epoch),
+ lease(l), lease_ack(la) {}
void print(std::ostream *out) const {
*out << "MInfoRec from " << from << " info: " << info;
+ if (lease) {
+ *out << " " << *lease;
+ }
+ if (lease_ack) {
+ *out << " " << *lease_ack;
+ }
}
};
spg_t to_spgid,
epoch_t min_epoch,
epoch_t cur_epoch,
- const pg_info_t &info)
+ const pg_info_t &info,
+ std::optional<pg_lease_t> lease,
+ std::optional<pg_lease_ack_t> lease_ack)
{
if (require_osd_release >= ceph_release_t::octopus) {
send_osd_message(
to_spgid,
info,
cur_epoch,
- min_epoch)
+ min_epoch,
+ lease,
+ lease_ack)
);
} else {
send_osd_message(
}
readable_until = min;
readable_until_ub = min;
- dout(20) << __func__ << " readable_until[_ub] " << readable_until << dendl;
+ dout(20) << __func__ << " readable_until[_ub] " << readable_until
+ << " (sent " << readable_until_ub_sent << ")" << dendl;
}
bool PeeringState::check_prior_readable_down_osds(const OSDMapRef& map)
spg_t(info.pgid.pgid, peer.shard),
get_osdmap_epoch(), // fixme: use lower epoch?
get_osdmap_epoch(),
- info);
+ info,
+ get_lease());
} else {
psdout(10) << "activate peer osd." << peer
<< " is up to date, but sending pg_log anyway" << dendl;
ceph_assert(ps->is_primary());
ceph_assert(!ps->acting_recovery_backfill.empty());
+ if (infoevt.lease_ack) {
+ ps->proc_lease_ack(infoevt.from.osd, *infoevt.lease_ack);
+ }
// don't update history (yet) if we are active and primary; the replica
// may be telling us they have activated (and committed) but we can't
// share that until _everyone_ does the same.
if (ps->is_acting_recovery_backfill(infoevt.from) &&
ps->peer_activated.count(infoevt.from) == 0) {
psdout(10) << " peer osd." << infoevt.from
- << " activated and committed" << dendl;
+ << " activated and committed" << dendl;
ps->peer_activated.insert(infoevt.from);
ps->blocked_by.erase(infoevt.from.shard);
pl->publish_stats_to_osd();
ceph_assert(!ps->acting_recovery_backfill.empty());
ceph_assert(ps->blocked_by.empty());
- ps->send_lease();
-
// Degraded?
ps->update_calc_stats();
if (ps->info.stats.stats.sum.num_objects_degraded) {
spg_t(ps->info.pgid.pgid, ps->get_primary().shard),
epoch,
epoch,
- i);
+ i,
+ {}, /* lease */
+ ps->get_lease_ack());
if (ps->acting.size() >= ps->pool.info.min_size) {
ps->state_set(PG_STATE_ACTIVE);
ps->info.hit_set = infoevt.info.hit_set;
}
+ if (infoevt.lease) {
+ ps->proc_lease(*infoevt.lease);
+ }
+
ceph_assert(infoevt.info.last_update == ps->info.last_update);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
void send_query(int to, spg_t spgid, const pg_query_t &q);
void send_info(int to, spg_t to_spgid,
epoch_t min_epoch, epoch_t cur_epoch,
- const pg_info_t &info);
+ const pg_info_t &info,
+ std::optional<pg_lease_t> lease = {},
+ std::optional<pg_lease_ack_t> lease_ack = {});
};
struct HeartbeatStamps : public RefCountedObject {
}
void send_info(int to, spg_t to_spgid,
epoch_t min_epoch, epoch_t cur_epoch,
- const pg_info_t &info) {
- msgs.send_info(to, to_spgid, min_epoch, cur_epoch, info);
+ const pg_info_t &info,
+ std::optional<pg_lease_t> lease = {},
+ std::optional<pg_lease_ack_t> lease_ack = {}) {
+ msgs.send_info(to, to_spgid, min_epoch, cur_epoch, info,
+ lease, lease_ack);
}
};