]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Revert "OSD: move waiting_for_pg into Session"
authorSage Weil <sage@inktank.com>
Mon, 7 Jul 2014 22:01:50 +0000 (15:01 -0700)
committerSage Weil <sage@inktank.com>
Mon, 7 Jul 2014 22:11:30 +0000 (15:11 -0700)
This reverts commit ecda2fef8ce982df3581a3b47ba74ae581d82479.

This leaves Session* refs indefinitely in the map.

This was one source of #7995.

Signed-off-by: Sage Weil <sage@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index ce4cd6e1042c5fd980c3cff09184786d2025b2f1..7010cf04ed9bf9feb4e959116a2d634df2aa9a01 100644 (file)
@@ -933,7 +933,7 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
   disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
   command_tp(cct, "OSD::command_tp", 1),
   paused_recovery(false),
-  session_waiting_lock("OSD::session_waiting_lock"),
+  session_waiting_for_map_lock("OSD::session_waiting_for_map_lock"),
   heartbeat_lock("OSD::heartbeat_lock"),
   heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
   heartbeat_need_update(true), heartbeat_epoch(0),
@@ -1882,6 +1882,7 @@ PG *OSD::_open_lock_pg(
     pg_map[pgid] = pg;
     pg->get("PGMap");  // because it's in pg_map
     service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
+    wake_pg_waiters(pg, pgid);
   }
   wake_pg_waiters(pg, pgid);
   return pg;
@@ -1914,6 +1915,7 @@ void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
   pg->get("PGMap");  // For pg_map
   pg_map[pg->info.pgid] = pg;
   service.pg_add_epoch(pg->info.pgid, pg->get_osdmap()->get_epoch());
+  wake_pg_waiters(pg, pg->info.pgid);
 
   dout(10) << "Adding newly split pg " << *pg << dendl;
   vector<int> up, acting;
@@ -2034,26 +2036,20 @@ PG *OSD::_create_lock_pg(
 
 PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
 {
-  RWLock::RLocker l(pg_map_lock);
-  Session *session = static_cast<Session*>(
-    op->get_req()->get_connection()->get_priv());
-
+  {
+    RWLock::RLocker l(pg_map_lock);
+    ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
+    if (i != pg_map.end())
+      return i->second;
+  }
+  RWLock::WLocker l(pg_map_lock);
   ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
-  if (i == pg_map.end())
-    session->waiting_for_pg[pgid];
-    
-  map<spg_t, list<OpRequestRef> >::iterator wlistiter =
-    session->waiting_for_pg.find(pgid);
-
-  PG *out = NULL;
-  if (wlistiter == session->waiting_for_pg.end()) {
-    out = i->second;
+  if (i != pg_map.end()) {
+    return i->second;
   } else {
-    wlistiter->second.push_back(op);
-    register_session_waiting_on_pg(session, pgid);
+    waiting_for_pg[pgid].push_back(op);
+    return NULL;
   }
-  session->put();
-  return out;
 }
 
 bool OSD::_have_pg(spg_t pgid)
@@ -4968,8 +4964,6 @@ bool OSD::ms_dispatch(Message *m)
 
 void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
 {
-  assert(session->session_dispatch_lock.is_locked());
-  assert(session->osdmap == osdmap);
   for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
        i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
        session->waiting_on_map.erase(i++));
@@ -4981,84 +4975,6 @@ void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
   }
 }
 
-
-void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  if (!session->osdmap) {
-    session->osdmap = newmap;
-    return;
-  }
-
-  if (newmap->get_epoch() == session->osdmap->get_epoch())
-    return;
-
-  assert(newmap->get_epoch() > session->osdmap->get_epoch());
-    
-  map<spg_t, list<OpRequestRef> > from;
-  from.swap(session->waiting_for_pg);
-  
-  for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();
-       i != from.end();
-       from.erase(i++)) {
-    set<spg_t> children;
-    if (!newmap->have_pg_pool(i->first.pool())) {
-      // drop this wait list on the ground
-      continue;
-    }
-    assert(session->osdmap->have_pg_pool(i->first.pool()));
-    if (i->first.is_split(
-         session->osdmap->get_pg_num(i->first.pool()),
-         newmap->get_pg_num(i->first.pool()),
-         &children)) {
-      for (set<spg_t>::iterator child = children.begin();
-          child != children.end();
-          ++child) {
-       unsigned split_bits = child->get_split_bits(
-         newmap->get_pg_num(child->pool()));
-       list<OpRequestRef> child_ops;
-       OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
-       if (!child_ops.empty()) {
-         session->waiting_for_pg[*child].swap(child_ops);
-         register_session_waiting_on_pg(session, *child);
-       }
-      }
-    }
-    if (i->second.empty()) {
-      clear_session_waiting_on_pg(session, i->first);
-    } else {
-      session->waiting_for_pg[i->first].swap(i->second);
-    }
-  }
-
-  session->osdmap = newmap;
-}
-
-void OSD::session_notify_pg_create(
-  Session *session, OSDMapRef osdmap, spg_t pgid)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  update_waiting_for_pg(session, osdmap);
-  map<spg_t, list<OpRequestRef> >::iterator i =
-    session->waiting_for_pg.find(pgid);
-  if (i != session->waiting_for_pg.end()) {
-    session->waiting_on_map.splice(
-      session->waiting_on_map.end(),
-      i->second);
-    session->waiting_for_pg.erase(i);
-  }
-  clear_session_waiting_on_pg(session, pgid);
-}
-
-void OSD::session_notify_pg_cleared(
-  Session *session, OSDMapRef osdmap, spg_t pgid)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  update_waiting_for_pg(session, osdmap);
-  session->waiting_for_pg.erase(pgid);
-  clear_session_waiting_on_pg(session, pgid);
-}
-
 void OSD::ms_fast_dispatch(Message *m)
 {
   OpRequestRef op = op_tracker.create_request<OpRequest>(m);
@@ -5067,7 +4983,6 @@ void OSD::ms_fast_dispatch(Message *m)
   assert(session);
   {
     Mutex::Locker l(session->session_dispatch_lock);
-    update_waiting_for_pg(session, nextmap);
     session->waiting_on_map.push_back(op);
     dispatch_session_waiting(session, nextmap);
   }
@@ -6336,36 +6251,48 @@ void OSD::consume_map()
        i != sessions_to_check.end();
        sessions_to_check.erase(i++)) {
     (*i)->session_dispatch_lock.Lock();
-    update_waiting_for_pg(*i, osdmap);
     dispatch_session_waiting(*i, osdmap);
     (*i)->session_dispatch_lock.Unlock();
     (*i)->put();
   }
 
-  // remove any PGs which we no longer host from the session waiting_for_pg lists
-  set<spg_t> pgs_to_check;
-  get_pgs_with_waiting_sessions(&pgs_to_check);
-  for (set<spg_t>::iterator p = pgs_to_check.begin();
-       p != pgs_to_check.end();
-       ++p) {
-    vector<int> acting;
-    int nrep = osdmap->pg_to_acting_osds(p->pgid, acting);
-    int role = osdmap->calc_pg_role(whoami, acting, nrep);
-
-    if (role < 0) {
-      set<Session*> concerned_sessions;
-      get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
-      for (set<Session*>::iterator i = concerned_sessions.begin();
-          i != concerned_sessions.end();
-          ++i) {
-       {
-         Mutex::Locker l((*i)->session_dispatch_lock);
-         session_notify_pg_cleared(*i, osdmap, *p);
-       }
-       (*i)->put();
+  // remove any PGs which we no longer host from the waiting_for_pg list
+  set<spg_t> pgs_to_delete;
+  {
+    RWLock::RLocker l(pg_map_lock);
+    map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
+    while (p != waiting_for_pg.end()) {
+      spg_t pgid = p->first;
+
+      vector<int> acting;
+      int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting);
+      int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
+      if (role < 0) {
+        pgs_to_delete.insert(p->first);
+        /* we can delete list contents under the read lock because
+         * nobody will be adding to them -- everybody is now using a map
+         * new enough that they will simply drop ops instead of adding
+         * them to the list. */
+        dout(10) << " discarding waiting ops for " << pgid << dendl;
+        while (!p->second.empty()) {
+          p->second.pop_front();
+        }
       }
+      ++p;
     }
   }
+  {
+    RWLock::WLocker l(pg_map_lock);
+    for (set<spg_t>::iterator i = pgs_to_delete.begin();
+        i != pgs_to_delete.end();
+        ++i) {
+      map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.find(*i);
+      assert(p->second.empty());
+      waiting_for_pg.erase(p);
+    }
+  }
+
 
   // scan pg's
   {
@@ -8282,7 +8209,6 @@ struct C_CompleteSplits : public Context {
         osd->service.complete_split(to_complete);
       }
       osd->pg_map_lock.put_write();
-      osd->wake_pg_waiters(&**i, (*i)->info.pgid);
       osd->dispatch_context_transaction(rctx, &**i);
        to_complete.insert((*i)->info.pgid);
       (*i)->unlock();
index 428f903bb619655038b6a3c8821623a59475629c..c52ab8455f814c964a11bc622b475b0d47cdd2f6 100644 (file)
@@ -1144,9 +1144,6 @@ public:
     Mutex session_dispatch_lock;
     list<OpRequestRef> waiting_on_map;
 
-    OSDMapRef osdmap;  /// Map as of which waiting_for_pg is current
-    map<spg_t, list<OpRequestRef> > waiting_for_pg;
-
     Mutex sent_epoch_lock;
     epoch_t last_sent_epoch;
     Mutex received_map_lock;
@@ -1158,89 +1155,28 @@ public:
       sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),
       received_map_lock("Session::received_map_lock"), received_map_epoch(0)
     {}
-
-    
   };
-  void update_waiting_for_pg(Session *session, OSDMapRef osdmap);
-  void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid);
-  void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid);
   void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
-
-  Mutex session_waiting_lock;
+  Mutex session_waiting_for_map_lock;
   set<Session*> session_waiting_for_map;
-  map<spg_t, set<Session*> > session_waiting_for_pg;
-
   /// Caller assumes refs for included Sessions
   void get_sessions_waiting_for_map(set<Session*> *out) {
-    Mutex::Locker l(session_waiting_lock);
+    Mutex::Locker l(session_waiting_for_map_lock);
     out->swap(session_waiting_for_map);
   }
   void register_session_waiting_on_map(Session *session) {
-    Mutex::Locker l(session_waiting_lock);
+    Mutex::Locker l(session_waiting_for_map_lock);
     session->get();
     session_waiting_for_map.insert(session);
   }
   void clear_session_waiting_on_map(Session *session) {
-    Mutex::Locker l(session_waiting_lock);
+    Mutex::Locker l(session_waiting_for_map_lock);
     set<Session*>::iterator i = session_waiting_for_map.find(session);
     if (i != session_waiting_for_map.end()) {
       (*i)->put();
       session_waiting_for_map.erase(i);
     }
   }
-  void register_session_waiting_on_pg(Session *session, spg_t pgid) {
-    Mutex::Locker l(session_waiting_lock);
-    set<Session*> &s = session_waiting_for_pg[pgid];
-    set<Session*>::iterator i = s.find(session);
-    if (i == s.end()) {
-      session->get();
-      s.insert(session);
-    }
-  }
-  void clear_session_waiting_on_pg(Session *session, spg_t pgid) {
-    Mutex::Locker l(session_waiting_lock);
-    map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
-    if (i == session_waiting_for_pg.end()) {
-      return;
-    }
-    set<Session*>::iterator j = i->second.find(session);
-    if (j != i->second.end()) {
-      (*j)->put();
-      i->second.erase(j);
-    }
-    if (i->second.empty()) {
-      session_waiting_for_pg.erase(i);
-    }
-  }
-  void get_sessions_possibly_interested_in_pg(
-    spg_t pgid, set<Session*> *sessions) {
-    Mutex::Locker l(session_waiting_lock);
-    while (1) {
-      map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
-      if (i != session_waiting_for_pg.end()) {
-       sessions->insert(i->second.begin(), i->second.end());
-      }
-      if (pgid.pgid.ps() == 0) {
-       break;
-      } else {
-       pgid = pgid.get_parent();
-      }
-    }
-    for (set<Session*>::iterator i = sessions->begin();
-        i != sessions->end();
-        ++i) {
-      (*i)->get();
-    }
-  }
-  void get_pgs_with_waiting_sessions(set<spg_t> *pgs) {
-    Mutex::Locker l(session_waiting_lock);
-    for (map<spg_t, set<Session*> >::iterator i =
-          session_waiting_for_pg.begin();
-        i != session_waiting_for_pg.end();
-        ++i) {
-      pgs->insert(i->first);
-    }
-  }
 
 private:
   /**
@@ -1627,6 +1563,7 @@ protected:
   // -- placement groups --
   RWLock pg_map_lock; // this lock orders *above* individual PG _locks
   ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
+  map<spg_t, list<OpRequestRef> > waiting_for_pg; // protected by pg_map lock
 
   map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
@@ -1691,20 +1628,15 @@ protected:
     ); ///< @return false if there was a map gap between from and now
 
   void wake_pg_waiters(PG* pg, spg_t pgid) {
-    assert(osd_lock.is_locked());
     // Need write lock on pg_map_lock
-    set<Session*> concerned_sessions;
-    get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions);
-
-    for (set<Session*>::iterator i = concerned_sessions.begin();
-        i != concerned_sessions.end();
-        ++i) {
-      {
-       Mutex::Locker l((*i)->session_dispatch_lock);
-       session_notify_pg_create(*i, osdmap, pgid);
-       dispatch_session_waiting(*i, osdmap);
+    map<spg_t, list<OpRequestRef> >::iterator i = waiting_for_pg.find(pgid);
+    if (i != waiting_for_pg.end()) {
+      for (list<OpRequestRef>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       enqueue_op(pg, *j);
       }
-      (*i)->put();
+      waiting_for_pg.erase(i);
     }
   }