return hb_stamps[peer];
}
+void OSDService::queue_renew_lease(epoch_t epoch, spg_t spgid)
+{
+ osd->enqueue_peering_evt(
+ spgid,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ epoch, epoch,
+ RenewLease())));
+}
+
void OSDService::start_shutdown()
{
{
// Timer for readable leases
ceph::timer<ceph::mono_clock> mono_timer = ceph::timer<ceph::mono_clock>{ceph::construct_suspended};
+ void queue_renew_lease(epoch_t epoch, spg_t spgid);
+
// -- stopping --
ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
ceph::condition_variable is_stopping_cond;
return osd->get_hb_stamps(peer);
}
+void PG::schedule_renew_lease(epoch_t lpr, ceph::timespan delay)
+{
+ auto spgid = info.pgid;
+ auto o = osd;
+ osd->mono_timer.add_event(
+ delay,
+ [o, lpr, spgid]() {
+ o->queue_renew_lease(lpr, spgid);
+ });
+}
+
void PG::rebuild_missing_set_with_deletes(PGLog &pglog)
{
pglog.rebuild_missing_set_with_deletes(
ceph::signedspan get_mnow() override;
HeartbeatStampsRef get_hb_stamps(int peer) override;
+ void schedule_renew_lease(epoch_t lpr, ceph::timespan delay) override;
void rebuild_missing_set_with_deletes(PGLog &pglog) override;
*out << "DeferBackfill: delay " << delay;
}
};
+
+TrivialEvent(RenewLease)
acting_readable_until_ub.clear();
if (is_primary()) {
acting_readable_until_ub.resize(acting.size(), ceph::signedspan::zero());
-
- // start lease here, so that we get acks during peering
- renew_lease(pl->get_mnow());
}
pl->on_new_interval();
return did;
}
+void PeeringState::schedule_renew_lease()
+{
+ pl->schedule_renew_lease(
+ last_peering_reset,
+ readable_interval / 2);
+}
+
+void PeeringState::send_lease()
+{
+ epoch_t epoch = pl->get_osdmap_epoch();
+ for (auto peer : actingset) {
+ if (peer == pg_whoami) {
+ continue;
+ }
+ pl->send_cluster_message(
+ peer.osd,
+ new MOSDPGLease(epoch,
+ spg_t(spgid.pgid, peer.shard),
+ get_lease()),
+ epoch);
+ }
+}
+
void PeeringState::proc_lease(const pg_lease_t& l)
{
+ if (get_role() < 0) {
+ return;
+ }
psdout(10) << __func__ << " " << l << dendl;
if (l.readable_until_ub > readable_until_ub_from_primary) {
readable_until_ub_from_primary = l.readable_until_ub;
}
}
+ renew_lease(pl->get_mnow());
+ send_lease();
+ schedule_renew_lease();
+
// Set up missing_loc
set<pg_shard_t> complete_shards;
for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
return discard_event();
}
+boost::statechart::result PeeringState::Active::react(const RenewLease& rl)
+{
+ DECLARE_LOCALS;
+ ps->renew_lease(pl->get_mnow());
+ ps->send_lease();
+ ps->schedule_renew_lease();
+ return discard_event();
+}
+
boost::statechart::result PeeringState::Active::react(const MLeaseAck& la)
{
DECLARE_LOCALS;
ps->proc_lease(l.lease);
pl->send_cluster_message(
ps->get_primary().osd,
- new MOSDPGLeaseAck(epoch, spgid, ps->get_lease_ack()),
+ new MOSDPGLeaseAck(epoch,
+ spg_t(spgid.pgid, ps->get_primary().shard),
+ ps->get_lease_ack()),
epoch);
return discard_event();
}
virtual ceph::signedspan get_mnow() = 0;
virtual HeartbeatStampsRef get_hb_stamps(int peer) = 0;
+ virtual void schedule_renew_lease(epoch_t plr, ceph::timespan delay) = 0;
// ============ Flush state ==================
/**
boost::statechart::custom_reaction< RemoteReservationRevokedTooFull>,
boost::statechart::custom_reaction< RemoteReservationRevoked>,
boost::statechart::custom_reaction< DoRecovery>,
+ boost::statechart::custom_reaction< RenewLease>,
boost::statechart::custom_reaction< MLeaseAck>
> reactions;
boost::statechart::result react(const QueryState& q);
}
boost::statechart::result react(const ActivateCommitted&);
boost::statechart::result react(const AllReplicasActivated&);
+ boost::statechart::result react(const RenewLease&);
boost::statechart::result react(const MLeaseAck&);
boost::statechart::result react(const DeferRecovery& evt) {
return discard_event();
recalc_readable_until();
}
}
-
+ void send_lease();
void schedule_renew_lease();
pg_lease_t get_lease() {