From 231fe1b685bfbd3db9c81709ca39a29d696b13ad Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 7 Jul 2014 15:01:50 -0700 Subject: [PATCH] Revert "OSD: move waiting_for_pg into Session" This reverts commit ecda2fef8ce982df3581a3b47ba74ae581d82479. This leaves Session* refs indefinitely in the map. This was one source of #7995. Signed-off-by: Sage Weil --- src/osd/OSD.cc | 170 ++++++++++++++----------------------------------- src/osd/OSD.h | 92 ++++---------------------- 2 files changed, 60 insertions(+), 202 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index ce4cd6e1042c5..7010cf04ed9bf 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -933,7 +933,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"), command_tp(cct, "OSD::command_tp", 1), paused_recovery(false), - session_waiting_lock("OSD::session_waiting_lock"), + session_waiting_for_map_lock("OSD::session_waiting_for_map_lock"), heartbeat_lock("OSD::heartbeat_lock"), heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"), heartbeat_need_update(true), heartbeat_epoch(0), @@ -1882,6 +1882,7 @@ PG *OSD::_open_lock_pg( pg_map[pgid] = pg; pg->get("PGMap"); // because it's in pg_map service.pg_add_epoch(pg->info.pgid, createmap->get_epoch()); + wake_pg_waiters(pg, pgid); } wake_pg_waiters(pg, pgid); return pg; @@ -1914,6 +1915,7 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx) pg->get("PGMap"); // For pg_map pg_map[pg->info.pgid] = pg; service.pg_add_epoch(pg->info.pgid, pg->get_osdmap()->get_epoch()); + wake_pg_waiters(pg, pg->info.pgid); dout(10) << "Adding newly split pg " << *pg << dendl; vector up, acting; @@ -2034,26 +2036,20 @@ PG *OSD::_create_lock_pg( PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op) { - RWLock::RLocker l(pg_map_lock); - Session *session = static_cast( - op->get_req()->get_connection()->get_priv()); - + { + RWLock::RLocker l(pg_map_lock); + ceph::unordered_map::iterator i = pg_map.find(pgid); + if (i != pg_map.end()) + return i->second; + } + RWLock::WLocker l(pg_map_lock); ceph::unordered_map::iterator i = pg_map.find(pgid); - if (i == pg_map.end()) - session->waiting_for_pg[pgid]; - - map >::iterator wlistiter = - session->waiting_for_pg.find(pgid); - - PG *out = NULL; - if (wlistiter == session->waiting_for_pg.end()) { - out = i->second; + if (i != pg_map.end()) { + return i->second; } else { - wlistiter->second.push_back(op); - register_session_waiting_on_pg(session, pgid); + waiting_for_pg[pgid].push_back(op); + return NULL; } - session->put(); - return out; } bool OSD::_have_pg(spg_t pgid) @@ -4968,8 +4964,6 @@ bool OSD::ms_dispatch(Message *m) void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) { - assert(session->session_dispatch_lock.is_locked()); - assert(session->osdmap == osdmap); for (list::iterator i = session->waiting_on_map.begin(); i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap); session->waiting_on_map.erase(i++)); @@ -4981,84 +4975,6 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap) } } - -void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap) -{ - assert(session->session_dispatch_lock.is_locked()); - if (!session->osdmap) { - session->osdmap = newmap; - return; - } - - if (newmap->get_epoch() == session->osdmap->get_epoch()) - return; - - assert(newmap->get_epoch() > session->osdmap->get_epoch()); - - map > from; - from.swap(session->waiting_for_pg); - - for (map >::iterator i = from.begin(); - i != from.end(); - from.erase(i++)) { - set children; - if (!newmap->have_pg_pool(i->first.pool())) { - // drop this wait list on the ground - continue; - } - assert(session->osdmap->have_pg_pool(i->first.pool())); - if (i->first.is_split( - session->osdmap->get_pg_num(i->first.pool()), - newmap->get_pg_num(i->first.pool()), - &children)) { - for (set::iterator child = children.begin(); - child != children.end(); - ++child) { - unsigned split_bits = child->get_split_bits( - newmap->get_pg_num(child->pool())); - list child_ops; - OSD::split_list(&i->second, &child_ops, child->ps(), split_bits); - if (!child_ops.empty()) { - session->waiting_for_pg[*child].swap(child_ops); - register_session_waiting_on_pg(session, *child); - } - } - } - if (i->second.empty()) { - clear_session_waiting_on_pg(session, i->first); - } else { - session->waiting_for_pg[i->first].swap(i->second); - } - } - - session->osdmap = newmap; -} - -void OSD::session_notify_pg_create( - Session *session, OSDMapRef osdmap, spg_t pgid) -{ - assert(session->session_dispatch_lock.is_locked()); - update_waiting_for_pg(session, osdmap); - map >::iterator i = - session->waiting_for_pg.find(pgid); - if (i != session->waiting_for_pg.end()) { - session->waiting_on_map.splice( - session->waiting_on_map.end(), - i->second); - session->waiting_for_pg.erase(i); - } - clear_session_waiting_on_pg(session, pgid); -} - -void OSD::session_notify_pg_cleared( - Session *session, OSDMapRef osdmap, spg_t pgid) -{ - assert(session->session_dispatch_lock.is_locked()); - update_waiting_for_pg(session, osdmap); - session->waiting_for_pg.erase(pgid); - clear_session_waiting_on_pg(session, pgid); -} - void OSD::ms_fast_dispatch(Message *m) { OpRequestRef op = op_tracker.create_request(m); @@ -5067,7 +4983,6 @@ void OSD::ms_fast_dispatch(Message *m) assert(session); { Mutex::Locker l(session->session_dispatch_lock); - update_waiting_for_pg(session, nextmap); session->waiting_on_map.push_back(op); dispatch_session_waiting(session, nextmap); } @@ -6336,36 +6251,48 @@ void OSD::consume_map() i != sessions_to_check.end(); sessions_to_check.erase(i++)) { (*i)->session_dispatch_lock.Lock(); - update_waiting_for_pg(*i, osdmap); dispatch_session_waiting(*i, osdmap); (*i)->session_dispatch_lock.Unlock(); (*i)->put(); } - // remove any PGs which we no longer host from the session waiting_for_pg lists - set pgs_to_check; - get_pgs_with_waiting_sessions(&pgs_to_check); - for (set::iterator p = pgs_to_check.begin(); - p != pgs_to_check.end(); - ++p) { - vector acting; - int nrep = osdmap->pg_to_acting_osds(p->pgid, acting); - int role = osdmap->calc_pg_role(whoami, acting, nrep); - - if (role < 0) { - set concerned_sessions; - get_sessions_possibly_interested_in_pg(*p, &concerned_sessions); - for (set::iterator i = concerned_sessions.begin(); - i != concerned_sessions.end(); - ++i) { - { - Mutex::Locker l((*i)->session_dispatch_lock); - session_notify_pg_cleared(*i, osdmap, *p); - } - (*i)->put(); + // remove any PGs which we no longer host from the waiting_for_pg list + set pgs_to_delete; + { + RWLock::RLocker l(pg_map_lock); + map >::iterator p = waiting_for_pg.begin(); + while (p != waiting_for_pg.end()) { + spg_t pgid = p->first; + + vector acting; + int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting); + int role = osdmap->calc_pg_role(whoami, acting, nrep); + + if (role < 0) { + pgs_to_delete.insert(p->first); + /* we can delete list contents under the read lock because + * nobody will be adding to them -- everybody is now using a map + * new enough that they will simply drop ops instead of adding + * them to the list. */ + dout(10) << " discarding waiting ops for " << pgid << dendl; + while (!p->second.empty()) { + p->second.pop_front(); + } } + ++p; } } + { + RWLock::WLocker l(pg_map_lock); + for (set::iterator i = pgs_to_delete.begin(); + i != pgs_to_delete.end(); + ++i) { + map >::iterator p = waiting_for_pg.find(*i); + assert(p->second.empty()); + waiting_for_pg.erase(p); + } + } + // scan pg's { @@ -8282,7 +8209,6 @@ struct C_CompleteSplits : public Context { osd->service.complete_split(to_complete); } osd->pg_map_lock.put_write(); - osd->wake_pg_waiters(&**i, (*i)->info.pgid); osd->dispatch_context_transaction(rctx, &**i); to_complete.insert((*i)->info.pgid); (*i)->unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 428f903bb6196..c52ab8455f814 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1144,9 +1144,6 @@ public: Mutex session_dispatch_lock; list waiting_on_map; - OSDMapRef osdmap; /// Map as of which waiting_for_pg is current - map > waiting_for_pg; - Mutex sent_epoch_lock; epoch_t last_sent_epoch; Mutex received_map_lock; @@ -1158,89 +1155,28 @@ public: sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0), received_map_lock("Session::received_map_lock"), received_map_epoch(0) {} - - }; - void update_waiting_for_pg(Session *session, OSDMapRef osdmap); - void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid); - void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid); void dispatch_session_waiting(Session *session, OSDMapRef osdmap); - - Mutex session_waiting_lock; + Mutex session_waiting_for_map_lock; set session_waiting_for_map; - map > session_waiting_for_pg; - /// Caller assumes refs for included Sessions void get_sessions_waiting_for_map(set *out) { - Mutex::Locker l(session_waiting_lock); + Mutex::Locker l(session_waiting_for_map_lock); out->swap(session_waiting_for_map); } void register_session_waiting_on_map(Session *session) { - Mutex::Locker l(session_waiting_lock); + Mutex::Locker l(session_waiting_for_map_lock); session->get(); session_waiting_for_map.insert(session); } void clear_session_waiting_on_map(Session *session) { - Mutex::Locker l(session_waiting_lock); + Mutex::Locker l(session_waiting_for_map_lock); set::iterator i = session_waiting_for_map.find(session); if (i != session_waiting_for_map.end()) { (*i)->put(); session_waiting_for_map.erase(i); } } - void register_session_waiting_on_pg(Session *session, spg_t pgid) { - Mutex::Locker l(session_waiting_lock); - set &s = session_waiting_for_pg[pgid]; - set::iterator i = s.find(session); - if (i == s.end()) { - session->get(); - s.insert(session); - } - } - void clear_session_waiting_on_pg(Session *session, spg_t pgid) { - Mutex::Locker l(session_waiting_lock); - map >::iterator i = session_waiting_for_pg.find(pgid); - if (i == session_waiting_for_pg.end()) { - return; - } - set::iterator j = i->second.find(session); - if (j != i->second.end()) { - (*j)->put(); - i->second.erase(j); - } - if (i->second.empty()) { - session_waiting_for_pg.erase(i); - } - } - void get_sessions_possibly_interested_in_pg( - spg_t pgid, set *sessions) { - Mutex::Locker l(session_waiting_lock); - while (1) { - map >::iterator i = session_waiting_for_pg.find(pgid); - if (i != session_waiting_for_pg.end()) { - sessions->insert(i->second.begin(), i->second.end()); - } - if (pgid.pgid.ps() == 0) { - break; - } else { - pgid = pgid.get_parent(); - } - } - for (set::iterator i = sessions->begin(); - i != sessions->end(); - ++i) { - (*i)->get(); - } - } - void get_pgs_with_waiting_sessions(set *pgs) { - Mutex::Locker l(session_waiting_lock); - for (map >::iterator i = - session_waiting_for_pg.begin(); - i != session_waiting_for_pg.end(); - ++i) { - pgs->insert(i->first); - } - } private: /** @@ -1627,6 +1563,7 @@ protected: // -- placement groups -- RWLock pg_map_lock; // this lock orders *above* individual PG _locks ceph::unordered_map pg_map; // protected by pg_map lock + map > waiting_for_pg; // protected by pg_map lock map > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; @@ -1691,20 +1628,15 @@ protected: ); ///< @return false if there was a map gap between from and now void wake_pg_waiters(PG* pg, spg_t pgid) { - assert(osd_lock.is_locked()); // Need write lock on pg_map_lock - set concerned_sessions; - get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions); - - for (set::iterator i = concerned_sessions.begin(); - i != concerned_sessions.end(); - ++i) { - { - Mutex::Locker l((*i)->session_dispatch_lock); - session_notify_pg_create(*i, osdmap, pgid); - dispatch_session_waiting(*i, osdmap); + map >::iterator i = waiting_for_pg.find(pgid); + if (i != waiting_for_pg.end()) { + for (list::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + enqueue_op(pg, *j); } - (*i)->put(); + waiting_for_pg.erase(i); } } -- 2.39.5