class MOSDPing : public Message {
private:
- static constexpr int HEAD_VERSION = 4;
+ static constexpr int HEAD_VERSION = 5;
static constexpr int COMPAT_VERSION = 4;
public:
uuid_d fsid;
epoch_t map_epoch = 0;
__u8 op = 0;
- utime_t stamp;
+ utime_t ping_stamp; ///< when the PING was sent
+ ceph::signedspan mono_ping_stamp; ///< relative to sender's clock
+ ceph::signedspan mono_send_stamp; ///< replier's send stamp
+ std::optional<ceph::time_detail::signedspan> delta_ub; ///< ping sender
+ epoch_t up_from = 0;
+
uint32_t min_message_size = 0;
- MOSDPing(const uuid_d& f, epoch_t e, __u8 o, utime_t s, uint32_t min_message)
+ MOSDPing(const uuid_d& f, epoch_t e, __u8 o,
+ utime_t s,
+ ceph::signedspan ms,
+ ceph::signedspan mss,
+ epoch_t upf,
+ uint32_t min_message,
+ std::optional<ceph::time_detail::signedspan> delta_ub = {})
: Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION},
- fsid(f), map_epoch(e), op(o), stamp(s), min_message_size(min_message)
+ fsid(f), map_epoch(e), op(o),
+ ping_stamp(s),
+ mono_ping_stamp(ms),
+ mono_send_stamp(mss),
+ delta_ub(delta_ub),
+ up_from(upf),
+ min_message_size(min_message)
{ }
MOSDPing()
: Message{MSG_OSD_PING, HEAD_VERSION, COMPAT_VERSION}
decode(fsid, p);
decode(map_epoch, p);
decode(op, p);
- decode(stamp, p);
+ decode(ping_stamp, p);
int payload_mid_length = p.get_off();
uint32_t size;
decode(size, p);
+
+ if (header.version >= 5) {
+ decode(up_from, p);
+ decode(mono_ping_stamp, p);
+ decode(mono_send_stamp, p);
+ decode(delta_ub, p);
+ }
+
p.advance(size);
min_message_size = size + payload_mid_length;
}
encode(fsid, payload);
encode(map_epoch, payload);
encode(op, payload);
- encode(stamp, payload);
+ encode(ping_stamp, payload);
size_t s = 0;
if (min_message_size > payload.length()) {
s = min_message_size - payload.length();
}
encode((uint32_t)s, payload);
+
+ encode(up_from, payload);
+ encode(mono_ping_stamp, payload);
+ encode(mono_send_stamp, payload);
+ encode(delta_ub, payload);
+
if (s) {
// this should be big enough for normal min_message padding sizes. since
// we are targeting jumbo ethernet frames around 9000 bytes, 16k should
void print(ostream& out) const override {
out << "osd_ping(" << get_op_name(op)
<< " e" << map_epoch
- << " stamp " << stamp
- << ")";
+ << " up_from " << up_from
+ << " ping_stamp " << ping_stamp << "/" << mono_ping_stamp
+ << " send_stamp " << mono_send_stamp;
+ if (delta_ub) {
+ out << " delta_ub " << *delta_ub;
+ }
+ out << ")";
}
private:
template<class T, typename... Args>
osd->need_heartbeat_peer_update();
}
+HeartbeatStampsRef OSDService::get_hb_stamps(unsigned peer)
+{
+ std::lock_guard l(hb_stamp_lock);
+ if (peer >= hb_stamps.size()) {
+ hb_stamps.resize(peer + 1);
+ }
+ if (!hb_stamps[peer]) {
+ hb_stamps[peer].reset(new HeartbeatStamps(peer));
+ }
+ return hb_stamps[peer];
+}
+
void OSDService::start_shutdown()
{
{
pair<ConnectionRef,ConnectionRef> cons = service.get_con_osd_hb(p, osdmap->get_epoch());
if (!cons.first)
return;
+ assert(cons.second);
+
hi = &heartbeat_peers[p];
hi->peer = p;
- RefCountedPtr s{new HeartbeatSession{p}, false};
+
+ auto stamps = service.get_hb_stamps(p);
+
+ Session *sb = new Session(cct, cons.first.get());
+ sb->peer = p;
+ sb->stamps = stamps;
+ RefCountedPtr sbref{sb, false};
hi->con_back = cons.first.get();
- hi->con_back->set_priv(s);
- if (cons.second) {
- hi->con_front = cons.second.get();
- hi->con_front->set_priv(s);
- dout(10) << "_add_heartbeat_peer: new peer osd." << p
- << " " << hi->con_back->get_peer_addr()
- << " " << hi->con_front->get_peer_addr()
- << dendl;
- } else {
- hi->con_front.reset(NULL);
- dout(10) << "_add_heartbeat_peer: new peer osd." << p
- << " " << hi->con_back->get_peer_addr()
- << dendl;
- }
+ hi->con_back->set_priv(sbref);
+
+ Session *sf = new Session(cct, cons.second.get());
+ sf->peer = p;
+ sf->stamps = stamps;
+ RefCountedPtr sfref{sf, false};
+ hi->con_front = cons.second.get();
+ hi->con_front->set_priv(sfref);
+
+ dout(10) << "_add_heartbeat_peer: new peer osd." << p
+ << " " << hi->con_back->get_peer_addr()
+ << " " << hi->con_front->get_peer_addr()
+ << dendl;
} else {
hi = &i->second;
}
{
if (superblock.cluster_fsid != m->fsid) {
dout(20) << "handle_osd_ping from " << m->get_source_inst()
- << " bad fsid " << m->fsid << " != " << superblock.cluster_fsid << dendl;
+ << " bad fsid " << m->fsid << " != " << superblock.cluster_fsid
+ << dendl;
m->put();
return;
}
}
utime_t now = ceph_clock_now();
+ auto mnow = service.get_mnow();
ConnectionRef con(m->get_connection());
OSDMapRef curmap = service.get_osdmap();
if (!curmap) {
return;
}
+ auto sref = con->get_priv();
+ Session *s = static_cast<Session*>(sref.get());
+ if (!s) {
+ heartbeat_lock.unlock();
+ m->put();
+ return;
+ }
+ if (!s->stamps) {
+ s->peer = from;
+ s->stamps = service.get_hb_stamps(from);
+ }
+
switch (m->op) {
case MOSDPing::PING:
}
}
+ ceph::signedspan sender_delta_ub;
+ s->stamps->got_ping(
+ m->up_from,
+ mnow,
+ m->mono_send_stamp,
+ m->delta_ub,
+ &sender_delta_ub);
+ dout(20) << __func__ << " new stamps " << *s->stamps << dendl;
+
if (!cct->get_heartbeat_map()->is_healthy()) {
dout(10) << "internal heartbeat not healthy, dropping ping request"
<< dendl;
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
- MOSDPing::PING_REPLY, m->stamp,
- cct->_conf->osd_heartbeat_min_size);
+ MOSDPing::PING_REPLY,
+ m->ping_stamp,
+ m->mono_ping_stamp,
+ mnow,
+ service.get_up_epoch(),
+ cct->_conf->osd_heartbeat_min_size,
+ sender_delta_ub);
con->send_message(r);
if (curmap->is_up(from)) {
Message *r = new MOSDPing(monc->get_fsid(),
curmap->get_epoch(),
MOSDPing::YOU_DIED,
- m->stamp,
+ m->ping_stamp,
+ m->mono_ping_stamp,
+ mnow,
+ service.get_up_epoch(),
cct->_conf->osd_heartbeat_min_size);
con->send_message(r);
}
{
map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(from);
if (i != heartbeat_peers.end()) {
- auto acked = i->second.ping_history.find(m->stamp);
+ auto acked = i->second.ping_history.find(m->ping_stamp);
if (acked != i->second.ping_history.end()) {
int &unacknowledged = acked->second.second;
if (con == i->second.con_back) {
if (unacknowledged == 0) {
// succeeded in getting all replies
dout(25) << "handle_osd_ping got all replies from osd." << from
- << " , erase pending ping(sent at " << m->stamp << ")"
+ << " , erase pending ping(sent at " << m->ping_stamp << ")"
<< " and older pending ping(s)"
<< dendl;
i->second.ping_history.erase(i->second.ping_history.begin(), ++acked);
}
} else {
// old replies, deprecated by newly sent pings.
- dout(10) << "handle_osd_ping no pending ping(sent at " << m->stamp
+ dout(10) << "handle_osd_ping no pending ping(sent at " << m->ping_stamp
<< ") is found, treat as covered by newly sent pings "
<< "and ignore"
<< dendl;
}
}
}
+
+ s->stamps->got_ping_reply(
+ mnow,
+ m->mono_send_stamp,
+ m->delta_ub);
+ dout(20) << __func__ << " new stamps " << *s->stamps << dendl;
}
break;
service.check_full_status(ratio, pratio);
utime_t now = ceph_clock_now();
+ auto mnow = service.get_mnow();
utime_t deadline = now;
deadline += cct->_conf->osd_heartbeat_grace;
i != heartbeat_peers.end();
++i) {
int peer = i->first;
+ dout(30) << "heartbeat sending ping to osd." << peer << dendl;
+
i->second.last_tx = now;
if (i->second.first_tx == utime_t())
i->second.first_tx = now;
i->second.ping_history[now] = make_pair(deadline,
HeartbeatInfo::HEARTBEAT_MAX_CONN);
- dout(30) << "heartbeat sending ping to osd." << peer << dendl;
- i->second.con_back->send_message(new MOSDPing(monc->get_fsid(),
- service.get_osdmap_epoch(),
- MOSDPing::PING, now,
- cct->_conf->osd_heartbeat_min_size));
+
+ Session *s = static_cast<Session*>(i->second.con_back->get_priv().get());
+ std::optional<ceph::signedspan> delta_ub;
+ s->stamps->sent_ping(&delta_ub);
+
+ i->second.con_back->send_message(
+ new MOSDPing(monc->get_fsid(),
+ service.get_osdmap_epoch(),
+ MOSDPing::PING,
+ now,
+ mnow,
+ mnow,
+ service.get_up_epoch(),
+ cct->_conf->osd_heartbeat_min_size,
+ delta_ub));
if (i->second.con_front)
- i->second.con_front->send_message(new MOSDPing(monc->get_fsid(),
- service.get_osdmap_epoch(),
- MOSDPing::PING, now,
- cct->_conf->osd_heartbeat_min_size));
+ i->second.con_front->send_message(
+ new MOSDPing(monc->get_fsid(),
+ service.get_osdmap_epoch(),
+ MOSDPing::PING,
+ now,
+ mnow,
+ mnow,
+ service.get_up_epoch(),
+ cct->_conf->osd_heartbeat_min_size,
+ delta_ub));
}
logger->set(l_osd_hb_to, heartbeat_peers.size());
if (is_stopping()) {
return true;
}
- auto heartbeat_session = static_cast<HeartbeatSession*>(s.get());
- auto p = heartbeat_peers.find(heartbeat_session->peer);
+ auto session = static_cast<Session*>(s.get());
+ auto p = heartbeat_peers.find(session->peer);
if (p != heartbeat_peers.end() &&
(p->second.con_back == con ||
p->second.con_front == con)) {
void request_osdmap_update(epoch_t e);
+ // -- heartbeats --
+ ceph::mutex hb_stamp_lock = ceph::make_mutex("OSDServce::hb_stamp_lock");
+
+ /// osd -> heartbeat stamps
+ vector<HeartbeatStampsRef> hb_stamps;
+
+ /// get or create a ref for a peer's HeartbeatStamps
+ HeartbeatStampsRef get_hb_stamps(unsigned osd);
+
// -- stopping --
ceph::mutex is_stopping_lock = ceph::make_mutex("OSDService::is_stopping_lock");
ceph::condition_variable is_stopping_cond;
return !is_unhealthy(now);
}
};
- /// state attached to outgoing heartbeat connections
- struct HeartbeatSession : public RefCountedObject {
- int peer;
- explicit HeartbeatSession(int p) : peer(p) {}
- };
+
ceph::mutex heartbeat_lock = ceph::make_mutex("OSD::heartbeat_lock");
map<int, int> debug_heartbeat_drops_remaining;
ceph::condition_variable heartbeat_cond;
}
};
+struct HeartbeatStamps : public RefCountedObject {
+ mutable ceph::mutex lock = ceph::make_mutex("HeartbeatStamps::lock");
+
+ const int osd;
+
+ // we maintain an upper and lower bound on the delta between our local
+ // mono_clock time (minus the startup_time) to the peer OSD's mono_clock
+ // time (minus its startup_time).
+ //
+ // delta is (remote_clock_time - local_clock_time), so that
+ // local_time + delta -> peer_time, and peer_time - delta -> local_time.
+ //
+ // we have an upper and lower bound value on this delta, meaning the
+ // value of the remote clock is somewhere between [my_time + lb, my_time + ub]
+ //
+ // conversely, if we have a remote timestamp T, then that is
+ // [T - ub, T - lb] in terms of the local clock. i.e., if you are
+ // substracting the delta, then take care that you swap the role of the
+ // lb and ub values.
+
+ /// lower bound on peer clock - local clock
+ std::optional<ceph::signedspan> peer_clock_delta_lb;
+
+ /// upper bound on peer clock - local clock
+ std::optional<ceph::signedspan> peer_clock_delta_ub;
+
+ /// highest up_from we've seen from this rank
+ epoch_t up_from = 0;
+
+ HeartbeatStamps(int o)
+ : RefCountedObject(NULL, 0),
+ osd(o) {}
+
+ void print(ostream& out) const {
+ std::lock_guard l(lock);
+ out << "hbstamp(osd." << osd << " up_from " << up_from
+ << " peer_clock_delta [";
+ if (peer_clock_delta_lb) {
+ out << *peer_clock_delta_lb;
+ }
+ out << ",";
+ if (peer_clock_delta_ub) {
+ out << *peer_clock_delta_ub;
+ }
+ out << "])";
+ }
+
+ void sent_ping(std::optional<ceph::signedspan> *delta_ub) {
+ std::lock_guard l(lock);
+ // the non-primaries need a lower bound on remote clock - local clock. if
+ // we assume the transit for the last ping_reply was
+ // instantaneous, that would be (the negative of) our last
+ // peer_clock_delta_lb value.
+ if (peer_clock_delta_lb) {
+ *delta_ub = - *peer_clock_delta_lb;
+ }
+ }
+
+ void got_ping(epoch_t this_up_from,
+ ceph::signedspan now,
+ ceph::signedspan peer_send_stamp,
+ std::optional<ceph::signedspan> delta_ub,
+ ceph::signedspan *out_delta_ub) {
+ std::lock_guard l(lock);
+ if (this_up_from < up_from) {
+ return;
+ }
+ if (this_up_from > up_from) {
+ up_from = this_up_from;
+ }
+ peer_clock_delta_lb = peer_send_stamp - now;
+ peer_clock_delta_ub = delta_ub;
+ *out_delta_ub = - *peer_clock_delta_lb;
+ }
+
+ void got_ping_reply(ceph::signedspan now,
+ ceph::signedspan peer_send_stamp,
+ std::optional<ceph::signedspan> delta_ub) {
+ std::lock_guard l(lock);
+ peer_clock_delta_lb = peer_send_stamp - now;
+ peer_clock_delta_ub = delta_ub;
+ }
+
+};
+typedef boost::intrusive_ptr<HeartbeatStamps> HeartbeatStampsRef;
+
+inline ostream& operator<<(ostream& out, const HeartbeatStamps& hb)
+{
+ hb.print(out);
+ return out;
+}
+
+
struct PeeringCtx : BufferedRecoveryMessages {
ObjectStore::Transaction transaction;
HBHandle* handle = nullptr;
#include "OSDCap.h"
#include "Watch.h"
#include "OSDMap.h"
+#include "PeeringState.h"
//#define PG_DEBUG_REFS
std::atomic<uint64_t> backoff_seq = {0};
+ // for heartbeat connections only
+ int peer = -1;
+ HeartbeatStampsRef stamps;
+
explicit Session(CephContext *cct, Connection *con_) :
RefCountedObject(cct),
con(con_),