From 2120f4bb6c5ba0f066d4541a51ce1d43c8ab6881 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Mon, 4 Aug 2014 15:30:41 -0700 Subject: [PATCH] OSD: move waiting_for_pg into the session structures Each message belongs to a session. Further, no ordering is implied between messages which arrived on different sessions. Breaking the global waiting_for_pg structure into a per-session structure lets us avoid the problem of taking a write lock on a global structure (pg_map_lock) in get_pg_or_queue_for_pg at the cost of some complexity in updating each session's waiting_for_pg structure when we receive a new map (due to pg splits) or when we locally create a pg. Signed-off-by: Samuel Just --- src/osd/OSD.cc | 169 ++++++++++++++++++++++++++++++++++++------------- src/osd/OSD.h | 103 +++++++++++++++++++++++++++--- 2 files changed, 219 insertions(+), 53 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 9ac8481a09af2..5f0635b0ef060 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2767,20 +2767,30 @@ PG *OSD::_create_lock_pg( PG *OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op) { - { - RWLock::RLocker l(pg_map_lock); - ceph::unordered_map::iterator i = pg_map.find(pgid); - if (i != pg_map.end()) - return i->second; - } - RWLock::WLocker l(pg_map_lock); + Session *session = static_cast( + op->get_req()->get_connection()->get_priv()); + assert(session); + // get_pg_or_queue_for_pg is only called from the fast_dispatch path where + // the session_dispatch_lock must already be held. + assert(session->session_dispatch_lock.is_locked()); + RWLock::RLocker l(pg_map_lock); + ceph::unordered_map::iterator i = pg_map.find(pgid); - if (i != pg_map.end()) { - return i->second; + if (i == pg_map.end()) + session->waiting_for_pg[pgid]; + + map >::iterator wlistiter = + session->waiting_for_pg.find(pgid); + + PG *out = NULL; + if (wlistiter == session->waiting_for_pg.end()) { + out = i->second; } else { - waiting_for_pg[pgid].push_back(op); - return NULL; + wlistiter->second.push_back(op); + register_session_waiting_on_pg(session, pgid); } + session->put(); + return out; } bool OSD::_have_pg(spg_t pgid) @@ -5399,6 +5409,8 @@ bool OSD::ms_dispatch(Message *m) void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) { + assert(session->session_dispatch_lock.is_locked()); + assert(session->osdmap == osdmap); for (list::iterator i = session->waiting_on_map.begin(); i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); session->waiting_on_map.erase(i++)); @@ -5410,6 +5422,85 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) } } + +void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap) +{ + assert(session->session_dispatch_lock.is_locked()); + if (!session->osdmap) { + session->osdmap = newmap; + return; + } + + if (newmap->get_epoch() == session->osdmap->get_epoch()) + return; + + assert(newmap->get_epoch() > session->osdmap->get_epoch()); + + map > from; + from.swap(session->waiting_for_pg); + + for (map >::iterator i = from.begin(); + i != from.end(); + from.erase(i++)) { + set children; + if (!newmap->have_pg_pool(i->first.pool())) { + // drop this wait list on the ground + i->second.clear(); + } else { + assert(session->osdmap->have_pg_pool(i->first.pool())); + if (i->first.is_split( + session->osdmap->get_pg_num(i->first.pool()), + newmap->get_pg_num(i->first.pool()), + &children)) { + for (set::iterator child = children.begin(); + child != children.end(); + ++child) { + unsigned split_bits = child->get_split_bits( + newmap->get_pg_num(child->pool())); + list child_ops; + OSD::split_list(&i->second, &child_ops, child->ps(), split_bits); + if (!child_ops.empty()) { + session->waiting_for_pg[*child].swap(child_ops); + register_session_waiting_on_pg(session, *child); + } + } + } + } + if (i->second.empty()) { + clear_session_waiting_on_pg(session, i->first); + } else { + session->waiting_for_pg[i->first].swap(i->second); + } + } + + session->osdmap = newmap; +} + +void OSD::session_notify_pg_create( + Session *session, OSDMapRef osdmap, spg_t pgid) +{ + assert(session->session_dispatch_lock.is_locked()); + update_waiting_for_pg(session, osdmap); + map >::iterator i = + session->waiting_for_pg.find(pgid); + if (i != session->waiting_for_pg.end()) { + session->waiting_on_map.splice( + session->waiting_on_map.end(), + i->second); + session->waiting_for_pg.erase(i); + } + clear_session_waiting_on_pg(session, pgid); +} + +void OSD::session_notify_pg_cleared( + Session *session, OSDMapRef osdmap, spg_t pgid) +{ + assert(session->session_dispatch_lock.is_locked()); + update_waiting_for_pg(session, osdmap); + session->waiting_for_pg.erase(pgid); + clear_session_waiting_on_pg(session, pgid); +} + void OSD::ms_fast_dispatch(Message *m) { if (service.is_stopping()) { @@ -5422,6 +5513,7 @@ void OSD::ms_fast_dispatch(Message *m) assert(session); { Mutex::Locker l(session->session_dispatch_lock); + update_waiting_for_pg(session, nextmap); session->waiting_on_map.push_back(op); dispatch_session_waiting(session, nextmap); } @@ -6562,43 +6654,30 @@ void OSD::consume_map() dispatch_sessions_waiting_on_map(); - // remove any PGs which we no longer host from the waiting_for_pg list - set pgs_to_delete; - { - RWLock::RLocker l(pg_map_lock); - map >::iterator p = waiting_for_pg.begin(); - while (p != waiting_for_pg.end()) { - spg_t pgid = p->first; - - vector acting; - int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting); - int role = osdmap->calc_pg_role(whoami, acting, nrep); - - if (role < 0) { - pgs_to_delete.insert(p->first); - /* we can delete list contents under the read lock because - * nobody will be adding to them -- everybody is now using a map - * new enough that they will simply drop ops instead of adding - * them to the list. */ - dout(10) << " discarding waiting ops for " << pgid << dendl; - while (!p->second.empty()) { - p->second.pop_front(); - } + // remove any PGs which we no longer host from the session waiting_for_pg lists + set pgs_to_check; + get_pgs_with_waiting_sessions(&pgs_to_check); + for (set::iterator p = pgs_to_check.begin(); + p != pgs_to_check.end(); + ++p) { + vector acting; + int nrep = osdmap->pg_to_acting_osds(p->pgid, acting); + int role = osdmap->calc_pg_role(whoami, acting, nrep); + + if (role < 0) { + set concerned_sessions; + get_sessions_possibly_interested_in_pg(*p, &concerned_sessions); + for (set::iterator i = concerned_sessions.begin(); + i != concerned_sessions.end(); + ++i) { + { + Mutex::Locker l((*i)->session_dispatch_lock); + session_notify_pg_cleared(*i, osdmap, *p); + } + (*i)->put(); } - ++p; } } - { - RWLock::WLocker l(pg_map_lock); - for (set::iterator i = pgs_to_delete.begin(); - i != pgs_to_delete.end(); - ++i) { - map >::iterator p = waiting_for_pg.find(*i); - assert(p->second.empty()); - waiting_for_pg.erase(p); - } - } - // scan pg's { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 1f9a14479d21f..1cd0f9128f620 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1149,6 +1149,9 @@ public: Mutex session_dispatch_lock; list waiting_on_map; + OSDMapRef osdmap; /// Map as of which waiting_for_pg is current + map > waiting_for_pg; + Mutex sent_epoch_lock; epoch_t last_sent_epoch; Mutex received_map_lock; @@ -1161,11 +1164,17 @@ public: sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0), received_map_lock("Session::received_map_lock"), received_map_epoch(0) {} + + }; + void update_waiting_for_pg(Session *session, OSDMapRef osdmap); + void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid); + void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid); void dispatch_session_waiting(Session *session, OSDMapRef osdmap); Mutex session_waiting_lock; set session_waiting_for_map; + map > session_waiting_for_pg; void clear_waiting_sessions() { Mutex::Locker l(session_waiting_lock); @@ -1180,6 +1189,13 @@ public: } } session_waiting_for_pg.clear(); + + for (set::iterator i = session_waiting_for_map.begin(); + i != session_waiting_for_map.end(); + ++i) { + (*i)->put(); + } + session_waiting_for_map.clear(); } /// Caller assumes refs for included Sessions @@ -1209,14 +1225,81 @@ public: i != sessions_to_check.end(); sessions_to_check.erase(i++)) { (*i)->session_dispatch_lock.Lock(); + update_waiting_for_pg(*i, osdmap); dispatch_session_waiting(*i, osdmap); (*i)->session_dispatch_lock.Unlock(); (*i)->put(); } } + void clear_session_waiting_on_pg(Session *session, spg_t pgid) { + Mutex::Locker l(session_waiting_lock); + map >::iterator i = session_waiting_for_pg.find(pgid); + if (i == session_waiting_for_pg.end()) { + return; + } + set::iterator j = i->second.find(session); + if (j != i->second.end()) { + (*j)->put(); + i->second.erase(j); + } + if (i->second.empty()) { + session_waiting_for_pg.erase(i); + } + } void session_handle_reset(Session *session) { Mutex::Locker l(session->session_dispatch_lock); clear_session_waiting_on_map(session); + vector pgs_to_clear; + pgs_to_clear.reserve(session->waiting_for_pg.size()); + for (map >::iterator i = + session->waiting_for_pg.begin(); + i != session->waiting_for_pg.end(); + ++i) { + pgs_to_clear.push_back(i->first); + } + for (vector::iterator i = pgs_to_clear.begin(); + i != pgs_to_clear.end(); + ++i) { + clear_session_waiting_on_pg(session, *i); + } + } + void register_session_waiting_on_pg(Session *session, spg_t pgid) { + Mutex::Locker l(session_waiting_lock); + set &s = session_waiting_for_pg[pgid]; + set::iterator i = s.find(session); + if (i == s.end()) { + session->get(); + s.insert(session); + } + } + void get_sessions_possibly_interested_in_pg( + spg_t pgid, set *sessions) { + Mutex::Locker l(session_waiting_lock); + while (1) { + map >::iterator i = session_waiting_for_pg.find(pgid); + if (i != session_waiting_for_pg.end()) { + sessions->insert(i->second.begin(), i->second.end()); + } + if (pgid.pgid.ps() == 0) { + break; + } else { + pgid = pgid.get_parent(); + } + } + for (set::iterator i = sessions->begin(); + i != sessions->end(); + ++i) { + (*i)->get(); + } + } + void get_pgs_with_waiting_sessions(set *pgs) { + Mutex::Locker l(session_waiting_lock); + for (map >::iterator i = + session_waiting_for_pg.begin(); + i != session_waiting_for_pg.end(); + ++i) { + pgs->insert(i->first); + } } private: @@ -1608,7 +1691,6 @@ protected: // -- placement groups -- RWLock pg_map_lock; // this lock orders *above* individual PG _locks ceph::unordered_map pg_map; // protected by pg_map lock - map > waiting_for_pg; // protected by pg_map lock map > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; @@ -1678,15 +1760,20 @@ protected: ); ///< @return false if there was a map gap between from and now void wake_pg_waiters(PG* pg, spg_t pgid) { + assert(osd_lock.is_locked()); // Need write lock on pg_map_lock - map >::iterator i = waiting_for_pg.find(pgid); - if (i != waiting_for_pg.end()) { - for (list::iterator j = i->second.begin(); - j != i->second.end(); - ++j) { - enqueue_op(pg, *j); + set concerned_sessions; + get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions); + + for (set::iterator i = concerned_sessions.begin(); + i != concerned_sessions.end(); + ++i) { + { + Mutex::Locker l((*i)->session_dispatch_lock); + session_notify_pg_create(*i, osdmap, pgid); + dispatch_session_waiting(*i, osdmap); } - waiting_for_pg.erase(i); + (*i)->put(); } } -- 2.47.3