From 541e208cdf3a2a20067e0cb96ee9cc79df70cf20 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 17 Jun 2019 13:55:56 -0500 Subject: [PATCH] osd: track clock delta between peer OSDs We need to keep track of the monotonic clock deltas between peer OSDs in order to be able to exchange timestamps across messages. We need an upper and lower bound on this delta, depending on the context where it is used. We can use the existing ping messages to get this by assuming that a ping message is sent instantly to get a bound, and to share our delta in a follow-up reply to share the other bound. The ping sender will get both bounds with a single ping + ping_reply exchange. The ping receiver will get the delta value from the next round's ping. Include up_from in the ping messages to ensure we don't mix up different instances of the same OSD. Signed-off-by: Sage Weil --- src/messages/MOSDPing.h | 52 ++++++++++++--- src/osd/OSD.cc | 141 ++++++++++++++++++++++++++++++---------- src/osd/OSD.h | 15 +++-- src/osd/PeeringState.h | 93 ++++++++++++++++++++++++++ src/osd/Session.h | 5 ++ 5 files changed, 260 insertions(+), 46 deletions(-) diff --git a/src/messages/MOSDPing.h b/src/messages/MOSDPing.h index 520f2416bebc0..f95fe0a613e71 100644 --- a/src/messages/MOSDPing.h +++ b/src/messages/MOSDPing.h @@ -34,7 +34,7 @@ class MOSDPing : public Message { private: - static constexpr int HEAD_VERSION = 4; + static constexpr int HEAD_VERSION = 5; static constexpr int COMPAT_VERSION = 4; public: @@ -61,12 +61,29 @@ private: uuid_d fsid; epoch_t map_epoch = 0; __u8 op = 0; - utime_t stamp; + utime_t ping_stamp; ///< when the PING was sent + ceph::signedspan mono_ping_stamp; ///< relative to sender's clock + ceph::signedspan mono_send_stamp; ///< replier's send stamp + std::optional delta_ub; ///< ping sender + epoch_t up_from = 0; + uint32_t min_message_size = 0; - MOSDPing(const uuid_d& f, epoch_t e, __u8 o, utime_t s, uint32_t min_message) + MOSDPing(const uuid_d& f, epoch_t e, __u8 o, + utime_t s, + ceph::signedspan ms, + ceph::signedspan mss, + epoch_t upf, + uint32_t min_message, + std::optional delta_ub = {}) : Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION}, - fsid(f), map_epoch(e), op(o), stamp(s), min_message_size(min_message) + fsid(f), map_epoch(e), op(o), + ping_stamp(s), + mono_ping_stamp(ms), + mono_send_stamp(mss), + delta_ub(delta_ub), + up_from(upf), + min_message_size(min_message) { } MOSDPing() : Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION} @@ -80,11 +97,19 @@ public: decode(fsid, p); decode(map_epoch, p); decode(op, p); - decode(stamp, p); + decode(ping_stamp, p); int payload_mid_length = p.get_off(); uint32_t size; decode(size, p); + + if (header.version >= 5) { + decode(up_from, p); + decode(mono_ping_stamp, p); + decode(mono_send_stamp, p); + decode(delta_ub, p); + } + p.advance(size); min_message_size = size + payload_mid_length; } @@ -93,13 +118,19 @@ public: encode(fsid, payload); encode(map_epoch, payload); encode(op, payload); - encode(stamp, payload); + encode(ping_stamp, payload); size_t s = 0; if (min_message_size > payload.length()) { s = min_message_size - payload.length(); } encode((uint32_t)s, payload); + + encode(up_from, payload); + encode(mono_ping_stamp, payload); + encode(mono_send_stamp, payload); + encode(delta_ub, payload); + if (s) { // this should be big enough for normal min_message padding sizes. since // we are targeting jumbo ethernet frames around 9000 bytes, 16k should @@ -120,8 +151,13 @@ public: void print(ostream& out) const override { out << "osd_ping(" << get_op_name(op) << " e" << map_epoch - << " stamp " << stamp - << ")"; + << " up_from " << up_from + << " ping_stamp " << ping_stamp << "/" << mono_ping_stamp + << " send_stamp " << mono_send_stamp; + if (delta_ub) { + out << " delta_ub " << *delta_ub; + } + out << ")"; } private: template diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 83d2e50643b90..200021fa3a845 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -426,6 +426,18 @@ void OSDService::need_heartbeat_peer_update() osd->need_heartbeat_peer_update(); } +HeartbeatStampsRef OSDService::get_hb_stamps(unsigned peer) +{ + std::lock_guard l(hb_stamp_lock); + if (peer >= hb_stamps.size()) { + hb_stamps.resize(peer + 1); + } + if (!hb_stamps[peer]) { + hb_stamps[peer].reset(new HeartbeatStamps(peer)); + } + return hb_stamps[peer]; +} + void OSDService::start_shutdown() { { @@ -4337,24 +4349,31 @@ void OSD::_add_heartbeat_peer(int p) pair cons = service.get_con_osd_hb(p, osdmap->get_epoch()); if (!cons.first) return; + assert(cons.second); + hi = &heartbeat_peers[p]; hi->peer = p; - RefCountedPtr s{new HeartbeatSession{p}, false}; + + auto stamps = service.get_hb_stamps(p); + + Session *sb = new Session(cct, cons.first.get()); + sb->peer = p; + sb->stamps = stamps; + RefCountedPtr sbref{sb, false}; hi->con_back = cons.first.get(); - hi->con_back->set_priv(s); - if (cons.second) { - hi->con_front = cons.second.get(); - hi->con_front->set_priv(s); - dout(10) << "_add_heartbeat_peer: new peer osd." << p - << " " << hi->con_back->get_peer_addr() - << " " << hi->con_front->get_peer_addr() - << dendl; - } else { - hi->con_front.reset(NULL); - dout(10) << "_add_heartbeat_peer: new peer osd." << p - << " " << hi->con_back->get_peer_addr() - << dendl; - } + hi->con_back->set_priv(sbref); + + Session *sf = new Session(cct, cons.second.get()); + sf->peer = p; + sf->stamps = stamps; + RefCountedPtr sfref{sf, false}; + hi->con_front = cons.second.get(); + hi->con_front->set_priv(sfref); + + dout(10) << "_add_heartbeat_peer: new peer osd." << p + << " " << hi->con_back->get_peer_addr() + << " " << hi->con_front->get_peer_addr() + << dendl; } else { hi = &i->second; } @@ -4518,7 +4537,8 @@ void OSD::handle_osd_ping(MOSDPing *m) { if (superblock.cluster_fsid != m->fsid) { dout(20) << "handle_osd_ping from " << m->get_source_inst() - << " bad fsid " << m->fsid << " != " << superblock.cluster_fsid << dendl; + << " bad fsid " << m->fsid << " != " << superblock.cluster_fsid + << dendl; m->put(); return; } @@ -4533,6 +4553,7 @@ void OSD::handle_osd_ping(MOSDPing *m) } utime_t now = ceph_clock_now(); + auto mnow = service.get_mnow(); ConnectionRef con(m->get_connection()); OSDMapRef curmap = service.get_osdmap(); if (!curmap) { @@ -4541,6 +4562,18 @@ void OSD::handle_osd_ping(MOSDPing *m) return; } + auto sref = con->get_priv(); + Session *s = static_cast(sref.get()); + if (!s) { + heartbeat_lock.unlock(); + m->put(); + return; + } + if (!s->stamps) { + s->peer = from; + s->stamps = service.get_hb_stamps(from); + } + switch (m->op) { case MOSDPing::PING: @@ -4569,6 +4602,15 @@ void OSD::handle_osd_ping(MOSDPing *m) } } + ceph::signedspan sender_delta_ub; + s->stamps->got_ping( + m->up_from, + mnow, + m->mono_send_stamp, + m->delta_ub, + &sender_delta_ub); + dout(20) << __func__ << " new stamps " << *s->stamps << dendl; + if (!cct->get_heartbeat_map()->is_healthy()) { dout(10) << "internal heartbeat not healthy, dropping ping request" << dendl; @@ -4577,8 +4619,13 @@ void OSD::handle_osd_ping(MOSDPing *m) Message *r = new MOSDPing(monc->get_fsid(), curmap->get_epoch(), - MOSDPing::PING_REPLY, m->stamp, - cct->_conf->osd_heartbeat_min_size); + MOSDPing::PING_REPLY, + m->ping_stamp, + m->mono_ping_stamp, + mnow, + service.get_up_epoch(), + cct->_conf->osd_heartbeat_min_size, + sender_delta_ub); con->send_message(r); if (curmap->is_up(from)) { @@ -4595,7 +4642,10 @@ void OSD::handle_osd_ping(MOSDPing *m) Message *r = new MOSDPing(monc->get_fsid(), curmap->get_epoch(), MOSDPing::YOU_DIED, - m->stamp, + m->ping_stamp, + m->mono_ping_stamp, + mnow, + service.get_up_epoch(), cct->_conf->osd_heartbeat_min_size); con->send_message(r); } @@ -4606,7 +4656,7 @@ void OSD::handle_osd_ping(MOSDPing *m) { map::iterator i = heartbeat_peers.find(from); if (i != heartbeat_peers.end()) { - auto acked = i->second.ping_history.find(m->stamp); + auto acked = i->second.ping_history.find(m->ping_stamp); if (acked != i->second.ping_history.end()) { int &unacknowledged = acked->second.second; if (con == i->second.con_back) { @@ -4642,7 +4692,7 @@ void OSD::handle_osd_ping(MOSDPing *m) if (unacknowledged == 0) { // succeeded in getting all replies dout(25) << "handle_osd_ping got all replies from osd." << from - << " , erase pending ping(sent at " << m->stamp << ")" + << " , erase pending ping(sent at " << m->ping_stamp << ")" << " and older pending ping(s)" << dendl; i->second.ping_history.erase(i->second.ping_history.begin(), ++acked); @@ -4669,7 +4719,7 @@ void OSD::handle_osd_ping(MOSDPing *m) } } else { // old replies, deprecated by newly sent pings. - dout(10) << "handle_osd_ping no pending ping(sent at " << m->stamp + dout(10) << "handle_osd_ping no pending ping(sent at " << m->ping_stamp << ") is found, treat as covered by newly sent pings " << "and ignore" << dendl; @@ -4686,6 +4736,12 @@ void OSD::handle_osd_ping(MOSDPing *m) } } } + + s->stamps->got_ping_reply( + mnow, + m->mono_send_stamp, + m->delta_ub); + dout(20) << __func__ << " new stamps " << *s->stamps << dendl; } break; @@ -4807,6 +4863,7 @@ void OSD::heartbeat() service.check_full_status(ratio, pratio); utime_t now = ceph_clock_now(); + auto mnow = service.get_mnow(); utime_t deadline = now; deadline += cct->_conf->osd_heartbeat_grace; @@ -4815,22 +4872,40 @@ void OSD::heartbeat() i != heartbeat_peers.end(); ++i) { int peer = i->first; + dout(30) << "heartbeat sending ping to osd." << peer << dendl; + i->second.last_tx = now; if (i->second.first_tx == utime_t()) i->second.first_tx = now; i->second.ping_history[now] = make_pair(deadline, HeartbeatInfo::HEARTBEAT_MAX_CONN); - dout(30) << "heartbeat sending ping to osd." << peer << dendl; - i->second.con_back->send_message(new MOSDPing(monc->get_fsid(), - service.get_osdmap_epoch(), - MOSDPing::PING, now, - cct->_conf->osd_heartbeat_min_size)); + + Session *s = static_cast(i->second.con_back->get_priv().get()); + std::optional delta_ub; + s->stamps->sent_ping(&delta_ub); + + i->second.con_back->send_message( + new MOSDPing(monc->get_fsid(), + service.get_osdmap_epoch(), + MOSDPing::PING, + now, + mnow, + mnow, + service.get_up_epoch(), + cct->_conf->osd_heartbeat_min_size, + delta_ub)); if (i->second.con_front) - i->second.con_front->send_message(new MOSDPing(monc->get_fsid(), - service.get_osdmap_epoch(), - MOSDPing::PING, now, - cct->_conf->osd_heartbeat_min_size)); + i->second.con_front->send_message( + new MOSDPing(monc->get_fsid(), + service.get_osdmap_epoch(), + MOSDPing::PING, + now, + mnow, + mnow, + service.get_up_epoch(), + cct->_conf->osd_heartbeat_min_size, + delta_ub)); } logger->set(l_osd_hb_to, heartbeat_peers.size()); @@ -4857,8 +4932,8 @@ bool OSD::heartbeat_reset(Connection *con) if (is_stopping()) { return true; } - auto heartbeat_session = static_cast(s.get()); - auto p = heartbeat_peers.find(heartbeat_session->peer); + auto session = static_cast(s.get()); + auto p = heartbeat_peers.find(session->peer); if (p != heartbeat_peers.end() && (p->second.con_back == con || p->second.con_front == con)) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 6b144885e5553..5b2a5f21078a2 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -850,6 +850,15 @@ public: void request_osdmap_update(epoch_t e); + // -- heartbeats -- + ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock"); + + /// osd -> heartbeat stamps + vector hb_stamps; + + /// get or create a ref for a peer's HeartbeatStamps + HeartbeatStampsRef get_hb_stamps(unsigned osd); + // -- stopping -- ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock"); ceph::condition_variable is_stopping_cond; @@ -1436,11 +1445,7 @@ private: return !is_unhealthy(now); } }; - /// state attached to outgoing heartbeat connections - struct HeartbeatSession : public RefCountedObject { - int peer; - explicit HeartbeatSession(int p) : peer(p) {} - }; + ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock"); map debug_heartbeat_drops_remaining; ceph::condition_variable heartbeat_cond; diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 3093697572227..d28d3350b93bb 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -78,6 +78,99 @@ struct BufferedRecoveryMessages { } }; +struct HeartbeatStamps : public RefCountedObject { + mutable ceph::mutex lock = ceph::make_mutex("HeartbeatStamps::lock"); + + const int osd; + + // we maintain an upper and lower bound on the delta between our local + // mono_clock time (minus the startup_time) to the peer OSD's mono_clock + // time (minus its startup_time). + // + // delta is (remote_clock_time - local_clock_time), so that + // local_time + delta -> peer_time, and peer_time - delta -> local_time. + // + // we have an upper and lower bound value on this delta, meaning the + // value of the remote clock is somewhere between [my_time + lb, my_time + ub] + // + // conversely, if we have a remote timestamp T, then that is + // [T - ub, T - lb] in terms of the local clock. i.e., if you are + // substracting the delta, then take care that you swap the role of the + // lb and ub values. + + /// lower bound on peer clock - local clock + std::optional peer_clock_delta_lb; + + /// upper bound on peer clock - local clock + std::optional peer_clock_delta_ub; + + /// highest up_from we've seen from this rank + epoch_t up_from = 0; + + HeartbeatStamps(int o) + : RefCountedObject(NULL, 0), + osd(o) {} + + void print(ostream& out) const { + std::lock_guard l(lock); + out << "hbstamp(osd." << osd << " up_from " << up_from + << " peer_clock_delta ["; + if (peer_clock_delta_lb) { + out << *peer_clock_delta_lb; + } + out << ","; + if (peer_clock_delta_ub) { + out << *peer_clock_delta_ub; + } + out << "])"; + } + + void sent_ping(std::optional *delta_ub) { + std::lock_guard l(lock); + // the non-primaries need a lower bound on remote clock - local clock. if + // we assume the transit for the last ping_reply was + // instantaneous, that would be (the negative of) our last + // peer_clock_delta_lb value. + if (peer_clock_delta_lb) { + *delta_ub = - *peer_clock_delta_lb; + } + } + + void got_ping(epoch_t this_up_from, + ceph::signedspan now, + ceph::signedspan peer_send_stamp, + std::optional delta_ub, + ceph::signedspan *out_delta_ub) { + std::lock_guard l(lock); + if (this_up_from < up_from) { + return; + } + if (this_up_from > up_from) { + up_from = this_up_from; + } + peer_clock_delta_lb = peer_send_stamp - now; + peer_clock_delta_ub = delta_ub; + *out_delta_ub = - *peer_clock_delta_lb; + } + + void got_ping_reply(ceph::signedspan now, + ceph::signedspan peer_send_stamp, + std::optional delta_ub) { + std::lock_guard l(lock); + peer_clock_delta_lb = peer_send_stamp - now; + peer_clock_delta_ub = delta_ub; + } + +}; +typedef boost::intrusive_ptr HeartbeatStampsRef; + +inline ostream& operator<<(ostream& out, const HeartbeatStamps& hb) +{ + hb.print(out); + return out; +} + + struct PeeringCtx : BufferedRecoveryMessages { ObjectStore::Transaction transaction; HBHandle* handle = nullptr; diff --git a/src/osd/Session.h b/src/osd/Session.h index ec01e0018e28d..432d3ee87443e 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -22,6 +22,7 @@ #include "OSDCap.h" #include "Watch.h" #include "OSDMap.h" +#include "PeeringState.h" //#define PG_DEBUG_REFS @@ -148,6 +149,10 @@ struct Session : public RefCountedObject { std::atomic backoff_seq = {0}; + // for heartbeat connections only + int peer = -1; + HeartbeatStampsRef stamps; + explicit Session(CephContext *cct, Connection *con_) : RefCountedObject(cct), con(con_), -- 2.39.5