From 19d770832b022f14401a0cc60cbdebc21a1a07d5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 19 Jul 2019 16:37:04 -0500 Subject: [PATCH] osd: send and process lease[_ack] messages Signed-off-by: Sage Weil --- src/messages/MOSDPGLease.h | 67 +++++++++++++++++++++++++++++ src/messages/MOSDPGLeaseAck.h | 67 +++++++++++++++++++++++++++++ src/msg/Message.cc | 8 ++++ src/msg/Message.h | 3 ++ src/osd/OSD.cc | 2 + src/osd/OSD.h | 2 + src/osd/PGPeeringEvent.h | 23 ++++++++++ src/osd/PeeringState.cc | 79 +++++++++++++++++++++++++++++++++++ src/osd/PeeringState.h | 17 +++++++- 9 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 src/messages/MOSDPGLease.h create mode 100644 src/messages/MOSDPGLeaseAck.h diff --git a/src/messages/MOSDPGLease.h b/src/messages/MOSDPGLease.h new file mode 100644 index 00000000000..9b8fcf3b2e0 --- /dev/null +++ b/src/messages/MOSDPGLease.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDPGLease : public MOSDPeeringOp { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + + epoch_t epoch = 0; + spg_t spgid; + pg_lease_t lease; + +public: + spg_t get_spg() const { + return spgid; + } + epoch_t get_map_epoch() const { + return epoch; + } + epoch_t get_min_epoch() const { + return epoch; + } + PGPeeringEvent *get_event() override { + return new PGPeeringEvent( + epoch, + epoch, + MLease(epoch, get_source().num(), lease)); + } + + MOSDPGLease() : MOSDPeeringOp{MSG_OSD_PG_LEASE, + HEAD_VERSION, COMPAT_VERSION} {} + MOSDPGLease(version_t mv, spg_t p, pg_lease_t lease) : + MOSDPeeringOp{MSG_OSD_PG_LEASE, + HEAD_VERSION, COMPAT_VERSION}, + epoch(mv), + spgid(p), + lease(lease) { } +private: + ~MOSDPGLease() override {} + +public: + std::string_view get_type_name() const override { return "pg_lease"; } + void inner_print(ostream& out) const override { + out << lease; + } + + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(epoch, payload); + encode(spgid, payload); + encode(lease, payload); + } + void decode_payload() override { + auto p = payload.cbegin(); + decode(epoch, p); + decode(spgid, p); + decode(lease, p); + } +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; diff --git a/src/messages/MOSDPGLeaseAck.h b/src/messages/MOSDPGLeaseAck.h new file mode 100644 index 00000000000..8a4bd5c028c --- /dev/null +++ b/src/messages/MOSDPGLeaseAck.h @@ -0,0 +1,67 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDPGLeaseAck : public MOSDPeeringOp { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + + epoch_t epoch = 0; + spg_t spgid; + pg_lease_ack_t lease_ack; + +public: + spg_t get_spg() const { + return spgid; + } + epoch_t get_map_epoch() const { + return epoch; + } + epoch_t get_min_epoch() const { + return epoch; + } + PGPeeringEvent *get_event() override { + return new PGPeeringEvent( + epoch, + epoch, + MLeaseAck(epoch, get_source().num(), lease_ack)); + } + + MOSDPGLeaseAck() : MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK, + HEAD_VERSION, COMPAT_VERSION} {} + MOSDPGLeaseAck(version_t mv, spg_t p, pg_lease_ack_t lease_ack) : + MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK, + HEAD_VERSION, COMPAT_VERSION}, + epoch(mv), + spgid(p), + lease_ack(lease_ack) { } +private: + ~MOSDPGLeaseAck() override {} + +public: + std::string_view get_type_name() const override { return "pg_lease_ack"; } + void inner_print(ostream& out) const override { + out << lease_ack; + } + + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(epoch, payload); + encode(spgid, payload); + encode(lease_ack, payload); + } + void decode_payload() override { + auto p = payload.cbegin(); + decode(epoch, p); + decode(spgid, p); + decode(lease_ack, p); + } +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 005aab53f86..c57bc3bc1bd 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -86,6 +86,8 @@ #include "messages/MOSDPGCreate.h" #include "messages/MOSDPGCreate2.h" #include "messages/MOSDPGTrim.h" +#include "messages/MOSDPGLease.h" +#include "messages/MOSDPGLeaseAck.h" #include "messages/MOSDScrub.h" #include "messages/MOSDScrub2.h" #include "messages/MOSDScrubReserve.h" @@ -559,6 +561,12 @@ Message *decode_message(CephContext *cct, case MSG_OSD_PG_TRIM: m = make_message(); break; + case MSG_OSD_PG_LEASE: + m = make_message(); + break; + case MSG_OSD_PG_LEASE_ACK: + m = make_message(); + break; case MSG_OSD_SCRUB: m = make_message(); diff --git a/src/msg/Message.h b/src/msg/Message.h index de100cc2794..e9b0fec7cab 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -140,6 +140,9 @@ #define MSG_OSD_PG_READY_TO_MERGE 122 +#define MSG_OSD_PG_LEASE 133 +#define MSG_OSD_PG_LEASE_ACK 134 + // *** MDS *** #define MSG_MDS_BEACON 100 // to monitor diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 6f5ca81ba0a..0b18c96c3c4 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -7261,6 +7261,8 @@ void OSD::ms_fast_dispatch(Message *m) case MSG_OSD_PG_INFO2: case MSG_OSD_BACKFILL_RESERVE: case MSG_OSD_RECOVERY_RESERVE: + case MSG_OSD_PG_LEASE: + case MSG_OSD_PG_LEASE_ACK: { MOSDPeeringOp *pm = static_cast(m); if (require_osd_peer(pm)) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3d7c70e6845..efa7678c871 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -2074,6 +2074,8 @@ private: case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: case MSG_OSD_PG_RECOVERY_DELETE: case MSG_OSD_PG_RECOVERY_DELETE_REPLY: + case MSG_OSD_PG_LEASE: + case MSG_OSD_PG_LEASE_ACK: return true; default: return false; diff --git a/src/osd/PGPeeringEvent.h b/src/osd/PGPeeringEvent.h index f38cb7750f8..aaa8b0e939a 100644 --- a/src/osd/PGPeeringEvent.h +++ b/src/osd/PGPeeringEvent.h @@ -129,6 +129,29 @@ struct MTrim : boost::statechart::event { } }; +struct MLease : boost::statechart::event { + epoch_t epoch; + int from; + pg_lease_t lease; + MLease(epoch_t epoch, int from, pg_lease_t l) + : epoch(epoch), from(from), lease(l) {} + void print(std::ostream *out) const { + *out << "MLease epoch " << epoch << " from osd." << from << " " << lease; + } +}; + +struct MLeaseAck : boost::statechart::event { + epoch_t epoch; + int from; + pg_lease_ack_t lease_ack; + MLeaseAck(epoch_t epoch, int from, pg_lease_ack_t l) + : epoch(epoch), from(from), lease_ack(l) {} + void print(std::ostream *out) const { + *out << "MLeaseAck epoch " << epoch << " from osd." << from + << " " << lease_ack; + } +}; + struct RequestBackfillPrio : boost::statechart::event< RequestBackfillPrio > { unsigned priority; int64_t primary_num_bytes; diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 876910f4421..038eeb19382 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -18,6 +18,8 @@ #include "messages/MOSDPGNotify2.h" #include "messages/MOSDPGQuery.h" #include "messages/MOSDPGQuery2.h" +#include "messages/MOSDPGLease.h" +#include "messages/MOSDPGLeaseAck.h" #define dout_context cct #define dout_subsys ceph_subsys_osd @@ -1080,6 +1082,62 @@ bool PeeringState::set_force_backfill(bool b) return did; } +void PeeringState::proc_lease(const pg_lease_t& l) +{ + psdout(10) << __func__ << " " << l << dendl; + if (l.readable_until_ub > readable_until_ub_from_primary) { + readable_until_ub_from_primary = l.readable_until_ub; + } + + ceph::signedspan ru; + if (hb_stamps[0]->peer_clock_delta_ub) { + ru = l.readable_until - *hb_stamps[0]->peer_clock_delta_ub; + } else { + ru = ceph::signedspan::zero(); + } + if (ru > readable_until) { + readable_until = ru; + psdout(20) << __func__ << " readable_until now " << readable_until << dendl; +#warning fixme: wake up replica? + } + + ceph::signedspan ruub; + if (hb_stamps[0]->peer_clock_delta_lb) { + ruub = l.readable_until_ub - *hb_stamps[0]->peer_clock_delta_lb; + } else { + ruub = pl->get_mnow() + l.interval; + } + if (ruub > readable_until_ub) { + readable_until_ub = ruub; + psdout(20) << __func__ << " readable_until_ub now " << readable_until_ub + << dendl; + } +} + +void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a) +{ + auto now = pl->get_mnow(); + bool was_min = false; + for (unsigned i = 0; i < acting.size(); ++i) { + if (from == acting[i]) { + // the lease_ack value is based on the primary's clock + if (a.readable_until_ub > acting_readable_until_ub[i]) { + if (acting_readable_until_ub[i] == readable_until) { + was_min = true; + } + acting_readable_until_ub[i] = a.readable_until_ub; + break; + } + } + } + if (was_min) { + recalc_readable_until(); +// if (pl->is_laggy() && readable_until > now) { +#warning fixme: wake up? + //} + } +} + void PeeringState::recalc_readable_until() { assert(is_primary()); @@ -5631,6 +5689,13 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated return discard_event(); } +boost::statechart::result PeeringState::Active::react(const MLeaseAck& la) +{ + DECLARE_LOCALS; + ps->proc_lease_ack(la.from, la.lease_ack); + return discard_event(); +} + /* * update info.history.last_epoch_started ONLY after we and all * replicas have activated AND committed the activate transaction @@ -5731,6 +5796,20 @@ boost::statechart::result PeeringState::ReplicaActive::react( return discard_event(); } +boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l) +{ + DECLARE_LOCALS; + spg_t spgid = context< PeeringMachine >().spgid; + epoch_t epoch = pl->get_osdmap_epoch(); + + ps->proc_lease(l.lease); + pl->send_cluster_message( + ps->get_primary().osd, + new MOSDPGLeaseAck(epoch, spgid, ps->get_lease_ack()), + epoch); + return discard_event(); +} + boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt) { DECLARE_LOCALS; diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 9a40997c9f6..e6a2408fbe8 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -812,7 +812,8 @@ public: boost::statechart::custom_reaction< UnfoundBackfill >, boost::statechart::custom_reaction< RemoteReservationRevokedTooFull>, boost::statechart::custom_reaction< RemoteReservationRevoked>, - boost::statechart::custom_reaction< DoRecovery> + boost::statechart::custom_reaction< DoRecovery>, + boost::statechart::custom_reaction< MLeaseAck> > reactions; boost::statechart::result react(const QueryState& q); boost::statechart::result react(const ActMap&); @@ -826,6 +827,7 @@ public: } boost::statechart::result react(const ActivateCommitted&); boost::statechart::result react(const AllReplicasActivated&); + boost::statechart::result react(const MLeaseAck&); boost::statechart::result react(const DeferRecovery& evt) { return discard_event(); } @@ -978,7 +980,8 @@ public: boost::statechart::custom_reaction< RemoteBackfillPreempted >, boost::statechart::custom_reaction< RemoteRecoveryPreempted >, boost::statechart::custom_reaction< RecoveryDone >, - boost::statechart::transition + boost::statechart::transition, + boost::statechart::custom_reaction< MLease > > reactions; boost::statechart::result react(const QueryState& q); boost::statechart::result react(const MInfoRec& infoevt); @@ -988,6 +991,7 @@ public: boost::statechart::result react(const MQuery&); boost::statechart::result react(const Activate&); boost::statechart::result react(const ActivateCommitted&); + boost::statechart::result react(const MLease&); boost::statechart::result react(const RecoveryDone&) { return discard_event(); } @@ -1946,10 +1950,19 @@ public: } } + void schedule_renew_lease(); + pg_lease_t get_lease() { return pg_lease_t(readable_until, readable_until_ub_sent, readable_interval); } + void proc_lease(const pg_lease_t& l); + void proc_lease_ack(int from, const pg_lease_ack_t& la); + + pg_lease_ack_t get_lease_ack() { + return pg_lease_ack_t(readable_until_ub_from_primary); + } + /// [primary] recalc readable_until[_ub] for the current interval void recalc_readable_until(); -- 2.39.5