]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: reorder OSDService methods under proper dout_prefix macro
authorSage Weil <sage@redhat.com>
Mon, 4 Aug 2014 21:57:28 +0000 (14:57 -0700)
committerSage Weil <sage@redhat.com>
Mon, 4 Aug 2014 21:57:28 +0000 (14:57 -0700)
The dout_prefix for OSDService uses get_osdmap() to grab a shared_ptr for
the epoch printout.  The OSD one does not, and is not safe to run in all
thread contexts.

In particular, update_osd_stat() is run by the heartbeat thread and can
race with the shared_ptr itself being updated with a new map.

Ironically, if this were simply an OSDMap*, there would be no race since
the pointer is a single word and updates atomically.

Fix this, and any similar issues, by moving the OSDService methods up in
OSD.cc so that they use the safe dout macro.

Fixes: #8998
Backport: firefly (in a minimal form, I think!)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc

index 08f2e1ad6b79eb56f47146710441a786aea6e314..800732c8a714f374234f6b646dd46e29fcd95433 100644 (file)
@@ -569,6 +569,722 @@ void OSDService::agent_stop()
   agent_thread.join();
 }
 
+// -------------------------------------
+
+float OSDService::get_full_ratio()
+{
+  float full_ratio = cct->_conf->osd_failsafe_full_ratio;
+  if (full_ratio > 1.0) full_ratio /= 100.0;
+  return full_ratio;
+}
+
+float OSDService::get_nearfull_ratio()
+{
+  float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
+  if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
+  return nearfull_ratio;
+}
+
+void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
+{
+  Mutex::Locker l(full_status_lock);
+  enum s_names new_state;
+
+  time_t now = ceph_clock_gettime(NULL);
+
+  // We base ratio on kb_avail rather than kb_used because they can
+  // differ significantly e.g. on btrfs volumes with a large number of
+  // chunks reserved for metadata, and for our purposes (avoiding
+  // completely filling the disk) it's far more important to know how
+  // much space is available to use than how much we've already used.
+  float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
+  float nearfull_ratio = get_nearfull_ratio();
+  float full_ratio = get_full_ratio();
+  cur_ratio = ratio;
+
+  if (full_ratio > 0 && ratio > full_ratio) {
+    new_state = FULL;
+  } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
+    new_state = NEAR;
+  } else {
+    cur_state = NONE;
+    return;
+  }
+
+  if (cur_state != new_state) {
+    cur_state = new_state;
+  } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
+    return;
+  }
+  last_msg = now;
+  if (cur_state == FULL)
+    clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
+  else
+    clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
+}
+
+bool OSDService::check_failsafe_full()
+{
+  Mutex::Locker l(full_status_lock);
+  if (cur_state == FULL)
+    return true;
+  return false;
+}
+
+bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
+{
+  Mutex::Locker l(full_status_lock);
+  double max_ratio;
+  max_ratio = cct->_conf->osd_backfill_full_ratio;
+  if (_ratio)
+    *_ratio = cur_ratio;
+  if (_max_ratio)
+    *_max_ratio = max_ratio;
+  return cur_ratio >= max_ratio;
+}
+
+void OSDService::update_osd_stat(vector<int>& hb_peers)
+{
+  Mutex::Locker lock(stat_lock);
+
+  // fill in osd stats too
+  struct statfs stbuf;
+  osd->store->statfs(&stbuf);
+
+  uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
+  uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
+  uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+
+  osd_stat.kb = bytes >> 10;
+  osd_stat.kb_used = used >> 10;
+  osd_stat.kb_avail = avail >> 10;
+
+  osd->logger->set(l_osd_stat_bytes, bytes);
+  osd->logger->set(l_osd_stat_bytes_used, used);
+  osd->logger->set(l_osd_stat_bytes_avail, avail);
+
+  osd_stat.hb_in.swap(hb_peers);
+  osd_stat.hb_out.clear();
+
+  check_nearfull_warning(osd_stat);
+
+  osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+
+  dout(20) << "update_osd_stat " << osd_stat << dendl;
+}
+
+void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+{
+  OSDMapRef next_map = get_nextmap_reserved();
+  // service map is always newer/newest
+  assert(from_epoch <= next_map->get_epoch());
+
+  if (next_map->is_down(peer) ||
+      next_map->get_info(peer).up_from > from_epoch) {
+    m->put();
+    release_map(next_map);
+    return;
+  }
+  const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
+  Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
+  share_map_peer(peer, peer_con, next_map);
+  osd->cluster_messenger->send_message(m, peer_inst);
+  release_map(next_map);
+}
+
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+{
+  OSDMapRef next_map = get_nextmap_reserved();
+  // service map is always newer/newest
+  assert(from_epoch <= next_map->get_epoch());
+
+  if (next_map->is_down(peer) ||
+      next_map->get_info(peer).up_from > from_epoch) {
+    release_map(next_map);
+    return NULL;
+  }
+  ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
+  release_map(next_map);
+  return con;
+}
+
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+{
+  OSDMapRef next_map = get_nextmap_reserved();
+  // service map is always newer/newest
+  assert(from_epoch <= next_map->get_epoch());
+
+  pair<ConnectionRef,ConnectionRef> ret;
+  if (next_map->is_down(peer) ||
+      next_map->get_info(peer).up_from > from_epoch) {
+    release_map(next_map);
+    return ret;
+  }
+  ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
+  if (next_map->get_hb_front_addr(peer) != entity_addr_t())
+    ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
+  release_map(next_map);
+  return ret;
+}
+
+
+void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+{
+  Mutex::Locker l(pg_temp_lock);
+  pg_temp_wanted[pgid] = want;
+}
+
+void OSDService::send_pg_temp()
+{
+  Mutex::Locker l(pg_temp_lock);
+  if (pg_temp_wanted.empty())
+    return;
+  dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
+  MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
+  m->pg_temp = pg_temp_wanted;
+  monc->send_mon_message(m);
+}
+
+
+// --------------------------------------
+// dispatch
+
+epoch_t OSDService::get_peer_epoch(int peer)
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p == peer_map_epoch.end())
+    return 0;
+  return p->second;
+}
+
+epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p != peer_map_epoch.end()) {
+    if (p->second < e) {
+      dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
+      p->second = e;
+    } else {
+      dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
+    }
+    return p->second;
+  } else {
+    dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
+    peer_map_epoch[peer] = e;
+    return e;
+  }
+}
+void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
+{
+  Mutex::Locker l(peer_map_epoch_lock);
+  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+  if (p != peer_map_epoch.end()) {
+    if (p->second <= as_of) {
+      dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+              << " had " << p->second << dendl;
+      peer_map_epoch.erase(p);
+    } else {
+      dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+              << " has " << p->second << " - not forgetting" << dendl;
+    }
+  }
+}
+
+bool OSDService::should_share_map(entity_name_t name, Connection *con,
+                                  epoch_t epoch, OSDMapRef& osdmap,
+                                  const epoch_t *sent_epoch_p)
+{
+  bool should_send = false;
+  dout(20) << "should_share_map "
+           << name << " " << con->get_peer_addr()
+           << " " << epoch << dendl;
+
+  // does client have old map?
+  if (name.is_client()) {
+    bool message_sendmap = epoch < osdmap->get_epoch();
+    if (message_sendmap && sent_epoch_p) {
+      dout(20) << "client session last_sent_epoch: "
+               << *sent_epoch_p
+               << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
+      if (*sent_epoch_p < osdmap->get_epoch()) {
+        should_send = true;
+      } // else we don't need to send it out again
+    }
+  }
+
+  if (con->get_messenger() == osd->cluster_messenger &&
+      con != osd->cluster_messenger->get_loopback_connection() &&
+      osdmap->is_up(name.num()) &&
+      (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+       osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+    // remember
+    epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
+
+    // share?
+    if (has < osdmap->get_epoch()) {
+      dout(10) << name << " " << con->get_peer_addr()
+               << " has old map " << epoch << " < "
+               << osdmap->get_epoch() << dendl;
+      should_send = true;
+    }
+  }
+
+  return should_send;
+}
+
+void OSDService::share_map(
+    entity_name_t name,
+    Connection *con,
+    epoch_t epoch,
+    OSDMapRef& osdmap,
+    epoch_t *sent_epoch_p)
+{
+  dout(20) << "share_map "
+          << name << " " << con->get_peer_addr()
+          << " " << epoch << dendl;
+
+  if ((!osd->is_active()) && (!osd->is_stopping())) {
+    /*It is safe not to proceed as OSD is not in healthy state*/
+    return;
+  }
+
+  bool want_shared = should_share_map(name, con, epoch,
+                                      osdmap, sent_epoch_p);
+
+  if (want_shared){
+    if (name.is_client()) {
+      dout(10) << name << " has old map " << epoch
+          << " < " << osdmap->get_epoch() << dendl;
+      // we know the Session is valid or we wouldn't be sending
+      if (sent_epoch_p) {
+       *sent_epoch_p = osdmap->get_epoch();
+      }
+      send_incremental_map(epoch, con, osdmap);
+    } else if (con->get_messenger() == osd->cluster_messenger &&
+        osdmap->is_up(name.num()) &&
+        (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+            osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+      dout(10) << name << " " << con->get_peer_addr()
+                      << " has old map " << epoch << " < "
+                      << osdmap->get_epoch() << dendl;
+      note_peer_epoch(name.num(), osdmap->get_epoch());
+      send_incremental_map(epoch, con, osdmap);
+    }
+  }
+}
+
+
+void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
+{
+  if (!map)
+    map = get_osdmap();
+
+  // send map?
+  epoch_t pe = get_peer_epoch(peer);
+  if (pe) {
+    if (pe < map->get_epoch()) {
+      send_incremental_map(pe, con, map);
+      note_peer_epoch(peer, map->get_epoch());
+    } else
+      dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
+  } else {
+    dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
+    // no idea about peer's epoch.
+    // ??? send recent ???
+    // do nothing.
+  }
+}
+
+
+bool OSDService::inc_scrubs_pending()
+{
+  bool result = false;
+
+  sched_scrub_lock.Lock();
+  if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
+    dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
+            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+    result = true;
+    ++scrubs_pending;
+  } else {
+    dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+  }
+  sched_scrub_lock.Unlock();
+
+  return result;
+}
+
+void OSDService::dec_scrubs_pending()
+{
+  sched_scrub_lock.Lock();
+  dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
+          << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+  --scrubs_pending;
+  assert(scrubs_pending >= 0);
+  sched_scrub_lock.Unlock();
+}
+
+void OSDService::inc_scrubs_active(bool reserved)
+{
+  sched_scrub_lock.Lock();
+  ++(scrubs_active);
+  if (reserved) {
+    --(scrubs_pending);
+    dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+            << " (max " << cct->_conf->osd_max_scrubs
+            << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
+    assert(scrubs_pending >= 0);
+  } else {
+    dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+            << " (max " << cct->_conf->osd_max_scrubs
+            << ", pending " << scrubs_pending << ")" << dendl;
+  }
+  sched_scrub_lock.Unlock();
+}
+
+void OSDService::dec_scrubs_active()
+{
+  sched_scrub_lock.Lock();
+  dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
+          << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
+  --scrubs_active;
+  sched_scrub_lock.Unlock();
+}
+
+void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+                                 epoch_t *_bind_epoch) const
+{
+  Mutex::Locker l(epoch_lock);
+  if (_boot_epoch)
+    *_boot_epoch = boot_epoch;
+  if (_up_epoch)
+    *_up_epoch = up_epoch;
+  if (_bind_epoch)
+    *_bind_epoch = bind_epoch;
+}
+
+void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+                            const epoch_t *_bind_epoch)
+{
+  Mutex::Locker l(epoch_lock);
+  if (_boot_epoch) {
+    assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
+    boot_epoch = *_boot_epoch;
+  }
+  if (_up_epoch) {
+    assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
+    up_epoch = *_up_epoch;
+  }
+  if (_bind_epoch) {
+    assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
+    bind_epoch = *_bind_epoch;
+  }
+}
+
+bool OSDService::prepare_to_stop()
+{
+  Mutex::Locker l(is_stopping_lock);
+  if (state != NOT_STOPPING)
+    return false;
+
+  OSDMapRef osdmap = get_osdmap();
+  if (osdmap && osdmap->is_up(whoami)) {
+    dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+    state = PREPARING_TO_STOP;
+    monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
+                                             osdmap->get_inst(whoami),
+                                             osdmap->get_epoch(),
+                                             true  // request ack
+                                             ));
+    utime_t now = ceph_clock_now(cct);
+    utime_t timeout;
+    timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
+    while ((ceph_clock_now(cct) < timeout) &&
+          (state != STOPPING)) {
+      is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
+    }
+  }
+  dout(0) << __func__ << " starting shutdown" << dendl;
+  state = STOPPING;
+  return true;
+}
+
+void OSDService::got_stop_ack()
+{
+  Mutex::Locker l(is_stopping_lock);
+  if (state == PREPARING_TO_STOP) {
+    dout(0) << __func__ << " starting shutdown" << dendl;
+    state = STOPPING;
+    is_stopping_cond.Signal();
+  } else {
+    dout(10) << __func__ << " ignoring msg" << dendl;
+  }
+}
+
+
+MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+                                               OSDSuperblock& sblock)
+{
+  MOSDMap *m = new MOSDMap(monc->get_fsid());
+  m->oldest_map = sblock.oldest_map;
+  m->newest_map = sblock.newest_map;
+  
+  for (epoch_t e = to; e > since; e--) {
+    bufferlist bl;
+    if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
+      m->incremental_maps[e].claim(bl);
+    } else if (get_map_bl(e, bl)) {
+      m->maps[e].claim(bl);
+      break;
+    } else {
+      derr << "since " << since << " to " << to
+          << " oldest " << m->oldest_map << " newest " << m->newest_map
+          << dendl;
+      m->put();
+      m = NULL;
+      break;
+    }
+  }
+  return m;
+}
+
+void OSDService::send_map(MOSDMap *m, Connection *con)
+{
+  Messenger *msgr = client_messenger;
+  if (entity_name_t::TYPE_OSD == con->get_peer_type())
+    msgr = cluster_messenger;
+  msgr->send_message(m, con);
+}
+
+void OSDService::send_incremental_map(epoch_t since, Connection *con,
+                                      OSDMapRef& osdmap)
+{
+  epoch_t to = osdmap->get_epoch();
+  dout(10) << "send_incremental_map " << since << " -> " << to
+           << " to " << con << " " << con->get_peer_addr() << dendl;
+
+  MOSDMap *m = NULL;
+  while (!m) {
+    OSDSuperblock sblock(get_superblock());
+    if (since < sblock.oldest_map) {
+      // just send latest full map
+      MOSDMap *m = new MOSDMap(monc->get_fsid());
+      m->oldest_map = sblock.oldest_map;
+      m->newest_map = sblock.newest_map;
+      get_map_bl(to, m->maps[to]);
+      send_map(m, con);
+      return;
+    }
+    
+    if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
+      dout(10) << "  " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
+              << ", only sending most recent" << dendl;
+      since = to - cct->_conf->osd_map_share_max_epochs;
+    }
+    
+    if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
+      to = since + cct->_conf->osd_map_message_max;
+    m = build_incremental_map_msg(since, to, sblock);
+  }
+  send_map(m, con);
+}
+
+bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
+{
+  bool found = map_bl_cache.lookup(e, &bl);
+  if (found)
+    return true;
+  found = store->read(
+    coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+  if (found)
+    _add_map_bl(e, bl);
+  return found;
+}
+
+bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
+{
+  Mutex::Locker l(map_cache_lock);
+  bool found = map_bl_inc_cache.lookup(e, &bl);
+  if (found)
+    return true;
+  found = store->read(
+    coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+  if (found)
+    _add_map_inc_bl(e, bl);
+  return found;
+}
+
+void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
+{
+  dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+  map_bl_cache.add(e, bl);
+}
+
+void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+{
+  dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+  map_bl_inc_cache.add(e, bl);
+}
+
+void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+{
+  Mutex::Locker l(map_cache_lock);
+  map_bl_inc_cache.pin(e, bl);
+}
+
+void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
+{
+  Mutex::Locker l(map_cache_lock);
+  map_bl_cache.pin(e, bl);
+}
+
+void OSDService::clear_map_bl_cache_pins(epoch_t e)
+{
+  Mutex::Locker l(map_cache_lock);
+  map_bl_inc_cache.clear_pinned(e);
+  map_bl_cache.clear_pinned(e);
+}
+
+OSDMapRef OSDService::_add_map(OSDMap *o)
+{
+  epoch_t e = o->get_epoch();
+
+  if (cct->_conf->osd_map_dedup) {
+    // Dedup against an existing map at a nearby epoch
+    OSDMapRef for_dedup = map_cache.lower_bound(e);
+    if (for_dedup) {
+      OSDMap::dedup(for_dedup.get(), o);
+    }
+  }
+  OSDMapRef l = map_cache.add(e, o);
+  return l;
+}
+
+OSDMapRef OSDService::try_get_map(epoch_t epoch)
+{
+  Mutex::Locker l(map_cache_lock);
+  OSDMapRef retval = map_cache.lookup(epoch);
+  if (retval) {
+    dout(30) << "get_map " << epoch << " -cached" << dendl;
+    return retval;
+  }
+
+  OSDMap *map = new OSDMap;
+  if (epoch > 0) {
+    dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
+    bufferlist bl;
+    if (!_get_map_bl(epoch, bl)) {
+      delete map;
+      return OSDMapRef();
+    }
+    map->decode(bl);
+  } else {
+    dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
+  }
+  return _add_map(map);
+}
+
+bool OSDService::queue_for_recovery(PG *pg)
+{
+  bool b = recovery_wq.queue(pg);
+  if (b)
+    dout(10) << "queue_for_recovery queued " << *pg << dendl;
+  else
+    dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+  return b;
+}
+
+
+// ops
+
+
+void OSDService::reply_op_error(OpRequestRef op, int err)
+{
+  reply_op_error(op, err, eversion_t(), 0);
+}
+
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+                                version_t uv)
+{
+  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+  assert(m->get_header().type == CEPH_MSG_OSD_OP);
+  int flags;
+  flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
+
+  MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+                                      true);
+  reply->set_reply_versions(v, uv);
+  m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
+}
+
+void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
+{
+  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+  assert(m->get_header().type == CEPH_MSG_OSD_OP);
+
+  assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
+
+  if (pg->is_ec_pg()) {
+    /**
+       * OSD recomputes op target based on current OSDMap. With an EC pg, we
+       * can get this result:
+       * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
+       *    [CRUSH_ITEM_NONE, 2, 3]/3
+       * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
+       *    [3, 2, 3]/3
+       * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
+       *    -- misdirected op
+       * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
+       *    it and fulfils it
+       *
+       * We can't compute the op target based on the sending map epoch due to
+       * splitting.  The simplest thing is to detect such cases here and drop
+       * them without an error (the client will resend anyway).
+       */
+    OSDMapRef opmap = try_get_map(m->get_map_epoch());
+    if (!opmap) {
+      dout(7) << __func__ << ": " << *pg << " no longer have map for "
+             << m->get_map_epoch() << ", dropping" << dendl;
+      return;
+    }
+    pg_t _pgid = m->get_pg();
+    spg_t pgid;
+    if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
+      _pgid = opmap->raw_pg_to_pg(_pgid);
+    if (opmap->get_primary_shard(_pgid, &pgid) &&
+       pgid.shard != pg->info.pgid.shard) {
+      dout(7) << __func__ << ": " << *pg << " primary changed since "
+             << m->get_map_epoch() << ", dropping" << dendl;
+      return;
+    }
+  }
+
+  dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
+  clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+             << " pg " << m->get_pg()
+             << " to osd." << whoami
+             << " not " << pg->acting
+             << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
+  reply_op_error(op, -ENXIO);
+}
+
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+  osd->op_shardedwq.dequeue(pg, dequeued);
+}
+
+void OSDService::queue_for_peering(PG *pg)
+{
+  peering_wq.queue(pg);
+}
+
+
+// ====================================================================
+// OSD
 
 #undef dout_prefix
 #define dout_prefix *_dout
@@ -2649,192 +3365,90 @@ bool OSD::project_pg_history(spg_t pgid, pg_history_t& h, epoch_t from,
 {
   dout(15) << "project_pg_history " << pgid
            << " from " << from << " to " << osdmap->get_epoch()
-           << ", start " << h
-           << dendl;
-
-  epoch_t e;
-  for (e = osdmap->get_epoch();
-       e > from;
-       e--) {
-    // verify during intermediate epoch (e-1)
-    OSDMapRef oldmap = service.try_get_map(e-1);
-    if (!oldmap) {
-      dout(15) << __func__ << ": found map gap, returning false" << dendl;
-      return false;
-    }
-    assert(oldmap->have_pg_pool(pgid.pool()));
-
-    int upprimary, actingprimary;
-    vector<int> up, acting;
-    oldmap->pg_to_up_acting_osds(
-      pgid.pgid,
-      &up,
-      &upprimary,
-      &acting,
-      &actingprimary);
-
-    // acting set change?
-    if ((actingprimary != currentactingprimary ||
-        upprimary != currentupprimary ||
-        acting != currentacting ||
-        up != currentup) && e > h.same_interval_since) {
-      dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e 
-              << " from " << acting << "/" << up
-              << " " << actingprimary << "/" << upprimary
-              << " -> " << currentacting << "/" << currentup
-              << " " << currentactingprimary << "/" << currentupprimary 
-              << dendl;
-      h.same_interval_since = e;
-    }
-    // split?
-    if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
-                     osdmap->get_pg_num(pgid.pool()),
-                     0)) {
-      h.same_interval_since = e;
-    }
-    // up set change?
-    if ((up != currentup || upprimary != currentupprimary)
-       && e > h.same_up_since) {
-      dout(15) << "project_pg_history " << pgid << " up changed in " << e 
-              << " from " << up << " " << upprimary
-              << " -> " << currentup << " " << currentupprimary << dendl;
-      h.same_up_since = e;
-    }
-
-    // primary change?
-    if (OSDMap::primary_changed(
-         actingprimary,
-         acting,
-         currentactingprimary,
-         currentacting) &&
-        e > h.same_primary_since) {
-      dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl;
-      h.same_primary_since = e;
-    }
-
-    if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e)
-      break;
-  }
-
-  // base case: these floors should be the creation epoch if we didn't
-  // find any changes.
-  if (e == h.epoch_created) {
-    if (!h.same_interval_since)
-      h.same_interval_since = e;
-    if (!h.same_up_since)
-      h.same_up_since = e;
-    if (!h.same_primary_since)
-      h.same_primary_since = e;
-  }
-
-  dout(15) << "project_pg_history end " << h << dendl;
-  return true;
-}
-
-// -------------------------------------
-
-float OSDService::get_full_ratio()
-{
-  float full_ratio = cct->_conf->osd_failsafe_full_ratio;
-  if (full_ratio > 1.0) full_ratio /= 100.0;
-  return full_ratio;
-}
-
-float OSDService::get_nearfull_ratio()
-{
-  float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
-  if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
-  return nearfull_ratio;
-}
-
-void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
-{
-  Mutex::Locker l(full_status_lock);
-  enum s_names new_state;
-
-  time_t now = ceph_clock_gettime(NULL);
-
-  // We base ratio on kb_avail rather than kb_used because they can
-  // differ significantly e.g. on btrfs volumes with a large number of
-  // chunks reserved for metadata, and for our purposes (avoiding
-  // completely filling the disk) it's far more important to know how
-  // much space is available to use than how much we've already used.
-  float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
-  float nearfull_ratio = get_nearfull_ratio();
-  float full_ratio = get_full_ratio();
-  cur_ratio = ratio;
-
-  if (full_ratio > 0 && ratio > full_ratio) {
-    new_state = FULL;
-  } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
-    new_state = NEAR;
-  } else {
-    cur_state = NONE;
-    return;
-  }
-
-  if (cur_state != new_state) {
-    cur_state = new_state;
-  } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
-    return;
-  }
-  last_msg = now;
-  if (cur_state == FULL)
-    clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
-  else
-    clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
-}
-
-bool OSDService::check_failsafe_full()
-{
-  Mutex::Locker l(full_status_lock);
-  if (cur_state == FULL)
-    return true;
-  return false;
-}
-
-bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
-{
-  Mutex::Locker l(full_status_lock);
-  double max_ratio;
-  max_ratio = cct->_conf->osd_backfill_full_ratio;
-  if (_ratio)
-    *_ratio = cur_ratio;
-  if (_max_ratio)
-    *_max_ratio = max_ratio;
-  return cur_ratio >= max_ratio;
-}
-
-void OSDService::update_osd_stat(vector<int>& hb_peers)
-{
-  Mutex::Locker lock(stat_lock);
-
-  // fill in osd stats too
-  struct statfs stbuf;
-  osd->store->statfs(&stbuf);
+           << ", start " << h
+           << dendl;
 
-  uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
-  uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
-  uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+  epoch_t e;
+  for (e = osdmap->get_epoch();
+       e > from;
+       e--) {
+    // verify during intermediate epoch (e-1)
+    OSDMapRef oldmap = service.try_get_map(e-1);
+    if (!oldmap) {
+      dout(15) << __func__ << ": found map gap, returning false" << dendl;
+      return false;
+    }
+    assert(oldmap->have_pg_pool(pgid.pool()));
 
-  osd_stat.kb = bytes >> 10;
-  osd_stat.kb_used = used >> 10;
-  osd_stat.kb_avail = avail >> 10;
+    int upprimary, actingprimary;
+    vector<int> up, acting;
+    oldmap->pg_to_up_acting_osds(
+      pgid.pgid,
+      &up,
+      &upprimary,
+      &acting,
+      &actingprimary);
 
-  osd->logger->set(l_osd_stat_bytes, bytes);
-  osd->logger->set(l_osd_stat_bytes_used, used);
-  osd->logger->set(l_osd_stat_bytes_avail, avail);
+    // acting set change?
+    if ((actingprimary != currentactingprimary ||
+        upprimary != currentupprimary ||
+        acting != currentacting ||
+        up != currentup) && e > h.same_interval_since) {
+      dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e 
+              << " from " << acting << "/" << up
+              << " " << actingprimary << "/" << upprimary
+              << " -> " << currentacting << "/" << currentup
+              << " " << currentactingprimary << "/" << currentupprimary 
+              << dendl;
+      h.same_interval_since = e;
+    }
+    // split?
+    if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
+                     osdmap->get_pg_num(pgid.pool()),
+                     0)) {
+      h.same_interval_since = e;
+    }
+    // up set change?
+    if ((up != currentup || upprimary != currentupprimary)
+       && e > h.same_up_since) {
+      dout(15) << "project_pg_history " << pgid << " up changed in " << e 
+              << " from " << up << " " << upprimary
+              << " -> " << currentup << " " << currentupprimary << dendl;
+      h.same_up_since = e;
+    }
 
-  osd_stat.hb_in.swap(hb_peers);
-  osd_stat.hb_out.clear();
+    // primary change?
+    if (OSDMap::primary_changed(
+         actingprimary,
+         acting,
+         currentactingprimary,
+         currentacting) &&
+        e > h.same_primary_since) {
+      dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl;
+      h.same_primary_since = e;
+    }
 
-  check_nearfull_warning(osd_stat);
+    if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e)
+      break;
+  }
 
-  osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+  // base case: these floors should be the creation epoch if we didn't
+  // find any changes.
+  if (e == h.epoch_created) {
+    if (!h.same_interval_since)
+      h.same_interval_since = e;
+    if (!h.same_up_since)
+      h.same_up_since = e;
+    if (!h.same_primary_since)
+      h.same_primary_since = e;
+  }
 
-  dout(20) << "update_osd_stat " << osd_stat << dendl;
+  dout(15) << "project_pg_history end " << h << dendl;
+  return true;
 }
 
+
+
 void OSD::_add_heartbeat_peer(int p)
 {
   if (p == whoami)
@@ -4065,76 +4679,6 @@ void OSD::send_alive()
   }
 }
 
-void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
-{
-  OSDMapRef next_map = get_nextmap_reserved();
-  // service map is always newer/newest
-  assert(from_epoch <= next_map->get_epoch());
-
-  if (next_map->is_down(peer) ||
-      next_map->get_info(peer).up_from > from_epoch) {
-    m->put();
-    release_map(next_map);
-    return;
-  }
-  const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
-  Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
-  share_map_peer(peer, peer_con, next_map);
-  osd->cluster_messenger->send_message(m, peer_inst);
-  release_map(next_map);
-}
-
-ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
-{
-  OSDMapRef next_map = get_nextmap_reserved();
-  // service map is always newer/newest
-  assert(from_epoch <= next_map->get_epoch());
-
-  if (next_map->is_down(peer) ||
-      next_map->get_info(peer).up_from > from_epoch) {
-    release_map(next_map);
-    return NULL;
-  }
-  ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
-  release_map(next_map);
-  return con;
-}
-
-pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
-{
-  OSDMapRef next_map = get_nextmap_reserved();
-  // service map is always newer/newest
-  assert(from_epoch <= next_map->get_epoch());
-
-  pair<ConnectionRef,ConnectionRef> ret;
-  if (next_map->is_down(peer) ||
-      next_map->get_info(peer).up_from > from_epoch) {
-    release_map(next_map);
-    return ret;
-  }
-  ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
-  if (next_map->get_hb_front_addr(peer) != entity_addr_t())
-    ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
-  release_map(next_map);
-  return ret;
-}
-
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
-{
-  Mutex::Locker l(pg_temp_lock);
-  pg_temp_wanted[pgid] = want;
-}
-
-void OSDService::send_pg_temp()
-{
-  Mutex::Locker l(pg_temp_lock);
-  if (pg_temp_wanted.empty())
-    return;
-  dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
-  MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
-  m->pg_temp = pg_temp_wanted;
-  monc->send_mon_message(m);
-}
 
 void OSD::send_failures()
 {
@@ -4712,210 +5256,58 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
         << r;
       goto out;
     }
-    cct->_conf->apply_changes(NULL);
-    ss << "kicking recovery queue. set osd_recovery_delay_start "
-       << "to " << cct->_conf->osd_recovery_delay_start;
-    defer_recovery_until = ceph_clock_now(cct);
-    defer_recovery_until += cct->_conf->osd_recovery_delay_start;
-    recovery_wq.wake();
-  }
-
-  else if (prefix == "cpu_profiler") {
-    string arg;
-    cmd_getval(cct, cmdmap, "arg", arg);
-    vector<string> argvec;
-    get_str_vec(arg, argvec);
-    cpu_profiler_handle_command(argvec, ds);
-  }
-
-  else if (prefix == "dump_pg_recovery_stats") {
-    stringstream s;
-    if (f) {
-      pg_recovery_stats.dump_formatted(f.get());
-      f->flush(ds);
-    } else {
-      pg_recovery_stats.dump(s);
-      ds << "dump pg recovery stats: " << s.str();
-    }
-  }
-
-  else if (prefix == "reset_pg_recovery_stats") {
-    ss << "reset pg recovery stats";
-    pg_recovery_stats.reset();
-  }
-
-  else {
-    ss << "unrecognized command! " << cmd;
-    r = -EINVAL;
-  }
-
- out:
-  rs = ss.str();
-  odata.append(ds);
-  dout(0) << "do_command r=" << r << " " << rs << dendl;
-  clog.info() << rs << "\n";
-  if (con) {
-    MCommandReply *reply = new MCommandReply(r, rs);
-    reply->set_tid(tid);
-    reply->set_data(odata);
-    client_messenger->send_message(reply, con);
-  }
-  return;
-}
-
-
-
-// --------------------------------------
-// dispatch
-
-epoch_t OSDService::get_peer_epoch(int peer)
-{
-  Mutex::Locker l(peer_map_epoch_lock);
-  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-  if (p == peer_map_epoch.end())
-    return 0;
-  return p->second;
-}
-
-epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
-{
-  Mutex::Locker l(peer_map_epoch_lock);
-  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-  if (p != peer_map_epoch.end()) {
-    if (p->second < e) {
-      dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
-      p->second = e;
-    } else {
-      dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
-    }
-    return p->second;
-  } else {
-    dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
-    peer_map_epoch[peer] = e;
-    return e;
-  }
-}
-void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
-{
-  Mutex::Locker l(peer_map_epoch_lock);
-  map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-  if (p != peer_map_epoch.end()) {
-    if (p->second <= as_of) {
-      dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
-              << " had " << p->second << dendl;
-      peer_map_epoch.erase(p);
-    } else {
-      dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
-              << " has " << p->second << " - not forgetting" << dendl;
-    }
+    cct->_conf->apply_changes(NULL);
+    ss << "kicking recovery queue. set osd_recovery_delay_start "
+       << "to " << cct->_conf->osd_recovery_delay_start;
+    defer_recovery_until = ceph_clock_now(cct);
+    defer_recovery_until += cct->_conf->osd_recovery_delay_start;
+    recovery_wq.wake();
   }
-}
-
-bool OSDService::should_share_map(entity_name_t name, Connection *con,
-                                  epoch_t epoch, OSDMapRef& osdmap,
-                                  const epoch_t *sent_epoch_p)
-{
-  bool should_send = false;
-  dout(20) << "should_share_map "
-           << name << " " << con->get_peer_addr()
-           << " " << epoch << dendl;
 
-  // does client have old map?
-  if (name.is_client()) {
-    bool message_sendmap = epoch < osdmap->get_epoch();
-    if (message_sendmap && sent_epoch_p) {
-      dout(20) << "client session last_sent_epoch: "
-               << *sent_epoch_p
-               << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
-      if (*sent_epoch_p < osdmap->get_epoch()) {
-        should_send = true;
-      } // else we don't need to send it out again
-    }
+  else if (prefix == "cpu_profiler") {
+    string arg;
+    cmd_getval(cct, cmdmap, "arg", arg);
+    vector<string> argvec;
+    get_str_vec(arg, argvec);
+    cpu_profiler_handle_command(argvec, ds);
   }
 
-  if (con->get_messenger() == osd->cluster_messenger &&
-      con != osd->cluster_messenger->get_loopback_connection() &&
-      osdmap->is_up(name.num()) &&
-      (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
-       osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
-    // remember
-    epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
-
-    // share?
-    if (has < osdmap->get_epoch()) {
-      dout(10) << name << " " << con->get_peer_addr()
-               << " has old map " << epoch << " < "
-               << osdmap->get_epoch() << dendl;
-      should_send = true;
+  else if (prefix == "dump_pg_recovery_stats") {
+    stringstream s;
+    if (f) {
+      pg_recovery_stats.dump_formatted(f.get());
+      f->flush(ds);
+    } else {
+      pg_recovery_stats.dump(s);
+      ds << "dump pg recovery stats: " << s.str();
     }
   }
 
-  return should_send;
-}
-
-void OSDService::share_map(
-    entity_name_t name,
-    Connection *con,
-    epoch_t epoch,
-    OSDMapRef& osdmap,
-    epoch_t *sent_epoch_p)
-{
-  dout(20) << "share_map "
-          << name << " " << con->get_peer_addr()
-          << " " << epoch << dendl;
-
-  if ((!osd->is_active()) && (!osd->is_stopping())) {
-    /*It is safe not to proceed as OSD is not in healthy state*/
-    return;
+  else if (prefix == "reset_pg_recovery_stats") {
+    ss << "reset pg recovery stats";
+    pg_recovery_stats.reset();
   }
 
-  bool want_shared = should_share_map(name, con, epoch,
-                                      osdmap, sent_epoch_p);
+  else {
+    ss << "unrecognized command! " << cmd;
+    r = -EINVAL;
+  }
 
-  if (want_shared){
-    if (name.is_client()) {
-      dout(10) << name << " has old map " << epoch
-          << " < " << osdmap->get_epoch() << dendl;
-      // we know the Session is valid or we wouldn't be sending
-      if (sent_epoch_p) {
-       *sent_epoch_p = osdmap->get_epoch();
-      }
-      send_incremental_map(epoch, con, osdmap);
-    } else if (con->get_messenger() == osd->cluster_messenger &&
-        osdmap->is_up(name.num()) &&
-        (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
-            osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
-      dout(10) << name << " " << con->get_peer_addr()
-                      << " has old map " << epoch << " < "
-                      << osdmap->get_epoch() << dendl;
-      note_peer_epoch(name.num(), osdmap->get_epoch());
-      send_incremental_map(epoch, con, osdmap);
-    }
+ out:
+  rs = ss.str();
+  odata.append(ds);
+  dout(0) << "do_command r=" << r << " " << rs << dendl;
+  clog.info() << rs << "\n";
+  if (con) {
+    MCommandReply *reply = new MCommandReply(r, rs);
+    reply->set_tid(tid);
+    reply->set_data(odata);
+    client_messenger->send_message(reply, con);
   }
+  return;
 }
 
 
-void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
-{
-  if (!map)
-    map = get_osdmap();
-
-  // send map?
-  epoch_t pe = get_peer_epoch(peer);
-  if (pe) {
-    if (pe < map->get_epoch()) {
-      send_incremental_map(pe, con, map);
-      note_peer_epoch(peer, map->get_epoch());
-    } else
-      dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
-  } else {
-    dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
-    // no idea about peer's epoch.
-    // ??? send recent ???
-    // do nothing.
-  }
-}
 
 
 bool OSD::heartbeat_dispatch(Message *m)
@@ -5535,131 +5927,7 @@ void OSD::sched_scrub()
   dout(20) << "sched_scrub done" << dendl;
 }
 
-bool OSDService::inc_scrubs_pending()
-{
-  bool result = false;
-
-  sched_scrub_lock.Lock();
-  if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
-    dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
-            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
-    result = true;
-    ++scrubs_pending;
-  } else {
-    dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
-  }
-  sched_scrub_lock.Unlock();
-
-  return result;
-}
-
-void OSDService::dec_scrubs_pending()
-{
-  sched_scrub_lock.Lock();
-  dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
-  --scrubs_pending;
-  assert(scrubs_pending >= 0);
-  sched_scrub_lock.Unlock();
-}
-
-void OSDService::inc_scrubs_active(bool reserved)
-{
-  sched_scrub_lock.Lock();
-  ++(scrubs_active);
-  if (reserved) {
-    --(scrubs_pending);
-    dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
-            << " (max " << cct->_conf->osd_max_scrubs
-            << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
-    assert(scrubs_pending >= 0);
-  } else {
-    dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
-            << " (max " << cct->_conf->osd_max_scrubs
-            << ", pending " << scrubs_pending << ")" << dendl;
-  }
-  sched_scrub_lock.Unlock();
-}
-
-void OSDService::dec_scrubs_active()
-{
-  sched_scrub_lock.Lock();
-  dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
-          << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
-  --scrubs_active;
-  sched_scrub_lock.Unlock();
-}
-
-void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
-                                 epoch_t *_bind_epoch) const
-{
-  Mutex::Locker l(epoch_lock);
-  if (_boot_epoch)
-    *_boot_epoch = boot_epoch;
-  if (_up_epoch)
-    *_up_epoch = up_epoch;
-  if (_bind_epoch)
-    *_bind_epoch = bind_epoch;
-}
-
-void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
-                            const epoch_t *_bind_epoch)
-{
-  Mutex::Locker l(epoch_lock);
-  if (_boot_epoch) {
-    assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
-    boot_epoch = *_boot_epoch;
-  }
-  if (_up_epoch) {
-    assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
-    up_epoch = *_up_epoch;
-  }
-  if (_bind_epoch) {
-    assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
-    bind_epoch = *_bind_epoch;
-  }
-}
-
-bool OSDService::prepare_to_stop()
-{
-  Mutex::Locker l(is_stopping_lock);
-  if (state != NOT_STOPPING)
-    return false;
-
-  OSDMapRef osdmap = get_osdmap();
-  if (osdmap && osdmap->is_up(whoami)) {
-    dout(0) << __func__ << " telling mon we are shutting down" << dendl;
-    state = PREPARING_TO_STOP;
-    monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
-                                             osdmap->get_inst(whoami),
-                                             osdmap->get_epoch(),
-                                             true  // request ack
-                                             ));
-    utime_t now = ceph_clock_now(cct);
-    utime_t timeout;
-    timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
-    while ((ceph_clock_now(cct) < timeout) &&
-          (state != STOPPING)) {
-      is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
-    }
-  }
-  dout(0) << __func__ << " starting shutdown" << dendl;
-  state = STOPPING;
-  return true;
-}
-
-void OSDService::got_stop_ack()
-{
-  Mutex::Locker l(is_stopping_lock);
-  if (state == PREPARING_TO_STOP) {
-    dout(0) << __func__ << " starting shutdown" << dendl;
-    state = STOPPING;
-    is_stopping_cond.Signal();
-  } else {
-    dout(10) << __func__ << " ignoring msg" << dendl;
-  }
-}
-
+
 
 // =====================================================
 // MAP
@@ -6391,169 +6659,6 @@ void OSD::activate_map()
   take_waiters(waiting_for_osdmap);
 }
 
-
-MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
-                                               OSDSuperblock& sblock)
-{
-  MOSDMap *m = new MOSDMap(monc->get_fsid());
-  m->oldest_map = sblock.oldest_map;
-  m->newest_map = sblock.newest_map;
-  
-  for (epoch_t e = to; e > since; e--) {
-    bufferlist bl;
-    if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
-      m->incremental_maps[e].claim(bl);
-    } else if (get_map_bl(e, bl)) {
-      m->maps[e].claim(bl);
-      break;
-    } else {
-      derr << "since " << since << " to " << to
-          << " oldest " << m->oldest_map << " newest " << m->newest_map
-          << dendl;
-      m->put();
-      m = NULL;
-      break;
-    }
-  }
-  return m;
-}
-
-void OSDService::send_map(MOSDMap *m, Connection *con)
-{
-  Messenger *msgr = client_messenger;
-  if (entity_name_t::TYPE_OSD == con->get_peer_type())
-    msgr = cluster_messenger;
-  msgr->send_message(m, con);
-}
-
-void OSDService::send_incremental_map(epoch_t since, Connection *con,
-                                      OSDMapRef& osdmap)
-{
-  epoch_t to = osdmap->get_epoch();
-  dout(10) << "send_incremental_map " << since << " -> " << to
-           << " to " << con << " " << con->get_peer_addr() << dendl;
-
-  MOSDMap *m = NULL;
-  while (!m) {
-    OSDSuperblock sblock(get_superblock());
-    if (since < sblock.oldest_map) {
-      // just send latest full map
-      MOSDMap *m = new MOSDMap(monc->get_fsid());
-      m->oldest_map = sblock.oldest_map;
-      m->newest_map = sblock.newest_map;
-      get_map_bl(to, m->maps[to]);
-      send_map(m, con);
-      return;
-    }
-    
-    if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
-      dout(10) << "  " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
-              << ", only sending most recent" << dendl;
-      since = to - cct->_conf->osd_map_share_max_epochs;
-    }
-    
-    if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
-      to = since + cct->_conf->osd_map_message_max;
-    m = build_incremental_map_msg(since, to, sblock);
-  }
-  send_map(m, con);
-}
-
-bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
-{
-  bool found = map_bl_cache.lookup(e, &bl);
-  if (found)
-    return true;
-  found = store->read(
-    coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
-  if (found)
-    _add_map_bl(e, bl);
-  return found;
-}
-
-bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
-{
-  Mutex::Locker l(map_cache_lock);
-  bool found = map_bl_inc_cache.lookup(e, &bl);
-  if (found)
-    return true;
-  found = store->read(
-    coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
-  if (found)
-    _add_map_inc_bl(e, bl);
-  return found;
-}
-
-void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
-{
-  dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
-  map_bl_cache.add(e, bl);
-}
-
-void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
-{
-  dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
-  map_bl_inc_cache.add(e, bl);
-}
-
-void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
-{
-  Mutex::Locker l(map_cache_lock);
-  map_bl_inc_cache.pin(e, bl);
-}
-
-void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
-{
-  Mutex::Locker l(map_cache_lock);
-  map_bl_cache.pin(e, bl);
-}
-
-void OSDService::clear_map_bl_cache_pins(epoch_t e)
-{
-  Mutex::Locker l(map_cache_lock);
-  map_bl_inc_cache.clear_pinned(e);
-  map_bl_cache.clear_pinned(e);
-}
-
-OSDMapRef OSDService::_add_map(OSDMap *o)
-{
-  epoch_t e = o->get_epoch();
-
-  if (cct->_conf->osd_map_dedup) {
-    // Dedup against an existing map at a nearby epoch
-    OSDMapRef for_dedup = map_cache.lower_bound(e);
-    if (for_dedup) {
-      OSDMap::dedup(for_dedup.get(), o);
-    }
-  }
-  OSDMapRef l = map_cache.add(e, o);
-  return l;
-}
-
-OSDMapRef OSDService::try_get_map(epoch_t epoch)
-{
-  Mutex::Locker l(map_cache_lock);
-  OSDMapRef retval = map_cache.lookup(epoch);
-  if (retval) {
-    dout(30) << "get_map " << epoch << " -cached" << dendl;
-    return retval;
-  }
-
-  OSDMap *map = new OSDMap;
-  if (epoch > 0) {
-    dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
-    bufferlist bl;
-    if (!_get_map_bl(epoch, bl)) {
-      delete map;
-      return OSDMapRef();
-    }
-    map->decode(bl);
-  } else {
-    dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
-  }
-  return _add_map(map);
-}
-
 bool OSD::require_mon_peer(Message *m)
 {
   if (!m->get_connection()->peer_is_mon()) {
@@ -7587,17 +7692,6 @@ void OSD::check_replay_queue()
   }
 }
 
-
-bool OSDService::queue_for_recovery(PG *pg)
-{
-  bool b = recovery_wq.queue(pg);
-  if (b)
-    dout(10) << "queue_for_recovery queued " << *pg << dendl;
-  else
-    dout(10) << "queue_for_recovery already queued " << *pg << dendl;
-  return b;
-}
-
 bool OSD::_recover_now()
 {
   if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
@@ -7732,76 +7826,6 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
 // =========================================================
 // OPS
 
-void OSDService::reply_op_error(OpRequestRef op, int err)
-{
-  reply_op_error(op, err, eversion_t(), 0);
-}
-
-void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
-                                version_t uv)
-{
-  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
-  assert(m->get_header().type == CEPH_MSG_OSD_OP);
-  int flags;
-  flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
-
-  MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
-                                      true);
-  reply->set_reply_versions(v, uv);
-  m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
-}
-
-void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
-{
-  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
-  assert(m->get_header().type == CEPH_MSG_OSD_OP);
-
-  assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
-
-  if (pg->is_ec_pg()) {
-    /**
-       * OSD recomputes op target based on current OSDMap. With an EC pg, we
-       * can get this result:
-       * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
-       *    [CRUSH_ITEM_NONE, 2, 3]/3
-       * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
-       *    [3, 2, 3]/3
-       * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
-       *    -- misdirected op
-       * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
-       *    it and fulfils it
-       *
-       * We can't compute the op target based on the sending map epoch due to
-       * splitting.  The simplest thing is to detect such cases here and drop
-       * them without an error (the client will resend anyway).
-       */
-    OSDMapRef opmap = try_get_map(m->get_map_epoch());
-    if (!opmap) {
-      dout(7) << __func__ << ": " << *pg << " no longer have map for "
-             << m->get_map_epoch() << ", dropping" << dendl;
-      return;
-    }
-    pg_t _pgid = m->get_pg();
-    spg_t pgid;
-    if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
-      _pgid = opmap->raw_pg_to_pg(_pgid);
-    if (opmap->get_primary_shard(_pgid, &pgid) &&
-       pgid.shard != pg->info.pgid.shard) {
-      dout(7) << __func__ << ": " << *pg << " primary changed since "
-             << m->get_map_epoch() << ", dropping" << dendl;
-      return;
-    }
-  }
-
-  dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
-  clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
-             << " pg " << m->get_pg()
-             << " to osd." << whoami
-             << " not " << pg->acting
-             << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
-  reply_op_error(op, -ENXIO);
-}
-
 class C_SendMap : public GenContext<ThreadPool::TPHandle&> {
   OSD *osd;
   entity_name_t name;
@@ -8181,12 +8205,6 @@ void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
 }
 
 
-
-void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
-{
-  osd->op_shardedwq.dequeue(pg, dequeued);
-}
-
 /*
  * NOTE: dequeue called in worker thread, with pg lock
  */
@@ -8234,11 +8252,6 @@ void OSD::dequeue_op(
 }
 
 
-void OSDService::queue_for_peering(PG *pg)
-{
-  peering_wq.queue(pg);
-}
-
 struct C_CompleteSplits : public Context {
   OSD *osd;
   set<boost::intrusive_ptr<PG> > pgs;