]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge remote-tracking branch 'upstream/next' into wip-sam-testing
authorSamuel Just <sam.just@inktank.com>
Tue, 5 Aug 2014 20:49:50 +0000 (13:49 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 5 Aug 2014 21:06:23 +0000 (14:06 -0700)
Conflicts:
src/osd/OSD.cc

1  2 
src/osd/OSD.cc
src/osd/OSD.h

diff --cc src/osd/OSD.cc
index 09432821f950cd641d86602ee5c270b82bbbb645,307def8f5f78a6c4ff3f527902ea71e1b0811b90..cf63ba75d216955d75a80dedee7c71c1b5f69d30
@@@ -569,6 -567,722 +567,719 @@@ void OSDService::agent_stop(
    agent_thread.join();
  }
  
 -  osd->cluster_messenger->send_message(m, peer_inst);
+ // -------------------------------------
+ float OSDService::get_full_ratio()
+ {
+   float full_ratio = cct->_conf->osd_failsafe_full_ratio;
+   if (full_ratio > 1.0) full_ratio /= 100.0;
+   return full_ratio;
+ }
+ float OSDService::get_nearfull_ratio()
+ {
+   float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
+   if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
+   return nearfull_ratio;
+ }
+ void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
+ {
+   Mutex::Locker l(full_status_lock);
+   enum s_names new_state;
+   time_t now = ceph_clock_gettime(NULL);
+   // We base ratio on kb_avail rather than kb_used because they can
+   // differ significantly e.g. on btrfs volumes with a large number of
+   // chunks reserved for metadata, and for our purposes (avoiding
+   // completely filling the disk) it's far more important to know how
+   // much space is available to use than how much we've already used.
+   float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
+   float nearfull_ratio = get_nearfull_ratio();
+   float full_ratio = get_full_ratio();
+   cur_ratio = ratio;
+   if (full_ratio > 0 && ratio > full_ratio) {
+     new_state = FULL;
+   } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
+     new_state = NEAR;
+   } else {
+     cur_state = NONE;
+     return;
+   }
+   if (cur_state != new_state) {
+     cur_state = new_state;
+   } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
+     return;
+   }
+   last_msg = now;
+   if (cur_state == FULL)
+     clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
+   else
+     clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
+ }
+ bool OSDService::check_failsafe_full()
+ {
+   Mutex::Locker l(full_status_lock);
+   if (cur_state == FULL)
+     return true;
+   return false;
+ }
+ bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
+ {
+   Mutex::Locker l(full_status_lock);
+   double max_ratio;
+   max_ratio = cct->_conf->osd_backfill_full_ratio;
+   if (_ratio)
+     *_ratio = cur_ratio;
+   if (_max_ratio)
+     *_max_ratio = max_ratio;
+   return cur_ratio >= max_ratio;
+ }
+ void OSDService::update_osd_stat(vector<int>& hb_peers)
+ {
+   Mutex::Locker lock(stat_lock);
+   // fill in osd stats too
+   struct statfs stbuf;
+   osd->store->statfs(&stbuf);
+   uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
+   uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
+   uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+   osd_stat.kb = bytes >> 10;
+   osd_stat.kb_used = used >> 10;
+   osd_stat.kb_avail = avail >> 10;
+   osd->logger->set(l_osd_stat_bytes, bytes);
+   osd->logger->set(l_osd_stat_bytes_used, used);
+   osd->logger->set(l_osd_stat_bytes_avail, avail);
+   osd_stat.hb_in.swap(hb_peers);
+   osd_stat.hb_out.clear();
+   check_nearfull_warning(osd_stat);
+   osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+   dout(20) << "update_osd_stat " << osd_stat << dendl;
+ }
+ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+ {
+   OSDMapRef next_map = get_nextmap_reserved();
+   // service map is always newer/newest
+   assert(from_epoch <= next_map->get_epoch());
+   if (next_map->is_down(peer) ||
+       next_map->get_info(peer).up_from > from_epoch) {
+     m->put();
+     release_map(next_map);
+     return;
+   }
+   const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
+   Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
+   share_map_peer(peer, peer_con, next_map);
 -  Messenger *msgr = client_messenger;
 -  if (entity_name_t::TYPE_OSD == con->get_peer_type())
 -    msgr = cluster_messenger;
 -  msgr->send_message(m, con);
++  peer_con->send_message(m);
+   release_map(next_map);
+ }
+ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+ {
+   OSDMapRef next_map = get_nextmap_reserved();
+   // service map is always newer/newest
+   assert(from_epoch <= next_map->get_epoch());
+   if (next_map->is_down(peer) ||
+       next_map->get_info(peer).up_from > from_epoch) {
+     release_map(next_map);
+     return NULL;
+   }
+   ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
+   release_map(next_map);
+   return con;
+ }
+ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+ {
+   OSDMapRef next_map = get_nextmap_reserved();
+   // service map is always newer/newest
+   assert(from_epoch <= next_map->get_epoch());
+   pair<ConnectionRef,ConnectionRef> ret;
+   if (next_map->is_down(peer) ||
+       next_map->get_info(peer).up_from > from_epoch) {
+     release_map(next_map);
+     return ret;
+   }
+   ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
+   if (next_map->get_hb_front_addr(peer) != entity_addr_t())
+     ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
+   release_map(next_map);
+   return ret;
+ }
+ void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+ {
+   Mutex::Locker l(pg_temp_lock);
+   pg_temp_wanted[pgid] = want;
+ }
+ void OSDService::send_pg_temp()
+ {
+   Mutex::Locker l(pg_temp_lock);
+   if (pg_temp_wanted.empty())
+     return;
+   dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
+   MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
+   m->pg_temp = pg_temp_wanted;
+   monc->send_mon_message(m);
+ }
+ // --------------------------------------
+ // dispatch
+ epoch_t OSDService::get_peer_epoch(int peer)
+ {
+   Mutex::Locker l(peer_map_epoch_lock);
+   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+   if (p == peer_map_epoch.end())
+     return 0;
+   return p->second;
+ }
+ epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
+ {
+   Mutex::Locker l(peer_map_epoch_lock);
+   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+   if (p != peer_map_epoch.end()) {
+     if (p->second < e) {
+       dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
+       p->second = e;
+     } else {
+       dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
+     }
+     return p->second;
+   } else {
+     dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
+     peer_map_epoch[peer] = e;
+     return e;
+   }
+ }
+  
+ void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
+ {
+   Mutex::Locker l(peer_map_epoch_lock);
+   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+   if (p != peer_map_epoch.end()) {
+     if (p->second <= as_of) {
+       dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+              << " had " << p->second << dendl;
+       peer_map_epoch.erase(p);
+     } else {
+       dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+              << " has " << p->second << " - not forgetting" << dendl;
+     }
+   }
+ }
+ bool OSDService::should_share_map(entity_name_t name, Connection *con,
+                                   epoch_t epoch, OSDMapRef& osdmap,
+                                   const epoch_t *sent_epoch_p)
+ {
+   bool should_send = false;
+   dout(20) << "should_share_map "
+            << name << " " << con->get_peer_addr()
+            << " " << epoch << dendl;
+   // does client have old map?
+   if (name.is_client()) {
+     bool message_sendmap = epoch < osdmap->get_epoch();
+     if (message_sendmap && sent_epoch_p) {
+       dout(20) << "client session last_sent_epoch: "
+                << *sent_epoch_p
+                << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
+       if (*sent_epoch_p < osdmap->get_epoch()) {
+         should_send = true;
+       } // else we don't need to send it out again
+     }
+   }
+   if (con->get_messenger() == osd->cluster_messenger &&
+       con != osd->cluster_messenger->get_loopback_connection() &&
+       osdmap->is_up(name.num()) &&
+       (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+        osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+     // remember
+     epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
+     // share?
+     if (has < osdmap->get_epoch()) {
+       dout(10) << name << " " << con->get_peer_addr()
+                << " has old map " << epoch << " < "
+                << osdmap->get_epoch() << dendl;
+       should_send = true;
+     }
+   }
+   return should_send;
+ }
+ void OSDService::share_map(
+     entity_name_t name,
+     Connection *con,
+     epoch_t epoch,
+     OSDMapRef& osdmap,
+     epoch_t *sent_epoch_p)
+ {
+   dout(20) << "share_map "
+          << name << " " << con->get_peer_addr()
+          << " " << epoch << dendl;
+   if ((!osd->is_active()) && (!osd->is_stopping())) {
+     /*It is safe not to proceed as OSD is not in healthy state*/
+     return;
+   }
+   bool want_shared = should_share_map(name, con, epoch,
+                                       osdmap, sent_epoch_p);
+   if (want_shared){
+     if (name.is_client()) {
+       dout(10) << name << " has old map " << epoch
+           << " < " << osdmap->get_epoch() << dendl;
+       // we know the Session is valid or we wouldn't be sending
+       if (sent_epoch_p) {
+       *sent_epoch_p = osdmap->get_epoch();
+       }
+       send_incremental_map(epoch, con, osdmap);
+     } else if (con->get_messenger() == osd->cluster_messenger &&
+         osdmap->is_up(name.num()) &&
+         (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+             osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+       dout(10) << name << " " << con->get_peer_addr()
+                      << " has old map " << epoch << " < "
+                      << osdmap->get_epoch() << dendl;
+       note_peer_epoch(name.num(), osdmap->get_epoch());
+       send_incremental_map(epoch, con, osdmap);
+     }
+   }
+ }
+ void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
+ {
+   if (!map)
+     map = get_osdmap();
+   // send map?
+   epoch_t pe = get_peer_epoch(peer);
+   if (pe) {
+     if (pe < map->get_epoch()) {
+       send_incremental_map(pe, con, map);
+       note_peer_epoch(peer, map->get_epoch());
+     } else
+       dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
+   } else {
+     dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
+     // no idea about peer's epoch.
+     // ??? send recent ???
+     // do nothing.
+   }
+ }
+ bool OSDService::inc_scrubs_pending()
+ {
+   bool result = false;
+   sched_scrub_lock.Lock();
+   if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
+     dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
+            << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+     result = true;
+     ++scrubs_pending;
+   } else {
+     dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+   }
+   sched_scrub_lock.Unlock();
+   return result;
+ }
+ void OSDService::dec_scrubs_pending()
+ {
+   sched_scrub_lock.Lock();
+   dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
+          << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+   --scrubs_pending;
+   assert(scrubs_pending >= 0);
+   sched_scrub_lock.Unlock();
+ }
+ void OSDService::inc_scrubs_active(bool reserved)
+ {
+   sched_scrub_lock.Lock();
+   ++(scrubs_active);
+   if (reserved) {
+     --(scrubs_pending);
+     dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+            << " (max " << cct->_conf->osd_max_scrubs
+            << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
+     assert(scrubs_pending >= 0);
+   } else {
+     dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+            << " (max " << cct->_conf->osd_max_scrubs
+            << ", pending " << scrubs_pending << ")" << dendl;
+   }
+   sched_scrub_lock.Unlock();
+ }
+ void OSDService::dec_scrubs_active()
+ {
+   sched_scrub_lock.Lock();
+   dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
+          << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
+   --scrubs_active;
+   sched_scrub_lock.Unlock();
+ }
+ void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+                                  epoch_t *_bind_epoch) const
+ {
+   Mutex::Locker l(epoch_lock);
+   if (_boot_epoch)
+     *_boot_epoch = boot_epoch;
+   if (_up_epoch)
+     *_up_epoch = up_epoch;
+   if (_bind_epoch)
+     *_bind_epoch = bind_epoch;
+ }
+ void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+                             const epoch_t *_bind_epoch)
+ {
+   Mutex::Locker l(epoch_lock);
+   if (_boot_epoch) {
+     assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
+     boot_epoch = *_boot_epoch;
+   }
+   if (_up_epoch) {
+     assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
+     up_epoch = *_up_epoch;
+   }
+   if (_bind_epoch) {
+     assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
+     bind_epoch = *_bind_epoch;
+   }
+ }
+ bool OSDService::prepare_to_stop()
+ {
+   Mutex::Locker l(is_stopping_lock);
+   if (state != NOT_STOPPING)
+     return false;
+   OSDMapRef osdmap = get_osdmap();
+   if (osdmap && osdmap->is_up(whoami)) {
+     dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+     state = PREPARING_TO_STOP;
+     monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
+                                             osdmap->get_inst(whoami),
+                                             osdmap->get_epoch(),
+                                             true  // request ack
+                                             ));
+     utime_t now = ceph_clock_now(cct);
+     utime_t timeout;
+     timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
+     while ((ceph_clock_now(cct) < timeout) &&
+          (state != STOPPING)) {
+       is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
+     }
+   }
+   dout(0) << __func__ << " starting shutdown" << dendl;
+   state = STOPPING;
+   return true;
+ }
+ void OSDService::got_stop_ack()
+ {
+   Mutex::Locker l(is_stopping_lock);
+   if (state == PREPARING_TO_STOP) {
+     dout(0) << __func__ << " starting shutdown" << dendl;
+     state = STOPPING;
+     is_stopping_cond.Signal();
+   } else {
+     dout(10) << __func__ << " ignoring msg" << dendl;
+   }
+ }
+ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+                                                OSDSuperblock& sblock)
+ {
+   MOSDMap *m = new MOSDMap(monc->get_fsid());
+   m->oldest_map = sblock.oldest_map;
+   m->newest_map = sblock.newest_map;
+   
+   for (epoch_t e = to; e > since; e--) {
+     bufferlist bl;
+     if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
+       m->incremental_maps[e].claim(bl);
+     } else if (get_map_bl(e, bl)) {
+       m->maps[e].claim(bl);
+       break;
+     } else {
+       derr << "since " << since << " to " << to
+          << " oldest " << m->oldest_map << " newest " << m->newest_map
+          << dendl;
+       m->put();
+       m = NULL;
+       break;
+     }
+   }
+   return m;
+ }
+ void OSDService::send_map(MOSDMap *m, Connection *con)
+ {
 -  m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
++  con->send_message(m);
+ }
+ void OSDService::send_incremental_map(epoch_t since, Connection *con,
+                                       OSDMapRef& osdmap)
+ {
+   epoch_t to = osdmap->get_epoch();
+   dout(10) << "send_incremental_map " << since << " -> " << to
+            << " to " << con << " " << con->get_peer_addr() << dendl;
+   MOSDMap *m = NULL;
+   while (!m) {
+     OSDSuperblock sblock(get_superblock());
+     if (since < sblock.oldest_map) {
+       // just send latest full map
+       MOSDMap *m = new MOSDMap(monc->get_fsid());
+       m->oldest_map = sblock.oldest_map;
+       m->newest_map = sblock.newest_map;
+       get_map_bl(to, m->maps[to]);
+       send_map(m, con);
+       return;
+     }
+     
+     if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
+       dout(10) << "  " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
+              << ", only sending most recent" << dendl;
+       since = to - cct->_conf->osd_map_share_max_epochs;
+     }
+     
+     if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
+       to = since + cct->_conf->osd_map_message_max;
+     m = build_incremental_map_msg(since, to, sblock);
+   }
+   send_map(m, con);
+ }
+ bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
+ {
+   bool found = map_bl_cache.lookup(e, &bl);
+   if (found)
+     return true;
+   found = store->read(
+     coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+   if (found)
+     _add_map_bl(e, bl);
+   return found;
+ }
+ bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
+ {
+   Mutex::Locker l(map_cache_lock);
+   bool found = map_bl_inc_cache.lookup(e, &bl);
+   if (found)
+     return true;
+   found = store->read(
+     coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+   if (found)
+     _add_map_inc_bl(e, bl);
+   return found;
+ }
+ void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
+ {
+   dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+   map_bl_cache.add(e, bl);
+ }
+ void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+ {
+   dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+   map_bl_inc_cache.add(e, bl);
+ }
+ void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+ {
+   Mutex::Locker l(map_cache_lock);
+   map_bl_inc_cache.pin(e, bl);
+ }
+ void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
+ {
+   Mutex::Locker l(map_cache_lock);
+   map_bl_cache.pin(e, bl);
+ }
+ void OSDService::clear_map_bl_cache_pins(epoch_t e)
+ {
+   Mutex::Locker l(map_cache_lock);
+   map_bl_inc_cache.clear_pinned(e);
+   map_bl_cache.clear_pinned(e);
+ }
+ OSDMapRef OSDService::_add_map(OSDMap *o)
+ {
+   epoch_t e = o->get_epoch();
+   if (cct->_conf->osd_map_dedup) {
+     // Dedup against an existing map at a nearby epoch
+     OSDMapRef for_dedup = map_cache.lower_bound(e);
+     if (for_dedup) {
+       OSDMap::dedup(for_dedup.get(), o);
+     }
+   }
+   OSDMapRef l = map_cache.add(e, o);
+   return l;
+ }
+ OSDMapRef OSDService::try_get_map(epoch_t epoch)
+ {
+   Mutex::Locker l(map_cache_lock);
+   OSDMapRef retval = map_cache.lookup(epoch);
+   if (retval) {
+     dout(30) << "get_map " << epoch << " -cached" << dendl;
+     return retval;
+   }
+   OSDMap *map = new OSDMap;
+   if (epoch > 0) {
+     dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
+     bufferlist bl;
+     if (!_get_map_bl(epoch, bl)) {
+       delete map;
+       return OSDMapRef();
+     }
+     map->decode(bl);
+   } else {
+     dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
+   }
+   return _add_map(map);
+ }
+ bool OSDService::queue_for_recovery(PG *pg)
+ {
+   bool b = recovery_wq.queue(pg);
+   if (b)
+     dout(10) << "queue_for_recovery queued " << *pg << dendl;
+   else
+     dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+   return b;
+ }
+ // ops
+ void OSDService::reply_op_error(OpRequestRef op, int err)
+ {
+   reply_op_error(op, err, eversion_t(), 0);
+ }
+ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+                                 version_t uv)
+ {
+   MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+   assert(m->get_header().type == CEPH_MSG_OSD_OP);
+   int flags;
+   flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
+   MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+                                      true);
+   reply->set_reply_versions(v, uv);
++  m->get_connection()->send_message(reply);
+ }
+ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
+ {
+   MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+   assert(m->get_header().type == CEPH_MSG_OSD_OP);
+   assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
+   if (pg->is_ec_pg()) {
+     /**
+        * OSD recomputes op target based on current OSDMap. With an EC pg, we
+        * can get this result:
+        * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
+        *    [CRUSH_ITEM_NONE, 2, 3]/3
+        * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
+        *    [3, 2, 3]/3
+        * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
+        *    -- misdirected op
+        * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
+        *    it and fulfils it
+        *
+        * We can't compute the op target based on the sending map epoch due to
+        * splitting.  The simplest thing is to detect such cases here and drop
+        * them without an error (the client will resend anyway).
+        */
+     OSDMapRef opmap = try_get_map(m->get_map_epoch());
+     if (!opmap) {
+       dout(7) << __func__ << ": " << *pg << " no longer have map for "
+             << m->get_map_epoch() << ", dropping" << dendl;
+       return;
+     }
+     pg_t _pgid = m->get_pg();
+     spg_t pgid;
+     if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
+       _pgid = opmap->raw_pg_to_pg(_pgid);
+     if (opmap->get_primary_shard(_pgid, &pgid) &&
+       pgid.shard != pg->info.pgid.shard) {
+       dout(7) << __func__ << ": " << *pg << " primary changed since "
+             << m->get_map_epoch() << ", dropping" << dendl;
+       return;
+     }
+   }
+   dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
+   clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+             << " pg " << m->get_pg()
+             << " to osd." << whoami
+             << " not " << pg->acting
+             << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
+   reply_op_error(op, -ENXIO);
+ }
+ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+ {
+   osd->op_shardedwq.dequeue(pg, dequeued);
+ }
+ void OSDService::queue_for_peering(PG *pg)
+ {
+   peering_wq.queue(pg);
+ }
+ // ====================================================================
+ // OSD
  
  #undef dout_prefix
  #define dout_prefix *_dout
@@@ -4064,77 -4677,7 +4673,6 @@@ void OSD::send_alive(
    }
  }
  
- void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
- {
-   OSDMapRef next_map = get_nextmap_reserved();
-   // service map is always newer/newest
-   assert(from_epoch <= next_map->get_epoch());
-   if (next_map->is_down(peer) ||
-       next_map->get_info(peer).up_from > from_epoch) {
-     m->put();
-     release_map(next_map);
-     return;
-   }
-   const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
-   Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
-   share_map_peer(peer, peer_con, next_map);
-   peer_con->send_message(m);
-   release_map(next_map);
- }
- ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
- {
-   OSDMapRef next_map = get_nextmap_reserved();
-   // service map is always newer/newest
-   assert(from_epoch <= next_map->get_epoch());
-   if (next_map->is_down(peer) ||
-       next_map->get_info(peer).up_from > from_epoch) {
-     release_map(next_map);
-     return NULL;
-   }
-   ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
-   release_map(next_map);
-   return con;
- }
- pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
- {
-   OSDMapRef next_map = get_nextmap_reserved();
-   // service map is always newer/newest
-   assert(from_epoch <= next_map->get_epoch());
-   pair<ConnectionRef,ConnectionRef> ret;
-   if (next_map->is_down(peer) ||
-       next_map->get_info(peer).up_from > from_epoch) {
-     release_map(next_map);
-     return ret;
-   }
-   ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
-   if (next_map->get_hb_front_addr(peer) != entity_addr_t())
-     ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
-   release_map(next_map);
-   return ret;
- }
- void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
- {
-   Mutex::Locker l(pg_temp_lock);
-   pg_temp_wanted[pgid] = want;
- }
- void OSDService::send_pg_temp()
- {
-   Mutex::Locker l(pg_temp_lock);
-   if (pg_temp_wanted.empty())
-     return;
-   dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
-   MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
-   m->pg_temp = pg_temp_wanted;
-   monc->send_mon_message(m);
- }
--
  void OSD::send_failures()
  {
    assert(osd_lock.is_locked());
@@@ -4692,228 -5235,74 +5230,74 @@@ void OSD::do_command(Connection *con, c
        if (mls.empty())
          continue;
        fout << "missing_loc: " << mls << std::endl;
-       }
-       pg->unlock();
-       fout << std::endl;
-     }
-     fout.close();
-   }
-   else if (prefix == "debug kick_recovery_wq") {
-     int64_t delay;
-     cmd_getval(cct, cmdmap, "delay", delay);
-     ostringstream oss;
-     oss << delay;
-     r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
-     if (r != 0) {
-       ss << "kick_recovery_wq: error setting "
-        << "osd_recovery_delay_start to '" << delay << "': error "
-        << r;
-       goto out;
-     }
-     cct->_conf->apply_changes(NULL);
-     ss << "kicking recovery queue. set osd_recovery_delay_start "
-        << "to " << cct->_conf->osd_recovery_delay_start;
-     defer_recovery_until = ceph_clock_now(cct);
-     defer_recovery_until += cct->_conf->osd_recovery_delay_start;
-     recovery_wq.wake();
-   }
-   else if (prefix == "cpu_profiler") {
-     string arg;
-     cmd_getval(cct, cmdmap, "arg", arg);
-     vector<string> argvec;
-     get_str_vec(arg, argvec);
-     cpu_profiler_handle_command(argvec, ds);
-   }
-   else if (prefix == "dump_pg_recovery_stats") {
-     stringstream s;
-     if (f) {
-       pg_recovery_stats.dump_formatted(f.get());
-       f->flush(ds);
-     } else {
-       pg_recovery_stats.dump(s);
-       ds << "dump pg recovery stats: " << s.str();
-     }
-   }
-   else if (prefix == "reset_pg_recovery_stats") {
-     ss << "reset pg recovery stats";
-     pg_recovery_stats.reset();
-   }
-   else {
-     ss << "unrecognized command! " << cmd;
-     r = -EINVAL;
-   }
-  out:
-   rs = ss.str();
-   odata.append(ds);
-   dout(0) << "do_command r=" << r << " " << rs << dendl;
-   clog.info() << rs << "\n";
-   if (con) {
-     MCommandReply *reply = new MCommandReply(r, rs);
-     reply->set_tid(tid);
-     reply->set_data(odata);
-     con->send_message(reply);
-   }
-   return;
- }
- // --------------------------------------
- // dispatch
- epoch_t OSDService::get_peer_epoch(int peer)
- {
-   Mutex::Locker l(peer_map_epoch_lock);
-   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-   if (p == peer_map_epoch.end())
-     return 0;
-   return p->second;
- }
- epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
- {
-   Mutex::Locker l(peer_map_epoch_lock);
-   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-   if (p != peer_map_epoch.end()) {
-     if (p->second < e) {
-       dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
-       p->second = e;
-     } else {
-       dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
-     }
-     return p->second;
-   } else {
-     dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
-     peer_map_epoch[peer] = e;
-     return e;
-   }
- }
-  
- void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
- {
-   Mutex::Locker l(peer_map_epoch_lock);
-   map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
-   if (p != peer_map_epoch.end()) {
-     if (p->second <= as_of) {
-       dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
-              << " had " << p->second << dendl;
-       peer_map_epoch.erase(p);
-     } else {
-       dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
-              << " has " << p->second << " - not forgetting" << dendl;
-     }
-   }
- }
- bool OSDService::should_share_map(entity_name_t name, Connection *con,
-                                   epoch_t epoch, OSDMapRef& osdmap,
-                                   const epoch_t *sent_epoch_p)
- {
-   bool should_send = false;
-   dout(20) << "should_share_map "
-            << name << " " << con->get_peer_addr()
-            << " " << epoch << dendl;
-   // does client have old map?
-   if (name.is_client()) {
-     bool message_sendmap = epoch < osdmap->get_epoch();
-     if (message_sendmap && sent_epoch_p) {
-       dout(20) << "client session last_sent_epoch: "
-                << *sent_epoch_p
-                << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
-       if (*sent_epoch_p < osdmap->get_epoch()) {
-         should_send = true;
-       } // else we don't need to send it out again
+       }
+       pg->unlock();
+       fout << std::endl;
      }
-   }
-   if (con->get_messenger() == osd->cluster_messenger &&
-       con != osd->cluster_messenger->get_loopback_connection() &&
-       osdmap->is_up(name.num()) &&
-       (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
-        osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
-     // remember
-     epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
  
-     // share?
-     if (has < osdmap->get_epoch()) {
-       dout(10) << name << " " << con->get_peer_addr()
-                << " has old map " << epoch << " < "
-                << osdmap->get_epoch() << dendl;
-       should_send = true;
+     fout.close();
+   }
+   else if (prefix == "debug kick_recovery_wq") {
+     int64_t delay;
+     cmd_getval(cct, cmdmap, "delay", delay);
+     ostringstream oss;
+     oss << delay;
+     r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
+     if (r != 0) {
+       ss << "kick_recovery_wq: error setting "
+        << "osd_recovery_delay_start to '" << delay << "': error "
+        << r;
+       goto out;
      }
+     cct->_conf->apply_changes(NULL);
+     ss << "kicking recovery queue. set osd_recovery_delay_start "
+        << "to " << cct->_conf->osd_recovery_delay_start;
+     defer_recovery_until = ceph_clock_now(cct);
+     defer_recovery_until += cct->_conf->osd_recovery_delay_start;
+     recovery_wq.wake();
    }
  
-   return should_send;
- }
- void OSDService::share_map(
-     entity_name_t name,
-     Connection *con,
-     epoch_t epoch,
-     OSDMapRef& osdmap,
-     epoch_t *sent_epoch_p)
- {
-   dout(20) << "share_map "
-          << name << " " << con->get_peer_addr()
-          << " " << epoch << dendl;
-   if ((!osd->is_active()) && (!osd->is_stopping())) {
-     /*It is safe not to proceed as OSD is not in healthy state*/
-     return;
+   else if (prefix == "cpu_profiler") {
+     string arg;
+     cmd_getval(cct, cmdmap, "arg", arg);
+     vector<string> argvec;
+     get_str_vec(arg, argvec);
+     cpu_profiler_handle_command(argvec, ds);
    }
  
-   bool want_shared = should_share_map(name, con, epoch,
-                                       osdmap, sent_epoch_p);
-   if (want_shared){
-     if (name.is_client()) {
-       dout(10) << name << " has old map " << epoch
-           << " < " << osdmap->get_epoch() << dendl;
-       // we know the Session is valid or we wouldn't be sending
-       if (sent_epoch_p) {
-       *sent_epoch_p = osdmap->get_epoch();
-       }
-       send_incremental_map(epoch, con, osdmap);
-     } else if (con->get_messenger() == osd->cluster_messenger &&
-         osdmap->is_up(name.num()) &&
-         (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
-             osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
-       dout(10) << name << " " << con->get_peer_addr()
-                      << " has old map " << epoch << " < "
-                      << osdmap->get_epoch() << dendl;
-       note_peer_epoch(name.num(), osdmap->get_epoch());
-       send_incremental_map(epoch, con, osdmap);
+   else if (prefix == "dump_pg_recovery_stats") {
+     stringstream s;
+     if (f) {
+       pg_recovery_stats.dump_formatted(f.get());
+       f->flush(ds);
+     } else {
+       pg_recovery_stats.dump(s);
+       ds << "dump pg recovery stats: " << s.str();
      }
    }
- }
  
+   else if (prefix == "reset_pg_recovery_stats") {
+     ss << "reset pg recovery stats";
+     pg_recovery_stats.reset();
+   }
  
- void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
- {
-   if (!map)
-     map = get_osdmap();
+   else {
+     ss << "unrecognized command! " << cmd;
+     r = -EINVAL;
+   }
  
-   // send map?
-   epoch_t pe = get_peer_epoch(peer);
-   if (pe) {
-     if (pe < map->get_epoch()) {
-       send_incremental_map(pe, con, map);
-       note_peer_epoch(peer, map->get_epoch());
-     } else
-       dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
-   } else {
-     dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
-     // no idea about peer's epoch.
-     // ??? send recent ???
-     // do nothing.
+  out:
+   rs = ss.str();
+   odata.append(ds);
+   dout(0) << "do_command r=" << r << " " << rs << dendl;
+   clog.info() << rs << "\n";
+   if (con) {
+     MCommandReply *reply = new MCommandReply(r, rs);
+     reply->set_tid(tid);
+     reply->set_data(odata);
 -    client_messenger->send_message(reply, con);
++    con->send_message(reply);
    }
+   return;
  }
  
  
diff --cc src/osd/OSD.h
Simple merge