class MOSDPGLog : public MOSDPeeringOp {
private:
- static constexpr int HEAD_VERSION = 5;
+ static constexpr int HEAD_VERSION = 6;
static constexpr int COMPAT_VERSION = 5;
epoch_t epoch = 0;
pg_log_t log;
pg_missing_t missing;
PastIntervals past_intervals;
+ std::optional<pg_lease_t> lease;
epoch_t get_epoch() const { return epoch; }
spg_t get_pgid() const { return spg_t(info.pgid.pgid, to); }
// swapped out by OSD code.
out << "log " << log
<< " pi " << past_intervals;
+ if (lease) {
+ out << " " << *lease;
+ }
}
void encode_payload(uint64_t features) override {
encode(past_intervals, payload);
encode(to, payload);
encode(from, payload);
+ encode(lease, payload);
}
void decode_payload() override {
using ceph::decode;
decode(past_intervals, p);
decode(to, p);
decode(from, p);
+ if (header.version >= 6) {
+ decode(lease, p);
+ }
}
private:
template<class T, typename... Args>
MEMPOOL_DEFINE_OBJECT_FACTORY(PGPeeringEvent, pg_peering_evt, osd);
-MLogRec::MLogRec(pg_shard_t from, MOSDPGLog *msg) :
- from(from), msg(msg) {}
+MLogRec::MLogRec(pg_shard_t from, MOSDPGLog *msg)
+ : from(from), msg(msg) {}
+
+void MLogRec::print(std::ostream *out) const
+{
+ *out << "MLogRec from " << from << " ";
+ msg->inner_print(*out);
+}
pg_shard_t from;
boost::intrusive_ptr<MOSDPGLog> msg;
MLogRec(pg_shard_t from, MOSDPGLog *msg);
- void print(std::ostream *out) const {
- *out << "MLogRec from " << from;
- }
+ void print(std::ostream *out) const;
};
struct MNotifyRec : boost::statechart::event< MNotifyRec > {
}
if (m) {
- dout(10) << "activate peer osd." << peer << " sending " << m->log << dendl;
- //m->log.print(cout);
+ dout(10) << "activate peer osd." << peer << " sending " << m->log
+ << dendl;
+ m->lease = get_lease();
pl->send_cluster_message(peer.osd, m, get_osdmap_epoch());
}
ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
ps->merge_log(t, logevt.msg->info, logevt.msg->log, logevt.from);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
+ if (logevt.msg->lease) {
+ ps->proc_lease(*logevt.msg->lease);
+ }
return discard_event();
}
} else {
ps->merge_log(t, msg->info, msg->log, logevt.from);
}
+ if (logevt.msg->lease) {
+ ps->proc_lease(*logevt.msg->lease);
+ }
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);