agent_lock.Unlock();
}
+void OSDService::request_osdmap_update(epoch_t e)
+{
+ osd->osdmap_subscribe(e, false);
+}
+
class AgentTimeoutCB : public Context {
PGRef pg;
public:
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", "tp_osd_cmd", 1),
session_waiting_lock("OSD::session_waiting_lock"),
+ osdmap_subscribe_lock("OSD::osdmap_subscribe_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false),
heartbeat_need_update(true),
void OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
- OSDMapRef osdmap = service.get_osdmap();
- if (osdmap->get_epoch() >= epoch)
+ Mutex::Locker l(osdmap_subscribe_lock);
+ if (latest_subscribed_epoch >= epoch && !force_request)
return;
+ latest_subscribed_epoch = MAX(epoch, latest_subscribed_epoch);
+
if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
monc->renew_subs();
return ret;
}
+ void request_osdmap_update(epoch_t e);
+
// -- stopping --
Mutex is_stopping_lock;
Cond is_stopping_cond;
void osdmap_subscribe(version_t epoch, bool force_request);
/** @} monc helpers */
+ Mutex osdmap_subscribe_lock;
+ epoch_t latest_subscribed_epoch{0};
+
// -- heartbeat --
/// information about a heartbeat peer
struct HeartbeatInfo {
<< p->first << " not empty, queueing" << dendl;
p->second.push_back(op);
op->mark_delayed("waiting_for_map not empty");
+ osd->request_osdmap_update(op->min_epoch);
return;
}
if (!have_same_or_newer_map(op->min_epoch)) {