]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: a major refactoring
authorYehuda Sadeh <yehuda@inktank.com>
Wed, 4 Jun 2014 21:55:13 +0000 (14:55 -0700)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:33:59 +0000 (01:33 +0100)
Fixes: #7619
Removed the client_lock (that used to pass in as a param) and replaced
it with a read-write lock (completely controlled by the objecter). Also
added a per-session read-write lock. Adapt code to use the new locking
scheme, removed locking where not needed. Replaced various counters to
atomics instead of grabbing the lock for updates. Moved ops to live
under the session.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
Signed-off-by: Sage Weil <sage@inktank.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index b8df730c5d49ca10aa4010cd2fced24c07f45d49..5b5e38d8f4350d63bb2e53860e94ae91402a98e3 100644 (file)
@@ -160,9 +160,9 @@ void Objecter::handle_conf_change(const struct md_config_t *conf,
 
 // messages ------------------------------
 
-void Objecter::init_unlocked()
+void Objecter::init()
 {
-  assert(!initialized);
+  assert(!initialized.read());
 
   if (!logger) {
     PerfCountersBuilder pcb(cct, "objecter", l_osdc_first, l_osdc_last);
@@ -255,25 +255,26 @@ void Objecter::init_unlocked()
     lderr(cct) << "error registering admin socket command: "
               << cpp_strerror(ret) << dendl;
   }
-}
 
-void Objecter::init_locked()
-{
-  assert(client_lock.is_locked());
-  assert(!initialized);
+  rwlock.get_read();
 
   schedule_tick();
-  if (osdmap->get_epoch() == 0)
-    maybe_request_map();
+  if (osdmap->get_epoch() == 0) {
+    int r = _maybe_request_map();
+    assert (r == 0 || osdmap->get_epoch() > 0);
+  }
 
-  initialized = true;
+  rwlock.unlock();
+
+  initialized.set(1);
 }
 
-void Objecter::shutdown_locked() 
+void Objecter::shutdown()
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
-  initialized = false;
+  assert(initialized.read());
+  initialized.set(0);
+
+  RWLock::WLocker wl(rwlock);
 
   map<int,OSDSession*>::iterator p;
   while (!osd_sessions.empty()) {
@@ -285,10 +286,7 @@ void Objecter::shutdown_locked()
     timer.cancel_event(tick_event);
     tick_event = NULL;
   }
-}
 
-void Objecter::shutdown_unlocked()
-{
   if (m_request_state_hook) {
     AdminSocket* admin_socket = cct->get_admin_socket();
     admin_socket->unregister_command("objecter_requests");
@@ -303,11 +301,16 @@ void Objecter::shutdown_unlocked()
   }
 }
 
-void Objecter::send_linger(LingerOp *info)
+void Objecter::_send_linger(LingerOp *info)
 {
+  assert(rwlock.is_wlocked());
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
   ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
   vector<OSDOp> opv = info->ops; // need to pass a copy to ops
-  Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
+  Context *onack = (!info->registered && info->on_reg_ack) ?
+    new C_Linger_Ack(this, info) : NULL;
   Context *oncommit = new C_Linger_Commit(this, info);
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
                 opv, info->target.flags | CEPH_OSD_FLAG_READ,
@@ -317,38 +320,26 @@ void Objecter::send_linger(LingerOp *info)
   o->snapc = info->snapc;
   o->mtime = info->mtime;
 
+  o->target = info->target;
+  o->tid = last_tid.inc();
+
   // do not resend this; we will send a new op to reregister
   o->should_resend = false;
 
-  if (info->session) {
-    int r = recalc_op_target(o);
-    if (r == RECALC_OP_TARGET_POOL_DNE) {
-      _send_linger_map_check(info);
-    }
-  }
-
   if (info->register_tid) {
     // repeat send.  cancel old registeration op, if any.
-    if (ops.count(info->register_tid)) {
-      Op *o = ops[info->register_tid];
-      op_cancel_map_check(o);
-      cancel_linger_op(o);
+    info->session->lock.get_write();
+    if (info->session->ops.count(info->register_tid)) {
+      Op *o = info->session->ops[info->register_tid];
+      _op_cancel_map_check(o);
+      _cancel_linger_op(o);
     }
-    info->register_tid = _op_submit(o);
+    info->session->lock.unlock();
+
+    info->register_tid = _op_submit(o, lc);
   } else {
     // first send
-    // populate info->pgid and info->acting so we
-    // don't resend the linger op on the next osdmap update
-    recalc_linger_op_target(info);
-    info->register_tid = op_submit(o);
-  }
-
-  OSDSession *s = o->session;
-  if (info->session != s) {
-    info->session_item.remove_myself();
-    info->session = s;
-    if (info->session)
-      s->linger_ops.push_back(&info->session_item);
+    info->register_tid = _op_submit_with_budget(o, lc);
   }
 
   logger->inc(l_osdc_linger_send);
@@ -381,10 +372,13 @@ void Objecter::unregister_linger(uint64_t linger_id)
   map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
   if (iter != linger_ops.end()) {
     LingerOp *info = iter->second;
-    info->session_item.remove_myself();
+    info->session->lock.get_write();
+    info->session->linger_ops.erase(linger_id);
+    info->session->lock.unlock();
     linger_ops.erase(iter);
     info->put();
-    logger->set(l_osdc_linger_active, linger_ops.size());
+
+    logger->dec(l_osdc_linger_active);
   }
 }
 
@@ -410,13 +404,9 @@ ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t&
   info->on_reg_ack = onack;
   info->on_reg_commit = oncommit;
 
-  info->linger_id = ++max_linger_id;
-  linger_ops[info->linger_id] = info;
-
-  logger->set(l_osdc_linger_active, linger_ops.size());
-
-  send_linger(info);
-
+  RWLock::WLocker wl(rwlock);
+  _linger_submit(info);
+  logger->inc(l_osdc_linger_active);
   return info->linger_id;
 }
 
@@ -432,23 +422,44 @@ ceph_tid_t Objecter::linger_read(const object_t& oid, const object_locator_t& ol
   if (info->target.base_oloc.key == oid)
     info->target.base_oloc.key.clear();
   info->snap = snap;
-  info->target.flags = flags;
+  info->target.flags = flags | CEPH_OSD_FLAG_READ;
   info->ops = op.ops;
   info->inbl = inbl;
   info->poutbl = poutbl;
   info->pobjver = objver;
   info->on_reg_commit = onfinish;
 
+  RWLock::WLocker wl(rwlock);
+  _linger_submit(info);
+  logger->inc(l_osdc_linger_active);
+  return info->linger_id;
+}
+
+void Objecter::_linger_submit(LingerOp *info)
+{
+  assert(rwlock.is_wlocked());
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
   info->linger_id = ++max_linger_id;
+  ldout(cct, 10) << __func__ << " info " << info
+                << " linger_id " << info->linger_id << dendl;
   linger_ops[info->linger_id] = info;
 
-  logger->set(l_osdc_linger_active, linger_ops.size());
+  OSDSession *s = NULL;
+  _calc_target(&info->target);
+  int r = _get_session(info->target.osd, &s, lc);
+  assert(r == 0);
 
-  send_linger(info);
+  info->session = s;
 
-  return info->linger_id;
+  s->lock.get_write();
+  s->linger_ops[info->linger_id] = info;
+  s->lock.unlock();
+  _send_linger(info);
 }
 
+
+
 void Objecter::dispatch(Message *m)
 {
   switch (m->get_type()) {
@@ -482,41 +493,50 @@ void Objecter::dispatch(Message *m)
   }
 }
 
-void Objecter::scan_requests(bool force_resend,
+void Objecter::_scan_requests(OSDSession *s,
+                             bool force_resend,
                             bool force_resend_writes,
                             map<ceph_tid_t, Op*>& need_resend,
                             list<LingerOp*>& need_resend_linger,
                             map<ceph_tid_t, CommandOp*>& need_resend_command)
 {
+  assert(rwlock.is_wlocked());
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+  RWLock::WLocker wl(s->lock);
+
   // check for changed linger mappings (_before_ regular ops)
-  map<ceph_tid_t,LingerOp*>::iterator lp = linger_ops.begin();
-  while (lp != linger_ops.end()) {
+  map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
+  while (lp != s->linger_ops.end()) {
     LingerOp *op = lp->second;
     ++lp;   // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
     ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
-    int r = recalc_linger_op_target(op);
+    int r = _calc_target(&op->target);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       if (!force_resend && !force_resend_writes)
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
+      if (op->session) {
+       _session_linger_op_remove(op);
+      }
       need_resend_linger.push_back(op);
-      linger_cancel_map_check(op);
+      _linger_cancel_map_check(op);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
-      check_linger_pool_dne(op);
+      _check_linger_pool_dne(op);
       break;
     }
   }
 
   // check for changed request mappings
-  map<ceph_tid_t,Op*>::iterator p = ops.begin();
-  while (p != ops.end()) {
+  map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+  while (p != s->ops.end()) {
     Op *op = p->second;
     ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
     ldout(cct, 10) << " checking op " << op->tid << dendl;
-    int r = recalc_op_target(op);
+    int r = _calc_target(&op->target);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       if (!force_resend &&
@@ -524,22 +544,25 @@ void Objecter::scan_requests(bool force_resend,
        break;
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
+      if (op->session) {
+       _session_op_remove(op);
+      }
       need_resend[op->tid] = op;
-      op_cancel_map_check(op);
+      _op_cancel_map_check(op);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
-      check_op_pool_dne(op);
+      _check_op_pool_dne(op);
       break;
     }
   }
 
   // commands
-  map<ceph_tid_t,CommandOp*>::iterator cp = command_ops.begin();
-  while (cp != command_ops.end()) {
+  map<ceph_tid_t,CommandOp*>::iterator cp = s->command_ops.begin();
+  while (cp != s->command_ops.end()) {
     CommandOp *c = cp->second;
     ++cp;
     ldout(cct, 10) << " checking command " << c->tid << dendl;
-    int r = recalc_command_target(c);
+    int r = _recalc_command_target(c);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       // resend if skipped map; otherwise do nothing.
@@ -548,12 +571,12 @@ void Objecter::scan_requests(bool force_resend,
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
       need_resend_command[c->tid] = c;
-      command_cancel_map_check(c);
+      _command_cancel_map_check(c);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
     case RECALC_OP_TARGET_OSD_DNE:
     case RECALC_OP_TARGET_OSD_DOWN:
-      check_command_map_dne(c);
+      _check_command_map_dne(c);
       break;
     }     
   }
@@ -561,12 +584,15 @@ void Objecter::scan_requests(bool force_resend,
 
 void Objecter::handle_osd_map(MOSDMap *m)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
+
   assert(osdmap); 
 
   if (m->fsid != monc->get_fsid()) {
-    ldout(cct, 0) << "handle_osd_map fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
+    ldout(cct, 0) << "handle_osd_map fsid " << m->fsid
+                 << " != " << monc->get_fsid() << dendl;
     m->put();
     return;
   }
@@ -583,8 +609,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
     ldout(cct, 3) << "handle_osd_map ignoring epochs [" 
             << m->get_first() << "," << m->get_last() 
             << "] <= " << osdmap->get_epoch() << dendl;
-  } 
-  else {
+  } else {
     ldout(cct, 3) << "handle_osd_map got epochs [" 
             << m->get_first() << "," << m->get_last() 
             << "] > " << osdmap->get_epoch()
@@ -599,7 +624,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
  
        if (osdmap->get_epoch() == e-1 &&
            m->incremental_maps.count(e)) {
-         ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e << dendl;
+         ldout(cct, 3) << "handle_osd_map decoding incremental epoch " << e
+                       << dendl;
          OSDMap::Incremental inc(m->incremental_maps[e]);
          osdmap->apply_incremental(inc);
          logger->inc(l_osdc_map_inc);
@@ -611,11 +637,14 @@ void Objecter::handle_osd_map(MOSDMap *m)
        }
        else {
          if (e && e > m->get_oldest()) {
-           ldout(cct, 3) << "handle_osd_map requesting missing epoch " << osdmap->get_epoch()+1 << dendl;
-           maybe_request_map();
+           ldout(cct, 3) << "handle_osd_map requesting missing epoch "
+                         << osdmap->get_epoch()+1 << dendl;
+           int r = _maybe_request_map();
+            assert(r == 0);
            break;
          }
-         ldout(cct, 3) << "handle_osd_map missing epoch " << osdmap->get_epoch()+1
+         ldout(cct, 3) << "handle_osd_map missing epoch "
+                       << osdmap->get_epoch()+1
                        << ", jumping to " << m->get_oldest() << dendl;
          e = m->get_oldest() - 1;
          skipped_map = true;
@@ -624,18 +653,21 @@ void Objecter::handle_osd_map(MOSDMap *m)
        logger->set(l_osdc_map_epoch, osdmap->get_epoch());
 
        was_full = was_full || osdmap_full_flag();
-       scan_requests(skipped_map, was_full, need_resend, need_resend_linger,
-                     need_resend_command);
+       _scan_requests(&homeless_session, skipped_map, was_full,
+                      need_resend, need_resend_linger,
+                      need_resend_command);
 
        // osd addr changes?
        for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
             p != osd_sessions.end(); ) {
          OSDSession *s = p->second;
+         _scan_requests(s, skipped_map, was_full,
+                        need_resend, need_resend_linger,
+                        need_resend_command);
          ++p;
-         if (osdmap->is_up(s->osd)) {
-           if (s->con && s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)
-             close_session(s);
-         } else {
+         if (!osdmap->is_up(s->osd) ||
+             (s->con &&
+              s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr)) {
            close_session(s);
          }
        }
@@ -646,13 +678,22 @@ void Objecter::handle_osd_map(MOSDMap *m)
     } else {
       // first map.  we want the full thing.
       if (m->maps.count(m->get_last())) {
-       ldout(cct, 3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl;
+        for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+            p != osd_sessions.end(); ++p) {
+         OSDSession *s = p->second;
+         _scan_requests(s, false, false, need_resend, need_resend_linger,
+                        need_resend_command);
+        }
+       ldout(cct, 3) << "handle_osd_map decoding full epoch "
+                     << m->get_last() << dendl;
        osdmap->decode(m->maps[m->get_last()]);
 
-       scan_requests(false, false, need_resend, need_resend_linger,
-                     need_resend_command);
+       _scan_requests(&homeless_session, false, false,
+                      need_resend, need_resend_linger,
+                      need_resend_command);
       } else {
-       ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
+       ldout(cct, 3) << "handle_osd_map hmm, i want a full map, requesting"
+                     << dendl;
        monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
        monc->renew_subs();
       }
@@ -663,36 +704,60 @@ void Objecter::handle_osd_map(MOSDMap *m)
   bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || osdmap_full_flag();
 
   // was/is paused?
-  if (was_pauserd || was_pausewr || pauserd || pausewr)
-    maybe_request_map();
+  if (was_pauserd || was_pausewr || pauserd || pausewr) {
+    int r = _maybe_request_map();
+    assert(r == 0);
+  }
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
 
   // resend requests
-  for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin(); p != need_resend.end(); ++p) {
+  for (map<ceph_tid_t, Op*>::iterator p = need_resend.begin();
+       p != need_resend.end(); ++p) {
     Op *op = p->second;
+    OSDSession *s = op->session;
+    bool mapped_session = false;
+    if (!s) {
+      int r = _map_session(&op->target, &s, lc);
+      assert(r == 0);
+      mapped_session = true;
+    }
+    s->lock.get_write();
+    if (mapped_session) {
+      _session_op_assign(op, s);
+    }
     if (op->should_resend) {
-      if (op->session && !op->target.paused) {
+      if (!op->session->is_homeless() && !op->target.paused) {
        logger->inc(l_osdc_op_resend);
-       send_op(op);
+       _send_op(op);
       }
     } else {
-      cancel_linger_op(op);
+      _cancel_linger_op(op);
     }
+    s->lock.unlock();
   }
-  for (list<LingerOp*>::iterator p = need_resend_linger.begin(); p != need_resend_linger.end(); ++p) {
+  for (list<LingerOp*>::iterator p = need_resend_linger.begin();
+       p != need_resend_linger.end(); ++p) {
     LingerOp *op = *p;
-    if (op->session) {
+    if (!op->session) {
+      _calc_target(&op->target);
+      int r = _get_session(op->target.osd, &op->session, lc);
+      assert(r == 0);
+    }
+    if (!op->session->is_homeless()) {
       logger->inc(l_osdc_linger_resend);
-      send_linger(op);
+      _send_linger(op);
     }
   }
-  for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin(); p != need_resend_command.end(); ++p) {
+  for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
+       p != need_resend_command.end(); ++p) {
     CommandOp *c = p->second;
     if (c->session) {
       _send_command(c);
     }
   }
 
-  dump_active();
+  _dump_active();
   
   // finish any Contexts that were waiting on a map update
   map<epoch_t,list< pair< Context*, int > > >::iterator p =
@@ -711,8 +776,10 @@ void Objecter::handle_osd_map(MOSDMap *m)
 
   monc->sub_got("osdmap", osdmap->get_epoch());
 
-  if (!waiting_for_map.empty())
-    maybe_request_map();
+  if (!waiting_for_map.empty()) {
+    int r = _maybe_request_map();
+    assert(r == 0);
+  }
 }
 
 // op pool check
@@ -725,7 +792,7 @@ void Objecter::C_Op_Map_Latest::finish(int r)
   lgeneric_subdout(objecter->cct, objecter, 10) << "op_map_latest r=" << r << " tid=" << tid
                                                << " latest " << latest << dendl;
 
-  Mutex::Locker l(objecter->client_lock);
+  RWLock::WLocker wl(objecter->rwlock);
 
   map<ceph_tid_t, Op*>::iterator iter =
     objecter->check_latest_map_ops.find(tid);
@@ -742,11 +809,66 @@ void Objecter::C_Op_Map_Latest::finish(int r)
   if (op->map_dne_bound == 0)
     op->map_dne_bound = latest;
 
-  objecter->check_op_pool_dne(op);
+  objecter->_check_op_pool_dne(op);
 }
 
-void Objecter::check_op_pool_dne(Op *op)
+int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap)
 {
+  RWLock::RLocker rl(rwlock);
+
+  const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
+  map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
+  if (iter == pools.end()) {
+    return -ENOENT;
+  }
+  const pg_pool_t& pg_pool = iter->second;
+  map<snapid_t, pool_snap_info_t>::const_iterator p;
+  for (p = pg_pool.snaps.begin();
+       p != pg_pool.snaps.end();
+       ++p) {
+    if (p->second.name == snap_name) {
+      *snap = p->first;
+      return 0;
+    }
+  }
+  return -ENOENT;
+}
+
+int Objecter::pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info)
+{
+  RWLock::RLocker rl(rwlock);
+
+  const map<int64_t, pg_pool_t>& pools = osdmap->get_pools();
+  map<int64_t, pg_pool_t>::const_iterator iter = pools.find(poolid);
+  if (iter == pools.end()) {
+    return -ENOENT;
+  }
+  const pg_pool_t& pg_pool = iter->second;
+  map<snapid_t,pool_snap_info_t>::const_iterator p = pg_pool.snaps.find(snap);
+  if (p == pg_pool.snaps.end())
+    return -ENOENT;
+  *info = p->second;
+
+  return 0;
+}
+
+int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
+{
+  RWLock::RLocker rl(rwlock);
+
+  const pg_pool_t *pi = osdmap->get_pg_pool(poolid);
+  for (map<snapid_t,pool_snap_info_t>::const_iterator p = pi->snaps.begin();
+       p != pi->snaps.end();
+       ++p) {
+    snaps->push_back(p->first);
+  }
+  return 0;
+}
+
+void Objecter::_check_op_pool_dne(Op *op)
+{
+  assert(rwlock.is_wlocked());
+
   ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
                 << " current " << osdmap->get_epoch()
                 << " map_dne_bound " << op->map_dne_bound
@@ -763,9 +885,8 @@ void Objecter::check_op_pool_dne(Op *op)
       if (op->oncommit) {
        op->oncommit->complete(-ENOENT);
       }
-      op->session_item.remove_myself();
-      ops.erase(op->tid);
-      delete op;
+      RWLock::WLocker wl(op->session->lock);
+      _finish_op(op);
     }
   } else {
     _send_op_map_check(op);
@@ -774,7 +895,7 @@ void Objecter::check_op_pool_dne(Op *op)
 
 void Objecter::_send_op_map_check(Op *op)
 {
-  assert(client_lock.is_locked());
+  assert(rwlock.is_wlocked());
   // ask the monitor
   if (check_latest_map_ops.count(op->tid) == 0) {
     check_latest_map_ops[op->tid] = op;
@@ -783,9 +904,9 @@ void Objecter::_send_op_map_check(Op *op)
   }
 }
 
-void Objecter::op_cancel_map_check(Op *op)
+void Objecter::_op_cancel_map_check(Op *op)
 {
-  assert(client_lock.is_locked());
+  assert(rwlock.is_wlocked());
   map<ceph_tid_t, Op*>::iterator iter =
     check_latest_map_ops.find(op->tid);
   if (iter != check_latest_map_ops.end()) {
@@ -802,7 +923,7 @@ void Objecter::C_Linger_Map_Latest::finish(int r)
     return;
   }
 
-  Mutex::Locker l(objecter->client_lock);
+  RWLock::WLocker wl(objecter->rwlock);
 
   map<uint64_t, LingerOp*>::iterator iter =
     objecter->check_latest_map_lingers.find(linger_id);
@@ -817,12 +938,14 @@ void Objecter::C_Linger_Map_Latest::finish(int r)
   if (op->map_dne_bound == 0)
     op->map_dne_bound = latest;
 
-  objecter->check_linger_pool_dne(op);
+  objecter->_check_linger_pool_dne(op);
 }
 
-void Objecter::check_linger_pool_dne(LingerOp *op)
+void Objecter::_check_linger_pool_dne(LingerOp *op)
 {
-  ldout(cct, 10) << "check_linger_pool_dne linger_id " << op->linger_id
+  assert(rwlock.is_wlocked());
+
+  ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
                 << " current " << osdmap->get_epoch()
                 << " map_dne_bound " << op->map_dne_bound
                 << dendl;
@@ -852,8 +975,10 @@ void Objecter::_send_linger_map_check(LingerOp *op)
   }
 }
 
-void Objecter::linger_cancel_map_check(LingerOp *op)
+void Objecter::_linger_cancel_map_check(LingerOp *op)
 {
+  assert(rwlock.is_wlocked());
+
   map<uint64_t, LingerOp*>::iterator iter =
     check_latest_map_lingers.find(op->linger_id);
   if (iter != check_latest_map_lingers.end()) {
@@ -872,7 +997,7 @@ void Objecter::C_Command_Map_Latest::finish(int r)
     return;
   }
 
-  Mutex::Locker l(objecter->client_lock);
+  RWLock::WLocker wl(objecter->rwlock);
 
   map<uint64_t, CommandOp*>::iterator iter =
     objecter->check_latest_map_commands.find(tid);
@@ -887,12 +1012,14 @@ void Objecter::C_Command_Map_Latest::finish(int r)
   if (c->map_dne_bound == 0)
     c->map_dne_bound = latest;
 
-  objecter->check_command_map_dne(c);
+  objecter->_check_command_map_dne(c);
 }
 
-void Objecter::check_command_map_dne(CommandOp *c)
+void Objecter::_check_command_map_dne(CommandOp *c)
 {
-  ldout(cct, 10) << "check_command_map_dne tid " << c->tid
+  assert(rwlock.is_wlocked());
+
+  ldout(cct, 10) << "_check_command_map_dne tid " << c->tid
                 << " current " << osdmap->get_epoch()
                 << " map_dne_bound " << c->map_dne_bound
                 << dendl;
@@ -907,6 +1034,8 @@ void Objecter::check_command_map_dne(CommandOp *c)
 
 void Objecter::_send_command_map_check(CommandOp *c)
 {
+  assert(rwlock.is_wlocked());
+
   // ask the monitor
   if (check_latest_map_commands.count(c->tid) == 0) {
     c->get();
@@ -916,8 +1045,10 @@ void Objecter::_send_command_map_check(CommandOp *c)
   }
 }
 
-void Objecter::command_cancel_map_check(CommandOp *c)
+void Objecter::_command_cancel_map_check(CommandOp *c)
 {
+  assert(rwlock.is_wlocked());
+
   map<uint64_t, CommandOp*>::iterator iter =
     check_latest_map_commands.find(c->tid);
   if (iter != check_latest_map_commands.end()) {
@@ -929,21 +1060,46 @@ void Objecter::command_cancel_map_check(CommandOp *c)
 
 
 
-Objecter::OSDSession *Objecter::get_session(int osd)
+int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
 {
+  assert(rwlock.is_locked());
+
+  if (osd < 0) {
+    *session = &homeless_session;
+    return 0;
+  }
+
   map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
-  if (p != osd_sessions.end())
-    return p->second;
+  if (p != osd_sessions.end()) {
+    OSDSession *s = p->second;
+    s->get();
+    *session = s;
+    return 0;
+  }
+  if (!lc.is_wlocked()) {
+    return -EAGAIN;
+  }
   OSDSession *s = new OSDSession(osd);
   osd_sessions[osd] = s;
   s->con = messenger->get_connection(osdmap->get_inst(osd));
   logger->inc(l_osdc_osd_session_open);
   logger->inc(l_osdc_osd_sessions, osd_sessions.size());
-  return s;
+  s->get();
+  *session = s;
+  return 0;
 }
 
-void Objecter::reopen_session(OSDSession *s)
+void Objecter::put_session(Objecter::OSDSession *s)
 {
+  if (s && !s->is_homeless()) {
+    s->put();
+  }
+}
+
+void Objecter::_reopen_session(OSDSession *s)
+{
+  assert(s->lock.is_locked());
+
   entity_inst_t inst = osdmap->get_inst(s->osd);
   ldout(cct, 10) << "reopen_session osd." << s->osd << " session, addr now " << inst << dendl;
   if (s->con) {
@@ -962,11 +1118,13 @@ void Objecter::close_session(OSDSession *s)
     s->con->mark_down();
     logger->inc(l_osdc_osd_session_close);
   }
+  s->lock.get_write();
   s->ops.clear();
   s->linger_ops.clear();
   s->command_ops.clear();
   osd_sessions.erase(s->osd);
-  delete s;
+  s->lock.unlock();
+  s->put();
 
   logger->set(l_osdc_osd_sessions, osd_sessions.size());
 }
@@ -992,9 +1150,9 @@ struct C_Objecter_GetVersion : public Context {
   C_Objecter_GetVersion(Objecter *o, Context *c)
     : objecter(o), oldest(0), newest(0), fin(c) {}
   void finish(int r) {
-    if (r >= 0)
-      objecter->_get_latest_version(oldest, newest, fin);
-    else if (r == -EAGAIN) { // try again as instructed
+    if (r >= 0) {
+      objecter->get_latest_version(oldest, newest, fin);
+    else if (r == -EAGAIN) { // try again as instructed
       objecter->wait_for_latest_osdmap(fin);
     } else {
       // it doesn't return any other error codes!
@@ -1010,8 +1168,15 @@ void Objecter::wait_for_latest_osdmap(Context *fin)
   monc->get_version("osdmap", &c->newest, &c->oldest, c);
 }
 
+void Objecter::get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
+{
+  RWLock::WLocker wl(rwlock);
+  _get_latest_version(oldest, newest, fin);
+}
+
 void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
 {
+  assert(rwlock.is_wlocked());
   if (osdmap->get_epoch() >= newest) {
   ldout(cct, 10) << __func__ << " latest " << newest << ", have it" << dendl;
     if (fin)
@@ -1020,72 +1185,106 @@ void Objecter::_get_latest_version(epoch_t oldest, epoch_t newest, Context *fin)
   }
 
   ldout(cct, 10) << __func__ << " latest " << newest << ", waiting" << dendl;
-  wait_for_new_map(fin, newest, 0);
+  _wait_for_new_map(fin, newest, 0);
 }
 
 void Objecter::maybe_request_map()
 {
+  RWLock::RLocker rl(rwlock);
+  int r;
+  do {
+    r = _maybe_request_map();
+  } while (r == -EAGAIN);
+}
+
+int Objecter::_maybe_request_map()
+{
+  assert(rwlock.is_locked());
   int flag = 0;
   if (osdmap_full_flag()) {
-    ldout(cct, 10) << "maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl;
+    ldout(cct, 10) << "_maybe_request_map subscribing (continuous) to next osd map (FULL flag is set)" << dendl;
   } else {
-    ldout(cct, 10) << "maybe_request_map subscribing (onetime) to next osd map" << dendl;
+    ldout(cct, 10) << "_maybe_request_map subscribing (onetime) to next osd map" << dendl;
     flag = CEPH_SUBSCRIBE_ONETIME;
   }
   epoch_t epoch = osdmap->get_epoch() ? osdmap->get_epoch()+1 : 0;
-  if (monc->sub_want("osdmap", epoch, flag))
+  if (monc->sub_want("osdmap", epoch, flag)) {
     monc->renew_subs();
+  }
+  return 0;
 }
 
-void Objecter::wait_for_new_map(Context *c, epoch_t epoch, int err)
+void Objecter::_wait_for_new_map(Context *c, epoch_t epoch, int err)
 {
+  assert(rwlock.is_wlocked());
   waiting_for_map[epoch].push_back(pair<Context *, int>(c, err));
-  maybe_request_map();
+  int r = _maybe_request_map();
+  assert(r == 0);
+}
+
+void Objecter::wait_for_new_map(Context *c, epoch_t epoch, int err)
+{
+  RWLock::WLocker wl(rwlock);
+  _wait_for_new_map(c, epoch, err);
 }
 
 void Objecter::kick_requests(OSDSession *session)
 {
   ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
 
+  RWLock::WLocker wl(rwlock);
+
+  _kick_requests(session);
+}
+
+void Objecter::_kick_requests(OSDSession *session)
+{
+  assert(rwlock.is_locked());
+
+  session->lock.get_write();
+
   // resend ops
   map<ceph_tid_t,Op*> resend;  // resend in tid order
-  for (xlist<Op*>::iterator p = session->ops.begin(); !p.end();) {
-    Op *op = *p;
+  for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin(); p != session->ops.end();) {
+    Op *op = p->second;
     ++p;
     logger->inc(l_osdc_op_resend);
     if (op->should_resend) {
       if (!op->target.paused)
        resend[op->tid] = op;
     } else {
-      cancel_linger_op(op);
+      _cancel_linger_op(op);
     }
   }
+
   while (!resend.empty()) {
-    send_op(resend.begin()->second);
+    _send_op(resend.begin()->second);
     resend.erase(resend.begin());
   }
 
   // resend lingers
   map<uint64_t, LingerOp*> lresend;  // resend in order
-  for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j) {
+  for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) {
     logger->inc(l_osdc_linger_resend);
-    lresend[(*j)->linger_id] = *j;
-  }
-  while (!lresend.empty()) {
-    send_linger(lresend.begin()->second);
-    lresend.erase(lresend.begin());
+    lresend[j->first] = j->second;
   }
 
   // resend commands
   map<uint64_t,CommandOp*> cresend;  // resend in order
-  for (xlist<CommandOp*>::iterator k = session->command_ops.begin(); !k.end(); ++k) {
+  for (map<ceph_tid_t, CommandOp*>::iterator k = session->command_ops.begin(); k != session->command_ops.end(); ++k) {
     logger->inc(l_osdc_command_resend);
-    cresend[(*k)->tid] = *k;
+    cresend[k->first] = k->second;
   }
   while (!cresend.empty()) {
     _send_command(cresend.begin()->second);
     cresend.erase(cresend.begin());
   }
+  session->lock.unlock();
+
+  while (!lresend.empty()) {
+    _send_linger(lresend.begin()->second);
+    lresend.erase(lresend.begin());
+  }
 }
 
 void Objecter::schedule_tick()
@@ -1097,9 +1296,13 @@ void Objecter::schedule_tick()
 
 void Objecter::tick()
 {
+  if (!initialized.read())
+    return;
+
+  assert(rwlock.is_locked());
+
   ldout(cct, 10) << "tick" << dendl;
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
 
   // we are only called by C_Tick
   assert(tick_event);
@@ -1107,75 +1310,84 @@ void Objecter::tick()
 
   set<OSDSession*> toping;
 
+  int r = 0;
+
   // look for laggy requests
   utime_t cutoff = ceph_clock_now(cct);
   cutoff -= cct->_conf->objecter_timeout;  // timeout
 
-  unsigned laggy_ops = 0;
-  for (map<ceph_tid_t,Op*>::iterator p = ops.begin();
-       p != ops.end();
-       ++p) {
-    Op *op = p->second;
-    if (op->session && op->stamp < cutoff) {
-      ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd << " is laggy" << dendl;
-      toping.insert(op->session);
-      ++laggy_ops;
-    }
-  }
-  for (map<uint64_t,LingerOp*>::iterator p = linger_ops.begin();
-       p != linger_ops.end();
-       ++p) {
-    LingerOp *op = p->second;
-    if (op->session) {
-      ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
-      toping.insert(op->session);
-    } else {
-      ldout(cct, 10) << " lingering tid " << p->first << " does not have session" << dendl;
+  unsigned laggy_ops;
+
+  do {
+    laggy_ops = 0;
+    for (map<int,OSDSession*>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+      OSDSession *s = siter->second;
+      for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin();
+           p != s->ops.end();
+           ++p) {
+        Op *op = p->second;
+        assert(op->session);
+        if (op->stamp < cutoff) {
+          ldout(cct, 2) << " tid " << p->first << " on osd." << op->session->osd << " is laggy" << dendl;
+          toping.insert(op->session);
+          ++laggy_ops;
+        }
+      }
+      for (map<uint64_t,LingerOp*>::iterator p = s->linger_ops.begin();
+           p != s->linger_ops.end();
+           ++p) {
+        LingerOp *op = p->second;
+        assert(op->session);
+        ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
+        toping.insert(op->session);
+      }
+      for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
+           p != s->command_ops.end();
+           ++p) {
+        CommandOp *op = p->second;
+        assert(op->session);
+        ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
+        toping.insert(op->session);
+      }
     }
-  }
-  for (map<uint64_t,CommandOp*>::iterator p = command_ops.begin();
-       p != command_ops.end();
-       ++p) {
-    CommandOp *op = p->second;
-    if (op->session) {
-      ldout(cct, 10) << " pinging osd that serves command tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
-      toping.insert(op->session);
-    } else {
-      ldout(cct, 10) << " command tid " << p->first << " does not have session" << dendl;
+    if (num_homeless_ops.read() || !toping.empty()) {
+      r = _maybe_request_map();
+      if (r == -EAGAIN) {
+        toping.clear();
+      }
     }
-  }
+  } while (r == -EAGAIN);
+
   logger->set(l_osdc_op_laggy, laggy_ops);
   logger->set(l_osdc_osd_laggy, toping.size());
 
-  if (num_homeless_ops || !toping.empty())
-    maybe_request_map();
-
   if (!toping.empty()) {
     // send a ping to these osds, to ensure we detect any session resets
     // (osd reply message policy is lossy)
-    for (set<OSDSession*>::iterator i = toping.begin();
+    for (set<OSDSession*>::const_iterator i = toping.begin();
         i != toping.end();
         ++i) {
       (*i)->con->send_message(new MPing);
     }
   }
-    
+
   // reschedule
   schedule_tick();
 }
 
 void Objecter::resend_mon_ops()
 {
-  assert(client_lock.is_locked());
+  RWLock::WLocker wl(rwlock);
+
   ldout(cct, 10) << "resend_mon_ops" << dendl;
 
   for (map<ceph_tid_t,PoolStatOp*>::iterator p = poolstat_ops.begin(); p!=poolstat_ops.end(); ++p) {
-    poolstat_submit(p->second);
+    _poolstat_submit(p->second);
     logger->inc(l_osdc_poolstat_resend);
   }
 
   for (map<ceph_tid_t,StatfsOp*>::iterator p = statfs_ops.begin(); p!=statfs_ops.end(); ++p) {
-    fs_stats_submit(p->second);
+    _fs_stats_submit(p->second);
     logger->inc(l_osdc_statfs_resend);
   }
 
@@ -1218,15 +1430,20 @@ public:
   C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op),
                                                     objecter(objecter) {}
   void finish(int r) {
-    // note that objecter lock == timer lock, and is already held
-    objecter->op_cancel(op->tid, -ETIMEDOUT);
+    objecter->op_cancel(op->session, op->tid, -ETIMEDOUT);
   }
 };
 
 ceph_tid_t Objecter::op_submit(Op *op)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  RWLock::RLocker rl(rwlock);
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
+  return _op_submit_with_budget(op, lc);
+}
+
+ceph_tid_t Objecter::_op_submit_with_budget(Op *op, RWLock::Context& lc)
+{
+  assert(initialized.read());
 
   assert(op->ops.size() == op->out_bl.size());
   assert(op->ops.size() == op->out_rval.size());
@@ -1239,41 +1456,53 @@ ceph_tid_t Objecter::op_submit(Op *op)
 
   // throttle.  before we look at any state, because
   // take_op_budget() may drop our lock while it blocks.
-  take_op_budget(op);
+  _take_op_budget(op);
 
-  return _op_submit(op);
+  return _op_submit(op, lc);
 }
 
-ceph_tid_t Objecter::_op_submit(Op *op)
+ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
 {
-  // pick tid if we haven't got one yet
-  if (op->tid == ceph_tid_t(0)) {
-    ceph_tid_t mytid = ++last_tid;
-    op->tid = mytid;
-  }
-  assert(client_inc >= 0);
+  assert(rwlock.is_locked());
+
+  ldout(cct, 10) << __func__ << " op " << op << dendl;
 
   // pick target
-  num_homeless_ops++;  // initially; recalc_op_target() will decrement if it finds a target
-  int r = recalc_op_target(op);
-  bool check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
+  int r;
+  assert(op->session == NULL);
+  OSDSession *s = NULL;
+
+  bool check_for_latest_map;
+
+  while (true) {
+    r = _calc_target(&op->target);
+    check_for_latest_map = (r == RECALC_OP_TARGET_POOL_DNE);
+    if (_get_session(op->target.osd, &s, lc) == -EAGAIN ||
+        check_for_latest_map) {
+      lc.promote();
+      continue;
+    }
+    break;
+  }
+  assert(s);  // may be homeless
+
+  inflight_ops.inc();
 
   // add to gather set(s)
   if (op->onack) {
-    ++num_unacked;
+    num_unacked.inc();
   } else {
     ldout(cct, 20) << " note: not requesting ack" << dendl;
   }
   if (op->oncommit) {
-    ++num_uncommitted;
+    num_uncommitted.inc();
   } else {
     ldout(cct, 20) << " note: not requesting commit" << dendl;
   }
-  ops[op->tid] = op;
-
-  logger->set(l_osdc_op_active, ops.size());
 
+  logger->inc(l_osdc_op_active);
   logger->inc(l_osdc_op);
+
   if ((op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE))
     logger->inc(l_osdc_op_rmw);
   else if (op->target.flags & CEPH_OSD_FLAG_WRITE)
@@ -1317,50 +1546,65 @@ ceph_tid_t Objecter::_op_submit(Op *op)
   }
 
   // send?
-  ldout(cct, 10) << "op_submit oid " << op->target.base_oid
+  ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
            << " " << op->target.base_oloc << " " << op->target.target_oloc
           << " " << op->ops << " tid " << op->tid
-           << " osd." << (op->session ? op->session->osd : -1)
+           << " osd." << (!op->session->is_homeless() ? op->session->osd : -1)
            << dendl;
 
   assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
 
+  bool need_send = false;
+
   if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
       osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) {
-    ldout(cct, 10) << " paused modify " << op << " tid " << last_tid << dendl;
+    ldout(cct, 10) << " paused modify " << op << " tid " << last_tid.read() << dendl;
     op->target.paused = true;
-    maybe_request_map();
+    _maybe_request_map();
   } else if ((op->target.flags & CEPH_OSD_FLAG_READ) &&
             osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) {
-    ldout(cct, 10) << " paused read " << op << " tid " << last_tid << dendl;
+    ldout(cct, 10) << " paused read " << op << " tid " << last_tid.read() << dendl;
     op->target.paused = true;
-    maybe_request_map();
+    _maybe_request_map();
   } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && osdmap_full_flag()) {
-    ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
+    ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() << dendl;
     op->target.paused = true;
-    maybe_request_map();
-  } else if (op->session) {
-    send_op(op);
+    _maybe_request_map();
+  } else if (!s->is_homeless()) {
+    need_send = true;
   } else {
-    maybe_request_map();
+    _maybe_request_map();
+  }
+
+  MOSDOp *m = _prepare_osd_op(op);
+
+  s->lock.get_write();
+  if (op->tid == 0)
+    op->tid = last_tid.inc();
+  _session_op_assign(op, s);
+
+  if (need_send) {
+    _send_op(op, m);
   }
+  s->lock.unlock();
 
   if (check_for_latest_map) {
     _send_op_map_check(op);
   }
 
-  ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
-  
+  ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
+
   return op->tid;
 }
 
-int Objecter::op_cancel(ceph_tid_t tid, int r)
+int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
 
-  map<ceph_tid_t, Op*>::iterator p = ops.find(tid);
-  if (p == ops.end()) {
+  s->lock.get_write();
+
+  map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
+  if (p == s->ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
   }
@@ -1375,11 +1619,45 @@ int Objecter::op_cancel(ceph_tid_t tid, int r)
     op->oncommit->complete(r);
     op->oncommit = NULL;
   }
-  op_cancel_map_check(op);
-  finish_op(op);
+  _op_cancel_map_check(op);
+  _finish_op(op);
+  s->lock.unlock();
+
   return 0;
 }
 
+int Objecter::op_cancel(ceph_tid_t tid, int r)
+{
+  int ret = 0;
+
+  rwlock.get_write();
+
+start:
+
+  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    if (s->ops.find(tid) != s->ops.end()) {
+      s->lock.unlock();
+      ret = op_cancel(s, tid, r);
+      if (ret == -ENOENT) {
+        /* oh no! raced, maybe tid moved to another session, restarting */
+        goto start;
+      }
+      rwlock.unlock();
+      return ret;
+    }
+    s->lock.unlock();
+  }
+
+  if (homeless_session.ops.find(tid) != homeless_session.ops.end()) {
+    ret = op_cancel(&homeless_session, tid, r);
+  }
+  rwlock.unlock();
+
+  return ret;
+}
+
 bool Objecter::is_pg_changed(
   int oldprimary,
   const vector<int>& oldacting,
@@ -1437,8 +1715,10 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
   return p->raw_hash_to_pg(p->hash_key(key, ns));
 }
 
-int Objecter::calc_target(op_target_t *t)
+int Objecter::_calc_target(op_target_t *t)
 {
+  assert(rwlock.is_locked());
+
   bool is_read = t->flags & CEPH_OSD_FLAG_READ;
   bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
 
@@ -1478,8 +1758,10 @@ int Objecter::calc_target(op_target_t *t)
   } else {
     int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
                                           pgid);
-    if (ret == -ENOENT)
+    if (ret == -ENOENT) {
+      t->osd = -1;
       return RECALC_OP_TARGET_POOL_DNE;
+    }
   }
   int primary;
   vector<int> acting;
@@ -1499,8 +1781,8 @@ int Objecter::calc_target(op_target_t *t)
     t->pgid = pgid;
     t->acting = acting;
     t->primary = primary;
-    ldout(cct, 10) << __func__ << " pgid " << pgid
-                  << " acting " << acting << dendl;
+    ldout(cct, 10) << __func__ << " "
+                  << " pgid " << pgid << " acting " << acting << dendl;
     t->used_replica = false;
     if (primary == -1) {
       t->osd = -1;
@@ -1550,48 +1832,140 @@ int Objecter::calc_target(op_target_t *t)
   return RECALC_OP_TARGET_NO_ACTION;
 }
 
-int Objecter::recalc_op_target(Op *op)
+int Objecter::_map_session(op_target_t *target, OSDSession **s,
+                          RWLock::Context& lc)
 {
-  int r = calc_target(&op->target);
-  if (r == RECALC_OP_TARGET_NEED_RESEND) {
-    OSDSession *s = NULL;
-    if (op->target.osd >= 0)
-      s = get_session(op->target.osd);
-    if (op->session != s) {
-      if (!op->session)
-       num_homeless_ops--;
-      op->session_item.remove_myself();
-      op->session = s;
-      if (s)
-       s->ops.push_back(&op->session_item);
-      else
-       num_homeless_ops++;
+  int r = _calc_target(target);
+  if (r < 0) {
+    return r;
+  }
+  return _get_session(target->osd, s, lc);
+}
+
+void Objecter::_session_op_assign(Op *op, OSDSession *to)
+{
+  assert(rwlock.is_locked());
+  assert(to->lock.is_locked());
+  assert(!op->session);
+  assert(op->tid);
+
+  op->session = to;
+  to->ops[op->tid] = op;
+
+  if (to->is_homeless()) {
+    num_homeless_ops.inc();
+  }
+}
+
+void Objecter::_session_op_remove(Op *op)
+{
+  assert(rwlock.is_locked());
+  OSDSession *s = op->session;
+  assert(s);
+  assert(s->lock.is_locked());
+
+  if (s->is_homeless()) {
+    num_homeless_ops.dec();
+  }
+  s->ops.erase(op->tid);
+  op->session = NULL;
+}
+
+void Objecter::_session_linger_op_remove(LingerOp *info)
+{
+  assert(rwlock.is_locked());
+  OSDSession *s = info->session;
+  assert(s);
+  assert(s->lock.is_locked());
+
+  s->linger_ops.erase(info->linger_id);
+  info->session = NULL;
+}
+
+int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
+{
+  int r;
+  do {
+    r = _get_session(osd, psession, lc);
+    if (r == -EAGAIN) {
+      assert(!lc.is_wlocked());
+
+      if (!_promote_lock_check_race(lc)) {
+        return r;
+      }
     }
+  } while (r == -EAGAIN);
+  assert(r == 0);
+
+  return 0;
+}
+
+int Objecter::_get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession)
+{
+  return _get_osd_session(op->target.osd, lc, psession);
+}
+
+void Objecter::_session_op_validate(Op *op, RWLock::Context& lc, bool session_locked)
+{
+  assert(rwlock.is_locked());
+
+  if (op->session && op->session->osd != op->target.osd) {
+    OSDSession *orig_session = op->session;
+    _session_op_remove(op);
+    put_session(orig_session);
+  }
+}
+
+int Objecter::_recalc_op_target(Op *op, RWLock::Context& lc,
+                                bool src_session_locked)
+{
+  assert(rwlock.is_locked());
+
+  int r = _calc_target(&op->target);
+  if (r == RECALC_OP_TARGET_NEED_RESEND) {
+    _session_op_validate(op, lc, src_session_locked);
   }
   return r;
 }
 
-bool Objecter::recalc_linger_op_target(LingerOp *linger_op)
+bool Objecter::_promote_lock_check_race(RWLock::Context& lc)
+{
+  epoch_t epoch = osdmap->get_epoch();
+  lc.promote();
+  return (epoch == osdmap->get_epoch());
+}
+
+int Objecter::_recalc_linger_op_target(LingerOp *linger_op, RWLock::Context& lc)
 {
-  int r = calc_target(&linger_op->target);
+  assert(rwlock.is_locked());
+
+  int r = _calc_target(&linger_op->target);
   if (r == RECALC_OP_TARGET_NEED_RESEND) {
     ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
                   << " pgid " << linger_op->target.pgid
                   << " acting " << linger_op->target.acting << dendl;
     
-    OSDSession *s = linger_op->target.osd != -1 ?
-      get_session(linger_op->target.osd) : NULL;
+    OSDSession *s;
+    r = _get_osd_session(linger_op->target.osd, lc, &s);
+    if (r < 0) {
+      return r;
+    }
+
+    s->lock.get_write();
+
     if (linger_op->session != s) {
-      linger_op->session_item.remove_myself();
       linger_op->session = s;
-      if (s)
-       s->linger_ops.push_back(&linger_op->session_item);
+      s->linger_ops[linger_op->register_tid] = linger_op;
     }
+
+    s->lock.unlock();
+    put_session(s);
+    return RECALC_OP_TARGET_NEED_RESEND;
   }
   return r;
 }
 
-void Objecter::cancel_linger_op(Op *op)
+void Objecter::_cancel_linger_op(Op *op)
 {
   ldout(cct, 15) << "cancel_op " << op->tid << dendl;
 
@@ -1599,30 +1973,54 @@ void Objecter::cancel_linger_op(Op *op)
   delete op->onack;
   delete op->oncommit;
 
-  finish_op(op);
+  _finish_op(op);
 }
 
-void Objecter::finish_op(Op *op)
+void Objecter::_finish_op(Op *op)
 {
   ldout(cct, 15) << "finish_op " << op->tid << dendl;
 
-  op->session_item.remove_myself();
   if (op->budgeted)
     put_op_budget(op);
 
-  ops.erase(op->tid);
-  logger->set(l_osdc_op_active, ops.size());
+  assert(op->session->lock.is_wlocked());
+
+  op->session->ops.erase(op->tid);
+
+  logger->dec(l_osdc_op_active);
+
   assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
 
-  if (op->ontimeout)
+  if (op->ontimeout) {
     timer.cancel_event(op->ontimeout);
+  }
+
+  op->session->put();
+
+  inflight_ops.dec();
 
   delete op;
 }
 
-void Objecter::send_op(Op *op)
+void Objecter::finish_op(OSDSession *session, ceph_tid_t tid)
 {
-  ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl;
+  ldout(cct, 15) << "finish_op " << tid << dendl;
+  RWLock::RLocker rl(rwlock);
+  
+  RWLock::WLocker wl(session->lock);
+
+  map<ceph_tid_t, Op *>::iterator iter = session->ops.find(tid);
+  if (iter == session->ops.end())
+    return;
+
+  Op *op = iter->second;
+
+  _finish_op(op);
+}
+
+MOSDOp *Objecter::_prepare_osd_op(Op *op)
+{
+  assert(rwlock.is_locked());
 
   int flags = op->target.flags;
   flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
@@ -1631,24 +2029,10 @@ void Objecter::send_op(Op *op)
   if (op->onack)
     flags |= CEPH_OSD_FLAG_ACK;
 
-  assert(op->session->con);
-
-  // preallocated rx buffer?
-  if (op->con) {
-    ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
-    op->con->revoke_rx_buffer(op->tid);
-  }
-  if (op->outbl && op->outbl->length()) {
-    ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl;
-    op->con = op->session->con;
-    op->con->post_rx_buffer(op->tid, *op->outbl);
-  }
-
   op->target.paused = false;
-  op->incarnation = op->session->incarnation;
   op->stamp = ceph_clock_now(cct);
 
-  MOSDOp *m = new MOSDOp(client_inc, op->tid, 
+  MOSDOp *m = new MOSDOp(client_inc.read(), op->tid, 
                         op->target.target_oid, op->target.target_oloc,
                         op->target.pgid,
                         osdmap->get_epoch(),
@@ -1673,6 +2057,39 @@ void Objecter::send_op(Op *op)
   logger->inc(l_osdc_op_send);
   logger->inc(l_osdc_op_send_bytes, m->get_data().length());
 
+  return m;
+}
+
+void Objecter::_send_op(Op *op, MOSDOp *m)
+{
+  assert(rwlock.is_locked());
+  assert(op->session->lock.is_locked());
+
+  if (!m) {
+    assert(op->tid > 0);
+    m = _prepare_osd_op(op);
+  }
+
+  ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd << dendl;
+
+  ConnectionRef con = op->session->con;
+  assert(con);
+
+  // preallocated rx buffer?
+  if (op->con) {
+    ldout(cct, 20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+    op->con->revoke_rx_buffer(op->tid);
+  }
+  if (op->outbl && op->outbl->length()) {
+    ldout(cct, 20) << " posting rx buffer for " << op->tid << " on " << con << dendl;
+    op->con = con;
+    op->con->post_rx_buffer(op->tid, *op->outbl);
+  }
+
+  op->incarnation = op->session->incarnation;
+
+  m->set_tid(op->tid);
+
   op->session->con->send_message(m);
 }
 
@@ -1696,45 +2113,73 @@ int Objecter::calc_op_budget(Op *op)
   return op_budget;
 }
 
-void Objecter::throttle_op(Op *op, int op_budget)
+void Objecter::_throttle_op(Op *op, int op_budget)
 {
+  assert(rwlock.is_locked());
+
+  bool locked_for_write = rwlock.is_wlocked();
+
   if (!op_budget)
     op_budget = calc_op_budget(op);
   if (!op_throttle_bytes.get_or_fail(op_budget)) { //couldn't take right now
-    client_lock.Unlock();
+    rwlock.unlock();
     op_throttle_bytes.get(op_budget);
-    client_lock.Lock();
+    rwlock.get(locked_for_write);
   }
   if (!op_throttle_ops.get_or_fail(1)) { //couldn't take right now
-    client_lock.Unlock();
+    rwlock.unlock();
     op_throttle_ops.get(1);
-    client_lock.Lock();
+    rwlock.get(locked_for_write);
   }
 }
 
 void Objecter::unregister_op(Op *op)
 {
-  if (op->onack)
-    num_unacked--;
-  if (op->oncommit)
-    num_uncommitted--;
-  ops.erase(op->tid);
+  op->session->lock.get_write();
+  op->session->ops.erase(op->tid);
+  op->session->lock.unlock();
+  op->session->put();
+  op->session = NULL;
+
+  inflight_ops.dec();
 }
 
 /* This function DOES put the passed message before returning */
 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
   ldout(cct, 10) << "in handle_osd_op_reply" << dendl;
 
   // get pio
   ceph_tid_t tid = m->get_tid();
 
-  if (ops.count(tid) == 0) {
+  int osd_num = (int)m->get_source().num();
+
+  RWLock::RLocker l(rwlock);
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
+
+  map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
+  if (siter == osd_sessions.end()) {
+    ldout(cct, 7) << "handle_osd_op_reply " << tid
+                 << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ?
+                                                 " onnvram":" ack"))
+                 << " ... unknown osd" << dendl;
+    m->put();
+    return;
+  }
+
+  OSDSession *s = siter->second;
+  s->get();
+
+  s->lock.get_write();
+
+  map<ceph_tid_t, Op *>::iterator iter = s->ops.find(tid);
+  if (iter == s->ops.end()) {
     ldout(cct, 7) << "handle_osd_op_reply " << tid
            << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
            << " ... stray" << dendl;
+    s->lock.unlock();
+    s->put();
     m->put();
     return;
   }
@@ -1745,7 +2190,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                << " in " << m->get_pg()
                << " attempt " << m->get_retry_attempt()
                << dendl;
-  Op *op = ops[tid];
+  Op *op = iter->second;
 
   if (m->get_retry_attempt() >= 0) {
     if (m->get_retry_attempt() != (op->attempts - 1)) {
@@ -1754,6 +2199,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
                    << "; last attempt " << (op->attempts - 1) << " sent to "
                    << op->session->con->get_peer_addr() << dendl;
       m->put();
+      s->lock.unlock();
+      s->put();
       return;
     }
   } else {
@@ -1769,23 +2216,41 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   if (m->is_redirect_reply()) {
     ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
-    unregister_op(op);
+    if (op->onack)
+      num_unacked.dec();
+    if (op->oncommit)
+      num_uncommitted.dec();
+    _session_op_remove(op);
+    s->lock.unlock();
+    s->put();
+
+    // FIXME: two redirects could race and reorder
+
+    op->tid = 0;
     m->get_redirect().combine_with_locator(op->target.target_oloc,
                                           op->target.target_oid.name);
     op->target.flags |= CEPH_OSD_FLAG_REDIRECTED;
-    _op_submit(op);
+    _op_submit(op, lc);
     m->put();
     return;
   }
 
   if (rc == -EAGAIN) {
     ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl;
-    unregister_op(op);
-    _op_submit(op);
+
+    // new tid
+    s->ops.erase(op->tid);
+    op->tid = last_tid.inc();
+
+    _send_op(op);
+    s->lock.unlock();
     m->put();
     return;
   }
 
+  l.unlock();
+  lc.set_state(RWLock::Context::Untaken);
+
   if (op->objver)
     *op->objver = m->get_user_version();
   if (op->reply_epoch)
@@ -1830,14 +2295,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     op->replay_version = m->get_replay_version();
     onack = op->onack;
     op->onack = 0;  // only do callback once
-    num_unacked--;
+    num_unacked.dec();
     logger->inc(l_osdc_op_ack);
   }
   if (op->oncommit && (m->is_ondisk() || rc)) {
     ldout(cct, 15) << "handle_osd_op_reply safe" << dendl;
     oncommit = op->oncommit;
     op->oncommit = 0;
-    num_uncommitted--;
+    num_uncommitted.dec();
     logger->inc(l_osdc_op_commit);
   }
 
@@ -1852,10 +2317,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   // done with this tid?
   if (!op->onack && !op->oncommit) {
     ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
-    finish_op(op);
+    _finish_op(op);
   }
-  
-  ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
+
+  ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
+
+  // serialize completions
+  s->completion_lock.Lock();
+  s->lock.unlock();
 
   // do callbacks
   if (onack) {
@@ -1864,15 +2333,17 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   if (oncommit) {
     oncommit->complete(rc);
   }
+  s->completion_lock.Unlock();
 
   m->put();
+  s->put();
 }
 
 
 uint32_t Objecter::list_objects_seek(ListContext *list_context,
                                     uint32_t pos)
 {
-  assert(client_lock.is_locked());
+  RWLock::RLocker rl(rwlock);
   pg_t actual = osdmap->raw_pg_to_pg(pg_t(pos, list_context->pool_id));
   ldout(cct, 10) << "list_objects_seek " << list_context
                 << " pos " << pos << " -> " << actual << dendl;
@@ -1886,7 +2357,6 @@ uint32_t Objecter::list_objects_seek(ListContext *list_context,
 
 void Objecter::list_objects(ListContext *list_context, Context *onfinish)
 {
-  assert(client_lock.is_locked());
   ldout(cct, 10) << "list_objects" << dendl;
   ldout(cct, 20) << " pool_id " << list_context->pool_id
           << " pool_snap_seq " << list_context->pool_snap_seq
@@ -1913,8 +2383,10 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish)
     return;
   }
 
+  rwlock.get_read();
   const pg_pool_t *pool = osdmap->get_pg_pool(list_context->pool_id);
   int pg_num = pool->get_pg_num();
+  rwlock.unlock();
 
   if (list_context->starting_pg_num == 0) {     // there can't be zero pgs!
     list_context->starting_pg_num = pg_num;
@@ -1936,6 +2408,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish)
   list_context->bl.clear();
   C_List *onack = new C_List(list_context, onfinish, this);
   object_locator_t oloc(list_context->pool_id, list_context->nspace);
+
   pg_read(list_context->current_pg, oloc, op,
          &list_context->bl, 0, onack, &onack->epoch);
 }
@@ -2008,7 +2481,7 @@ int Objecter::create_pool_snap(int64_t pool, string& snap_name, Context *onfinis
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   op->name = snap_name;
   op->onfinish = onfinish;
@@ -2040,7 +2513,7 @@ int Objecter::allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid,
   ldout(cct, 10) << "allocate_selfmanaged_snap; pool: " << pool << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   C_SelfmanagedSnap *fin = new C_SelfmanagedSnap(psnapid, onfinish);
   op->onfinish = fin;
@@ -2065,7 +2538,7 @@ int Objecter::delete_pool_snap(int64_t pool, string& snap_name, Context *onfinis
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   op->name = snap_name;
   op->onfinish = onfinish;
@@ -2083,7 +2556,7 @@ int Objecter::delete_selfmanaged_snap(int64_t pool, snapid_t snap,
           << snap << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   op->onfinish = onfinish;
   op->pool_op = POOL_OP_DELETE_UNMANAGED_SNAP;
@@ -2106,7 +2579,7 @@ int Objecter::create_pool(string& name, Context *onfinish, uint64_t auid,
   PoolOp *op = new PoolOp;
   if (!op)
     return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = 0;
   op->name = name;
   op->onfinish = onfinish;
@@ -2129,7 +2602,7 @@ int Objecter::delete_pool(int64_t pool, Context *onfinish)
 
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   op->name = "delete";
   op->onfinish = onfinish;
@@ -2152,7 +2625,7 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
   ldout(cct, 10) << "change_pool_auid " << pool << " to " << auid << dendl;
   PoolOp *op = new PoolOp;
   if (!op) return -ENOMEM;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pool = pool;
   op->name = "change_pool_auid";
   op->onfinish = onfinish;
@@ -2174,13 +2647,14 @@ public:
   C_CancelPoolOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
                                                       objecter(objecter) {}
   void finish(int r) {
-    // note that objecter lock == timer lock, and is already held
     objecter->pool_op_cancel(tid, -ETIMEDOUT);
   }
 };
 
 void Objecter::pool_op_submit(PoolOp *op)
 {
+  RWLock::WLocker wl(rwlock);
+
   if (mon_timeout > 0) {
     op->ontimeout = new C_CancelPoolOp(op->tid, this);
     timer.add_event_after(mon_timeout, op->ontimeout);
@@ -2190,6 +2664,8 @@ void Objecter::pool_op_submit(PoolOp *op)
 
 void Objecter::_pool_op_submit(PoolOp *op)
 {
+  assert(rwlock.is_wlocked());
+
   ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
   MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
                           op->name, op->pool_op,
@@ -2211,37 +2687,54 @@ void Objecter::_pool_op_submit(PoolOp *op)
  */
 void Objecter::handle_pool_op_reply(MPoolOpReply *m)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  rwlock.get_read();
+
   ldout(cct, 10) << "handle_pool_op_reply " << *m << dendl;
   ceph_tid_t tid = m->get_tid();
-  if (pool_ops.count(tid)) {
-    PoolOp *op = pool_ops[tid];
+  map<ceph_tid_t, PoolOp *>::iterator iter = pool_ops.find(tid);
+  if (iter != pool_ops.end()) {
+    PoolOp *op = iter->second;
     ldout(cct, 10) << "have request " << tid << " at " << op << " Op: " << ceph_pool_op_name(op->pool_op) << dendl;
     if (op->blp)
       op->blp->claim(m->response_data);
     if (m->version > last_seen_osdmap_version)
       last_seen_osdmap_version = m->version;
     if (osdmap->get_epoch() < m->epoch) {
-      ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl;
-      wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+      rwlock.unlock();
+      rwlock.get_write();
+      if (osdmap->get_epoch() < m->epoch) {
+        ldout(cct, 20) << "waiting for client to reach epoch " << m->epoch << " before calling back" << dendl;
+        _wait_for_new_map(op->onfinish, m->epoch, m->replyCode);
+      }
     }
     else {
       op->onfinish->complete(m->replyCode);
     }
     op->onfinish = NULL;
-    finish_pool_op(op);
+    if (!rwlock.is_wlocked()) {
+      rwlock.unlock();
+      rwlock.get_write();
+    }
+    iter = pool_ops.find(tid);
+    if (iter != pool_ops.end()) {
+      _finish_pool_op(op);
+    }
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
   }
+  rwlock.unlock();
+
   ldout(cct, 10) << "done" << dendl;
   m->put();
 }
 
 int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
 
   map<ceph_tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
   if (it == pool_ops.end()) {
@@ -2254,17 +2747,20 @@ int Objecter::pool_op_cancel(ceph_tid_t tid, int r)
   PoolOp *op = it->second;
   if (op->onfinish)
     op->onfinish->complete(r);
-  finish_pool_op(op);
+
+  _finish_pool_op(op);
   return 0;
 }
 
-void Objecter::finish_pool_op(PoolOp *op)
+void Objecter::_finish_pool_op(PoolOp *op)
 {
+  assert(rwlock.is_wlocked());
   pool_ops.erase(op->tid);
   logger->set(l_osdc_poolop_active, pool_ops.size());
 
-  if (op->ontimeout)
+  if (op->ontimeout) {
     timer.cancel_event(op->ontimeout);
+  }
 
   delete op;
 }
@@ -2290,7 +2786,7 @@ void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *resu
   ldout(cct, 10) << "get_pool_stats " << pools << dendl;
 
   PoolStatOp *op = new PoolStatOp;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->pools = pools;
   op->pool_stats = result;
   op->onfinish = onfinish;
@@ -2299,16 +2795,19 @@ void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *resu
     op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
     timer.add_event_after(mon_timeout, op->ontimeout);
   }
+
+  RWLock::WLocker wl(rwlock);
+
   poolstat_ops[op->tid] = op;
 
   logger->set(l_osdc_poolstat_active, poolstat_ops.size());
 
-  poolstat_submit(op);
+  _poolstat_submit(op);
 }
 
-void Objecter::poolstat_submit(PoolStatOp *op)
+void Objecter::_poolstat_submit(PoolStatOp *op)
 {
-  ldout(cct, 10) << "poolstat_submit " << op->tid << dendl;
+  ldout(cct, 10) << "_poolstat_submit " << op->tid << dendl;
   monc->send_mon_message(new MGetPoolStats(monc->get_fsid(), op->tid, op->pools, last_seen_pgmap_version));
   op->last_submit = ceph_clock_now(cct);
 
@@ -2317,19 +2816,21 @@ void Objecter::poolstat_submit(PoolStatOp *op)
 
 void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
   ldout(cct, 10) << "handle_get_pool_stats_reply " << *m << dendl;
   ceph_tid_t tid = m->get_tid();
 
-  if (poolstat_ops.count(tid)) {
+  RWLock::WLocker wl(rwlock);
+  map<ceph_tid_t, PoolStatOp *>::iterator iter = poolstat_ops.find(tid);
+  if (iter != poolstat_ops.end()) {
     PoolStatOp *op = poolstat_ops[tid];
     ldout(cct, 10) << "have request " << tid << " at " << op << dendl;
     *op->pool_stats = m->pool_stats;
-    if (m->version > last_seen_pgmap_version)
+    if (m->version > last_seen_pgmap_version) {
       last_seen_pgmap_version = m->version;
+    }
     op->onfinish->complete(0);
-    finish_pool_stat_op(op);
+    _finish_pool_stat_op(op);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
   } 
@@ -2339,8 +2840,9 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
 
 int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
 
   map<ceph_tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
   if (it == poolstat_ops.end()) {
@@ -2353,17 +2855,20 @@ int Objecter::pool_stat_op_cancel(ceph_tid_t tid, int r)
   PoolStatOp *op = it->second;
   if (op->onfinish)
     op->onfinish->complete(r);
-  finish_pool_stat_op(op);
+  _finish_pool_stat_op(op);
   return 0;
 }
 
-void Objecter::finish_pool_stat_op(PoolStatOp *op)
+void Objecter::_finish_pool_stat_op(PoolStatOp *op)
 {
+  assert(rwlock.is_wlocked());
+
   poolstat_ops.erase(op->tid);
   logger->set(l_osdc_poolstat_active, poolstat_ops.size());
 
-  if (op->ontimeout)
+  if (op->ontimeout) {
     timer.cancel_event(op->ontimeout);
+  }
 
   delete op;
 }
@@ -2376,7 +2881,6 @@ public:
   C_CancelStatfsOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
                                                         objecter(objecter) {}
   void finish(int r) {
-    // note that objecter lock == timer lock, and is already held
     objecter->statfs_op_cancel(tid, -ETIMEDOUT);
   }
 };
@@ -2384,9 +2888,10 @@ public:
 void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
 {
   ldout(cct, 10) << "get_fs_stats" << dendl;
+  RWLock::WLocker l(rwlock);
 
   StatfsOp *op = new StatfsOp;
-  op->tid = ++last_tid;
+  op->tid = last_tid.inc();
   op->stats = &result;
   op->onfinish = onfinish;
   op->ontimeout = NULL;
@@ -2398,11 +2903,13 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
 
   logger->set(l_osdc_statfs_active, statfs_ops.size());
 
-  fs_stats_submit(op);
+  _fs_stats_submit(op);
 }
 
-void Objecter::fs_stats_submit(StatfsOp *op)
+void Objecter::_fs_stats_submit(StatfsOp *op)
 {
+  assert(rwlock.is_wlocked());
+
   ldout(cct, 10) << "fs_stats_submit" << op->tid << dendl;
   monc->send_mon_message(new MStatfs(monc->get_fsid(), op->tid, last_seen_pgmap_version));
   op->last_submit = ceph_clock_now(cct);
@@ -2412,8 +2919,10 @@ void Objecter::fs_stats_submit(StatfsOp *op)
 
 void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
+
   ldout(cct, 10) << "handle_fs_stats_reply " << *m << dendl;
   ceph_tid_t tid = m->get_tid();
 
@@ -2424,7 +2933,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
     if (m->h.version > last_seen_pgmap_version)
       last_seen_pgmap_version = m->h.version;
     op->onfinish->complete(0);
-    finish_statfs_op(op);
+    _finish_statfs_op(op);
   } else {
     ldout(cct, 10) << "unknown request " << tid << dendl;
   }
@@ -2434,8 +2943,9 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
 
 int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
 
   map<ceph_tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
   if (it == statfs_ops.end()) {
@@ -2448,17 +2958,20 @@ int Objecter::statfs_op_cancel(ceph_tid_t tid, int r)
   StatfsOp *op = it->second;
   if (op->onfinish)
     op->onfinish->complete(r);
-  finish_statfs_op(op);
+  _finish_statfs_op(op);
   return 0;
 }
 
-void Objecter::finish_statfs_op(StatfsOp *op)
+void Objecter::_finish_statfs_op(StatfsOp *op)
 {
+  assert(rwlock.is_wlocked());
+
   statfs_ops.erase(op->tid);
   logger->set(l_osdc_statfs_active, statfs_ops.size());
 
-  if (op->ontimeout)
+  if (op->ontimeout) {
     timer.cancel_event(op->ontimeout);
+  }
 
   delete op;
 }
@@ -2510,12 +3023,18 @@ void Objecter::ms_handle_reset(Connection *con)
     int osd = osdmap->identify_osd(con->get_peer_addr());
     if (osd >= 0) {
       ldout(cct, 1) << "ms_handle_reset on osd." << osd << dendl;
+      rwlock.get_read();
       map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
       if (p != osd_sessions.end()) {
        OSDSession *session = p->second;
-       reopen_session(session);
-       kick_requests(session);
+        session->lock.get_write();
+       _reopen_session(session);
+       _kick_requests(session);
+        session->lock.unlock();
+        rwlock.unlock();
        maybe_request_map();
+      } else {
+        rwlock.unlock();
       }
     } else {
       ldout(cct, 10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
@@ -2545,10 +3064,9 @@ void Objecter::op_target_t::dump(Formatter *f) const
   f->dump_int("precalc_pgid", (int)precalc_pgid);
 }
 
-void Objecter::dump_active()
+void Objecter::_dump_active(OSDSession *s)
 {
-  ldout(cct, 20) << "dump_active .. " << num_homeless_ops << " homeless" << dendl;
-  for (map<ceph_tid_t,Op*>::iterator p = ops.begin(); p != ops.end(); ++p) {
+  for (map<ceph_tid_t,Op*>::iterator p = s->ops.begin(); p != s->ops.end(); ++p) {
     Op *op = p->second;
     ldout(cct, 20) << op->tid << "\t" << op->target.pgid
                   << "\tosd." << (op->session ? op->session->osd : -1)
@@ -2557,10 +3075,27 @@ void Objecter::dump_active()
   }
 }
 
-void Objecter::dump_requests(Formatter *fmt) const
+void Objecter::_dump_active()
+{
+  ldout(cct, 20) << "dump_active .. " << num_homeless_ops.read() << " homeless" << dendl;
+  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    _dump_active(s);
+    s->lock.unlock();
+  }
+  _dump_active(&homeless_session);
+}
+
+void Objecter::dump_active()
 {
-  assert(client_lock.is_locked());
+  rwlock.get_read();
+  _dump_active();
+  rwlock.unlock();
+}
 
+void Objecter::dump_requests(Formatter *fmt)
+{
   fmt->open_object_section("requests");
   dump_ops(fmt);
   dump_linger_ops(fmt);
@@ -2571,11 +3106,10 @@ void Objecter::dump_requests(Formatter *fmt) const
   fmt->close_section(); // requests object
 }
 
-void Objecter::dump_ops(Formatter *fmt) const
+void Objecter::_dump_ops(const OSDSession *s, Formatter *fmt)
 {
-  fmt->open_array_section("ops");
-  for (map<ceph_tid_t,Op*>::const_iterator p = ops.begin();
-       p != ops.end();
+  for (map<ceph_tid_t,Op*>::const_iterator p = s->ops.begin();
+       p != s->ops.end();
        ++p) {
     Op *op = p->second;
     fmt->open_object_section("op");
@@ -2597,14 +3131,27 @@ void Objecter::dump_ops(Formatter *fmt) const
 
     fmt->close_section(); // op object
   }
+}
+
+void Objecter::dump_ops(Formatter *fmt)
+{
+  fmt->open_array_section("ops");
+  rwlock.get_read();
+  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    _dump_ops(s, fmt);
+    s->lock.unlock();
+  }
+  rwlock.unlock();
+  _dump_ops(&homeless_session, fmt);
   fmt->close_section(); // ops array
 }
 
-void Objecter::dump_linger_ops(Formatter *fmt) const
+void Objecter::_dump_linger_ops(const OSDSession *s, Formatter *fmt)
 {
-  fmt->open_array_section("linger_ops");
-  for (map<uint64_t, LingerOp*>::const_iterator p = linger_ops.begin();
-       p != linger_ops.end();
+  for (map<uint64_t, LingerOp*>::const_iterator p = s->linger_ops.begin();
+       p != s->linger_ops.end();
        ++p) {
     LingerOp *op = p->second;
     fmt->open_object_section("linger_op");
@@ -2614,14 +3161,27 @@ void Objecter::dump_linger_ops(Formatter *fmt) const
     fmt->dump_stream("registered") << op->registered;
     fmt->close_section(); // linger_op object
   }
+}
+
+void Objecter::dump_linger_ops(Formatter *fmt)
+{
+  fmt->open_array_section("linger_ops");
+  rwlock.get_read();
+  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    _dump_linger_ops(s, fmt);
+    s->lock.unlock();
+  }
+  rwlock.unlock();
+  _dump_linger_ops(&homeless_session, fmt);
   fmt->close_section(); // linger_ops array
 }
 
-void Objecter::dump_command_ops(Formatter *fmt) const
+void Objecter::_dump_command_ops(const OSDSession *s, Formatter *fmt)
 {
-  fmt->open_array_section("command_ops");
-  for (map<uint64_t, CommandOp*>::const_iterator p = command_ops.begin();
-       p != command_ops.end();
+  for (map<uint64_t, CommandOp*>::const_iterator p = s->command_ops.begin();
+       p != s->command_ops.end();
        ++p) {
     CommandOp *op = p->second;
     fmt->open_object_section("command_op");
@@ -2637,6 +3197,20 @@ void Objecter::dump_command_ops(Formatter *fmt) const
       fmt->dump_stream("target_pg") << op->target_pg;
     fmt->close_section(); // command_op object
   }
+}
+
+void Objecter::dump_command_ops(Formatter *fmt)
+{
+  fmt->open_array_section("command_ops");
+  rwlock.get_read();
+  for (map<int, OSDSession *>::const_iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    _dump_command_ops(s, fmt);
+    s->lock.unlock();
+  }
+  rwlock.unlock();
+  _dump_command_ops(&homeless_session, fmt);
   fmt->close_section(); // command_ops array
 }
 
@@ -2711,9 +3285,8 @@ bool Objecter::RequestStateHook::call(std::string command, cmdmap_t& cmdmap,
   Formatter *f = new_formatter(format);
   if (!f)
     f = new_formatter("json-pretty");
-  m_objecter->client_lock.Lock();
+  RWLock::RLocker rl(m_objecter->rwlock);
   m_objecter->dump_requests(f);
-  m_objecter->client_lock.Unlock();
   f->flush(out);
   delete f;
   return true;
@@ -2743,10 +3316,25 @@ void Objecter::blacklist_self(bool set)
 
 void Objecter::handle_command_reply(MCommandReply *m)
 {
-  map<ceph_tid_t,CommandOp*>::iterator p = command_ops.find(m->get_tid());
-  if (p == command_ops.end()) {
+  int osd_num = (int)m->get_source().num();
+
+  RWLock::WLocker wl(rwlock);
+
+  map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
+  if (siter == osd_sessions.end()) {
+    ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " osd not found" << dendl;
+    m->put();
+    return;
+  }
+
+  OSDSession *s = siter->second;
+
+  s->lock.get_read();
+  map<ceph_tid_t,CommandOp*>::iterator p = s->command_ops.find(m->get_tid());
+  if (p == s->command_ops.end()) {
     ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " not found" << dendl;
     m->put();
+    s->lock.unlock();
     return;
   }
 
@@ -2756,55 +3344,70 @@ void Objecter::handle_command_reply(MCommandReply *m)
     ldout(cct, 10) << "handle_command_reply tid " << m->get_tid() << " got reply from wrong connection "
                   << m->get_connection() << " " << m->get_source_inst() << dendl;
     m->put();
+    s->lock.unlock();
     return;
   }
   if (c->poutbl)
     c->poutbl->claim(m->get_data());
+
+  s->lock.unlock();
+
+
   _finish_command(c, m->r, m->rs);
   m->put();
 }
 
 class C_CancelCommandOp : public Context
 {
+  Objecter::OSDSession *s;
   ceph_tid_t tid;
   Objecter *objecter;
 public:
-  C_CancelCommandOp(ceph_tid_t tid, Objecter *objecter) : tid(tid),
-                                                         objecter(objecter) {}
+  C_CancelCommandOp(Objecter::OSDSession *s, ceph_tid_t tid, Objecter *objecter) : s(s), tid(tid),
+                                                    objecter(objecter) {}
   void finish(int r) {
-    // note that objecter lock == timer lock, and is already held
-    objecter->command_op_cancel(tid, -ETIMEDOUT);
+    objecter->command_op_cancel(s, tid, -ETIMEDOUT);
   }
 };
 
-int Objecter::_submit_command(CommandOp *c, ceph_tid_t *ptid)
+int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 {
-  ceph_tid_t tid = ++last_tid;
+  RWLock::WLocker wl(rwlock);
+
+  ceph_tid_t tid = last_tid.inc();
   ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
   c->tid = tid;
+  homeless_session.command_ops[tid] = c;
+  num_homeless_ops.inc();
+  c->session = &homeless_session;
+  (void)_recalc_command_target(c);
   if (osd_timeout > 0) {
-    c->ontimeout = new C_CancelCommandOp(tid, this);
+    c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
     timer.add_event_after(osd_timeout, c->ontimeout);
   }
-  command_ops[tid] = c;
-  num_homeless_ops++;
-  (void)recalc_command_target(c);
 
-  if (c->session)
+  if (!c->session->is_homeless()) {
     _send_command(c);
-  else
-    maybe_request_map();
+  } else {
+    int r = _maybe_request_map();
+    assert(r != -EAGAIN); /* because rwlock is already write-locked */
+  }
   if (c->map_check_error)
     _send_command_map_check(c);
   *ptid = tid;
 
-  logger->set(l_osdc_command_active, command_ops.size());
+  logger->inc(l_osdc_command_active);
+
   return 0;
 }
 
-int Objecter::recalc_command_target(CommandOp *c)
+int Objecter::_recalc_command_target(CommandOp *c)
 {
-  OSDSession *s = NULL;
+  assert(rwlock.is_wlocked());
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+  OSDSession *s = &homeless_session;
   c->map_check_error = 0;
   if (c->target_osd >= 0) {
     if (!osdmap->exists(c->target_osd)) {
@@ -2817,7 +3420,8 @@ int Objecter::recalc_command_target(CommandOp *c)
       c->map_check_error_str = "osd down";
       return RECALC_OP_TARGET_OSD_DOWN;
     }
-    s = get_session(c->target_osd);
+    int r = _get_session(c->target_osd, &s, lc);
+    assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
   } else {
     if (!osdmap->have_pg_pool(c->target_pg.pool())) {
       c->map_check_error = -ENOENT;
@@ -2827,23 +3431,39 @@ int Objecter::recalc_command_target(CommandOp *c)
     int primary;
     vector<int> acting;
     osdmap->pg_to_acting_osds(c->target_pg, &acting, &primary);
-    if (primary != -1)
-      s = get_session(primary);
+    if (primary != -1) {
+      int r = _get_session(primary, &s, lc);
+      assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+    }
   }
+
   if (c->session != s) {
-    ldout(cct, 10) << "recalc_command_target " << c->tid << " now " << c->session << dendl;
-    if (s) {
-      if (!c->session)
-       num_homeless_ops--;
-      c->session = s;
-      s->command_ops.push_back(&c->session_item);
-    } else {
-      c->session = NULL;
-      num_homeless_ops++;
+    ldout(cct, 10) << "_recalc_command_target " << c->tid << " now " << c->session << dendl;
+    if (c->session) {
+      if (c->session->is_homeless()) {
+        num_homeless_ops.dec();
+      }
+      if (c->tid) {
+        c->session->lock.get_write();
+        c->session->command_ops.erase(c->tid);
+        c->session->lock.unlock();
+      }
+      put_session(c->session);
+    }
+    c->session = s;
+    s->lock.get_write();
+    if (c->tid) {
+      s->command_ops[c->tid] = c;
     }
+    if (s->is_homeless())
+      num_homeless_ops.inc();
+    s->lock.unlock();
     return RECALC_OP_TARGET_NEED_RESEND;
+  } else {
+    put_session(s);
   }
-  ldout(cct, 20) << "recalc_command_target " << c->tid << " no change, " << c->session << dendl;
+
+  ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
   return RECALC_OP_TARGET_NO_ACTION;
 }
 
@@ -2860,13 +3480,14 @@ void Objecter::_send_command(CommandOp *c)
   logger->inc(l_osdc_command_send);
 }
 
-int Objecter::command_op_cancel(ceph_tid_t tid, int r)
+int Objecter::command_op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 {
-  assert(client_lock.is_locked());
-  assert(initialized);
+  assert(initialized.read());
+
+  RWLock::WLocker wl(rwlock);
 
-  map<ceph_tid_t, CommandOp*>::iterator it = command_ops.find(tid);
-  if (it == command_ops.end()) {
+  map<ceph_tid_t, CommandOp*>::iterator it = s->command_ops.find(tid);
+  if (it == s->command_ops.end()) {
     ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
     return -ENOENT;
   }
@@ -2874,23 +3495,27 @@ int Objecter::command_op_cancel(ceph_tid_t tid, int r)
   ldout(cct, 10) << __func__ << " tid " << tid << dendl;
 
   CommandOp *op = it->second;
-  command_cancel_map_check(op);
+  _command_cancel_map_check(op);
   _finish_command(op, -ETIMEDOUT, "");
   return 0;
 }
 
 void Objecter::_finish_command(CommandOp *c, int r, string rs)
 {
+  assert(rwlock.is_wlocked());
+
   ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl;
-  c->session_item.remove_myself();
   if (c->prs)
     *c->prs = rs;
   if (c->onfinish)
     c->onfinish->complete(r);
-  command_ops.erase(c->tid);
-  if (c->ontimeout)
+  c->session->lock.get_write();
+  c->session->command_ops.erase(c->tid);
+  c->session->lock.unlock();
+  if (c->ontimeout) {
     timer.cancel_event(c->ontimeout);
+  }
   c->put();
 
-  logger->set(l_osdc_command_active, command_ops.size());
+  logger->dec(l_osdc_command_active);
 }
index 1e6fcf34b565286579d2fdd1527071fe178b645c..e6e6c06966d610a575974664474b6c3559dada30 100644 (file)
 
 #include "include/types.h"
 #include "include/buffer.h"
-#include "include/xlist.h"
 
 #include "osd/OSDMap.h"
 #include "messages/MOSDOp.h"
 
 #include "common/admin_socket.h"
 #include "common/Timer.h"
+#include "common/RWLock.h"
 #include "include/rados/rados_types.h"
 #include "include/rados/rados_types.hpp"
 
@@ -1010,15 +1010,16 @@ public:
   CephContext *cct;
   std::multimap<string,string> crush_location;
 
-  bool initialized;
+  atomic_t initialized;
 
 private:
-  ceph_tid_t last_tid;
-  int client_inc;
+  atomic64_t last_tid;
+  atomic_t inflight_ops;
+  atomic_t client_inc;
   uint64_t max_linger_id;
-  int num_unacked;
-  int num_uncommitted;
-  int global_op_flags; // flags which are applied to each IO op
+  atomic_t num_unacked;
+  atomic_t num_uncommitted;
+  atomic_t global_op_flags; // flags which are applied to each IO op
   bool keep_balanced_budget;
   bool honor_osdmap_full;
 
@@ -1026,11 +1027,13 @@ public:
   void maybe_request_map();
 private:
 
+  int _maybe_request_map();
+
   version_t last_seen_osdmap_version;
   version_t last_seen_pgmap_version;
 
-  Mutex &client_lock;
-  SafeTimer &timer;
+  RWLock rwlock;
+  RWTimer timer;
 
   PerfCounters *logger;
   
@@ -1096,7 +1099,6 @@ public:
 
   struct Op {
     OSDSession *session;
-    xlist<Op*>::item session_item;
     int incarnation;
 
     op_target_t target;
@@ -1135,7 +1137,7 @@ public:
 
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
        int f, Context *ac, Context *co, version_t *ov) :
-      session(NULL), session_item(this), incarnation(0),
+      session(NULL), incarnation(0),
       target(o, ol, f),
       con(NULL),
       snapid(CEPH_NOSNAP),
@@ -1316,7 +1318,6 @@ public:
 
   // -- osd commands --
   struct CommandOp : public RefCountedObject {
-    xlist<CommandOp*>::item session_item;
     OSDSession *session;
     ceph_tid_t tid;
     vector<string> cmd;
@@ -1332,7 +1333,7 @@ public:
     utime_t last_submit;
 
     CommandOp()
-      : session_item(this), session(NULL),
+      : session(NULL),
        tid(0), poutbl(NULL), prs(NULL), target_osd(-1),
        map_dne_bound(0),
        map_check_error(0),
@@ -1340,10 +1341,10 @@ public:
        onfinish(NULL), ontimeout(NULL) {}
   };
 
-  int _submit_command(CommandOp *c, ceph_tid_t *ptid);
-  int recalc_command_target(CommandOp *c);
+  int submit_command(CommandOp *c, ceph_tid_t *ptid);
+  int _recalc_command_target(CommandOp *c);
   void _send_command(CommandOp *c);
-  int command_op_cancel(ceph_tid_t tid, int r);
+  int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
   void _finish_command(CommandOp *c, int r, string rs);
   void handle_command_reply(MCommandReply *m);
 
@@ -1368,7 +1369,6 @@ public:
     Context *on_reg_ack, *on_reg_commit;
 
     OSDSession *session;
-    xlist<LingerOp*>::item session_item;
 
     ceph_tid_t register_tid;
     epoch_t map_dne_bound;
@@ -1379,7 +1379,7 @@ public:
                 poutbl(NULL), pobjver(NULL),
                 registered(false),
                 on_reg_ack(NULL), on_reg_commit(NULL),
-                session(NULL), session_item(this),
+                session(NULL),
                 register_tid(0),
                 map_dne_bound(0) {}
 
@@ -1428,28 +1428,35 @@ public:
   };
 
   // -- osd sessions --
-  struct OSDSession {
-    xlist<Op*> ops;
-    xlist<LingerOp*> linger_ops;
-    xlist<CommandOp*> command_ops;
+  struct OSDSession : public RefCountedObject {
+    RWLock lock;
+    Mutex completion_lock;
+
+    // pending ops
+    map<ceph_tid_t,Op*>            ops;
+    map<uint64_t, LingerOp*>  linger_ops;
+    map<ceph_tid_t,CommandOp*>     command_ops;
+
     int osd;
     int incarnation;
     ConnectionRef con;
 
-    OSDSession(int o) : osd(o), incarnation(0), con(NULL) {}
+    OSDSession(int o) : lock("OSDSession"), completion_lock("OSDSession::completion_lock"), osd(o), incarnation(0), con(NULL) {}
+
+    bool is_homeless() { return (osd == -1); }
   };
   map<int,OSDSession*> osd_sessions;
 
 
  private:
-  // pending ops
-  map<ceph_tid_t,Op*>       ops;
-  int                       num_homeless_ops;
   map<uint64_t, LingerOp*>  linger_ops;
+
   map<ceph_tid_t,PoolStatOp*>    poolstat_ops;
   map<ceph_tid_t,StatfsOp*>      statfs_ops;
   map<ceph_tid_t,PoolOp*>        pool_ops;
-  map<ceph_tid_t,CommandOp*>     command_ops;
+  atomic_t                  num_homeless_ops;
+
+  OSDSession homeless_session;
 
   // ops waiting for an osdmap with a new pool or confirmation that
   // the pool does not exist (may be expanded to other uses later)
@@ -1461,9 +1468,11 @@ public:
 
   double mon_timeout, osd_timeout;
 
-  void send_op(Op *op);
-  void cancel_linger_op(Op *op);
-  void finish_op(Op *op);
+  MOSDOp *_prepare_osd_op(Op *op);
+  void _send_op(Op *op, MOSDOp *m = NULL);
+  void _cancel_linger_op(Op *op);
+  void finish_op(OSDSession *session, ceph_tid_t tid);
+  void _finish_op(Op *op);
   static bool is_pg_changed(
     int oldprimary,
     const vector<int>& oldacting,
@@ -1478,30 +1487,42 @@ public:
     RECALC_OP_TARGET_OSD_DOWN,
   };
   bool osdmap_full_flag() const;
-  bool target_should_be_paused(op_target_t *op);
-
-  int calc_target(op_target_t *t);
-  int recalc_op_target(Op *op);
-  bool recalc_linger_op_target(LingerOp *op);
 
-  void send_linger(LingerOp *info);
+  bool target_should_be_paused(op_target_t *op);
+  int _calc_target(op_target_t *t);
+  int _map_session(op_target_t *op, OSDSession **s,
+                  RWLock::Context& lc);
+  void _session_op_remove(Op *op);
+  void _session_linger_op_remove(LingerOp *info);
+  void _session_op_assign(Op *op, OSDSession *s);
+  int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession);
+  int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked);
+  int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession);
+  void _session_op_validate(Op *op, RWLock::Context& lc, bool session_locked);
+  int _recalc_op_target(Op *op, RWLock::Context& lc, bool session_locked = false);
+  int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc);
+
+  void _linger_submit(LingerOp *info);
+  void _send_linger(LingerOp *info);
   void _linger_ack(LingerOp *info, int r);
   void _linger_commit(LingerOp *info, int r);
 
-  void check_op_pool_dne(Op *op);
+  void _check_op_pool_dne(Op *op);
   void _send_op_map_check(Op *op);
-  void op_cancel_map_check(Op *op);
-  void check_linger_pool_dne(LingerOp *op);
+  void _op_cancel_map_check(Op *op);
+  void _check_linger_pool_dne(LingerOp *op);
   void _send_linger_map_check(LingerOp *op);
-  void linger_cancel_map_check(LingerOp *op);
-  void check_command_map_dne(CommandOp *op);
+  void _linger_cancel_map_check(LingerOp *op);
+  void _check_command_map_dne(CommandOp *op);
   void _send_command_map_check(CommandOp *op);
-  void command_cancel_map_check(CommandOp *op);
+  void _command_cancel_map_check(CommandOp *op);
 
   void kick_requests(OSDSession *session);
+  void _kick_requests(OSDSession *session);
 
-  OSDSession *get_session(int osd);
-  void reopen_session(OSDSession *session);
+  int _get_session(int osd, OSDSession **session, RWLock::Context& lc);
+  void put_session(OSDSession *s);
+  void _reopen_session(OSDSession *session);
   void close_session(OSDSession *session);
   
   void _list_reply(ListContext *list_context, int r, Context *final_finish,
@@ -1516,11 +1537,12 @@ public:
    * If throttle_op needs to throttle it will unlock client_lock.
    */
   int calc_op_budget(Op *op);
-  void throttle_op(Op *op, int op_size=0);
-  void take_op_budget(Op *op) {
+  void _throttle_op(Op *op, int op_size=0);
+  void _take_op_budget(Op *op) {
+    assert(rwlock.is_locked());
     int op_budget = calc_op_budget(op);
     if (keep_balanced_budget) {
-      throttle_op(op, op_budget);
+      _throttle_op(op, op_budget);
     } else {
       op_throttle_bytes.take(op_budget);
       op_throttle_ops.take(1);
@@ -1537,7 +1559,7 @@ public:
 
  public:
   Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
-          OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout,
+          OSDMap *om, double mon_timeout,
           double osd_timeout) :
     messenger(m), monc(mc), osdmap(om), cct(cct_),
     initialized(false),
@@ -1547,10 +1569,12 @@ public:
     keep_balanced_budget(false), honor_osdmap_full(true),
     last_seen_osdmap_version(0),
     last_seen_pgmap_version(0),
-    client_lock(l), timer(t),
+    rwlock("Objecter::rwlock"),
+    timer(cct, rwlock),
     logger(NULL), tick_event(NULL),
     m_request_state_hook(NULL),
     num_homeless_ops(0),
+    homeless_session(-1),
     mon_timeout(mon_timeout),
     osd_timeout(osd_timeout),
     op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
@@ -1562,10 +1586,8 @@ public:
     assert(!logger);
   }
 
-  void init_unlocked();
-  void init_locked();
-  void shutdown_locked();
-  void shutdown_unlocked();
+  void init();
+  void shutdown();
 
   /**
    * Tell the objecter to throttle outgoing ops according to its
@@ -1580,7 +1602,8 @@ public:
   void set_honor_osdmap_full() { honor_osdmap_full = true; }
   void unset_honor_osdmap_full() { honor_osdmap_full = false; }
 
-  void scan_requests(bool force_resend,
+  void _scan_requests(OSDSession *s,
+                     bool force_resend,
                     bool force_resend_writes,
                     map<ceph_tid_t, Op*>& need_resend,
                     list<LingerOp*>& need_resend_linger,
@@ -1596,45 +1619,59 @@ public:
   void handle_osd_map(class MOSDMap *m);
   void wait_for_osd_map();
 
+  int pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap);
+  int pool_snap_get_info(int64_t poolid, snapid_t snap, pool_snap_info_t *info);
+  int pool_snap_list(int64_t poolid, vector<uint64_t> *snaps);
 private:
+  bool _promote_lock_check_race(RWLock::Context& lc);
+
   // low-level
-  ceph_tid_t _op_submit(Op *op);
+  ceph_tid_t _op_submit(Op *op, RWLock::Context& lc);
+  ceph_tid_t _op_submit_with_budget(Op *op, RWLock::Context& lc);
   inline void unregister_op(Op *op);
 
   // public interface
 public:
   ceph_tid_t op_submit(Op *op);
   bool is_active() {
-    return !(ops.empty() && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty());
+    return !((!inflight_ops.read()) && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty());
   }
 
   /**
    * Output in-flight requests
    */
+  void _dump_active(OSDSession *s);
+  void _dump_active();
   void dump_active();
-  void dump_requests(Formatter *fmt) const;
-  void dump_ops(Formatter *fmt) const;
-  void dump_linger_ops(Formatter *fmt) const;
-  void dump_command_ops(Formatter *fmt) const;
+  void dump_requests(Formatter *fmt);
+  void _dump_ops(const OSDSession *s, Formatter *fmt);
+  void dump_ops(Formatter *fmt);
+  void _dump_linger_ops(const OSDSession *s, Formatter *fmt);
+  void dump_linger_ops(Formatter *fmt);
+  void _dump_command_ops(const OSDSession *s, Formatter *fmt);
+  void dump_command_ops(Formatter *fmt);
   void dump_pool_ops(Formatter *fmt) const;
   void dump_pool_stat_ops(Formatter *fmt) const;
   void dump_statfs_ops(Formatter *fmt) const;
 
-  int get_client_incarnation() const { return client_inc; }
-  void set_client_incarnation(int inc) { client_inc = inc; }
+  int get_client_incarnation() const { return client_inc.read(); }
+  void set_client_incarnation(int inc) { client_inc.set(inc); }
 
   void wait_for_new_map(Context *c, epoch_t epoch, int err=0);
+  void _wait_for_new_map(Context *c, epoch_t epoch, int err=0);
   void wait_for_latest_osdmap(Context *fin);
+  void get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
   void _get_latest_version(epoch_t oldest, epoch_t neweset, Context *fin);
 
   /** Get the current set of global op flags */
-  int get_global_op_flags() { return global_op_flags; }
-  /** Add a flag to the global op flags */
-  void add_global_op_flags(int flag) { global_op_flags |= flag; }
-  /** Clear the passed flags from the global op flag set */
-  void clear_global_op_flag(int flags) { global_op_flags &= ~flags; }
+  int get_global_op_flags() { return global_op_flags.read(); }
+  /** Add a flag to the global op flags, not really atomic operation */
+  void add_global_op_flags(int flag) { global_op_flags.set(global_op_flags.read() | flag); } 
+  /** Clear the passed flags from the global op flag set, not really atomic operation */
+  void clear_global_op_flag(int flags) { global_op_flags.set(global_op_flags.read() & ~flags); }
 
   /// cancel an in-progress request with the given return code
+  int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
   int op_cancel(ceph_tid_t tid, int r);
 
   // commands
@@ -1649,7 +1686,7 @@ public:
     c->prs = prs;
     c->onfinish = onfinish;
     c->target_osd = osd;
-    return _submit_command(c, ptid);
+    return submit_command(c, ptid);
   }
   int pg_command(pg_t pgid, vector<string>& cmd,
                 const bufferlist& inbl, ceph_tid_t *ptid,
@@ -1661,7 +1698,7 @@ public:
     c->prs = prs;
     c->onfinish = onfinish;
     c->target_pg = pgid;
-    return _submit_command(c, ptid);
+    return submit_command(c, ptid);
   }
 
   // mid-level helpers
@@ -1669,7 +1706,7 @@ public:
               ObjectOperation& op,
               const SnapContext& snapc, utime_t mtime, int flags,
               Context *onack, Context *oncommit, version_t *objver = NULL) {
-    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->priority = op.priority;
     o->mtime = mtime;
     o->snapc = snapc;
@@ -1687,7 +1724,7 @@ public:
             ObjectOperation& op,
             snapid_t snapid, bufferlist *pbl, int flags,
             Context *onack, version_t *objver = NULL) {
-    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onack, NULL, objver);
+    Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onack, NULL, objver);
     o->priority = op.priority;
     o->snapid = snapid;
     o->outbl = pbl;
@@ -1709,7 +1746,7 @@ public:
                Context *onack,
                epoch_t *reply_epoch) {
     Op *o = new Op(object_t(), oloc,
-                  op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ,
+                  op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ,
                   onack, NULL, NULL);
     o->target.precalc_pgid = true;
     o->target.base_pgid = pg_t(hash, oloc.pool);
@@ -1771,7 +1808,7 @@ public:
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_STAT;
     C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver);
     o->snapid = snap;
     o->outbl = &fin->bl;
     return op_submit(o);
@@ -1788,7 +1825,7 @@ public:
     ops[i].op.extent.length = len;
     ops[i].op.extent.truncate_size = 0;
     ops[i].op.extent.truncate_seq = 0;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
     o->snapid = snap;
     o->outbl = pbl;
     return op_submit(o);
@@ -1806,7 +1843,7 @@ public:
     ops[i].op.extent.length = len;
     ops[i].op.extent.truncate_size = trunc_size;
     ops[i].op.extent.truncate_seq = trunc_seq;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
     o->snapid = snap;
     o->outbl = pbl;
     return op_submit(o);
@@ -1822,7 +1859,7 @@ public:
     ops[i].op.extent.length = len;
     ops[i].op.extent.truncate_size = 0;
     ops[i].op.extent.truncate_seq = 0;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
     o->snapid = snap;
     o->outbl = pbl;
     return op_submit(o);
@@ -1838,7 +1875,7 @@ public:
     ops[i].op.xattr.value_len = 0;
     if (name)
       ops[i].indata.append(name);
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
     o->snapid = snap;
     o->outbl = pbl;
     return op_submit(o);
@@ -1852,7 +1889,7 @@ public:
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
     C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, fin, 0, objver);
     o->snapid = snap;
     o->outbl = &fin->bl;
     return op_submit(o);
@@ -1862,7 +1899,7 @@ public:
                  snapid_t snap, bufferlist *pbl, int flags,
                  Context *onfinish,
                  version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
-    return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags | CEPH_OSD_FLAG_READ, onfinish, objver);
+    return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, onfinish, objver);
   }
 
      
@@ -1872,7 +1909,7 @@ public:
                const SnapContext& snapc, int flags,
                Context *onack, Context *oncommit,
                version_t *objver = NULL) {
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1890,7 +1927,7 @@ public:
     ops[i].op.extent.truncate_size = 0;
     ops[i].op.extent.truncate_seq = 0;
     ops[i].indata = bl;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1908,7 +1945,7 @@ public:
     ops[i].op.extent.truncate_size = 0;
     ops[i].op.extent.truncate_seq = 0;
     ops[i].indata = bl;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1927,7 +1964,7 @@ public:
     ops[i].op.extent.truncate_size = trunc_size;
     ops[i].op.extent.truncate_seq = trunc_seq;
     ops[i].indata = bl;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1942,7 +1979,7 @@ public:
     ops[i].op.extent.offset = 0;
     ops[i].op.extent.length = bl.length();
     ops[i].indata = bl;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1959,7 +1996,7 @@ public:
     ops[i].op.extent.offset = trunc_size;
     ops[i].op.extent.truncate_size = trunc_size;
     ops[i].op.extent.truncate_seq = trunc_seq;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -1973,7 +2010,7 @@ public:
     ops[i].op.op = CEPH_OSD_OP_ZERO;
     ops[i].op.extent.offset = off;
     ops[i].op.extent.length = len;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -2000,7 +2037,7 @@ public:
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_CREATE;
     ops[i].op.flags = create_flags;
-    Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -2012,7 +2049,7 @@ public:
     vector<OSDOp> ops;
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_DELETE;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -2024,7 +2061,7 @@ public:
     vector<OSDOp> ops;
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = op;
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->snapc = snapc;
     return op_submit(o);
   }
@@ -2041,7 +2078,7 @@ public:
     if (name)
       ops[i].indata.append(name);
     ops[i].indata.append(bl);
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -2058,7 +2095,7 @@ public:
     ops[i].op.xattr.value_len = 0;
     if (name)
       ops[i].indata.append(name);
-    Op *o = new Op(oid, oloc, ops, flags | global_op_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
+    Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);
@@ -2085,28 +2122,28 @@ public:
 
   void handle_pool_op_reply(MPoolOpReply *m);
   int pool_op_cancel(ceph_tid_t tid, int r);
-  void finish_pool_op(PoolOp *op);
+  void _finish_pool_op(PoolOp *op);
 
   // --------------------------
   // pool stats
 private:
-  void poolstat_submit(PoolStatOp *op);
+  void _poolstat_submit(PoolStatOp *op);
 public:
   void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
   void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
                      Context *onfinish);
   int pool_stat_op_cancel(ceph_tid_t tid, int r);
-  void finish_pool_stat_op(PoolStatOp *op);
+  void _finish_pool_stat_op(PoolStatOp *op);
 
   // ---------------------------
   // df stats
 private:
-  void fs_stats_submit(StatfsOp *op);
+  void _fs_stats_submit(StatfsOp *op);
 public:
   void handle_fs_stats_reply(MStatfsReply *m);
   void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
   int statfs_op_cancel(ceph_tid_t tid, int r);
-  void finish_statfs_op(StatfsOp *op);
+  void _finish_statfs_op(StatfsOp *op);
 
   // ---------------------------
   // some scatter/gather hackery