]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: fix resource management
authorJohn Spray <john.spray@redhat.com>
Fri, 15 Aug 2014 00:26:20 +0000 (01:26 +0100)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:19 +0000 (01:34 +0100)
The refactor introduced various reference leaks, and
lacked cleanup in shutdown.

Things done here:
 * Reinstate _recalc_linger_op_target, which was accidentally
   disabled and let to freezes in notify() (#9112)
 * Make reference counting on OSDSessions much more explicit, using
   put_session and get_session everywhere
 * Add assertions in ~OSDSession and ~Objecter that the various
   maps of operations have been emptied.
 * Reassign ops away from closing session to homeless session in
   close_session()
 * Delete/deref all the ops from the objecter-wide maps of operations
   in shutdown()

Signed-off-by: John Spray <john.spray@redhat.com>
src/common/RefCountedObj.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index b16e071238afdb6dbf59f983682262abc37a25b2..729bbb9fd4b87703c3b3ae30beb76d6e30e25503 100644 (file)
@@ -51,6 +51,10 @@ public:
   void set_cct(CephContext *c) {
     cct = c;
   }
+
+  uint64_t get_nref() {
+    return nref.read();
+  }
 };
 
 /**
index 8f326b0d1735992266fdac680ea173f390590a61..c3fdb6528d43c39705bf7130729ddfe368ab6cf5 100644 (file)
@@ -300,6 +300,74 @@ void Objecter::shutdown()
     close_session(p->second);
   }
 
+  while(!check_latest_map_lingers.empty()) {
+    map<uint64_t, LingerOp*>::iterator i = check_latest_map_lingers.begin();
+    i->second->put();
+    check_latest_map_lingers.erase(i->first);
+  }
+
+  while(!check_latest_map_ops.empty()) {
+    map<ceph_tid_t, Op*>::iterator i = check_latest_map_ops.begin();
+    i->second->put();
+    check_latest_map_ops.erase(i->first);
+  }
+
+  while(!check_latest_map_commands.empty()) {
+    map<ceph_tid_t, CommandOp*>::iterator i = check_latest_map_commands.begin();
+    i->second->put();
+    check_latest_map_commands.erase(i->first);
+  }
+
+  while(!poolstat_ops.empty()) {
+    map<ceph_tid_t,PoolStatOp*>::iterator i = poolstat_ops.begin();
+    delete i->second;
+    poolstat_ops.erase(i->first);
+  }
+
+  while(!statfs_ops.empty()) {
+    map<ceph_tid_t, StatfsOp*>::iterator i = statfs_ops.begin();
+    delete i->second;
+    statfs_ops.erase(i->first);
+  }
+
+  while(!pool_ops.empty()) {
+    map<ceph_tid_t, PoolOp*>::iterator i = pool_ops.begin();
+    delete i->second;
+    pool_ops.erase(i->first);
+  }
+
+  ldout(cct, 20) << __func__ << " clearing up homeless session..." << dendl;
+  while(!homeless_session->linger_ops.empty()) {
+    std::map<uint64_t, LingerOp*>::iterator i = homeless_session->linger_ops.begin();
+    ldout(cct, 10) << " linger_op " << i->first << dendl;
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_linger_op_remove(homeless_session, i->second);
+    }
+    linger_ops.erase(i->second->linger_id);
+    i->second->put();
+  }
+
+  while(!homeless_session->ops.empty()) {
+    std::map<ceph_tid_t, Op*>::iterator i = homeless_session->ops.begin();
+    ldout(cct, 10) << " op " << i->first << dendl;
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_op_remove(homeless_session, i->second);
+    }
+    i->second->put();
+  }
+
+  while(!homeless_session->command_ops.empty()) {
+    std::map<ceph_tid_t, CommandOp*>::iterator i = homeless_session->command_ops.begin();
+    ldout(cct, 10) << " command_op " << i->first << dendl;
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_command_op_remove(homeless_session, i->second);
+    }
+    i->second->put();
+  }
+
   if (tick_event) {
     timer.cancel_event(tick_event);
     tick_event = NULL;
@@ -397,13 +465,16 @@ void Objecter::unregister_linger(uint64_t linger_id)
 void Objecter::_unregister_linger(uint64_t linger_id)
 {
   assert(rwlock.is_wlocked());
+  ldout(cct, 20) << __func__ << " linger_id=" << linger_id << dendl;
 
   map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
   if (iter != linger_ops.end()) {
     LingerOp *info = iter->second;
-    info->session->lock.get_write();
-    info->session->linger_ops.erase(linger_id);
-    info->session->lock.unlock();
+    OSDSession *s = info->session;
+    s->lock.get_write();
+    _session_linger_op_remove(s, info);
+    s->lock.unlock();
+
     linger_ops.erase(iter);
     info->canceled = true;
     info->put();
@@ -470,26 +541,27 @@ void Objecter::_linger_submit(LingerOp *info)
   assert(rwlock.is_wlocked());
   RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
 
+  // Acquire linger ID
   info->linger_id = ++max_linger_id;
   ldout(cct, 10) << __func__ << " info " << info
                 << " linger_id " << info->linger_id << dendl;
   linger_ops[info->linger_id] = info;
 
+  // Populate Op::target
   OSDSession *s = NULL;
   _calc_target(&info->target);
+
+  // Create LingerOp<->OSDSession relation
   int r = _get_session(info->target.osd, &s, lc);
   assert(r == 0);
-
-  info->session = s;
-
   s->lock.get_write();
-  s->linger_ops[info->linger_id] = info;
+  _session_linger_op_assign(s, info);
   s->lock.unlock();
+  put_session(s);
+
   _send_linger(info);
 }
 
-
-
 bool Objecter::ms_dispatch(Message *m)
 {
   ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
@@ -514,19 +586,16 @@ bool Objecter::ms_dispatch(Message *m)
     handle_pool_op_reply(static_cast<MPoolOpReply*>(m));
     return true;
 
+  case CEPH_MSG_STATFS_REPLY:
+    handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
+    return true;
+
     // these we give others a chance to inspect
 
     // MDS, OSD
   case CEPH_MSG_OSD_MAP:
-    m->get();
     handle_osd_map(static_cast<MOSDMap*>(m));
     return false;
-
-    // Client
-  case CEPH_MSG_STATFS_REPLY:
-    m->get();
-    handle_fs_stats_reply(static_cast<MStatfsReply*>(m));
-    return false;
   }
   return false;
 }
@@ -553,16 +622,13 @@ void Objecter::_scan_requests(OSDSession *s,
     ++lp;   // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
     ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
     bool unregister;
-    int r = _calc_target(&op->target);
+    int r = _recalc_linger_op_target(op, lc);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       if (!force_resend && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
-      if (op->session) {
-       _session_linger_op_remove(op);
-      }
       need_resend_linger.push_back(op);
       _linger_cancel_map_check(op);
       break;
@@ -591,7 +657,7 @@ void Objecter::_scan_requests(OSDSession *s,
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
       if (op->session) {
-       _session_op_remove(op);
+       _session_op_remove(op->session, op);
       }
       need_resend[op->tid] = op;
       _op_cancel_map_check(op);
@@ -618,7 +684,7 @@ void Objecter::_scan_requests(OSDSession *s,
     case RECALC_OP_TARGET_NEED_RESEND:
       need_resend_command[c->tid] = c;
       if (c->session) {
-        _session_command_op_remove(c);
+        _session_command_op_remove(c->session, c);
       }
       _command_cancel_map_check(c);
       break;
@@ -648,7 +714,6 @@ void Objecter::handle_osd_map(MOSDMap *m)
   if (m->fsid != monc->get_fsid()) {
     ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
                  << " != " << monc->get_fsid() << dendl;
-    m->put();
     return;
   }
 
@@ -779,7 +844,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
     }
     s->lock.get_write();
     if (mapped_session) {
-      _session_op_assign(op, s);
+      _session_op_assign(s, op);
+      put_session(s);
     }
     if (op->should_resend) {
       if (!op->session->is_homeless() && !op->target.paused) {
@@ -796,8 +862,12 @@ void Objecter::handle_osd_map(MOSDMap *m)
     LingerOp *op = *p;
     if (!op->session) {
       _calc_target(&op->target);
-      int r = _get_session(op->target.osd, &op->session, lc);
+      OSDSession *s = NULL;
+      int const r = _get_session(op->target.osd, &s, lc);
       assert(r == 0);
+      assert(s != NULL);
+      op->session = s;
+      put_session(s);
     }
     if (!op->session->is_homeless()) {
       logger->inc(l_osdc_linger_resend);
@@ -828,8 +898,6 @@ void Objecter::handle_osd_map(MOSDMap *m)
     waiting_for_map.erase(p++);
   }
 
-  m->put();
-
   monc->sub_got("osdmap", osdmap->get_epoch());
 
   if (!waiting_for_map.empty()) {
@@ -943,12 +1011,16 @@ void Objecter::_check_op_pool_dne(Op *op, bool session_locked)
       if (op->oncommit) {
        op->oncommit->complete(-ENOENT);
       }
+
+      OSDSession *s = op->session;
+      assert(s != NULL);
+
       if (!session_locked) {
-        op->session->lock.get_write();
+        s->lock.get_write();
       }
       _finish_op(op);
       if (!session_locked) {
-        op->session->lock.unlock();
+        s->lock.unlock();
       }
     }
   } else {
@@ -999,6 +1071,8 @@ void Objecter::C_Linger_Map_Latest::finish(int r)
 
   LingerOp *op = iter->second;
   objecter->check_latest_map_lingers.erase(iter);
+  assert(op->get_nref() > 1);  // something other than check_latest_map_lingers should
+                               // have a ref to this guy too
   op->put();
 
   if (op->map_dne_bound == 0)
@@ -1132,13 +1206,18 @@ void Objecter::_command_cancel_map_check(CommandOp *c)
 }
 
 
-
+/**
+ * Look up OSDSession by OSD id.
+ *
+ * @returns 0 on success, or -EAGAIN if the lock context requires promotion to write.
+ */
 int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
 {
   assert(rwlock.is_locked());
 
   if (osd < 0) {
     *session = homeless_session;
+    ldout(cct, 20) << __func__ << " osd=" << osd << " returning homeless" << dendl;
     return 0;
   }
 
@@ -1147,6 +1226,7 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
     OSDSession *s = p->second;
     s->get();
     *session = s;
+    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl;
     return 0;
   }
   if (!lc.is_wlocked()) {
@@ -1159,16 +1239,28 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
   logger->inc(l_osdc_osd_sessions, osd_sessions.size());
   s->get();
   *session = s;
+  ldout(cct, 20) << __func__ << " s=" << s << " osd=" << osd << " " << s->get_nref() << dendl;
   return 0;
 }
 
 void Objecter::put_session(Objecter::OSDSession *s)
 {
   if (s && !s->is_homeless()) {
+    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl;
     s->put();
   }
 }
 
+void Objecter::get_session(Objecter::OSDSession *s)
+{
+  assert(s != NULL);
+
+  if (!s->is_homeless()) {
+    ldout(cct, 20) << __func__ << " s=" << s << " osd=" << s->osd << " " << s->get_nref() << dendl;
+    s->get();
+  }
+}
+
 void Objecter::_reopen_session(OSDSession *s)
 {
   assert(s->lock.is_locked());
@@ -1192,12 +1284,41 @@ void Objecter::close_session(OSDSession *s)
     logger->inc(l_osdc_osd_session_close);
   }
   s->lock.get_write();
-  s->ops.clear();
-  s->linger_ops.clear();
-  s->command_ops.clear();
+
+  while(!s->linger_ops.empty()) {
+    std::map<uint64_t, LingerOp*>::iterator i = s->linger_ops.begin();
+    ldout(cct, 10) << " linger_op " << i->first << dendl;
+    _session_linger_op_remove(s, i->second);
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_linger_op_assign(homeless_session, i->second);
+    }
+  }
+
+  while(!s->ops.empty()) {
+    std::map<ceph_tid_t, Op*>::iterator i = s->ops.begin();
+    ldout(cct, 10) << " op " << i->first << dendl;
+    _session_op_remove(s, i->second);
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_op_assign(homeless_session, i->second);
+    }
+  }
+
+  while(!s->command_ops.empty()) {
+    std::map<ceph_tid_t, CommandOp*>::iterator i = s->command_ops.begin();
+    ldout(cct, 10) << " command_op " << i->first << dendl;
+    _session_command_op_remove(s, i->second);
+    {
+      RWLock::WLocker wl(homeless_session->lock);
+      _session_command_op_assign(homeless_session, i->second);
+    }
+  }
+
   osd_sessions.erase(s->osd);
   s->lock.unlock();
-  s->put();
+  assert(s->get_nref() == 1);  // We reassigned any/all ops, so should be last ref
+  put_session(s);
 
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
 }
@@ -1353,6 +1474,7 @@ void Objecter::_kick_requests(OSDSession *session, map<uint64_t, LingerOp *>& lr
     LingerOp *op = j->second;
     op->get();
     logger->inc(l_osdc_linger_resend);
+    assert(lresend.count(j->first) == 0);
     lresend[j->first] = op;
   }
 
@@ -1565,24 +1687,26 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
   ldout(cct, 10) << __func__ << " op " << op << dendl;
 
   // pick target
-  int r;
   assert(op->session == NULL);
   OSDSession *s = NULL;
 
-  bool check_for_latest_map;
+  bool const check_for_latest_map = _calc_target(&op->target) == RECALC_OP_TARGET_POOL_DNE;
 
-  while (true) {
-    r = _calc_target(&op->target);
-    check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
-    if (_get_session(op->target.osd, &s, lc) == -EAGAIN ||
-        (check_for_latest_map && lc.is_rlocked())) {
-      lc.promote();
-      continue;
-    }
-    break;
+  // Try to get a session, including a retry if we need to take write lock
+  int r = _get_session(op->target.osd, &s, lc);
+  if (r == -EAGAIN) {
+    assert(s == NULL);
+    lc.promote();
+    r = _get_session(op->target.osd, &s, lc);
   }
+  assert(r == 0);
   assert(s);  // may be homeless
 
+  // We may need to take wlock if we will need to _set_op_map_check later.
+  if (check_for_latest_map && !lc.is_wlocked()) {
+    lc.promote();
+  }
+
   inflight_ops.inc();
 
   // add to gather set(s)
@@ -1673,25 +1797,34 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
     _maybe_request_map();
   }
 
-  MOSDOp *m = _prepare_osd_op(op);
+  MOSDOp *m = NULL;
+  if (need_send) {
+    m = _prepare_osd_op(op);
+  }
 
   s->lock.get_write();
   if (op->tid == 0)
     op->tid = last_tid.inc();
-  _session_op_assign(op, s);
+  _session_op_assign(s, op);
 
   if (need_send) {
     _send_op(op, m);
   }
-  s->lock.unlock();
 
+  // Last chance to touch Op here, after giving up session lock it can be
+  // freed at any time by response handler.
+  ceph_tid_t tid = op->tid;
   if (check_for_latest_map) {
     _send_op_map_check(op);
   }
+  op = NULL;
+
+  s->lock.unlock();
+  put_session(s);
 
   ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
 
-  return op->tid;
+  return tid;
 }
 
 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
@@ -1747,9 +1880,14 @@ start:
     s->lock.unlock();
   }
 
-  if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
-    ret = op_cancel(homeless_session, tid, r);
+  // Handle case where the op is in homeless session
+  {
+    RWLock::RLocker hs_lc(homeless_session->lock);
+    if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
+      ret = op_cancel(homeless_session, tid, r);
+    }
   }
+
   rwlock.unlock();
 
   return ret;
@@ -1939,55 +2077,102 @@ int Objecter::_map_session(op_target_t *target, OSDSession **s,
   return _get_session(target->osd, s, lc);
 }
 
-void Objecter::_session_op_assign(Op *op, OSDSession *to)
+void Objecter::_session_op_assign(OSDSession *to, Op *op)
 {
-  assert(rwlock.is_locked());
   assert(to->lock.is_locked());
-  assert(!op->session);
+  assert(op->session == NULL);
   assert(op->tid);
 
+  get_session(to);
   op->session = to;
   to->ops[op->tid] = op;
 
   if (to->is_homeless()) {
     num_homeless_ops.inc();
   }
+
+  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
 }
 
-void Objecter::_session_op_remove(Op *op)
+void Objecter::_session_op_remove(OSDSession *from, Op *op)
 {
-  assert(rwlock.is_locked());
-  OSDSession *s = op->session;
-  assert(s);
-  assert(s->lock.is_locked());
+  assert(op->session == from);
+  assert(from->lock.is_locked());
 
-  if (s->is_homeless()) {
+  if (from->is_homeless()) {
     num_homeless_ops.dec();
   }
-  s->ops.erase(op->tid);
+
+  from->ops.erase(op->tid);
+  put_session(from);
   op->session = NULL;
+
+  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
 }
 
-void Objecter::_session_linger_op_remove(LingerOp *info)
+void Objecter::_session_linger_op_assign(OSDSession *to, LingerOp *op)
 {
-  assert(rwlock.is_locked());
-  OSDSession *s = info->session;
-  assert(s);
-  assert(s->lock.is_locked());
+  assert(to->lock.is_wlocked());
+  assert(op->session == NULL);
+
+  if (to->is_homeless()) {
+    num_homeless_ops.inc();
+  }
 
-  s->linger_ops.erase(info->linger_id);
-  info->session = NULL;
+  get_session(to);
+  op->session = to;
+  to->linger_ops[op->linger_id] = op;
+
+  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->linger_id << dendl;
 }
 
-void Objecter::_session_command_op_remove(CommandOp *op)
+void Objecter::_session_linger_op_remove(OSDSession *from, LingerOp *op)
 {
-  assert(rwlock.is_locked());
-  OSDSession *s = op->session;
-  assert(s);
-  assert(s->lock.is_locked());
+  assert(from == op->session);
+  assert(from->lock.is_locked());
 
-  s->command_ops.erase(op->tid);
+  if (from->is_homeless()) {
+    num_homeless_ops.dec();
+  }
+
+  from->linger_ops.erase(op->linger_id);
+  put_session(from);
   op->session = NULL;
+
+  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->linger_id << dendl;
+}
+
+void Objecter::_session_command_op_remove(OSDSession *from, CommandOp *op)
+{
+  assert(from == op->session);
+  assert(from->lock.is_locked());
+
+  if (from->is_homeless()) {
+    num_homeless_ops.dec();
+  }
+
+  from->command_ops.erase(op->tid);
+  put_session(from);
+  op->session = NULL;
+
+  ldout(cct, 15) << __func__ << " " << from->osd << " " << op->tid << dendl;
+}
+
+void Objecter::_session_command_op_assign(OSDSession *to, CommandOp *op)
+{
+  assert(to->lock.is_locked());
+  assert(op->session == NULL);
+  assert(op->tid);
+
+  if (to->is_homeless()) {
+    num_homeless_ops.inc();
+  }
+
+  get_session(to);
+  op->session = to;
+  to->command_ops[op->tid] = op;
+
+  ldout(cct, 15) << __func__ << " " << to->osd << " " << op->tid << dendl;
 }
 
 int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
@@ -2022,7 +2207,7 @@ bool Objecter::_promote_lock_check_race(RWLock::Context& lc)
 
 int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc)
 {
-  assert(rwlock.is_locked());
+  assert(rwlock.is_wlocked());
 
   int r = _calc_target(&linger_op->target);
   if (r == RECALC_OP_TARGET_NEED_RESEND) {
@@ -2033,17 +2218,21 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc)
     OSDSession *s;
     r = _get_osd_session(linger_op->target.osd, lc, &s);
     if (r < 0) {
+      // We have no session for the new destination for the op, so leave it
+      // in this session to be handled again next time we scan requests
       return r;
     }
 
-    s->lock.get_write();
-
     if (linger_op->session != s) {
-      linger_op->session = s;
-      s->linger_ops[linger_op->register_tid] = linger_op;
+      // NB locking two sessions (s and linger_op->session) at the same time here
+      // is only safe because we are the only one that takes two, and we are
+      // holding rwlock for write.
+      s->lock.get_write();
+      _session_linger_op_remove(linger_op->session, linger_op);
+      _session_linger_op_assign(s, linger_op);
+      s->lock.unlock();
     }
 
-    s->lock.unlock();
     put_session(s);
     return RECALC_OP_TARGET_NEED_RESEND;
   }
@@ -2070,7 +2259,7 @@ void Objecter::_finish_op(Op *op)
   if (op->budgeted)
     put_op_budget(op);
 
-  op->session->ops.erase(op->tid);
+  _session_op_remove(op->session, op);
 
   logger->dec(l_osdc_op_active);
 
@@ -2080,8 +2269,6 @@ void Objecter::_finish_op(Op *op)
     timer.cancel_event(op->ontimeout);
   }
 
-  put_session(op->session);
-
   inflight_ops.dec();
 
   op->put();
@@ -2254,7 +2441,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   OSDSession *s = siter->second;
-  s->get();
+  get_session(s);
 
   s->lock.get_write();
 
@@ -2264,7 +2451,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
            << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
            << " ... stray" << dendl;
     s->lock.unlock();
-    s->put();
+    put_session(s);
     m->put();
     return;
   }
@@ -2285,7 +2472,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
       s->lock.unlock();
-      s->put();
+      put_session(s);
       return;
     }
   } else {
@@ -2305,9 +2492,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
       num_unacked.dec();
     if (op->oncommit)
       num_uncommitted.dec();
-    _session_op_remove(op);
+    _session_op_remove(s, op);
     s->lock.unlock();
-    s->put();
+    put_session(s);
 
     // FIXME: two redirects could race and reorder
 
@@ -2329,6 +2516,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
     _send_op(op);
     s->lock.unlock();
+    put_session(s);
     m->put();
     return;
   }
@@ -2428,7 +2616,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   }
 
   m->put();
-  s->put();
+  put_session(s);
 }
 
 
@@ -3052,7 +3240,6 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
     ldout(cct, 10) << "unknown request " << tid << dendl;
   }
   ldout(cct, 10) << "done" << dendl;
-  m->put();
 }
 
 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
@@ -3514,9 +3701,12 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
   ceph_tid_t tid = last_tid.inc();
   ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
   c->tid = tid;
-  homeless_session->command_ops[tid] = c;
-  num_homeless_ops.inc();
-  c->session = homeless_session;
+
+  {
+   RWLock::WLocker hs_wl(homeless_session->lock);
+  _session_command_op_assign(homeless_session, c);
+  }
+
   (void)_calc_command_target(c);
   _assign_command_session(c);
   if (osd_timeout > 0) {
@@ -3596,29 +3786,18 @@ void Objecter::_assign_command_session(CommandOp *c)
   assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
 
   if (c->session != s) {
-    ldout(cct, 10) << "_assign_command_session " << c->tid << " now " << c->session << dendl;
     if (c->session) {
-      if (c->session->is_homeless()) {
-        num_homeless_ops.dec();
-      }
-      if (c->tid) {
-        c->session->lock.get_write();
-        c->session->command_ops.erase(c->tid);
-        c->session->lock.unlock();
-      }
-      put_session(c->session);
+      OSDSession *cs = c->session;
+      cs->lock.get_write();
+      _session_command_op_remove(c->session, c);
+      cs->lock.unlock();
     }
-    c->session = s;
     s->lock.get_write();
-    if (c->tid) {
-      s->command_ops[c->tid] = c;
-    }
-    if (s->is_homeless())
-      num_homeless_ops.inc();
+    _session_command_op_assign(s, c);
     s->lock.unlock();
-  } else {
-    put_session(s);
   }
+
+  put_session(s);
 }
 
 void Objecter::_send_command(CommandOp *c)
@@ -3663,9 +3842,12 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
     *c->prs = rs;
   if (c->onfinish)
     c->onfinish->complete(r);
-  c->session->lock.get_write();
-  c->session->command_ops.erase(c->tid);
-  c->session->lock.unlock();
+
+  OSDSession *s = c->session;
+  s->lock.get_write();
+  _session_command_op_remove(c->session, c);
+  s->lock.unlock();
+
   if (c->ontimeout) {
     timer.cancel_event(c->ontimeout);
   }
@@ -3673,3 +3855,43 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
 
   logger->dec(l_osdc_command_active);
 }
+
+Objecter::OSDSession::~OSDSession()
+{
+  // Caller is responsible for re-assigning or
+  // destroying any ops that were assigned to us
+  assert(ops.empty());
+  assert(linger_ops.empty());
+  assert(command_ops.empty());
+
+  for (int i = 0; i < num_locks; i++) {
+    delete completion_locks[i];
+  }
+  delete[] completion_locks;
+}
+
+Objecter::~Objecter()
+{
+  delete osdmap;
+
+
+
+  assert(homeless_session->get_nref() == 1);
+  assert(num_homeless_ops.read() == 0);
+  homeless_session->put();
+
+  assert(osd_sessions.empty());
+  assert(poolstat_ops.empty());
+  assert(statfs_ops.empty());
+  assert(pool_ops.empty());
+  assert(waiting_for_map.empty());
+  assert(linger_ops.empty());
+  assert(check_latest_map_lingers.empty());
+  assert(check_latest_map_ops.empty());
+  assert(check_latest_map_commands.empty());
+
+  assert(!tick_event);
+  assert(!m_request_state_hook);
+  assert(!logger);
+}
+
index ab68e03ca1eed5aaf1244f7c16e8405a4183641d..fbeeb1a8dd4b558e5e18a4c36cea8954cd1ea408 100644 (file)
@@ -1458,12 +1458,7 @@ public:
       }
     }
 
-    ~OSDSession() {
-      for (int i = 0; i < num_locks; i++) {
-        delete completion_locks[i];
-      }
-      delete[] completion_locks;
-    }
+    ~OSDSession();
 
     bool is_homeless() { return (osd == -1); }
 
@@ -1516,10 +1511,14 @@ public:
   int _calc_target(op_target_t *t);
   int _map_session(op_target_t *op, OSDSession **s,
                   RWLock::Context& lc);
-  void _session_op_remove(Op *op);
-  void _session_linger_op_remove(LingerOp *info);
-  void _session_command_op_remove(CommandOp *op);
-  void _session_op_assign(Op *op, OSDSession *s);
+
+  void _session_op_assign(OSDSession *s, Op *op);
+  void _session_op_remove(OSDSession *s, Op *op);
+  void _session_linger_op_assign(OSDSession *to, LingerOp *op);
+  void _session_linger_op_remove(OSDSession *from, LingerOp *op);
+  void _session_command_op_assign(OSDSession *to, CommandOp *op);
+  void _session_command_op_remove(OSDSession *from, CommandOp *op);
+
   int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession);
   int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked);
   int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession);
@@ -1546,6 +1545,7 @@ public:
 
   int _get_session(int osd, OSDSession **session, RWLock::Context& lc);
   void put_session(OSDSession *s);
+  void get_session(OSDSession *s);
   void _reopen_session(OSDSession *session);
   void close_session(OSDSession *session);
   
@@ -1607,13 +1607,7 @@ public:
     op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
     op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops)
   { }
-  ~Objecter() {
-    delete osdmap;
-    assert(!tick_event);
-    assert(!m_request_state_hook);
-    assert(!logger);
-    homeless_session->put();
-  }
+  ~Objecter();
 
   void init();
   void start();