publish_lock{ceph::make_mutex("OSDService::publish_lock")},
pre_publish_lock{ceph::make_mutex("OSDService::pre_publish_lock")},
max_oldest_map(0),
- peer_map_epoch_lock("OSDService::peer_map_epoch_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
agent_lock("OSDService::agent_lock"),
}
ConnectionRef peer_con = osd->cluster_messenger->connect_to_osd(
next_map->get_cluster_addrs(peer));
- share_map_peer(peer, peer_con.get(), next_map);
+ maybe_share_map(peer_con.get(), next_map);
peer_con->send_message(m);
release_map(next_map);
}
// --------------------------------------
// dispatch
-epoch_t OSDService::get_peer_epoch(int peer)
-{
- std::lock_guard l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p == peer_map_epoch.end())
- return 0;
- return p->second;
-}
-
-epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
-{
- std::lock_guard l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second < e) {
- dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
- p->second = e;
- } else {
- dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
- }
- return p->second;
- } else {
- dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
- peer_map_epoch[peer] = e;
- return e;
- }
-}
-
-void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
-{
- std::lock_guard l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second <= as_of) {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " had " << p->second << dendl;
- peer_map_epoch.erase(p);
- } else {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " has " << p->second << " - not forgetting" << dendl;
- }
- }
-}
-
-bool OSDService::should_share_map(entity_name_t name, Connection *con,
- epoch_t epoch, const OSDMapRef& osdmap,
- const epoch_t *sent_epoch_p)
-{
- dout(20) << "should_share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
-
- // does client have old map?
- if (name.is_client()) {
- bool message_sendmap = epoch < osdmap->get_epoch();
- if (message_sendmap && sent_epoch_p) {
- dout(20) << "client session last_sent_epoch: "
- << *sent_epoch_p
- << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
- if (*sent_epoch_p < osdmap->get_epoch()) {
- return true;
- } // else we don't need to send it out again
- }
- }
-
- if (con->get_messenger() == osd->cluster_messenger &&
- con != osd->cluster_messenger->get_loopback_connection() &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addrs(name.num()) == con->get_peer_addrs() ||
- osdmap->get_hb_back_addrs(name.num()) == con->get_peer_addrs())) {
- // remember
- epoch_t has = std::max(get_peer_epoch(name.num()), epoch);
-
- // share?
- if (has < osdmap->get_epoch()) {
- dout(10) << name << " " << con->get_peer_addr()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- return true;
- }
- }
-
- return false;
-}
-
-void OSDService::share_map(
- entity_name_t name,
- Connection *con,
- epoch_t epoch,
- OSDMapRef& osdmap,
- epoch_t *sent_epoch_p)
-{
- dout(20) << "share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
-
- if (!osd->is_active()) {
- /*It is safe not to proceed as OSD is not in healthy state*/
- return;
- }
-
- bool want_shared = should_share_map(name, con, epoch,
- osdmap, sent_epoch_p);
-
- if (want_shared){
- if (name.is_client()) {
- dout(10) << name << " has old map " << epoch
- << " < " << osdmap->get_epoch() << dendl;
- // we know the Session is valid or we wouldn't be sending
- if (sent_epoch_p) {
- *sent_epoch_p = osdmap->get_epoch();
- }
- send_incremental_map(epoch, con, osdmap);
- } else if (con->get_messenger() == osd->cluster_messenger &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addrs(name.num()) == con->get_peer_addrs() ||
- osdmap->get_hb_back_addrs(name.num()) == con->get_peer_addrs())) {
- dout(10) << name << " " << con->get_peer_addrs()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- note_peer_epoch(name.num(), osdmap->get_epoch());
- send_incremental_map(epoch, con, osdmap);
- }
- }
-}
-
-void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
-{
- if (!map)
- map = get_osdmap();
-
- // send map?
- epoch_t pe = get_peer_epoch(peer);
- if (pe) {
- if (pe < map->get_epoch()) {
- send_incremental_map(pe, con, map);
- note_peer_epoch(peer, map->get_epoch());
- } else
- dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
- } else {
- dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
- // no idea about peer's epoch.
- // ??? send recent ???
- // do nothing.
- }
-}
-
bool OSDService::can_inc_scrubs_pending()
{
bool can_inc = false;
}
void OSDService::send_incremental_map(epoch_t since, Connection *con,
- OSDMapRef& osdmap)
+ const OSDMapRef& osdmap)
{
epoch_t to = osdmap->get_epoch();
dout(10) << "send_incremental_map " << since << " -> " << to
m->get_connection()->send_message(r);
if (curmap->is_up(from)) {
- service.note_peer_epoch(from, m->map_epoch);
if (is_active()) {
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- service.share_map_peer(from, con.get());
+ service.maybe_share_map(con.get(), get_osdmap(), m->map_epoch);
}
}
} else if (!curmap->exists(from) ||
if (m->map_epoch &&
curmap->is_up(from)) {
- service.note_peer_epoch(from, m->map_epoch);
if (is_active()) {
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- service.share_map_peer(from, con.get());
+ service.maybe_share_map(con.get(), get_osdmap(), m->map_epoch);
}
}
}
return true;
}
-void OSD::maybe_share_map(
- Session *session,
- OpRequestRef op,
- OSDMapRef osdmap)
+void OSDService::maybe_share_map(
+ Connection *con,
+ const OSDMapRef& osdmap,
+ epoch_t peer_epoch_lb)
{
- if (!op->check_send_map) {
+ // NOTE: we assume caller hold something that keeps the Connection itself
+ // pinned (e.g., an OpRequest's MessageRef).
+ auto priv = con->get_priv();
+ auto session = static_cast<Session*>(priv.get());
+ if (!session) {
return;
}
- epoch_t last_sent_epoch = 0;
+ // assume the peer has the newer of the op's sent_epoch and what
+ // we think we sent them.
session->sent_epoch_lock.lock();
- last_sent_epoch = session->last_sent_epoch;
+ if (peer_epoch_lb > session->last_sent_epoch) {
+ dout(10) << __func__ << " con " << con
+ << " " << con->get_peer_addr()
+ << " map epoch " << session->last_sent_epoch
+ << " -> " << peer_epoch_lb << " (as per caller)" << dendl;
+ session->last_sent_epoch = peer_epoch_lb;
+ }
+ epoch_t last_sent_epoch = session->last_sent_epoch;
session->sent_epoch_lock.unlock();
- // assume the peer has the newer of the op's sent_epoch and what
- // we think we sent them.
- epoch_t from = std::max(last_sent_epoch, op->sent_epoch);
+ if (osdmap->get_epoch() <= last_sent_epoch) {
+ return;
+ }
- const Message *m = op->get_req();
- service.share_map(
- m->get_source(),
- m->get_connection().get(),
- from,
- osdmap,
- session ? &last_sent_epoch : NULL);
+ send_incremental_map(last_sent_epoch, con, osdmap);
+ last_sent_epoch = osdmap->get_epoch();
session->sent_epoch_lock.lock();
if (session->last_sent_epoch < last_sent_epoch) {
+ dout(10) << __func__ << " con " << con
+ << " " << con->get_peer_addr()
+ << " map epoch " << session->last_sent_epoch
+ << " -> " << last_sent_epoch << " (shared)" << dendl;
session->last_sent_epoch = last_sent_epoch;
}
session->sent_epoch_lock.unlock();
-
- op->check_send_map = false;
}
void OSD::dispatch_session_waiting(SessionRef session, OSDMapRef osdmap)
void OSD::note_up_osd(int peer)
{
- service.forget_peer_epoch(peer, osdmap->get_epoch() - 1);
heartbeat_set_peers_need_update();
}
<< " (NULL con)" << dendl;
continue;
}
- service.share_map_peer(it->first, con.get(), curmap);
+ service.maybe_share_map(con.get(), curmap);
dout(7) << __func__ << " osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
<< " (NULL con)" << dendl;
continue;
}
- service.share_map_peer(who, con.get(), curmap);
+ service.maybe_share_map(con.get(), curmap);
dout(7) << __func__ << " querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(),
<< " (NULL con)" << dendl;
continue;
}
- service.share_map_peer(p->first, con.get(), curmap);
+ service.maybe_share_map(con.get(), curmap);
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
con->send_message(m);
PastIntervals()));
m = new MOSDPGNotify(osdmap->get_epoch(), std::move(ls));
}
- service.share_map_peer(q.from.osd, con.get(), osdmap);
+ service.maybe_share_map(con.get(), osdmap);
con->send_message(m);
}
}
logger->tinc(l_osd_op_before_dequeue_op_lat, latency);
- auto priv = op->get_req()->get_connection()->get_priv();
- if (auto session = static_cast<Session *>(priv.get()); session) {
- maybe_share_map(session, op, pg->get_osdmap());
- }
+ service.maybe_share_map(m->get_connection().get(),
+ pg->get_osdmap(),
+ op->sent_epoch);
if (pg->is_deleting())
return;
<< ", dropping " << qi << dendl;
// share map with client?
if (boost::optional<OpRequestRef> _op = qi.maybe_get_op()) {
- auto priv = (*_op)->get_req()->get_connection()->get_priv();
- if (auto session = static_cast<Session *>(priv.get()); session) {
- osd->maybe_share_map(session, *_op, sdata->shard_osdmap);
- }
+ osd->service.maybe_share_map((*_op)->get_req()->get_connection().get(),
+ sdata->shard_osdmap,
+ (*_op)->sent_epoch);
}
unsigned pushes_to_free = qi.get_reserved_pushes();
if (pushes_to_free > 0) {