Beacon::Beacon(CephContext *cct_, MonClient *monc_, std::string name_) :
- Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock), name(name_)
+ Dispatcher(cct_), lock("Beacon"), monc(monc_), timer(g_ceph_context, lock),
+ name(name_), awaiting_seq(-1)
{
want_state = MDSMap::STATE_NULL;
last_seq = 0;
while (!seq_stamp.empty() &&
seq_stamp.begin()->first <= seq)
seq_stamp.erase(seq_stamp.begin());
+
+ // Wake a waiter up if present
+ if (awaiting_seq == seq) {
+ waiting_cond.Signal();
+ }
} else {
dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
<< " seq " << m->get_seq() << " dne" << dendl;
}
+void Beacon::send_and_wait(const double duration)
+{
+ Mutex::Locker l(lock);
+ _send();
+ awaiting_seq = last_seq;
+ dout(20) << __func__ << ": awaiting " << awaiting_seq
+ << " for up to " << duration << "s" << dendl;
+
+ utime_t timeout;
+ timeout.set_from_double(ceph_clock_now(cct) + duration);
+ while ((!seq_stamp.empty() && seq_stamp.begin()->first <= awaiting_seq)
+ && ceph_clock_now(cct) < timeout) {
+ waiting_cond.WaitUntil(lock, timeout);
+ }
+
+ awaiting_seq = -1;
+}
+
+
/**
* Call periodically, or when you have updated the desired state
*/
void _notify_mdsmap(MDSMap const *mdsmap);
void _send();
+ version_t awaiting_seq;
+ Cond waiting_cond;
+
public:
Beacon(CephContext *cct_, MonClient *monc_, std::string name);
~Beacon();
void handle_mds_beacon(MMDSBeacon *m);
void send();
+ /**
+ * Send a beacon, and block until the ack is received from the mon
+ * or `duration` seconds pass, whichever happens sooner. Useful
+ * for emitting a last message on shutdown.
+ */
+ void send_and_wait(const double duration);
+
bool is_laggy();
utime_t get_laggy_until() const;
};