]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: restructure op_shardedwq 13343/head
authorSage Weil <sage@redhat.com>
Mon, 20 Feb 2017 16:49:08 +0000 (11:49 -0500)
committerSage Weil <sage@redhat.com>
Sat, 25 Feb 2017 04:22:19 +0000 (23:22 -0500)
This is difficult to break into pieces, so one big fat commit it is.

A few trivial bits

- include epoch in PGQueueable.
- PGQueuable operator<<
- remove op_wq ref from OSDService; use simple set of queue methods instead

The big stuff:

- Fast dispatch now passes messages directly to the queue based on an
spg_t.  The exception is MOSDOp's from legacy clients.  We add a
waiting_for_map mechanism on the front-side that is similar to but simpler
than the previous one so that we can map those legacy requests to an
accurate spg_t.
- The dequeue path now has a waiting_for_pg mechanism.  It also uses a
much simpler set of data structures that should make it much faster than
the previous incarnation.
- Shutdown works a bit differently; we drain the queue instead of trying
to remove work for individual PGs.  This lets us remove the dequeue_pg
machinery.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OSDMap.h
src/osd/OpRequest.cc
src/osd/OpRequest.h
src/osd/PG.cc
src/osd/PG.h
src/osd/PrimaryLogPG.cc
src/osd/Session.h

index 71d6dd6f6d7f678f13d834787e2bf8e222f6e7cc..c631a54e57d60c97902402493006e167c91e5052 100644 (file)
@@ -853,6 +853,8 @@ OPTION(osd_backoff_on_degraded, OPT_BOOL, false) // [mainly for debug?] object u
 OPTION(osd_backoff_on_down, OPT_BOOL, true)      // pg in down/incomplete state
 OPTION(osd_backoff_on_peering, OPT_BOOL, false)  // [debug] pg peering
 OPTION(osd_debug_crash_on_ignored_backoff, OPT_BOOL, false) // crash osd if client ignores a backoff; useful for debugging
+OPTION(osd_debug_inject_dispatch_delay_probability, OPT_DOUBLE, 0)
+OPTION(osd_debug_inject_dispatch_delay_duration, OPT_DOUBLE, .1)
 OPTION(osd_debug_drop_ping_probability, OPT_DOUBLE, 0)
 OPTION(osd_debug_drop_ping_duration, OPT_INT, 0)
 OPTION(osd_debug_op_order, OPT_BOOL, false)
index eba3a132824cb051e0d6788faaeaaf4954a4c545..537f96f6242600bb103a6e0ba993852d26948fe3 100644 (file)
@@ -223,7 +223,6 @@ OSDService::OSDService(OSD *osd) :
   logger(osd->logger),
   recoverystate_perf(osd->recoverystate_perf),
   monc(osd->monc),
-  op_wq(osd->op_shardedwq),
   peering_wq(osd->peering_wq),
   recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
                  &osd->disk_tp),
@@ -1042,7 +1041,6 @@ void OSDService::share_map(
   }
 }
 
-
 void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
 {
   if (!map)
@@ -1459,14 +1457,14 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
   }
 }
 
+void OSDService::enqueue_back(spg_t pgid, PGQueueable qi)
+{
+  osd->op_shardedwq.queue(make_pair(pgid, qi));
+}
 
-void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+void OSDService::enqueue_front(spg_t pgid, PGQueueable qi)
 {
-  FUNCTRACE();
-  if (dequeued)
-    osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued);
-  else
-    osd->op_shardedwq.dequeue(pg);
+  osd->op_shardedwq.queue_front(make_pair(pgid, qi));
 }
 
 void OSDService::queue_for_peering(PG *pg)
@@ -1474,17 +1472,19 @@ void OSDService::queue_for_peering(PG *pg)
   peering_wq.queue(pg);
 }
 
-void OSDService::queue_for_snap_trim(PG *pg) {
+void OSDService::queue_for_snap_trim(PG *pg)
+{
   dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
-  op_wq.queue(
+  osd->op_shardedwq.queue(
     make_pair(
-      pg,
+      pg->info.pgid,
       PGQueueable(
        PGSnapTrim(pg->get_osdmap()->get_epoch()),
        cct->_conf->osd_snap_trim_cost,
        cct->_conf->osd_snap_trim_priority,
        ceph_clock_now(),
-       entity_inst_t())));
+       entity_inst_t(),
+       pg->get_osdmap()->get_epoch())));
 }
 
 
@@ -2766,7 +2766,9 @@ int OSD::shutdown()
 
   service.start_shutdown();
 
-  clear_waiting_sessions();
+  // stop sending work to pgs.  this just prevents any new work in _process
+  // from racing with on_shutdown and potentially entering the pg after.
+  op_shardedwq.drain();
 
   // Shutdown PGs
   {
@@ -2783,12 +2785,14 @@ int OSD::shutdown()
   }
   clear_pg_stat_queue();
 
-  // finish ops
-  op_shardedwq.drain(); // should already be empty except for laggard PGs
+  // drain op queue again (in case PGs requeued something)
+  op_shardedwq.drain();
   {
     finished.clear(); // zap waiters (bleh, this is messy)
   }
 
+  op_shardedwq.clear_pg_slots();
+
   // unregister commands
   cct->get_admin_socket()->unregister_command("status");
   cct->get_admin_socket()->unregister_command("flush_journal");
@@ -3118,7 +3122,7 @@ void OSD::recursive_remove_collection(CephContext* cct,
   vector<ghobject_t> objects;
   store->collection_list(tmp, ghobject_t(), ghobject_t::get_max(),
                         INT_MAX, &objects, 0);
-
+  generic_dout(10) << __func__ << " " << objects << dendl;
   // delete them.
   int removed = 0;
   for (vector<ghobject_t>::iterator p = objects.begin();
@@ -3318,34 +3322,6 @@ PG *OSD::_create_lock_pg(
   return pg;
 }
 
-PGRef OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op,
-                                 Session *session)
-{
-  if (!session) {
-    return PGRef();
-  }
-  // get_pg_or_queue_for_pg is only called from the fast_dispatch path where
-  // the session_dispatch_lock must already be held.
-  assert(session->session_dispatch_lock.is_locked());
-  RWLock::RLocker l(pg_map_lock);
-
-  ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
-  if (i == pg_map.end())
-    session->waiting_for_pg[pgid];
-
-  auto wlistiter = session->waiting_for_pg.find(pgid);
-
-  PG *out = NULL;
-  if (wlistiter == session->waiting_for_pg.end()) {
-    out = i->second;
-  } else {
-    op->get();
-    wlistiter->second.push_back(*op);
-    register_session_waiting_on_pg(session, pgid);
-  }
-  return PGRef(out);
-}
-
 PG *OSD::_lookup_lock_pg(spg_t pgid)
 {
   RWLock::RLocker l(pg_map_lock);
@@ -3750,12 +3726,12 @@ void OSD::handle_pg_peering_evt(
       dout(10) << *pg << " is new" << dendl;
 
       pg->queue_peering_event(evt);
+      wake_pg_waiters(pg);
       pg->unlock();
-      wake_pg_waiters(pgid);
       return;
     }
     case RES_SELF: {
-        old_pg_state->lock();
+      old_pg_state->lock();
       OSDMapRef old_osd_map = old_pg_state->get_osdmap();
       int old_role = old_pg_state->role;
       vector<int> old_up = old_pg_state->up;
@@ -3785,8 +3761,8 @@ void OSD::handle_pg_peering_evt(
       dout(10) << *pg << " is new (resurrected)" << dendl;
 
       pg->queue_peering_event(evt);
+      wake_pg_waiters(pg);
       pg->unlock();
-      wake_pg_waiters(resurrected);
       return;
     }
     case RES_PARENT: {
@@ -3826,8 +3802,8 @@ void OSD::handle_pg_peering_evt(
 
       //parent->queue_peering_event(evt);
       parent->queue_null(osdmap->get_epoch(), osdmap->get_epoch());
+      wake_pg_waiters(parent);
       parent->unlock();
-      wake_pg_waiters(resurrected);
       return;
     }
     }
@@ -4790,6 +4766,7 @@ bool remove_dir(
     store->get_ideal_list_max(),
     &olist,
     &next);
+  generic_dout(10) << __func__ << " " << olist << dendl;
   // default cont to true, this is safe because caller(OSD::RemoveWQ::_process()) 
   // will recheck the answer before it really goes on.
   bool cont = true;
@@ -6014,9 +5991,6 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
   }
 }
 
-
-
-
 bool OSD::heartbeat_dispatch(Message *m)
 {
   dout(30) << "heartbeat_dispatch " << m << dendl;
@@ -6065,110 +6039,71 @@ bool OSD::ms_dispatch(Message *m)
   return true;
 }
 
-void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+void OSD::maybe_share_map(
+  Session *session,
+  OpRequestRef op,
+  OSDMapRef osdmap)
 {
-  assert(session->session_dispatch_lock.is_locked());
-  assert(session->osdmap == osdmap);
-
-  auto i = session->waiting_on_map.begin();
-  while (i != session->waiting_on_map.end()) {
-    OpRequest *op = &(*i);
-    session->waiting_on_map.erase(i++);
-    if (!dispatch_op_fast(op, osdmap)) {
-      session->waiting_on_map.push_front(*op);
-      break;
-    }
-    op->put();
+  if (!op->check_send_map) {
+    return;
   }
+  epoch_t last_sent_epoch = 0;
 
-  if (session->waiting_on_map.empty()) {
-    clear_session_waiting_on_map(session);
-  } else {
-    register_session_waiting_on_map(session);
-  }
-  session->maybe_reset_osdmap();
-}
+  session->sent_epoch_lock.lock();
+  last_sent_epoch = session->last_sent_epoch;
+  session->sent_epoch_lock.unlock();
 
+  const Message *m = op->get_req();
+  service.share_map(
+    m->get_source(),
+    m->get_connection().get(),
+    op->sent_epoch,
+    osdmap,
+    session ? &last_sent_epoch : NULL);
 
-void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  if (!session->osdmap) {
-    session->osdmap = newmap;
-    return;
+  session->sent_epoch_lock.lock();
+  if (session->last_sent_epoch < last_sent_epoch) {
+    session->last_sent_epoch = last_sent_epoch;
   }
+  session->sent_epoch_lock.unlock();
 
-  if (newmap->get_epoch() == session->osdmap->get_epoch())
-    return;
+  op->check_send_map = false;
+}
 
-  assert(newmap->get_epoch() > session->osdmap->get_epoch());
+void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
+{
+  assert(session->session_dispatch_lock.is_locked());
 
-  map<spg_t, boost::intrusive::list<OpRequest> > from;
-  from.swap(session->waiting_for_pg);
+  auto i = session->waiting_on_map.begin();
+  while (i != session->waiting_on_map.end()) {
+    OpRequestRef op = &(*i);
+    assert(ms_can_fast_dispatch(op->get_req()));
+    const MOSDFastDispatchOp *m = static_cast<const MOSDFastDispatchOp*>(
+      op->get_req());
+    if (m->get_map_epoch() > osdmap->get_epoch()) {
+      break;
+    }
+    session->waiting_on_map.erase(i++);
+    op->put();
 
-  for (auto i = from.begin(); i != from.end(); from.erase(i++)) {
-    set<spg_t> children;
-    if (!newmap->have_pg_pool(i->first.pool())) {
-      // drop this wait list on the ground
-      i->second.clear_and_dispose(TrackedOp::Putter());
-    } else {
-      assert(session->osdmap->have_pg_pool(i->first.pool()));
-      if (i->first.is_split(
-           session->osdmap->get_pg_num(i->first.pool()),
-           newmap->get_pg_num(i->first.pool()),
-           &children)) {
-       for (set<spg_t>::iterator child = children.begin();
-            child != children.end();
-            ++child) {
-         unsigned split_bits = child->get_split_bits(
-           newmap->get_pg_num(child->pool()));
-         boost::intrusive::list<OpRequest> child_ops;
-         OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
-         if (!child_ops.empty()) {
-           session->waiting_for_pg[*child].swap(child_ops);
-           register_session_waiting_on_pg(session, *child);
-         }
-       }
+    spg_t pgid;
+    if (m->get_type() == CEPH_MSG_OSD_OP) {
+      pg_t actual_pgid = osdmap->raw_pg_to_pg(
+       static_cast<const MOSDOp*>(m)->get_pg());
+      if (!osdmap->get_primary_shard(actual_pgid, &pgid)) {
+       continue;
       }
-    }
-    if (i->second.empty()) {
-      clear_session_waiting_on_pg(session, i->first);
     } else {
-      session->waiting_for_pg[i->first].swap(i->second);
+      pgid = m->get_spg();
     }
+    enqueue_op(pgid, op, m->get_map_epoch());
   }
 
-  session->osdmap = newmap;
-}
-
-void OSD::session_notify_pg_create(
-  Session *session, OSDMapRef osdmap, spg_t pgid)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  update_waiting_for_pg(session, osdmap);
-  auto i = session->waiting_for_pg.find(pgid);
-  if (i != session->waiting_for_pg.end()) {
-    session->waiting_on_map.splice(
-      session->waiting_on_map.begin(),
-      i->second);
-    assert(i->second.empty());
-    session->waiting_for_pg.erase(i);
-  }
-  clear_session_waiting_on_pg(session, pgid);
-}
-
-void OSD::session_notify_pg_cleared(
-  Session *session, OSDMapRef osdmap, spg_t pgid)
-{
-  assert(session->session_dispatch_lock.is_locked());
-  update_waiting_for_pg(session, osdmap);
-  auto i = session->waiting_for_pg.find(pgid);
-  if (i != session->waiting_for_pg.end()) {
-    i->second.clear_and_dispose(TrackedOp::Putter());
-    session->waiting_for_pg.erase(i);
+  if (session->waiting_on_map.empty()) {
+    clear_session_waiting_on_map(session);
+  } else {
+    register_session_waiting_on_map(session);
   }
-  session->maybe_reset_osdmap();
-  clear_session_waiting_on_pg(session, pgid);
 }
 
 void OSD::ms_fast_dispatch(Message *m)
@@ -6186,19 +6121,36 @@ void OSD::ms_fast_dispatch(Message *m)
     tracepoint(osd, ms_fast_dispatch, reqid.name._type,
         reqid.name._num, reqid.tid, reqid.inc);
   }
-  OSDMapRef nextmap = service.get_nextmap_reserved();
-  Session *session = static_cast<Session*>(m->get_connection()->get_priv());
-  if (session) {
-    {
-      Mutex::Locker l(session->session_dispatch_lock);
-      update_waiting_for_pg(session, nextmap);
-      op->get();
-      session->waiting_on_map.push_back(*op);
-      dispatch_session_waiting(session, nextmap);
+
+  // note sender epoch
+  op->sent_epoch = static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch();
+
+  service.maybe_inject_dispatch_delay();
+
+  if (m->get_connection()->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) ||
+      m->get_type() != CEPH_MSG_OSD_OP) {
+    // queue it directly
+    enqueue_op(
+      static_cast<MOSDFastDispatchOp*>(m)->get_spg(),
+      op,
+      static_cast<MOSDFastDispatchOp*>(m)->get_map_epoch());
+  } else {
+    // legacy client, and this is an MOSDOp (the *only* fast dispatch
+    // message that didn't have an explicit spg_t); we need to map
+    // them to an spg_t while preserving delivery order.
+    Session *session = static_cast<Session*>(m->get_connection()->get_priv());
+    if (session) {
+      {
+       Mutex::Locker l(session->session_dispatch_lock);
+       op->get();
+       session->waiting_on_map.push_back(*op);
+       OSDMapRef nextmap = service.get_nextmap_reserved();
+       dispatch_session_waiting(session, nextmap);
+       service.release_map(nextmap);
+      }
+      session->put();
     }
-    session->put();
   }
-  service.release_map(nextmap);
   OID_EVENT_TRACE_WITH_MSG(m, "MS_FAST_DISPATCH_END", false); 
 }
 
@@ -6320,73 +6272,6 @@ void OSD::do_waiters()
   dout(10) << "do_waiters -- finish" << dendl;
 }
 
-template<typename T, int MSGTYPE>
-epoch_t replica_op_required_epoch(OpRequestRef op)
-{
-  const T *m = static_cast<const T *>(op->get_req());
-  assert(m->get_type() == MSGTYPE);
-  return m->map_epoch;
-}
-
-epoch_t op_required_epoch(OpRequestRef op)
-{
-  switch (op->get_req()->get_type()) {
-  case CEPH_MSG_OSD_OP: {
-    const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-    return m->get_map_epoch();
-  }
-  case CEPH_MSG_OSD_BACKOFF: {
-    const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-    return m->map_epoch;
-  }
-  case MSG_OSD_SUBOP:
-    return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
-  case MSG_OSD_REPOP:
-    return replica_op_required_epoch<MOSDRepOp, MSG_OSD_REPOP>(op);
-  case MSG_OSD_SUBOPREPLY:
-    return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
-      op);
-  case MSG_OSD_REPOPREPLY:
-    return replica_op_required_epoch<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(
-      op);
-  case MSG_OSD_PG_PUSH:
-    return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
-      op);
-  case MSG_OSD_PG_PULL:
-    return replica_op_required_epoch<MOSDPGPull, MSG_OSD_PG_PULL>(
-      op);
-  case MSG_OSD_PG_PUSH_REPLY:
-    return replica_op_required_epoch<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(
-      op);
-  case MSG_OSD_PG_SCAN:
-    return replica_op_required_epoch<MOSDPGScan, MSG_OSD_PG_SCAN>(op);
-  case MSG_OSD_PG_BACKFILL:
-    return replica_op_required_epoch<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(
-      op);
-  case MSG_OSD_EC_WRITE:
-    return replica_op_required_epoch<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
-  case MSG_OSD_EC_WRITE_REPLY:
-    return replica_op_required_epoch<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
-  case MSG_OSD_EC_READ:
-    return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
-  case MSG_OSD_EC_READ_REPLY:
-    return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
-  case MSG_OSD_REP_SCRUB:
-    return replica_op_required_epoch<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
-  case MSG_OSD_PG_UPDATE_LOG_MISSING:
-    return replica_op_required_epoch<
-      MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(
-      op);
-  case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
-    return replica_op_required_epoch<
-      MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(
-      op);
-  default:
-    ceph_abort();
-    return 0;
-  }
-}
-
 void OSD::dispatch_op(OpRequestRef op)
 {
   switch (op->get_req()->get_type()) {
@@ -6421,95 +6306,6 @@ void OSD::dispatch_op(OpRequestRef op)
   }
 }
 
-bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef& osdmap)
-{
-  if (is_stopping()) {
-    // we're shutting down, so drop the op
-    return true;
-  }
-
-  epoch_t msg_epoch(op_required_epoch(op));
-  if (msg_epoch > osdmap->get_epoch()) {
-    Session *s = static_cast<Session*>(op->get_req()->
-                                      get_connection()->get_priv());
-    if (s) {
-      s->received_map_lock.lock();
-      epoch_t received_epoch = s->received_map_epoch;
-      s->received_map_lock.unlock();
-      if (received_epoch < msg_epoch) {
-       osdmap_subscribe(msg_epoch, false);
-      }
-      s->put();
-    }
-    return false;
-  }
-
-  switch(op->get_req()->get_type()) {
-  // client ops
-  case CEPH_MSG_OSD_OP:
-    handle_op(op, osdmap);
-    break;
-  case CEPH_MSG_OSD_BACKOFF:
-    handle_backoff(op, osdmap);
-    break;
-    // for replication etc.
-  case MSG_OSD_SUBOP:
-    handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
-    break;
-  case MSG_OSD_REPOP:
-    handle_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op, osdmap);
-    break;
-  case MSG_OSD_SUBOPREPLY:
-    handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
-    break;
-  case MSG_OSD_REPOPREPLY:
-    handle_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PUSH:
-    handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PULL:
-    handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PUSH_REPLY:
-    handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
-    break;
-  case MSG_OSD_PG_SCAN:
-    handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
-    break;
-  case MSG_OSD_PG_BACKFILL:
-    handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
-    break;
-  case MSG_OSD_EC_WRITE:
-    handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
-    break;
-  case MSG_OSD_EC_WRITE_REPLY:
-    handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
-    break;
-  case MSG_OSD_EC_READ:
-    handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
-    break;
-  case MSG_OSD_EC_READ_REPLY:
-    handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
-    break;
-  case MSG_OSD_REP_SCRUB:
-    handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
-    break;
-  case MSG_OSD_PG_UPDATE_LOG_MISSING:
-    handle_replica_op<MOSDPGUpdateLogMissing, MSG_OSD_PG_UPDATE_LOG_MISSING>(
-      op, osdmap);
-    break;
-  case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
-    handle_replica_op<MOSDPGUpdateLogMissingReply,
-                     MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(
-      op, osdmap);
-    break;
-  default:
-    ceph_abort();
-  }
-  return true;
-}
-
 void OSD::_dispatch(Message *m)
 {
   assert(osd_lock.is_locked());
@@ -7557,28 +7353,17 @@ void OSD::consume_map()
   service.await_reserved_maps();
   service.publish_map(osdmap);
 
+  service.maybe_inject_dispatch_delay();
+
   dispatch_sessions_waiting_on_map();
 
+  service.maybe_inject_dispatch_delay();
+
   // remove any PGs which we no longer host from the session waiting_for_pg lists
-  set<spg_t> pgs_to_check;
-  get_pgs_with_waiting_sessions(&pgs_to_check);
-  for (set<spg_t>::iterator p = pgs_to_check.begin();
-       p != pgs_to_check.end();
-       ++p) {
-    if (!(osdmap->is_acting_osd_shard(spg_t(p->pgid, p->shard), whoami))) {
-      set<Session*> concerned_sessions;
-      get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
-      for (set<Session*>::iterator i = concerned_sessions.begin();
-          i != concerned_sessions.end();
-          ++i) {
-       {
-         Mutex::Locker l((*i)->session_dispatch_lock);
-         session_notify_pg_cleared(*i, osdmap, *p);
-       }
-       (*i)->put();
-      }
-    }
-  }
+  dout(20) << __func__ << " checking waiting_for_pg" << dendl;
+  op_shardedwq.prune_pg_waiters(osdmap, whoami);
+
+  service.maybe_inject_dispatch_delay();
 
   // scan pg's
   {
@@ -8550,6 +8335,9 @@ void OSD::_remove_pg(PG *pg)
 
   service.pg_remove_epoch(pg->info.pgid);
 
+  // dereference from op_wq
+  op_shardedwq.clear_pg_pointer(pg->info.pgid);
+
   // remove from map
   pg_map.erase(pg->info.pgid);
   pg->put("PGMap"); // since we've taken it out of map
@@ -8718,232 +8506,6 @@ bool OSDService::is_recovery_active()
 // =========================================================
 // OPS
 
-class C_SendMap : public GenContext<ThreadPool::TPHandle&> {
-  OSD *osd;
-  entity_name_t name;
-  ConnectionRef con;
-  OSDMapRef osdmap;
-  epoch_t map_epoch;
-
-public:
-  C_SendMap(OSD *osd, entity_name_t n, const ConnectionRef& con,
-            OSDMapRef& osdmap, epoch_t map_epoch) :
-    osd(osd), name(n), con(con), osdmap(osdmap), map_epoch(map_epoch) {
-  }
-
-  void finish(ThreadPool::TPHandle& tp) override {
-    Session *session = static_cast<Session *>(
-        con->get_priv());
-    epoch_t last_sent_epoch;
-    if (session) {
-      session->sent_epoch_lock.lock();
-      last_sent_epoch = session->last_sent_epoch;
-      session->sent_epoch_lock.unlock();
-    }
-    osd->service.share_map(
-       name,
-        con.get(),
-        map_epoch,
-        osdmap,
-        session ? &last_sent_epoch : NULL);
-    if (session) {
-      session->sent_epoch_lock.lock();
-      if (session->last_sent_epoch < last_sent_epoch) {
-       session->last_sent_epoch = last_sent_epoch;
-      }
-      session->sent_epoch_lock.unlock();
-      session->put();
-    }
-  }
-};
-
-struct send_map_on_destruct {
-  OSD *osd;
-  entity_name_t name;
-  ConnectionRef con;
-  OSDMapRef osdmap;
-  epoch_t map_epoch;
-  bool should_send;
-  send_map_on_destruct(OSD *osd, const Message *m,
-                       OSDMapRef& osdmap, epoch_t map_epoch)
-    : osd(osd), name(m->get_source()), con(m->get_connection()),
-      osdmap(osdmap), map_epoch(map_epoch),
-      should_send(true) { }
-  ~send_map_on_destruct() {
-    if (!should_send)
-      return;
-    osd->service.op_gen_wq.queue(new C_SendMap(osd, name, con,
-                                              osdmap, map_epoch));
-  }
-};
-
-void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
-{
-  const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-  assert(m->get_type() == CEPH_MSG_OSD_OP);
-  if (op_is_discardable(m)) {
-    dout(10) << " discardable " << *m << dendl;
-    return;
-  }
-
-  // set up a map send if the Op gets blocked for some reason
-  send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
-  Session *client_session =
-      static_cast<Session*>(m->get_connection()->get_priv());
-  epoch_t last_sent_epoch;
-  if (client_session) {
-    client_session->sent_epoch_lock.lock();
-    last_sent_epoch = client_session->last_sent_epoch;
-    client_session->sent_epoch_lock.unlock();
-  }
-  share_map.should_send = service.should_share_map(
-      m->get_source(), m->get_connection().get(), m->get_map_epoch(),
-      osdmap, client_session ? &last_sent_epoch : NULL);
-  if (client_session) {
-    client_session->put();
-  }
-
-  // calc actual pgid
-  pg_t _pgid = m->get_raw_pg();
-  int64_t pool = _pgid.pool();
-
-  if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 &&
-      osdmap->have_pg_pool(pool))
-    _pgid = osdmap->raw_pg_to_pg(_pgid);
-
-  spg_t pgid;
-  if (!osdmap->get_primary_shard(_pgid, &pgid)) {
-    // missing pool or acting set empty -- drop
-    return;
-  }
-
-  PGRef pg = get_pg_or_queue_for_pg(pgid, op, client_session);
-  if (pg) {
-    op->send_map_update = share_map.should_send;
-    op->sent_epoch = m->get_map_epoch();
-    enqueue_op(pg, op);
-    share_map.should_send = false;
-    return;
-  }
-
-  // ok, we didn't have the PG.
-  if (!cct->_conf->osd_debug_misdirected_ops) {
-    return;
-  }
-  // let's see if it's our fault or the client's.  note that this might
-  // involve loading an old OSDmap off disk, so it can be slow.
-
-  OSDMapRef send_map = service.try_get_map(m->get_map_epoch());
-  if (!send_map) {
-    dout(7) << "don't have sender's osdmap; assuming it was valid and that"
-           << " client will resend" << dendl;
-    return;
-  }
-  if (!send_map->have_pg_pool(pgid.pool())) {
-    dout(7) << "dropping request; pool did not exist" << dendl;
-    clog->warn() << m->get_source_inst() << " invalid " << m->get_reqid()
-                     << " pg " << m->get_raw_pg()
-                     << " to osd." << whoami
-                     << " in e" << osdmap->get_epoch()
-                     << ", client e" << m->get_map_epoch()
-                     << " when pool " << m->get_pg().pool() << " did not exist"
-                     << "\n";
-    return;
-  }
-  if (!send_map->osd_is_valid_op_target(pgid.pgid, whoami)) {
-    dout(7) << "we are invalid target" << dendl;
-    clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
-                << " pg " << m->get_raw_pg()
-                << " to osd." << whoami
-                << " in e" << osdmap->get_epoch()
-                << ", client e" << m->get_map_epoch()
-                << " pg " << pgid
-                << " features " << m->get_connection()->get_features()
-                << "\n";
-    if (g_conf->osd_enxio_on_misdirected_op) {
-      service.reply_op_error(op, -ENXIO);
-    }
-    return;
-  }
-
-  // check against current map too
-  if (!osdmap->have_pg_pool(pgid.pool()) ||
-      !osdmap->osd_is_valid_op_target(pgid.pgid, whoami)) {
-    dout(7) << "dropping; no longer have PG (or pool); client will retarget"
-           << dendl;
-    return;
-  }
-}
-
-void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
-{
-  const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
-  Session *s = static_cast<Session*>(m->get_connection()->get_priv());
-  dout(10) << __func__ << " " << *m << " session " << s << dendl;
-  assert(s);
-  s->put();
-
-  if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) {
-    dout(10) << __func__ << " unrecognized op, ignoring" << dendl;
-    return;
-  }
-
-  // map hobject range to PG(s)
-  PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s);
-  if (pg) {
-    enqueue_op(pg, op);
-  }
-}
-
-template<typename T, int MSGTYPE>
-void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
-{
-  const T *m = static_cast<const T *>(op->get_req());
-  assert(m->get_type() == MSGTYPE);
-
-  dout(10) << __func__ << " " << *m << " epoch " << m->map_epoch << dendl;
-  if (!require_self_aliveness(op->get_req(), m->map_epoch))
-    return;
-  if (!require_osd_peer(op->get_req()))
-    return;
-  if (osdmap->get_epoch() >= m->map_epoch &&
-      !require_same_peer_instance(op->get_req(), osdmap, true))
-    return;
-
-  // must be a rep op.
-  assert(m->get_source().is_osd());
-
-  // share our map with sender, if they're old
-  bool should_share_map = false;
-  Session *peer_session =
-      static_cast<Session*>(m->get_connection()->get_priv());
-  epoch_t last_sent_epoch;
-  if (peer_session) {
-    peer_session->sent_epoch_lock.lock();
-    last_sent_epoch = peer_session->last_sent_epoch;
-    peer_session->sent_epoch_lock.unlock();
-  }
-  should_share_map = service.should_share_map(
-      m->get_source(), m->get_connection().get(), m->map_epoch,
-      osdmap,
-      peer_session ? &last_sent_epoch : NULL);
-  if (peer_session) {
-    peer_session->put();
-  }
-
-  PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, peer_session);
-  if (pg) {
-    op->send_map_update = should_share_map;
-    op->sent_epoch = m->map_epoch;
-    enqueue_op(pg, op);
-  } else if (should_share_map && m->get_connection()->is_connected()) {
-    C_SendMap *send_map = new C_SendMap(this, m->get_source(),
-                                       m->get_connection(),
-                                        osdmap, m->map_epoch);
-    service.op_gen_wq.queue(send_map);
-  }
-}
-
 bool OSD::op_is_discardable(const MOSDOp *op)
 {
   // drop client request if they are not connected and can't get the
@@ -8954,196 +8516,44 @@ bool OSD::op_is_discardable(const MOSDOp *op)
   return false;
 }
 
-void OSD::enqueue_op(PGRef pg, OpRequestRef& op)
+void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch)
 {
   utime_t latency = ceph_clock_now() - op->get_req()->get_recv_stamp();
   dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
           << " cost " << op->get_req()->get_cost()
           << " latency " << latency
+          << " epoch " << epoch
           << " " << *(op->get_req()) << dendl;
-  pg->queue_op(op);
+  op->mark_queued_for_pg();
+  op_shardedwq.queue(make_pair(pg, PGQueueable(op, epoch)));
 }
 
-void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) {
 
-  uint32_t shard_index = thread_index % num_shards;
 
-  ShardData* sdata = shard_list[shard_index];
-  assert(NULL != sdata);
-  sdata->sdata_op_ordering_lock.Lock();
-  if (sdata->pqueue->empty()) {
-    sdata->sdata_op_ordering_lock.Unlock();
-    osd->cct->get_heartbeat_map()->reset_timeout(hb,
-      osd->cct->_conf->threadpool_default_timeout, 0);
-    sdata->sdata_lock.Lock();
-    sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
-      utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
-    sdata->sdata_lock.Unlock();
-    sdata->sdata_op_ordering_lock.Lock();
-    if(sdata->pqueue->empty()) {
-      sdata->sdata_op_ordering_lock.Unlock();
-      return;
-    }
-  }
-  pair<PGRef, PGQueueable> item = sdata->pqueue->dequeue();
-  sdata->pg_for_processing[&*(item.first)].push_back(item.second);
-  sdata->sdata_op_ordering_lock.Unlock();
-  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
-    suicide_interval);
+/*
+ * NOTE: dequeue called in worker thread, with pg lock
+ */
+void OSD::dequeue_op(
+  PGRef pg, OpRequestRef op,
+  ThreadPool::TPHandle &handle)
+{
+  FUNCTRACE();
+  OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false);
 
-  (item.first)->lock_suspend_timeout(tp_handle);
+  utime_t now = ceph_clock_now();
+  op->set_dequeued_time(now);
+  utime_t latency = now - op->get_req()->get_recv_stamp();
+  dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
+          << " cost " << op->get_req()->get_cost()
+          << " latency " << latency
+          << " " << *(op->get_req())
+          << " pg " << *pg << dendl;
 
-  boost::optional<PGQueueable> op;
-  {
-    Mutex::Locker l(sdata->sdata_op_ordering_lock);
-    if (!sdata->pg_for_processing.count(&*(item.first))) {
-      (item.first)->unlock();
-      return;
-    }
-    assert(sdata->pg_for_processing[&*(item.first)].size());
-    op = sdata->pg_for_processing[&*(item.first)].front();
-    sdata->pg_for_processing[&*(item.first)].pop_front();
-    if (!(sdata->pg_for_processing[&*(item.first)].size()))
-      sdata->pg_for_processing.erase(&*(item.first));
-  }
-
-  // osd:opwq_process marks the point at which an operation has been dequeued
-  // and will begin to be handled by a worker thread.
-  {
-#ifdef WITH_LTTNG
-    osd_reqid_t reqid;
-    if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
-      reqid = (*_op)->get_reqid();
-    }
-#endif
-    tracepoint(osd, opwq_process_start, reqid.name._type,
-        reqid.name._num, reqid.tid, reqid.inc);
-  }
-
-  lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
-  Formatter *f = Formatter::create("json");
-  f->open_object_section("q");
-  dump(f);
-  f->close_section();
-  f->flush(*_dout);
-  delete f;
-  *_dout << dendl;
-
-  op->run(osd, item.first, tp_handle);
-
-  {
-#ifdef WITH_LTTNG
-    osd_reqid_t reqid;
-    if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
-      reqid = (*_op)->get_reqid();
-    }
-#endif
-    tracepoint(osd, opwq_process_finish, reqid.name._type,
-        reqid.name._num, reqid.tid, reqid.inc);
-  }
-
-  (item.first)->unlock();
-}
-
-void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
-  uint32_t shard_index =
-    (item.first)->get_pgid().hash_to_shard(shard_list.size());
-
-  ShardData* sdata = shard_list[shard_index];
-  assert (NULL != sdata);
-  unsigned priority = item.second.get_priority();
-  unsigned cost = item.second.get_cost();
-  sdata->sdata_op_ordering_lock.Lock();
-  if (priority >= osd->op_prio_cutoff)
-    sdata->pqueue->enqueue_strict(
-      item.second.get_owner(), priority, item);
-  else
-    sdata->pqueue->enqueue(
-      item.second.get_owner(),
-      priority, cost, item);
-  sdata->sdata_op_ordering_lock.Unlock();
-
-  sdata->sdata_lock.Lock();
-  sdata->sdata_cond.SignalOne();
-  sdata->sdata_lock.Unlock();
-
-}
-
-void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
-
-  uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
-
-  ShardData* sdata = shard_list[shard_index];
-  assert (NULL != sdata);
-  sdata->sdata_op_ordering_lock.Lock();
-  if (sdata->pg_for_processing.count(&*(item.first))) {
-    sdata->pg_for_processing[&*(item.first)].push_front(item.second);
-    item.second = sdata->pg_for_processing[&*(item.first)].back();
-    sdata->pg_for_processing[&*(item.first)].pop_back();
-  }
-  unsigned priority = item.second.get_priority();
-  unsigned cost = item.second.get_cost();
-  if (priority >= osd->op_prio_cutoff)
-    sdata->pqueue->enqueue_strict_front(
-      item.second.get_owner(),
-      priority, item);
-  else
-    sdata->pqueue->enqueue_front(
-      item.second.get_owner(),
-      priority, cost, item);
-
-  sdata->sdata_op_ordering_lock.Unlock();
-  sdata->sdata_lock.Lock();
-  sdata->sdata_cond.SignalOne();
-  sdata->sdata_lock.Unlock();
-
-}
-
-
-/*
- * NOTE: dequeue called in worker thread, with pg lock
- */
-void OSD::dequeue_op(
-  PGRef pg, OpRequestRef op,
-  ThreadPool::TPHandle &handle)
-{
-  FUNCTRACE();
-  OID_EVENT_TRACE_WITH_MSG(op->get_req(), "DEQUEUE_OP_BEGIN", false);
-
-  utime_t now = ceph_clock_now();
-  op->set_dequeued_time(now);
-  utime_t latency = now - op->get_req()->get_recv_stamp();
-  dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
-          << " cost " << op->get_req()->get_cost()
-          << " latency " << latency
-          << " " << *(op->get_req())
-          << " pg " << *pg << dendl;
-
-  // share our map with sender, if they're old
-  if (op->send_map_update) {
-    const Message *m = op->get_req();
-    Session *session = static_cast<Session *>(m->get_connection()->get_priv());
-    epoch_t last_sent_epoch;
-    if (session) {
-      session->sent_epoch_lock.lock();
-      last_sent_epoch = session->last_sent_epoch;
-      session->sent_epoch_lock.unlock();
-    }
-    service.share_map(
-        m->get_source(),
-        m->get_connection().get(),
-        op->sent_epoch,
-        osdmap,
-        session ? &last_sent_epoch : NULL);
-    if (session) {
-      session->sent_epoch_lock.lock();
-      if (session->last_sent_epoch < last_sent_epoch) {
-       session->last_sent_epoch = last_sent_epoch;
-      }
-      session->sent_epoch_lock.unlock();
-      session->put();
-    }
+  Session *session = static_cast<Session *>(
+    op->get_req()->get_connection()->get_priv());
+  if (session) {
+    maybe_share_map(session, op, pg->get_osdmap());
+    session->put();
   }
 
   if (pg->deleting)
@@ -9182,8 +8592,8 @@ struct C_CompleteSplits : public Context {
       }
       osd->pg_map_lock.put_write();
       osd->dispatch_context_transaction(rctx, &**i);
+      osd->wake_pg_waiters(*i);
       (*i)->unlock();
-      osd->wake_pg_waiters((*i)->info.pgid);
     }
 
     osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
@@ -9621,3 +9031,371 @@ void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
   }
   in_use.insert(out->begin(), out->end());
 }
+
+
+
+// =============================================================
+
+#undef dout_context
+#define dout_context osd->cct
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq "
+
+void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid)
+{
+  uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
+  auto sdata = shard_list[shard_index];
+  bool queued = false;
+  unsigned pushes_to_free = 0;
+  {
+    Mutex::Locker l(sdata->sdata_op_ordering_lock);
+    auto p = sdata->pg_slots.find(pgid);
+    if (p != sdata->pg_slots.end()) {
+      dout(20) << __func__ << " " << pgid
+              << " to_process " << p->second.to_process
+              << " waiting_for_pg=" << (int)p->second.waiting_for_pg << dendl;
+      for (auto i = p->second.to_process.rbegin();
+          i != p->second.to_process.rend();
+          ++i) {
+       sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff);
+      }
+      for (auto& q : p->second.to_process) {
+       pushes_to_free += q.get_reserved_pushes();
+      }
+      p->second.to_process.clear();
+      p->second.waiting_for_pg = false;
+      ++p->second.requeue_seq;
+      queued = true;
+    }
+  }
+  if (pushes_to_free > 0) {
+    osd->service.release_reserved_pushes(pushes_to_free);
+  }
+  if (queued) {
+    sdata->sdata_lock.Lock();
+    sdata->sdata_cond.SignalOne();
+    sdata->sdata_lock.Unlock();
+  }
+}
+
+void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
+{
+  unsigned pushes_to_free = 0;
+  for (auto sdata : shard_list) {
+    Mutex::Locker l(sdata->sdata_op_ordering_lock);
+    sdata->waiting_for_pg_osdmap = osdmap;
+    auto p = sdata->pg_slots.begin();
+    while (p != sdata->pg_slots.end()) {
+      ShardData::pg_slot& slot = p->second;
+      if (!slot.to_process.empty() && slot.num_running == 0) {
+       if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
+         dout(20) << __func__ << "  " << p->first << " maps to us, keeping"
+                  << dendl;
+         ++p;
+         continue;
+       }
+       while (!slot.to_process.empty() &&
+              slot.to_process.front().get_map_epoch() <= osdmap->get_epoch()) {
+         auto& qi = slot.to_process.front();
+         dout(20) << __func__ << "  " << p->first
+                  << " item " << qi
+                  << " epoch " << qi.get_map_epoch()
+                  << " <= " << osdmap->get_epoch()
+                  << ", stale, dropping" << dendl;
+         pushes_to_free += qi.get_reserved_pushes();
+         slot.to_process.pop_front();
+       }
+      }
+      if (slot.to_process.empty() &&
+         slot.num_running == 0 &&
+         !slot.pg) {
+       dout(20) << __func__ << "  " << p->first << " empty, pruning" << dendl;
+       p = sdata->pg_slots.erase(p);
+      } else {
+       ++p;
+      }
+    }
+  }
+  if (pushes_to_free > 0) {
+    osd->service.release_reserved_pushes(pushes_to_free);
+  }
+}
+
+void OSD::ShardedOpWQ::clear_pg_pointer(spg_t pgid)
+{
+  uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
+  auto sdata = shard_list[shard_index];
+  Mutex::Locker l(sdata->sdata_op_ordering_lock);
+  auto p = sdata->pg_slots.find(pgid);
+  if (p != sdata->pg_slots.end()) {
+    auto& slot = p->second;
+    dout(20) << __func__ << " " << pgid << " pg " << slot.pg << dendl;
+    assert(!slot.pg || slot.pg->deleting);
+    slot.pg = nullptr;
+  }
+}
+
+void OSD::ShardedOpWQ::clear_pg_slots()
+{
+  for (auto sdata : shard_list) {
+    Mutex::Locker l(sdata->sdata_op_ordering_lock);
+    sdata->pg_slots.clear();
+    sdata->waiting_for_pg_osdmap.reset();
+    // don't bother with reserved pushes; we are shutting down
+  }
+}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "osd." << osd->whoami << " op_wq(" << shard_index << ") "
+
+void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
+{
+  uint32_t shard_index = thread_index % num_shards;
+  ShardData *sdata = shard_list[shard_index];
+  assert(NULL != sdata);
+
+  // peek at spg_t
+  sdata->sdata_op_ordering_lock.Lock();
+  if (sdata->pqueue->empty()) {
+    dout(20) << __func__ << " empty q, waiting" << dendl;
+    // optimistically sleep a moment; maybe another work item will come along.
+    sdata->sdata_op_ordering_lock.Unlock();
+    osd->cct->get_heartbeat_map()->reset_timeout(hb,
+      osd->cct->_conf->threadpool_default_timeout, 0);
+    sdata->sdata_lock.Lock();
+    sdata->sdata_cond.WaitInterval(sdata->sdata_lock,
+      utime_t(osd->cct->_conf->threadpool_empty_queue_max_wait, 0));
+    sdata->sdata_lock.Unlock();
+    sdata->sdata_op_ordering_lock.Lock();
+    if (sdata->pqueue->empty()) {
+      sdata->sdata_op_ordering_lock.Unlock();
+      return;
+    }
+  }
+  pair<spg_t, PGQueueable> item = sdata->pqueue->dequeue();
+  if (osd->is_stopping()) {
+    sdata->sdata_op_ordering_lock.Unlock();
+    return;    // OSD shutdown, discard.
+  }
+  PGRef pg;
+  uint64_t requeue_seq;
+  {
+    auto& slot = sdata->pg_slots[item.first];
+    dout(30) << __func__ << " " << item.first
+            << " to_process " << slot.to_process
+            << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+    slot.to_process.push_back(item.second);
+    // note the requeue seq now...
+    requeue_seq = slot.requeue_seq;
+    if (slot.waiting_for_pg) {
+      // save ourselves a bit of effort
+      dout(20) << __func__ << " " << item.first << " item " << item.second
+              << " queued, waiting_for_pg" << dendl;
+      sdata->sdata_op_ordering_lock.Unlock();
+      return;
+    }
+    pg = slot.pg;
+    dout(20) << __func__ << " " << item.first << " item " << item.second
+            << " queued" << dendl;
+    ++slot.num_running;
+  }
+  sdata->sdata_op_ordering_lock.Unlock();
+
+  osd->service.maybe_inject_dispatch_delay();
+
+  // [lookup +] lock pg (if we have it)
+  if (!pg) {
+    pg = osd->_lookup_lock_pg(item.first);
+  } else {
+    pg->lock();
+  }
+
+  osd->service.maybe_inject_dispatch_delay();
+
+  boost::optional<PGQueueable> qi;
+
+  // we don't use a Mutex::Locker here because of the
+  // osd->service.release_reserved_pushes() call below
+  sdata->sdata_op_ordering_lock.Lock();
+
+  auto q = sdata->pg_slots.find(item.first);
+  assert(q != sdata->pg_slots.end());
+  auto& slot = q->second;
+  --slot.num_running;
+
+  if (slot.to_process.empty()) {
+    // raced with wake_pg_waiters or prune_pg_waiters
+    dout(20) << __func__ << " " << item.first << " nothing queued" << dendl;
+    if (pg) {
+      pg->unlock();
+    }
+    sdata->sdata_op_ordering_lock.Unlock();
+    return;
+  }
+  if (requeue_seq != slot.requeue_seq) {
+    dout(20) << __func__ << " " << item.first
+            << " requeue_seq " << slot.requeue_seq << " > our "
+            << requeue_seq << ", we raced with wake_pg_waiters"
+            << dendl;
+    if (pg) {
+      pg->unlock();
+    }
+    sdata->sdata_op_ordering_lock.Unlock();
+    return;
+  }
+  if (pg && !slot.pg && !pg->deleting) {
+    dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl;
+    slot.pg = pg;
+  }
+  dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process
+          << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl;
+
+  // make sure we're not already waiting for this pg
+  if (slot.waiting_for_pg) {
+    dout(20) << __func__ << " " << item.first << " item " << item.second
+            << " slot is waiting_for_pg" << dendl;
+    if (pg) {
+      pg->unlock();
+    }
+    sdata->sdata_op_ordering_lock.Unlock();
+    return;
+  }
+
+  // take next item
+  qi = slot.to_process.front();
+  slot.to_process.pop_front();
+  dout(20) << __func__ << " " << item.first << " item " << *qi
+          << " pg " << pg << dendl;
+
+  if (!pg) {
+    // should this pg shard exist on this osd in this (or a later) epoch?
+    OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
+    if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) {
+      dout(20) << __func__ << " " << item.first
+              << " no pg, should exist, will wait" << " on " << *qi << dendl;
+      slot.to_process.push_front(*qi);
+      slot.waiting_for_pg = true;
+    } else if (qi->get_map_epoch() > osdmap->get_epoch()) {
+      dout(20) << __func__ << " " << item.first << " no pg, item epoch is "
+              << qi->get_map_epoch() << " > " << osdmap->get_epoch()
+              << ", will wait on " << *qi << dendl;
+      slot.to_process.push_front(*qi);
+      slot.waiting_for_pg = true;
+    } else {
+      dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist,"
+              << " dropping " << *qi << dendl;
+      // share map with client?
+      if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+       Session *session = static_cast<Session *>(
+         (*_op)->get_req()->get_connection()->get_priv());
+       if (session) {
+         osd->maybe_share_map(session, *_op, sdata->waiting_for_pg_osdmap);
+         session->put();
+       }
+      }
+      unsigned pushes_to_free = qi->get_reserved_pushes();
+      if (pushes_to_free > 0) {
+       sdata->sdata_op_ordering_lock.Unlock();
+       osd->service.release_reserved_pushes(pushes_to_free);
+       return;
+      }
+    }
+    sdata->sdata_op_ordering_lock.Unlock();
+    return;
+  }
+  sdata->sdata_op_ordering_lock.Unlock();
+
+
+  // osd_opwq_process marks the point at which an operation has been dequeued
+  // and will begin to be handled by a worker thread.
+  {
+#ifdef WITH_LTTNG
+    osd_reqid_t reqid;
+    if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+      reqid = (*_op)->get_reqid();
+    }
+#endif
+    tracepoint(osd, opwq_process_start, reqid.name._type,
+        reqid.name._num, reqid.tid, reqid.inc);
+  }
+
+  lgeneric_subdout(osd->cct, osd, 30) << "dequeue status: ";
+  Formatter *f = Formatter::create("json");
+  f->open_object_section("q");
+  dump(f);
+  f->close_section();
+  f->flush(*_dout);
+  delete f;
+  *_dout << dendl;
+
+  ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+                                suicide_interval);
+  qi->run(osd, pg, tp_handle);
+
+  {
+#ifdef WITH_LTTNG
+    osd_reqid_t reqid;
+    if (boost::optional<OpRequestRef> _op = qi->maybe_get_op()) {
+      reqid = (*_op)->get_reqid();
+    }
+#endif
+    tracepoint(osd, opwq_process_finish, reqid.name._type,
+        reqid.name._num, reqid.tid, reqid.inc);
+  }
+
+  pg->unlock();
+}
+
+void OSD::ShardedOpWQ::_enqueue(pair<spg_t, PGQueueable> item) {
+  uint32_t shard_index =
+    item.first.hash_to_shard(shard_list.size());
+
+  ShardData* sdata = shard_list[shard_index];
+  assert (NULL != sdata);
+  unsigned priority = item.second.get_priority();
+  unsigned cost = item.second.get_cost();
+  sdata->sdata_op_ordering_lock.Lock();
+
+  dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+  if (priority >= osd->op_prio_cutoff)
+    sdata->pqueue->enqueue_strict(
+      item.second.get_owner(), priority, item);
+  else
+    sdata->pqueue->enqueue(
+      item.second.get_owner(),
+      priority, cost, item);
+  sdata->sdata_op_ordering_lock.Unlock();
+
+  sdata->sdata_lock.Lock();
+  sdata->sdata_cond.SignalOne();
+  sdata->sdata_lock.Unlock();
+
+}
+
+void OSD::ShardedOpWQ::_enqueue_front(pair<spg_t, PGQueueable> item)
+{
+  uint32_t shard_index = item.first.hash_to_shard(shard_list.size());
+  ShardData* sdata = shard_list[shard_index];
+  assert (NULL != sdata);
+  sdata->sdata_op_ordering_lock.Lock();
+  auto p = sdata->pg_slots.find(item.first);
+  if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
+    // we may be racing with _process, which has dequeued a new item
+    // from pqueue, put it on to_process, and is now busy taking the
+    // pg lock.  ensure this old requeued item is ordered before any
+    // such newer item in to_process.
+    p->second.to_process.push_front(item.second);
+    item.second = p->second.to_process.back();
+    p->second.to_process.pop_back();
+    dout(20) << __func__ << " " << item.first
+            << " " << p->second.to_process.front()
+            << " shuffled w/ " << item.second << dendl;
+  } else {
+    dout(20) << __func__ << " " << item.first << " " << item.second << dendl;
+  }
+  sdata->_enqueue_front(item, osd->op_prio_cutoff);
+  sdata->sdata_op_ordering_lock.Unlock();
+  sdata->sdata_lock.Lock();
+  sdata->sdata_cond.SignalOne();
+  sdata->sdata_lock.Unlock();
+}
index 8b4a2085517af5506805fa3bbd40428dcb00f704..9114a5d2e9d27492e36d06f067ff852660d81803 100644 (file)
@@ -359,7 +359,6 @@ struct PGRecovery {
   }
 };
 
-
 class PGQueueable {
   typedef boost::variant<
     OpRequestRef,
@@ -372,6 +371,8 @@ class PGQueueable {
   unsigned priority;
   utime_t start_time;
   entity_inst_t owner;
+  epoch_t map_epoch;    ///< an epoch we expect the PG to exist in
+
   struct RunVis : public boost::static_visitor<> {
     OSD *osd;
     PGRef &pg;
@@ -383,29 +384,52 @@ class PGQueueable {
     void operator()(const PGScrub &op);
     void operator()(const PGRecovery &op);
   };
+
+  struct StringifyVis : public boost::static_visitor<std::string> {
+    std::string operator()(const OpRequestRef &op) {
+      return stringify(op);
+    }
+    std::string operator()(const PGSnapTrim &op) {
+      return "PGSnapTrim";
+    }
+    std::string operator()(const PGScrub &op) {
+      return "PGScrub";
+    }
+    std::string operator()(const PGRecovery &op) {
+      return "PGRecovery";
+    }
+  };
+  friend ostream& operator<<(ostream& out, const PGQueueable& q) {
+    StringifyVis v;
+    return out << "PGQueueable(" << boost::apply_visitor(v, q.qvariant)
+              << " prio " << q.priority << " cost " << q.cost
+              << " e" << q.map_epoch << ")";
+  }
+
 public:
   // cppcheck-suppress noExplicitConstructor
-  PGQueueable(OpRequestRef op)
+  PGQueueable(OpRequestRef op, epoch_t e)
     : qvariant(op), cost(op->get_req()->get_cost()),
       priority(op->get_req()->get_priority()),
       start_time(op->get_req()->get_recv_stamp()),
-      owner(op->get_req()->get_source_inst())
+      owner(op->get_req()->get_source_inst()),
+      map_epoch(e)
     {}
   PGQueueable(
     const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
-    const entity_inst_t &owner)
+    const entity_inst_t &owner, epoch_t e)
     : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner) {}
+      owner(owner), map_epoch(e) {}
   PGQueueable(
     const PGScrub &op, int cost, unsigned priority, utime_t start_time,
-    const entity_inst_t &owner)
+    const entity_inst_t &owner, epoch_t e)
     : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner) {}
+      owner(owner), map_epoch(e) {}
   PGQueueable(
     const PGRecovery &op, int cost, unsigned priority, utime_t start_time,
-    const entity_inst_t &owner)
+    const entity_inst_t &owner, epoch_t e)
     : qvariant(op), cost(cost), priority(priority), start_time(start_time),
-      owner(owner) {}
+      owner(owner), map_epoch(e)  {}
   const boost::optional<OpRequestRef> maybe_get_op() const {
     const OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
     return op ? OpRequestRef(*op) : boost::optional<OpRequestRef>();
@@ -422,6 +446,7 @@ public:
   int get_cost() const { return cost; }
   utime_t get_start_time() const { return start_time; }
   entity_inst_t get_owner() const { return owner; }
+  epoch_t get_map_epoch() const { return map_epoch; }
 };
 
 class OSDService {
@@ -443,13 +468,24 @@ public:
   PerfCounters *&logger;
   PerfCounters *&recoverystate_perf;
   MonClient   *&monc;
-  ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
   ThreadPool::BatchWorkQueue<PG> &peering_wq;
   GenContextWQ recovery_gen_wq;
   GenContextWQ op_gen_wq;
   ClassHandler  *&class_handler;
 
-  void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+  void enqueue_back(spg_t pgid, PGQueueable qi);
+  void enqueue_front(spg_t pgid, PGQueueable qi);
+
+  void maybe_inject_dispatch_delay() {
+    if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) {
+      if (rand() % 10000 <
+         g_conf->osd_debug_inject_dispatch_delay_probability * 10000) {
+       utime_t t;
+       t.set_from_double(g_conf->osd_debug_inject_dispatch_delay_duration);
+       t.sleep();
+      }
+    }
+  }
 
 private:
   // -- map epoch lower bound --
@@ -918,15 +954,15 @@ public:
   void queue_for_snap_trim(PG *pg);
 
   void queue_for_scrub(PG *pg) {
-    op_wq.queue(
-      make_pair(
-       pg,
-       PGQueueable(
-         PGScrub(pg->get_osdmap()->get_epoch()),
-         cct->_conf->osd_scrub_cost,
-         pg->get_scrub_priority(),
-         ceph_clock_now(),
-         entity_inst_t())));
+    enqueue_back(
+      pg->info.pgid,
+      PGQueueable(
+       PGScrub(pg->get_osdmap()->get_epoch()),
+       cct->_conf->osd_scrub_cost,
+       pg->get_scrub_priority(),
+       ceph_clock_now(),
+       entity_inst_t(),
+       pg->get_osdmap()->get_epoch()));
   }
 
 private:
@@ -946,15 +982,15 @@ private:
   void _queue_for_recovery(
     pair<epoch_t, PGRef> p, uint64_t reserved_pushes) {
     assert(recovery_lock.is_locked_by_me());
-    pair<PGRef, PGQueueable> to_queue = make_pair(
-      p.second,
+    enqueue_back(
+      p.second->info.pgid,
       PGQueueable(
        PGRecovery(p.first, reserved_pushes),
        cct->_conf->osd_recovery_cost,
        cct->_conf->osd_recovery_priority,
        ceph_clock_now(),
-       entity_inst_t()));
-    op_wq.queue(to_queue);
+       entity_inst_t(),
+       p.first));
   }
 public:
   void start_recovery_op(PG *pg, const hobject_t& soid);
@@ -1403,81 +1439,12 @@ private:
   void get_latest_osdmap();
 
   // -- sessions --
-public:
-
-
-  static bool split_request(OpRequestRef op, unsigned match, unsigned bits) {
-    unsigned mask = ~((~0)<<bits);
-    switch (op->get_req()->get_type()) {
-    case CEPH_MSG_OSD_OP:
-      return (static_cast<const MOSDOp*>(
-               op->get_req())->get_raw_pg().m_seed & mask) == match;
-    }
-    return false;
-  }
-
-  static void split_list(
-    boost::intrusive::list<OpRequest> *from,
-    boost::intrusive::list<OpRequest> *to,
-    unsigned match,
-    unsigned bits) {
-    for (auto i = from->begin(); i != from->end(); ) {
-      if (split_request(&(*i), match, bits)) {
-       OpRequest& o = *i;
-       i = from->erase(i);
-       to->push_back(o);
-      } else {
-       ++i;
-      }
-    }
-  }
-  static void split_list(
-    list<OpRequestRef> *from,
-    list<OpRequestRef> *to,
-    unsigned match,
-    unsigned bits) {
-    for (auto i = from->begin(); i != from->end(); ) {
-      if (split_request(*i, match, bits)) {
-       to->push_back(*i);
-       from->erase(i++);
-      } else {
-       ++i;
-      }
-    }
-  }
-
-
 private:
-  void update_waiting_for_pg(Session *session, OSDMapRef osdmap);
-  void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid);
-  void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid);
   void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
+  void maybe_share_map(Session *session, OpRequestRef op, OSDMapRef osdmap);
 
   Mutex session_waiting_lock;
   set<Session*> session_waiting_for_map;
-  map<spg_t, set<Session*> > session_waiting_for_pg;
-
-  void clear_waiting_sessions() {
-    Mutex::Locker l(session_waiting_lock);
-    for (map<spg_t, set<Session*> >::const_iterator i =
-          session_waiting_for_pg.cbegin();
-        i != session_waiting_for_pg.cend();
-        ++i) {
-      for (set<Session*>::iterator j = i->second.begin();
-          j != i->second.end();
-          ++j) {
-       (*j)->put();
-      }
-    }
-    session_waiting_for_pg.clear();
-
-    for (set<Session*>::iterator i = session_waiting_for_map.begin();
-        i != session_waiting_for_map.end();
-        ++i) {
-      (*i)->put();
-    }
-    session_waiting_for_map.clear();
-  }
 
   /// Caller assumes refs for included Sessions
   void get_sessions_waiting_for_map(set<Session*> *out) {
@@ -1486,9 +1453,8 @@ private:
   }
   void register_session_waiting_on_map(Session *session) {
     Mutex::Locker l(session_waiting_lock);
-    if (session_waiting_for_map.count(session) == 0) {
+    if (session_waiting_for_map.insert(session).second) {
       session->get();
-      session_waiting_for_map.insert(session);
     }
   }
   void clear_session_waiting_on_map(Session *session) {
@@ -1506,37 +1472,15 @@ private:
         i != sessions_to_check.end();
         sessions_to_check.erase(i++)) {
       (*i)->session_dispatch_lock.Lock();
-      update_waiting_for_pg(*i, osdmap);
       dispatch_session_waiting(*i, osdmap);
       (*i)->session_dispatch_lock.Unlock();
       (*i)->put();
     }
   }
-  void clear_session_waiting_on_pg(Session *session, const spg_t &pgid) {
-    Mutex::Locker l(session_waiting_lock);
-    map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
-    if (i == session_waiting_for_pg.end()) {
-      return;
-    }
-    set<Session*>::iterator j = i->second.find(session);
-    if (j != i->second.end()) {
-      (*j)->put();
-      i->second.erase(j);
-    }
-    if (i->second.empty()) {
-      session_waiting_for_pg.erase(i);
-    }
-  }
   void session_handle_reset(Session *session) {
     Mutex::Locker l(session->session_dispatch_lock);
     clear_session_waiting_on_map(session);
 
-    for (auto i = session->waiting_for_pg.cbegin();
-        i != session->waiting_for_pg.cend();
-        ++i) {
-      clear_session_waiting_on_pg(session, i->first);
-    }    
-
     session->clear_backoffs();
 
     /* Messages have connection refs, we need to clear the
@@ -1545,49 +1489,6 @@ private:
      * Bug #12338
      */
     session->waiting_on_map.clear_and_dispose(TrackedOp::Putter());
-    for (auto& i : session->waiting_for_pg) {
-      i.second.clear_and_dispose(TrackedOp::Putter());
-    }
-    session->waiting_for_pg.clear();
-    session->osdmap.reset();
-  }
-  void register_session_waiting_on_pg(Session *session, spg_t pgid) {
-    Mutex::Locker l(session_waiting_lock);
-    set<Session*> &s = session_waiting_for_pg[pgid];
-    set<Session*>::const_iterator i = s.find(session);
-    if (i == s.cend()) {
-      session->get();
-      s.insert(session);
-    }
-  }
-  void get_sessions_possibly_interested_in_pg(
-    spg_t pgid, set<Session*> *sessions) {
-    Mutex::Locker l(session_waiting_lock);
-    while (1) {
-      map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
-      if (i != session_waiting_for_pg.end()) {
-       sessions->insert(i->second.begin(), i->second.end());
-      }
-      if (pgid.pgid.ps() == 0) {
-       break;
-      } else {
-       pgid = pgid.get_parent();
-      }
-    }
-    for (set<Session*>::iterator i = sessions->begin();
-        i != sessions->end();
-        ++i) {
-      (*i)->get();
-    }
-  }
-  void get_pgs_with_waiting_sessions(set<spg_t> *pgs) {
-    Mutex::Locker l(session_waiting_lock);
-    for (map<spg_t, set<Session*> >::iterator i =
-          session_waiting_for_pg.begin();
-        i != session_waiting_for_pg.end();
-        ++i) {
-      pgs->insert(i->first);
-    }
   }
 
 private:
@@ -1746,54 +1647,117 @@ private:
   // -- op queue --
   enum io_queue {
     prioritized,
-    weightedpriority};
+    weightedpriority
+  };
   const io_queue op_queue;
   const unsigned int op_prio_cutoff;
 
+  /*
+   * The ordered op delivery chain is:
+   *
+   *   fast dispatch -> pqueue back
+   *                    pqueue front <-> to_process back
+   *                                     to_process front  -> RunVis(item)
+   *                                                      <- queue_front()
+   *
+   * The pqueue is per-shard, and to_process is per pg_slot.  Items can be
+   * pushed back up into to_process and/or pqueue while order is preserved.
+   *
+   * Multiple worker threads can operate on each shard.
+   *
+   * Under normal circumstances, num_running == to_proces.size().  There are
+   * two times when that is not true: (1) when waiting_for_pg == true and
+   * to_process is accumulating requests that are waiting for the pg to be
+   * instantiated; in that case they will all get requeued together by
+   * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg
+   * and already requeued the items.
+   */
   friend class PGQueueable;
-  class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
-
+  class ShardedOpWQ
+    : public ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>
+  {
     struct ShardData {
       Mutex sdata_lock;
       Cond sdata_cond;
-      Mutex sdata_op_ordering_lock;
-      map<PG*, list<PGQueueable> > pg_for_processing;
-      std::unique_ptr<OpQueue< pair<PGRef, PGQueueable>, entity_inst_t>> pqueue;
+
+      Mutex sdata_op_ordering_lock;   ///< protects all members below
+
+      OSDMapRef waiting_for_pg_osdmap;
+      struct pg_slot {
+       PGRef pg;                     ///< cached pg reference [optional]
+       list<PGQueueable> to_process; ///< order items for this slot
+       int num_running = 0;          ///< _process threads doing pg lookup/lock
+
+       /// true if pg does/did not exist. if so all new items go directly to
+       /// to_process.  cleared by prune_pg_waiters.
+       bool waiting_for_pg = false;
+
+       /// incremented by wake_pg_waiters; indicates racing _process threads
+       /// should bail out (their op has been requeued)
+       uint64_t requeue_seq = 0;
+      };
+
+      /// map of slots for each spg_t.  maintains ordering of items dequeued
+      /// from pqueue while _process thread drops shard lock to acquire the
+      /// pg lock.  slots are removed only by prune_pg_waiters.
+      unordered_map<spg_t,pg_slot> pg_slots;
+
+      /// priority queue
+      std::unique_ptr<OpQueue< pair<spg_t, PGQueueable>, entity_inst_t>> pqueue;
+
+      void _enqueue_front(pair<spg_t, PGQueueable> item, unsigned cutoff) {
+       unsigned priority = item.second.get_priority();
+       unsigned cost = item.second.get_cost();
+       if (priority >= cutoff)
+         pqueue->enqueue_strict_front(
+           item.second.get_owner(),
+           priority, item);
+       else
+         pqueue->enqueue_front(
+           item.second.get_owner(),
+           priority, cost, item);
+      }
+
       ShardData(
        string lock_name, string ordering_lock,
        uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
        io_queue opqueue)
        : sdata_lock(lock_name.c_str(), false, true, false, cct),
-         sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) {
-           if (opqueue == weightedpriority) {
-             pqueue = std::unique_ptr
-               <WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
-                 new WeightedPriorityQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
-                   max_tok_per_prio, min_cost));
-           } else if (opqueue == prioritized) {
-             pqueue = std::unique_ptr
-               <PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>>(
-                 new PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t>(
-                   max_tok_per_prio, min_cost));
-           }
-         }
+         sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+                                false, cct) {
+       if (opqueue == weightedpriority) {
+         pqueue = std::unique_ptr
+           <WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+             new WeightedPriorityQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+               max_tok_per_prio, min_cost));
+       } else if (opqueue == prioritized) {
+         pqueue = std::unique_ptr
+           <PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>>(
+             new PrioritizedQueue<pair<spg_t,PGQueueable>,entity_inst_t>(
+               max_tok_per_prio, min_cost));
+       }
+      }
     };
-    
+
     vector<ShardData*> shard_list;
     OSD *osd;
     uint32_t num_shards;
 
   public:
-    ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
-      ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> >(ti, si, tp),
-      osd(o), num_shards(pnum_shards) {
-      for(uint32_t i = 0; i < num_shards; i++) {
+    ShardedOpWQ(uint32_t pnum_shards,
+               OSD *o,
+               time_t ti,
+               time_t si,
+               ShardedThreadPool* tp)
+      : ShardedThreadPool::ShardedWQ<pair<spg_t,PGQueueable>>(ti, si, tp),
+        osd(o),
+        num_shards(pnum_shards) {
+      for (uint32_t i = 0; i < num_shards; i++) {
        char lock_name[32] = {0};
        snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
        char order_lock[32] = {0};
-       snprintf(
-         order_lock, sizeof(order_lock), "%s.%d",
-         "OSD:ShardedOpWQ:order:", i);
+       snprintf(order_lock, sizeof(order_lock), "%s.%d",
+                "OSD:ShardedOpWQ:order:", i);
        ShardData* one_shard = new ShardData(
          lock_name, order_lock,
          osd->cct->_conf->osd_op_pq_max_tokens_per_priority, 
@@ -1801,17 +1765,33 @@ private:
        shard_list.push_back(one_shard);
       }
     }
-    
     ~ShardedOpWQ() {
-      while(!shard_list.empty()) {
+      while (!shard_list.empty()) {
        delete shard_list.back();
        shard_list.pop_back();
       }
     }
 
+    /// wake any pg waiters after a PG is created/instantiated
+    void wake_pg_waiters(spg_t pgid);
+
+    /// prune ops (and possiblye pg_slots) for pgs that shouldn't be here
+    void prune_pg_waiters(OSDMapRef osdmap, int whoami);
+
+    /// clear cached PGRef on pg deletion
+    void clear_pg_pointer(spg_t pgid);
+
+    /// clear pg_slots on shutdown
+    void clear_pg_slots();
+
+    /// try to do some work
     void _process(uint32_t thread_index, heartbeat_handle_d *hb);
-    void _enqueue(pair <PGRef, PGQueueable> item);
-    void _enqueue_front(pair <PGRef, PGQueueable> item);
+
+    /// enqueue a new item
+    void _enqueue(pair <spg_t, PGQueueable> item);
+
+    /// requeue an old item (at the front of the line)
+    void _enqueue_front(pair <spg_t, PGQueueable> item);
       
     void return_waiting_threads() {
       for(uint32_t i = 0; i < num_shards; i++) {
@@ -1839,11 +1819,11 @@ private:
 
     /// Must be called on ops queued back to front
     struct Pred {
-      PG *pg;
+      spg_t pgid;
       list<OpRequestRef> *out_ops;
       uint64_t reserved_pushes_to_free;
-      Pred(PG *pg, list<OpRequestRef> *out_ops = 0)
-       : pg(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
+      Pred(spg_t pg, list<OpRequestRef> *out_ops = 0)
+       : pgid(pg), out_ops(out_ops), reserved_pushes_to_free(0) {}
       void accumulate(const PGQueueable &op) {
        reserved_pushes_to_free += op.get_reserved_pushes();
        if (out_ops) {
@@ -1852,8 +1832,8 @@ private:
            out_ops->push_front(*mop);
        }
       }
-      bool operator()(const pair<PGRef, PGQueueable> &op) {
-       if (op.first == pg) {
+      bool operator()(const pair<spg_t, PGQueueable> &op) {
+       if (op.first == pgid) {
          accumulate(op.second);
          return true;
        } else {
@@ -1865,40 +1845,7 @@ private:
       }
     };
 
-    void dequeue(PG *pg) {
-      FUNCTRACE();
-      return dequeue_and_get_ops(pg, nullptr);
-    }
-
-    void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
-      ShardData* sdata = NULL;
-      assert(pg != NULL);
-      uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
-      sdata = shard_list[shard_index];
-      assert(sdata != NULL);
-      sdata->sdata_op_ordering_lock.Lock();
-
-      Pred f(pg, dequeued);
-
-      // items in pqueue are behind items in pg_for_processing
-      sdata->pqueue->remove_by_filter(f);
-
-      map<PG *, list<PGQueueable> >::const_iterator iter =
-       sdata->pg_for_processing.find(pg);
-      if (iter != sdata->pg_for_processing.cend()) {
-       for (auto i = iter->second.crbegin();
-            i != iter->second.crend();
-            ++i) {
-         f.accumulate(*i);
-       }
-       sdata->pg_for_processing.erase(iter);
-      }
-
-      sdata->sdata_op_ordering_lock.Unlock();
-      osd->service.release_reserved_pushes(f.get_reserved_pushes_to_free());
-    }
-    bool is_shard_empty(uint32_t thread_index) {
+    bool is_shard_empty(uint32_t thread_index) override {
       uint32_t shard_index = thread_index % num_shards; 
       ShardData* sdata = shard_list[shard_index];
       assert(NULL != sdata);
@@ -1908,7 +1855,7 @@ private:
   } op_shardedwq;
 
 
-  void enqueue_op(PGRef pg, OpRequestRef& op);
+  void enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch);
   void dequeue_op(
     PGRef pg, OpRequestRef op,
     ThreadPool::TPHandle &handle);
@@ -2042,8 +1989,6 @@ protected:
 
   PGPool _get_pool(int id, OSDMapRef createmap);
 
-  PGRef get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op,
-                              Session *session);
   PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
   PG   *_lookup_lock_pg(spg_t pgid);
   PG   *_open_lock_pg(OSDMapRef createmap,
@@ -2056,10 +2001,6 @@ protected:
   res_result _try_resurrect_pg(
     OSDMapRef curmap, spg_t pgid, spg_t *resurrected, PGRef *old_pg_state);
 
-  /**
-   * After unlocking the pg, the user must ensure that wake_pg_waiters
-   * is called.
-   */
   PG   *_create_lock_pg(
     OSDMapRef createmap,
     spg_t pgid,
@@ -2071,7 +2012,6 @@ protected:
     pg_history_t history,
     const pg_interval_map_t& pi,
     ObjectStore::Transaction& t);
-  PG   *_lookup_qlock_pg(spg_t pgid);
 
   PG* _make_pg(OSDMapRef createmap, spg_t pgid);
   void add_newly_split_pg(PG *pg,
@@ -2096,25 +2036,11 @@ protected:
     int lastactingprimary
     ); ///< @return false if there was a map gap between from and now
 
-  void wake_pg_waiters(spg_t pgid) {
-    assert(osd_lock.is_locked());
-    // Need write lock on pg_map_lock
-    set<Session*> concerned_sessions;
-    get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions);
-
-    for (set<Session*>::iterator i = concerned_sessions.begin();
-        i != concerned_sessions.end();
-        ++i) {
-      {
-       Mutex::Locker l((*i)->session_dispatch_lock);
-       session_notify_pg_create(*i, osdmap, pgid);
-       dispatch_session_waiting(*i, osdmap);
-      }
-      (*i)->put();
-    }
+  // this must be called with pg->lock held on any pg addition to pg_map
+  void wake_pg_waiters(PGRef pg) {
+    assert(pg->is_locked());
+    op_shardedwq.wake_pg_waiters(pg->info.pgid);
   }
-
-
   epoch_t last_pg_create_epoch;
 
   void handle_pg_create(OpRequestRef op);
@@ -2473,11 +2399,6 @@ private:
   void handle_pg_scrub(struct MOSDScrub *m, PG* pg);
   void handle_scrub(struct MOSDScrub *m);
   void handle_osd_ping(class MOSDPing *m);
-  void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
-  void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap);
-
-  template <typename T, int MSGTYPE>
-  void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap);
 
   int init_op_flags(OpRequestRef& op);
 
index e01127dbb9535d663fb3a85308f8ecbff0231647..64d0580f7b371666dd5cb4c13421a45e18b6269a 100644 (file)
@@ -784,14 +784,20 @@ public:
   /*
    * check whether an spg_t maps to a particular osd
    */
-  bool is_acting_osd_shard(spg_t pg, int osd) const {
-    vector<int> acting;
-    _pg_to_up_acting_osds(pg.pgid, NULL, NULL, &acting, NULL, false);
-    if (pg.shard == shard_id_t::NO_SHARD)
-      return calc_pg_role(osd, acting, acting.size()) >= 0;
-    if (pg.shard >= (int)acting.size())
-      return false;
-    return acting[pg.shard] == osd;
+  bool is_up_acting_osd_shard(spg_t pg, int osd) const {
+    vector<int> up, acting;
+    _pg_to_up_acting_osds(pg.pgid, &up, NULL, &acting, NULL, false);
+    if (pg.shard == shard_id_t::NO_SHARD) {
+      if (calc_pg_role(osd, acting, acting.size()) >= 0 ||
+         calc_pg_role(osd, up, up.size()) >= 0)
+       return true;
+    } else {
+      if (pg.shard < (int)acting.size() && acting[pg.shard] == osd)
+       return true;
+      if (pg.shard < (int)up.size() && up[pg.shard] == osd)
+       return true;
+    }
+    return false;
   }
 
 
index 3a161b65170f2f01f05096e351cf6ca81268764f..a71e35c90dfd91b729977eceedb9b3ff99ce964c 100644 (file)
@@ -27,7 +27,6 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) :
   TrackedOp(tracker, req->get_recv_stamp()),
   rmw_flags(0), request(req),
   hit_flag_points(0), latest_flag_point(0),
-  send_map_update(false), sent_epoch(0),
   hitset_inserted(false) {
   if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
     // don't warn as quickly for low priority ops
index dbcfe6773a3ebf029248265c23ab6f41c0c29e0a..9bd41830888f3a01f6d5bd645a7182c935357abe 100644 (file)
@@ -107,12 +107,22 @@ public:
   ~OpRequest() {
     request->put();
   }
-  bool send_map_update;
-  epoch_t sent_epoch;
+
+  bool check_send_map = true; ///< true until we check if sender needs a map
+  epoch_t sent_epoch = 0;     ///< client's map epoch
+
   bool hitset_inserted;
   const Message *get_req() const { return request; }
   Message *get_nonconst_req() { return request; }
 
+  entity_name_t get_source() {
+    if (request) {
+      return request->get_source();
+    } else {
+      return entity_name_t();
+    }
+  }
+
   const char *state_string() const {
     switch(latest_flag_point) {
     case flag_queued_for_pg: return "queued for pg";
index 963e75e4c8023394cf0c09a66d02620997a3aa6c..885ce69776950cf5efcadef09607bb885429a0b1 100644 (file)
@@ -221,7 +221,6 @@ PG::PG(OSDService *o, OSDMapRef curmap,
     p.get_split_bits(curmap->get_pg_num(_pool.id)),
     _pool.id,
     p.shard),
-  map_lock("PG::map_lock"),
   osdmap_ref(curmap), last_persisted_osdmap_ref(curmap), pool(_pool),
   _lock("PG::_lock"),
   #ifdef PG_DEBUG_REFS
@@ -1910,48 +1909,6 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
   return cap;
 }
 
-void PG::take_op_map_waiters()
-{
-  Mutex::Locker l(map_lock);
-  for (list<OpRequestRef>::iterator i = waiting_for_map.begin();
-       i != waiting_for_map.end();
-       ) {
-    if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), *i)) {
-      break;
-    } else {
-      (*i)->mark_queued_for_pg();
-      osd->op_wq.queue(make_pair(PGRef(this), *i));
-      waiting_for_map.erase(i++);
-    }
-  }
-}
-
-void PG::queue_op(OpRequestRef& op)
-{
-  Mutex::Locker l(map_lock);
-  if (!waiting_for_map.empty()) {
-    // preserve ordering
-    waiting_for_map.push_back(op);
-    op->mark_delayed("waiting_for_map not empty");
-    return;
-  }
-  if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), op)) {
-    waiting_for_map.push_back(op);
-    op->mark_delayed("op must wait for map");
-    return;
-  }
-  op->mark_queued_for_pg();
-  osd->op_wq.queue(make_pair(PGRef(this), op));
-  {
-    // after queue() to include any locking costs
-#ifdef WITH_LTTNG
-    osd_reqid_t reqid = op->get_reqid();
-#endif
-    tracepoint(pg, queue_op, reqid.name._type,
-        reqid.name._num, reqid.tid, reqid.inc, op->rmw_flags);
-  }
-}
-
 void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
 {
   lock();
@@ -3457,20 +3414,55 @@ void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m)
 
 void PG::requeue_op(OpRequestRef op)
 {
-  osd->op_wq.queue_front(make_pair(PGRef(this), op));
+  auto p = waiting_for_map.find(op->get_source());
+  if (p != waiting_for_map.end()) {
+    dout(20) << __func__ << " " << op << " (waiting_for_map " << p->first << ")"
+            << dendl;
+    p->second.push_front(op);
+  } else {
+    dout(20) << __func__ << " " << op << dendl;
+    osd->enqueue_front(info.pgid, PGQueueable(op, get_osdmap()->get_epoch()));
+  }
 }
 
 void PG::requeue_ops(list<OpRequestRef> &ls)
 {
-  dout(15) << " requeue_ops " << ls << dendl;
   for (list<OpRequestRef>::reverse_iterator i = ls.rbegin();
        i != ls.rend();
        ++i) {
-    osd->op_wq.queue_front(make_pair(PGRef(this), *i));
+    auto p = waiting_for_map.find((*i)->get_source());
+    if (p != waiting_for_map.end()) {
+      dout(20) << __func__ << " " << *i << " (waiting_for_map " << p->first
+              << ")" << dendl;
+      p->second.push_front(*i);
+    } else {
+      dout(20) << __func__ << " " << *i << dendl;
+      osd->enqueue_front(info.pgid, PGQueueable(*i, get_osdmap()->get_epoch()));
+    }
   }
   ls.clear();
 }
 
+void PG::requeue_map_waiters()
+{
+  epoch_t epoch = get_osdmap()->get_epoch();
+  auto p = waiting_for_map.begin();
+  while (p != waiting_for_map.end()) {
+    if (op_must_wait_for_map(epoch, p->second.front())) {
+      dout(20) << __func__ << " " << p->first << " front op "
+              << p->second.front() << " must still wait, doing nothing"
+              << dendl;
+      ++p;
+    } else {
+      dout(20) << __func__ << " " << p->first << " " << p->second << dendl;
+      for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) {
+       osd->enqueue_front(info.pgid, PGQueueable(*q, epoch));
+      }
+      p = waiting_for_map.erase(p);
+    }
+  }
+}
+
 
 // ==========================================================================================
 // SCRUB
@@ -5626,7 +5618,7 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
 void PG::take_waiters()
 {
   dout(10) << "take_waiters" << dendl;
-  take_op_map_waiters();
+  requeue_map_waiters();
   for (list<CephPeeringEvtRef>::iterator i = peering_waiters.begin();
        i != peering_waiters.end();
        ++i) osd->queue_for_peering(this);
index 118a2ec7eb4f4f40aa26caab5351e8f657e3f9c9..dbbac2df8eea2a8302c52026952cfcde74bde2ce 100644 (file)
@@ -218,28 +218,17 @@ public:
     return get_pgbackend()->get_is_recoverable_predicate();
   }
 protected:
-  // Ops waiting for map, should be queued at back
-  Mutex map_lock;
-  list<OpRequestRef> waiting_for_map;
   OSDMapRef osdmap_ref;
   OSDMapRef last_persisted_osdmap_ref;
   PGPool pool;
 
-  void queue_op(OpRequestRef& op);
-  void take_op_map_waiters();
+  void requeue_map_waiters();
 
   void update_osdmap_ref(OSDMapRef newmap) {
     assert(_lock.is_locked_by_me());
-    Mutex::Locker l(map_lock);
     osdmap_ref = std::move(newmap);
   }
 
-  OSDMapRef get_osdmap_with_maplock() const {
-    assert(map_lock.is_locked());
-    assert(osdmap_ref);
-    return osdmap_ref;
-  }
-
 public:
   OSDMapRef get_osdmap() const {
     assert(is_locked());
@@ -813,10 +802,65 @@ public:
 
 protected:
 
+  /*
+   * blocked request wait hierarchy
+   *
+   * In order to preserve request ordering we need to be careful about the
+   * order in which blocked requests get requeued.  Generally speaking, we
+   * push the requests back up to the op_wq in reverse order (most recent
+   * request first) so that they come back out again in the original order.
+   * However, because there are multiple wait queues, we need to requeue
+   * waitlists in order.  Generally speaking, we requeue the wait lists
+   * that are checked first.
+   *
+   * Here are the various wait lists, in the order they are used during
+   * request processing, with notes:
+   *
+   *  - waiting_for_map
+   *    - may start or stop blocking at any time (depending on client epoch)
+   *  - waiting_for_peered
+   *    - !is_peered() or flushes_in_progress
+   *    - only starts blocking on interval change; never restarts
+   *  - waiting_for_active
+   *    - !is_active()
+   *    - only starts blocking on interval change; never restarts
+   *  - waiting_for_scrub
+   *    - starts and stops blocking for varying intervals during scrub
+   *  - waiting_for_unreadable_object
+   *    - never restarts once object is readable (* except for EIO?)
+   *  - waiting_for_degraded_object
+   *    - never restarts once object is writeable (* except for EIO?)
+   *  - waiting_for_blocked_object
+   *    - starts and stops based on proxied op activity
+   *  - obc rwlocks
+   *    - starts and stops based on read/write activity
+   *
+   * Notes:
+   *
+   *  1. During and interval change, we requeue *everything* in the above order.
+   *
+   *  2. When an obc rwlock is released, we check for a scrub block and requeue
+   *     the op there if it applies.  We ignore the unreadable/degraded/blocked
+   *     queues because we assume they cannot apply at that time (this is
+   *     probably mostly true).
+   *
+   *  3. The requeue_ops helper will push ops onto the waiting_for_map list if
+   *     it is non-empty.
+   *
+   * These three behaviors are generally sufficient to maintain ordering, with
+   * the possible exception of cases where we make an object degraded or
+   * unreadable that was previously okay, e.g. when scrub or op processing
+   * encounter an unexpected error.  FIXME.
+   */
 
   // pg waiters
   unsigned flushes_in_progress;
 
+  // ops with newer maps than our (or blocked behind them)
+  // track these by client, since inter-request ordering doesn't otherwise
+  // matter.
+  unordered_map<entity_name_t,list<OpRequestRef>> waiting_for_map;
+
   // ops waiting on peered
   list<OpRequestRef>            waiting_for_peered;
 
index 579fe7c1cc9771e1954f1dae2f40f6c39f600240..96dbaafaf61d9edf052e31429aaea4889485c126 100644 (file)
@@ -1552,7 +1552,24 @@ void PrimaryLogPG::do_request(
   OpRequestRef& op,
   ThreadPool::TPHandle &handle)
 {
-  assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
+  // make sure we have a new enough map
+  auto p = waiting_for_map.find(op->get_source());
+  if (p != waiting_for_map.end()) {
+    // preserve ordering
+    dout(20) << __func__ << " waiting_for_map "
+            << p->first << " not empty, queueing" << dendl;
+    p->second.push_back(op);
+    op->mark_delayed("waiting_for_map not empty");
+    return;
+  }
+  if (op_must_wait_for_map(get_osdmap()->get_epoch(), op)) {
+    dout(20) << __func__ << " queue on waiting_for_map "
+            << op->get_source() << dendl;
+    waiting_for_map[op->get_source()].push_back(op);
+    op->mark_delayed("op must wait for map");
+    return;
+  }
+
   if (can_discard_request(op)) {
     return;
   }
@@ -8272,10 +8289,9 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
     if (scrubber.active_rep_scrub) {
       if (last_update_applied == static_cast<const MOSDRepScrub*>(
            scrubber.active_rep_scrub->get_req())->scrub_to) {
-       osd->op_wq.queue(
-         make_pair(
-           this,
-           scrubber.active_rep_scrub));
+       osd->enqueue_back(
+         info.pgid,
+         PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
        scrubber.active_rep_scrub = OpRequestRef();
       }
     }
@@ -9503,10 +9519,9 @@ void PrimaryLogPG::_applied_recovered_object_replica()
   if (!deleting && active_pushes == 0 &&
       scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
        scrubber.active_rep_scrub->get_req())->chunky) {
-    osd->op_wq.queue(
-      make_pair(
-       this,
-       scrubber.active_rep_scrub));
+    osd->enqueue_back(
+      info.pgid,
+      PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
     scrubber.active_rep_scrub = OpRequestRef();
   }
 
@@ -9898,7 +9913,6 @@ void PrimaryLogPG::on_shutdown()
 
   // remove from queues
   osd->pg_stat_queue_dequeue(this);
-  osd->dequeue_pg(this, 0);
   osd->peering_wq.dequeue(this);
 
   // handles queue races
index 63316773711f9ad21a543a7c0670857d1fd58ed0..5abcb0da3d1291dfc58f83bdf0099dd97788d190 100644 (file)
@@ -130,9 +130,6 @@ struct Session : public RefCountedObject {
   Mutex session_dispatch_lock;
   boost::intrusive::list<OpRequest> waiting_on_map;
 
-  OSDMapRef osdmap;  /// Map as of which waiting_for_pg is current
-  map<spg_t, boost::intrusive::list<OpRequest> > waiting_for_pg;
-
   Spinlock sent_epoch_lock;
   epoch_t last_sent_epoch;
   Spinlock received_map_lock;
@@ -153,11 +150,6 @@ struct Session : public RefCountedObject {
     last_sent_epoch(0), received_map_epoch(0),
     backoff_lock("Session::backoff_lock")
     {}
-  void maybe_reset_osdmap() {
-    if (waiting_for_pg.empty()) {
-      osdmap.reset();
-    }
-  }
 
   void ack_backoff(
     CephContext *cct,