--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGLease : public MOSDPeeringOp {
+private:
+ static constexpr int HEAD_VERSION = 1;
+ static constexpr int COMPAT_VERSION = 1;
+
+ epoch_t epoch = 0;
+ spg_t spgid;
+ pg_lease_t lease;
+
+public:
+ spg_t get_spg() const {
+ return spgid;
+ }
+ epoch_t get_map_epoch() const {
+ return epoch;
+ }
+ epoch_t get_min_epoch() const {
+ return epoch;
+ }
+ PGPeeringEvent *get_event() override {
+ return new PGPeeringEvent(
+ epoch,
+ epoch,
+ MLease(epoch, get_source().num(), lease));
+ }
+
+ MOSDPGLease() : MOSDPeeringOp{MSG_OSD_PG_LEASE,
+ HEAD_VERSION, COMPAT_VERSION} {}
+ MOSDPGLease(version_t mv, spg_t p, pg_lease_t lease) :
+ MOSDPeeringOp{MSG_OSD_PG_LEASE,
+ HEAD_VERSION, COMPAT_VERSION},
+ epoch(mv),
+ spgid(p),
+ lease(lease) { }
+private:
+ ~MOSDPGLease() override {}
+
+public:
+ std::string_view get_type_name() const override { return "pg_lease"; }
+ void inner_print(ostream& out) const override {
+ out << lease;
+ }
+
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(epoch, payload);
+ encode(spgid, payload);
+ encode(lease, payload);
+ }
+ void decode_payload() override {
+ auto p = payload.cbegin();
+ decode(epoch, p);
+ decode(spgid, p);
+ decode(lease, p);
+ }
+private:
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGLeaseAck : public MOSDPeeringOp {
+private:
+ static constexpr int HEAD_VERSION = 1;
+ static constexpr int COMPAT_VERSION = 1;
+
+ epoch_t epoch = 0;
+ spg_t spgid;
+ pg_lease_ack_t lease_ack;
+
+public:
+ spg_t get_spg() const {
+ return spgid;
+ }
+ epoch_t get_map_epoch() const {
+ return epoch;
+ }
+ epoch_t get_min_epoch() const {
+ return epoch;
+ }
+ PGPeeringEvent *get_event() override {
+ return new PGPeeringEvent(
+ epoch,
+ epoch,
+ MLeaseAck(epoch, get_source().num(), lease_ack));
+ }
+
+ MOSDPGLeaseAck() : MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK,
+ HEAD_VERSION, COMPAT_VERSION} {}
+ MOSDPGLeaseAck(version_t mv, spg_t p, pg_lease_ack_t lease_ack) :
+ MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK,
+ HEAD_VERSION, COMPAT_VERSION},
+ epoch(mv),
+ spgid(p),
+ lease_ack(lease_ack) { }
+private:
+ ~MOSDPGLeaseAck() override {}
+
+public:
+ std::string_view get_type_name() const override { return "pg_lease_ack"; }
+ void inner_print(ostream& out) const override {
+ out << lease_ack;
+ }
+
+ void encode_payload(uint64_t features) override {
+ using ceph::encode;
+ encode(epoch, payload);
+ encode(spgid, payload);
+ encode(lease_ack, payload);
+ }
+ void decode_payload() override {
+ auto p = payload.cbegin();
+ decode(epoch, p);
+ decode(spgid, p);
+ decode(lease_ack, p);
+ }
+private:
+ template<class T, typename... Args>
+ friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
#include "messages/MOSDPGCreate.h"
#include "messages/MOSDPGCreate2.h"
#include "messages/MOSDPGTrim.h"
+#include "messages/MOSDPGLease.h"
+#include "messages/MOSDPGLeaseAck.h"
#include "messages/MOSDScrub.h"
#include "messages/MOSDScrub2.h"
#include "messages/MOSDScrubReserve.h"
case MSG_OSD_PG_TRIM:
m = make_message<MOSDPGTrim>();
break;
+ case MSG_OSD_PG_LEASE:
+ m = make_message<MOSDPGLease>();
+ break;
+ case MSG_OSD_PG_LEASE_ACK:
+ m = make_message<MOSDPGLeaseAck>();
+ break;
case MSG_OSD_SCRUB:
m = make_message<MOSDScrub>();
#define MSG_OSD_PG_READY_TO_MERGE 122
+#define MSG_OSD_PG_LEASE 133
+#define MSG_OSD_PG_LEASE_ACK 134
+
// *** MDS ***
#define MSG_MDS_BEACON 100 // to monitor
case MSG_OSD_PG_INFO2:
case MSG_OSD_BACKFILL_RESERVE:
case MSG_OSD_RECOVERY_RESERVE:
+ case MSG_OSD_PG_LEASE:
+ case MSG_OSD_PG_LEASE_ACK:
{
MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);
if (require_osd_peer(pm)) {
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
case MSG_OSD_PG_RECOVERY_DELETE:
case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ case MSG_OSD_PG_LEASE:
+ case MSG_OSD_PG_LEASE_ACK:
return true;
default:
return false;
}
};
+struct MLease : boost::statechart::event<MLease> {
+ epoch_t epoch;
+ int from;
+ pg_lease_t lease;
+ MLease(epoch_t epoch, int from, pg_lease_t l)
+ : epoch(epoch), from(from), lease(l) {}
+ void print(std::ostream *out) const {
+ *out << "MLease epoch " << epoch << " from osd." << from << " " << lease;
+ }
+};
+
+struct MLeaseAck : boost::statechart::event<MLeaseAck> {
+ epoch_t epoch;
+ int from;
+ pg_lease_ack_t lease_ack;
+ MLeaseAck(epoch_t epoch, int from, pg_lease_ack_t l)
+ : epoch(epoch), from(from), lease_ack(l) {}
+ void print(std::ostream *out) const {
+ *out << "MLeaseAck epoch " << epoch << " from osd." << from
+ << " " << lease_ack;
+ }
+};
+
struct RequestBackfillPrio : boost::statechart::event< RequestBackfillPrio > {
unsigned priority;
int64_t primary_num_bytes;
#include "messages/MOSDPGNotify2.h"
#include "messages/MOSDPGQuery.h"
#include "messages/MOSDPGQuery2.h"
+#include "messages/MOSDPGLease.h"
+#include "messages/MOSDPGLeaseAck.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
return did;
}
+void PeeringState::proc_lease(const pg_lease_t& l)
+{
+ psdout(10) << __func__ << " " << l << dendl;
+ if (l.readable_until_ub > readable_until_ub_from_primary) {
+ readable_until_ub_from_primary = l.readable_until_ub;
+ }
+
+ ceph::signedspan ru;
+ if (hb_stamps[0]->peer_clock_delta_ub) {
+ ru = l.readable_until - *hb_stamps[0]->peer_clock_delta_ub;
+ } else {
+ ru = ceph::signedspan::zero();
+ }
+ if (ru > readable_until) {
+ readable_until = ru;
+ psdout(20) << __func__ << " readable_until now " << readable_until << dendl;
+#warning fixme: wake up replica?
+ }
+
+ ceph::signedspan ruub;
+ if (hb_stamps[0]->peer_clock_delta_lb) {
+ ruub = l.readable_until_ub - *hb_stamps[0]->peer_clock_delta_lb;
+ } else {
+ ruub = pl->get_mnow() + l.interval;
+ }
+ if (ruub > readable_until_ub) {
+ readable_until_ub = ruub;
+ psdout(20) << __func__ << " readable_until_ub now " << readable_until_ub
+ << dendl;
+ }
+}
+
+void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a)
+{
+ auto now = pl->get_mnow();
+ bool was_min = false;
+ for (unsigned i = 0; i < acting.size(); ++i) {
+ if (from == acting[i]) {
+ // the lease_ack value is based on the primary's clock
+ if (a.readable_until_ub > acting_readable_until_ub[i]) {
+ if (acting_readable_until_ub[i] == readable_until) {
+ was_min = true;
+ }
+ acting_readable_until_ub[i] = a.readable_until_ub;
+ break;
+ }
+ }
+ }
+ if (was_min) {
+ recalc_readable_until();
+// if (pl->is_laggy() && readable_until > now) {
+#warning fixme: wake up?
+ //}
+ }
+}
+
void PeeringState::recalc_readable_until()
{
assert(is_primary());
return discard_event();
}
+boost::statechart::result PeeringState::Active::react(const MLeaseAck& la)
+{
+ DECLARE_LOCALS;
+ ps->proc_lease_ack(la.from, la.lease_ack);
+ return discard_event();
+}
+
/*
* update info.history.last_epoch_started ONLY after we and all
* replicas have activated AND committed the activate transaction
return discard_event();
}
+boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l)
+{
+ DECLARE_LOCALS;
+ spg_t spgid = context< PeeringMachine >().spgid;
+ epoch_t epoch = pl->get_osdmap_epoch();
+
+ ps->proc_lease(l.lease);
+ pl->send_cluster_message(
+ ps->get_primary().osd,
+ new MOSDPGLeaseAck(epoch, spgid, ps->get_lease_ack()),
+ epoch);
+ return discard_event();
+}
+
boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
{
DECLARE_LOCALS;
boost::statechart::custom_reaction< UnfoundBackfill >,
boost::statechart::custom_reaction< RemoteReservationRevokedTooFull>,
boost::statechart::custom_reaction< RemoteReservationRevoked>,
- boost::statechart::custom_reaction< DoRecovery>
+ boost::statechart::custom_reaction< DoRecovery>,
+ boost::statechart::custom_reaction< MLeaseAck>
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const ActMap&);
}
boost::statechart::result react(const ActivateCommitted&);
boost::statechart::result react(const AllReplicasActivated&);
+ boost::statechart::result react(const MLeaseAck&);
boost::statechart::result react(const DeferRecovery& evt) {
return discard_event();
}
boost::statechart::custom_reaction< RemoteBackfillPreempted >,
boost::statechart::custom_reaction< RemoteRecoveryPreempted >,
boost::statechart::custom_reaction< RecoveryDone >,
- boost::statechart::transition<DeleteStart, ToDelete>
+ boost::statechart::transition<DeleteStart, ToDelete>,
+ boost::statechart::custom_reaction< MLease >
> reactions;
boost::statechart::result react(const QueryState& q);
boost::statechart::result react(const MInfoRec& infoevt);
boost::statechart::result react(const MQuery&);
boost::statechart::result react(const Activate&);
boost::statechart::result react(const ActivateCommitted&);
+ boost::statechart::result react(const MLease&);
boost::statechart::result react(const RecoveryDone&) {
return discard_event();
}
}
}
+ void schedule_renew_lease();
+
pg_lease_t get_lease() {
return pg_lease_t(readable_until, readable_until_ub_sent, readable_interval);
}
+ void proc_lease(const pg_lease_t& l);
+ void proc_lease_ack(int from, const pg_lease_ack_t& la);
+
+ pg_lease_ack_t get_lease_ack() {
+ return pg_lease_ack_t(readable_until_ub_from_primary);
+ }
+
/// [primary] recalc readable_until[_ub] for the current interval
void recalc_readable_until();