]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
osd: send and process lease[_ack] messages
authorSage Weil <sage@redhat.com>
Fri, 19 Jul 2019 21:37:04 +0000 (16:37 -0500)
committerSage Weil <sage@redhat.com>
Thu, 26 Sep 2019 17:21:53 +0000 (12:21 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/messages/MOSDPGLease.h [new file with mode: 0644]
src/messages/MOSDPGLeaseAck.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PGPeeringEvent.h
src/osd/PeeringState.cc
src/osd/PeeringState.h

diff --git a/src/messages/MOSDPGLease.h b/src/messages/MOSDPGLease.h
new file mode 100644 (file)
index 0000000..9b8fcf3
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGLease : public MOSDPeeringOp {
+private:
+  static constexpr int HEAD_VERSION = 1;
+  static constexpr int COMPAT_VERSION = 1;
+
+  epoch_t epoch = 0;
+  spg_t spgid;
+  pg_lease_t lease;
+
+public:
+  spg_t get_spg() const {
+    return spgid;
+  }
+  epoch_t get_map_epoch() const {
+    return epoch;
+  }
+  epoch_t get_min_epoch() const {
+    return epoch;
+  }
+  PGPeeringEvent *get_event() override {
+    return new PGPeeringEvent(
+      epoch,
+      epoch,
+      MLease(epoch, get_source().num(), lease));
+  }
+
+  MOSDPGLease() : MOSDPeeringOp{MSG_OSD_PG_LEASE,
+                               HEAD_VERSION, COMPAT_VERSION} {}
+  MOSDPGLease(version_t mv, spg_t p, pg_lease_t lease) :
+    MOSDPeeringOp{MSG_OSD_PG_LEASE,
+                 HEAD_VERSION, COMPAT_VERSION},
+    epoch(mv),
+    spgid(p),
+    lease(lease) { }
+private:
+  ~MOSDPGLease() override {}
+
+public:
+  std::string_view get_type_name() const override { return "pg_lease"; }
+  void inner_print(ostream& out) const override {
+    out << lease;
+  }
+
+  void encode_payload(uint64_t features) override {
+    using ceph::encode;
+    encode(epoch, payload);
+    encode(spgid, payload);
+    encode(lease, payload);
+  }
+  void decode_payload() override {
+    auto p = payload.cbegin();
+    decode(epoch, p);
+    decode(spgid, p);
+    decode(lease, p);
+  }
+private:
+  template<class T, typename... Args>
+  friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
diff --git a/src/messages/MOSDPGLeaseAck.h b/src/messages/MOSDPGLeaseAck.h
new file mode 100644 (file)
index 0000000..8a4bd5c
--- /dev/null
@@ -0,0 +1,67 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#pragma once
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDPGLeaseAck : public MOSDPeeringOp {
+private:
+  static constexpr int HEAD_VERSION = 1;
+  static constexpr int COMPAT_VERSION = 1;
+
+  epoch_t epoch = 0;
+  spg_t spgid;
+  pg_lease_ack_t lease_ack;
+
+public:
+  spg_t get_spg() const {
+    return spgid;
+  }
+  epoch_t get_map_epoch() const {
+    return epoch;
+  }
+  epoch_t get_min_epoch() const {
+    return epoch;
+  }
+  PGPeeringEvent *get_event() override {
+    return new PGPeeringEvent(
+      epoch,
+      epoch,
+      MLeaseAck(epoch, get_source().num(), lease_ack));
+  }
+
+  MOSDPGLeaseAck() : MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK,
+                                  HEAD_VERSION, COMPAT_VERSION} {}
+  MOSDPGLeaseAck(version_t mv, spg_t p, pg_lease_ack_t lease_ack) :
+    MOSDPeeringOp{MSG_OSD_PG_LEASE_ACK,
+                 HEAD_VERSION, COMPAT_VERSION},
+    epoch(mv),
+    spgid(p),
+    lease_ack(lease_ack) { }
+private:
+  ~MOSDPGLeaseAck() override {}
+
+public:
+  std::string_view get_type_name() const override { return "pg_lease_ack"; }
+  void inner_print(ostream& out) const override {
+    out << lease_ack;
+  }
+
+  void encode_payload(uint64_t features) override {
+    using ceph::encode;
+    encode(epoch, payload);
+    encode(spgid, payload);
+    encode(lease_ack, payload);
+  }
+  void decode_payload() override {
+    auto p = payload.cbegin();
+    decode(epoch, p);
+    decode(spgid, p);
+    decode(lease_ack, p);
+  }
+private:
+  template<class T, typename... Args>
+  friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
+};
index 005aab53f86ed9b0ef37a61c470d0a4712b9e3ad..c57bc3bc1bd8767261b76fde22f0d0917bb0b97c 100644 (file)
@@ -86,6 +86,8 @@
 #include "messages/MOSDPGCreate.h"
 #include "messages/MOSDPGCreate2.h"
 #include "messages/MOSDPGTrim.h"
+#include "messages/MOSDPGLease.h"
+#include "messages/MOSDPGLeaseAck.h"
 #include "messages/MOSDScrub.h"
 #include "messages/MOSDScrub2.h"
 #include "messages/MOSDScrubReserve.h"
@@ -559,6 +561,12 @@ Message *decode_message(CephContext *cct,
   case MSG_OSD_PG_TRIM:
     m = make_message<MOSDPGTrim>();
     break;
+  case MSG_OSD_PG_LEASE:
+    m = make_message<MOSDPGLease>();
+    break;
+  case MSG_OSD_PG_LEASE_ACK:
+    m = make_message<MOSDPGLeaseAck>();
+    break;
 
   case MSG_OSD_SCRUB:
     m = make_message<MOSDScrub>();
index de100cc2794f44392dce4274a9abc0c5558df7fe..e9b0fec7cab9ff8a01ead5e1f6ead64b8d020adc 100644 (file)
 
 #define MSG_OSD_PG_READY_TO_MERGE 122
 
+#define MSG_OSD_PG_LEASE        133
+#define MSG_OSD_PG_LEASE_ACK    134
+
 // *** MDS ***
 
 #define MSG_MDS_BEACON             100  // to monitor
index 6f5ca81ba0ae0f65cbc8785a1d55a75f33c3cf20..0b18c96c3c4c9f785a2f6dfeed89d7b3f2ed5345 100644 (file)
@@ -7261,6 +7261,8 @@ void OSD::ms_fast_dispatch(Message *m)
   case MSG_OSD_PG_INFO2:
   case MSG_OSD_BACKFILL_RESERVE:
   case MSG_OSD_RECOVERY_RESERVE:
+  case MSG_OSD_PG_LEASE:
+  case MSG_OSD_PG_LEASE_ACK:
     {
       MOSDPeeringOp *pm = static_cast<MOSDPeeringOp*>(m);
       if (require_osd_peer(pm)) {
index 3d7c70e68455a1a1a8210bebab5a5541a040faae..efa7678c871170b2026d949eb0c86d833087c7fa 100644 (file)
@@ -2074,6 +2074,8 @@ private:
     case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
     case MSG_OSD_PG_RECOVERY_DELETE:
     case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+    case MSG_OSD_PG_LEASE:
+    case MSG_OSD_PG_LEASE_ACK:
       return true;
     default:
       return false;
index f38cb7750f8a8aff28c3f08570b40ce8cf8bb092..aaa8b0e939a2f3faa1e54ae1bc3726febfb7f9fe 100644 (file)
@@ -129,6 +129,29 @@ struct MTrim : boost::statechart::event<MTrim> {
   }
 };
 
+struct MLease : boost::statechart::event<MLease> {
+  epoch_t epoch;
+  int from;
+  pg_lease_t lease;
+  MLease(epoch_t epoch, int from, pg_lease_t l)
+    : epoch(epoch), from(from), lease(l) {}
+  void print(std::ostream *out) const {
+    *out << "MLease epoch " << epoch << " from osd." << from << " " << lease;
+  }
+};
+
+struct MLeaseAck : boost::statechart::event<MLeaseAck> {
+  epoch_t epoch;
+  int from;
+  pg_lease_ack_t lease_ack;
+  MLeaseAck(epoch_t epoch, int from, pg_lease_ack_t l)
+    : epoch(epoch), from(from), lease_ack(l) {}
+  void print(std::ostream *out) const {
+    *out << "MLeaseAck epoch " << epoch << " from osd." << from
+        << " " << lease_ack;
+  }
+};
+
 struct RequestBackfillPrio : boost::statechart::event< RequestBackfillPrio > {
   unsigned priority;
   int64_t primary_num_bytes;
index 876910f4421a28768fe347fa2ddf6a027fb67800..038eeb193825c5b5f3c435143933b7ee20fc82b0 100644 (file)
@@ -18,6 +18,8 @@
 #include "messages/MOSDPGNotify2.h"
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGQuery2.h"
+#include "messages/MOSDPGLease.h"
+#include "messages/MOSDPGLeaseAck.h"
 
 #define dout_context cct
 #define dout_subsys ceph_subsys_osd
@@ -1080,6 +1082,62 @@ bool PeeringState::set_force_backfill(bool b)
   return did;
 }
 
+void PeeringState::proc_lease(const pg_lease_t& l)
+{
+  psdout(10) << __func__ << " " << l << dendl;
+  if (l.readable_until_ub > readable_until_ub_from_primary) {
+    readable_until_ub_from_primary = l.readable_until_ub;
+  }
+
+  ceph::signedspan ru;
+  if (hb_stamps[0]->peer_clock_delta_ub) {
+    ru = l.readable_until - *hb_stamps[0]->peer_clock_delta_ub;
+  } else {
+    ru = ceph::signedspan::zero();
+  }
+  if (ru > readable_until) {
+    readable_until = ru;
+    psdout(20) << __func__ << " readable_until now " << readable_until << dendl;
+#warning fixme: wake up replica?
+  }
+
+  ceph::signedspan ruub;
+  if (hb_stamps[0]->peer_clock_delta_lb) {
+    ruub = l.readable_until_ub - *hb_stamps[0]->peer_clock_delta_lb;
+  } else {
+    ruub = pl->get_mnow() + l.interval;
+  }
+  if (ruub > readable_until_ub) {
+    readable_until_ub = ruub;
+    psdout(20) << __func__ << " readable_until_ub now " << readable_until_ub
+              << dendl;
+  }
+}
+
+void PeeringState::proc_lease_ack(int from, const pg_lease_ack_t& a)
+{
+  auto now = pl->get_mnow();
+  bool was_min = false;
+  for (unsigned i = 0; i < acting.size(); ++i) {
+    if (from == acting[i]) {
+      // the lease_ack value is based on the primary's clock
+      if (a.readable_until_ub > acting_readable_until_ub[i]) {
+       if (acting_readable_until_ub[i] == readable_until) {
+         was_min = true;
+       }
+       acting_readable_until_ub[i] = a.readable_until_ub;
+       break;
+      }
+    }
+  }
+  if (was_min) {
+    recalc_readable_until();
+//    if (pl->is_laggy() && readable_until > now) {
+#warning fixme: wake up?
+    //}
+  }
+}
+
 void PeeringState::recalc_readable_until()
 {
   assert(is_primary());
@@ -5631,6 +5689,13 @@ boost::statechart::result PeeringState::Active::react(const AllReplicasActivated
   return discard_event();
 }
 
+boost::statechart::result PeeringState::Active::react(const MLeaseAck& la)
+{
+  DECLARE_LOCALS;
+  ps->proc_lease_ack(la.from, la.lease_ack);
+  return discard_event();
+}
+
 /*
  * update info.history.last_epoch_started ONLY after we and all
  * replicas have activated AND committed the activate transaction
@@ -5731,6 +5796,20 @@ boost::statechart::result PeeringState::ReplicaActive::react(
   return discard_event();
 }
 
+boost::statechart::result PeeringState::ReplicaActive::react(const MLease& l)
+{
+  DECLARE_LOCALS;
+  spg_t spgid = context< PeeringMachine >().spgid;
+  epoch_t epoch = pl->get_osdmap_epoch();
+
+  ps->proc_lease(l.lease);
+  pl->send_cluster_message(
+    ps->get_primary().osd,
+    new MOSDPGLeaseAck(epoch, spgid, ps->get_lease_ack()),
+    epoch);
+  return discard_event();
+}
+
 boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
 {
   DECLARE_LOCALS;
index 9a40997c9f682d29dac9b46553d4074e6195ef92..e6a2408fbe850a4cdd746ea86beaf21f492b36dc 100644 (file)
@@ -812,7 +812,8 @@ public:
       boost::statechart::custom_reaction< UnfoundBackfill >,
       boost::statechart::custom_reaction< RemoteReservationRevokedTooFull>,
       boost::statechart::custom_reaction< RemoteReservationRevoked>,
-      boost::statechart::custom_reaction< DoRecovery>
+      boost::statechart::custom_reaction< DoRecovery>,
+      boost::statechart::custom_reaction< MLeaseAck>
       > reactions;
     boost::statechart::result react(const QueryState& q);
     boost::statechart::result react(const ActMap&);
@@ -826,6 +827,7 @@ public:
     }
     boost::statechart::result react(const ActivateCommitted&);
     boost::statechart::result react(const AllReplicasActivated&);
+    boost::statechart::result react(const MLeaseAck&);
     boost::statechart::result react(const DeferRecovery& evt) {
       return discard_event();
     }
@@ -978,7 +980,8 @@ public:
       boost::statechart::custom_reaction< RemoteBackfillPreempted >,
       boost::statechart::custom_reaction< RemoteRecoveryPreempted >,
       boost::statechart::custom_reaction< RecoveryDone >,
-      boost::statechart::transition<DeleteStart, ToDelete>
+      boost::statechart::transition<DeleteStart, ToDelete>,
+      boost::statechart::custom_reaction< MLease >
       > reactions;
     boost::statechart::result react(const QueryState& q);
     boost::statechart::result react(const MInfoRec& infoevt);
@@ -988,6 +991,7 @@ public:
     boost::statechart::result react(const MQuery&);
     boost::statechart::result react(const Activate&);
     boost::statechart::result react(const ActivateCommitted&);
+    boost::statechart::result react(const MLease&);
     boost::statechart::result react(const RecoveryDone&) {
       return discard_event();
     }
@@ -1946,10 +1950,19 @@ public:
     }
   }
 
+  void schedule_renew_lease();
+
   pg_lease_t get_lease() {
     return pg_lease_t(readable_until, readable_until_ub_sent, readable_interval);
   }
 
+  void proc_lease(const pg_lease_t& l);
+  void proc_lease_ack(int from, const pg_lease_ack_t& la);
+
+  pg_lease_ack_t get_lease_ack() {
+    return pg_lease_ack_t(readable_until_ub_from_primary);
+  }
+
   /// [primary] recalc readable_until[_ub] for the current interval
   void recalc_readable_until();