agent_thread.join();
}
- osd->cluster_messenger->send_message(m, peer_inst);
+ // -------------------------------------
+
+ float OSDService::get_full_ratio()
+ {
+ float full_ratio = cct->_conf->osd_failsafe_full_ratio;
+ if (full_ratio > 1.0) full_ratio /= 100.0;
+ return full_ratio;
+ }
+
+ float OSDService::get_nearfull_ratio()
+ {
+ float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
+ if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
+ return nearfull_ratio;
+ }
+
+ void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
+ {
+ Mutex::Locker l(full_status_lock);
+ enum s_names new_state;
+
+ time_t now = ceph_clock_gettime(NULL);
+
+ // We base ratio on kb_avail rather than kb_used because they can
+ // differ significantly e.g. on btrfs volumes with a large number of
+ // chunks reserved for metadata, and for our purposes (avoiding
+ // completely filling the disk) it's far more important to know how
+ // much space is available to use than how much we've already used.
+ float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
+ float nearfull_ratio = get_nearfull_ratio();
+ float full_ratio = get_full_ratio();
+ cur_ratio = ratio;
+
+ if (full_ratio > 0 && ratio > full_ratio) {
+ new_state = FULL;
+ } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
+ new_state = NEAR;
+ } else {
+ cur_state = NONE;
+ return;
+ }
+
+ if (cur_state != new_state) {
+ cur_state = new_state;
+ } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
+ return;
+ }
+ last_msg = now;
+ if (cur_state == FULL)
+ clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
+ else
+ clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
+ }
+
+ bool OSDService::check_failsafe_full()
+ {
+ Mutex::Locker l(full_status_lock);
+ if (cur_state == FULL)
+ return true;
+ return false;
+ }
+
+ bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
+ {
+ Mutex::Locker l(full_status_lock);
+ double max_ratio;
+ max_ratio = cct->_conf->osd_backfill_full_ratio;
+ if (_ratio)
+ *_ratio = cur_ratio;
+ if (_max_ratio)
+ *_max_ratio = max_ratio;
+ return cur_ratio >= max_ratio;
+ }
+
+ void OSDService::update_osd_stat(vector<int>& hb_peers)
+ {
+ Mutex::Locker lock(stat_lock);
+
+ // fill in osd stats too
+ struct statfs stbuf;
+ osd->store->statfs(&stbuf);
+
+ uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
+ uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
+ uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+
+ osd_stat.kb = bytes >> 10;
+ osd_stat.kb_used = used >> 10;
+ osd_stat.kb_avail = avail >> 10;
+
+ osd->logger->set(l_osd_stat_bytes, bytes);
+ osd->logger->set(l_osd_stat_bytes_used, used);
+ osd->logger->set(l_osd_stat_bytes_avail, avail);
+
+ osd_stat.hb_in.swap(hb_peers);
+ osd_stat.hb_out.clear();
+
+ check_nearfull_warning(osd_stat);
+
+ osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+
+ dout(20) << "update_osd_stat " << osd_stat << dendl;
+ }
+
+ void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+ {
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ m->put();
+ release_map(next_map);
+ return;
+ }
+ const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
+ Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
+ share_map_peer(peer, peer_con, next_map);
- Messenger *msgr = client_messenger;
- if (entity_name_t::TYPE_OSD == con->get_peer_type())
- msgr = cluster_messenger;
- msgr->send_message(m, con);
++ peer_con->send_message(m);
+ release_map(next_map);
+ }
+
+ ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+ {
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ release_map(next_map);
+ return NULL;
+ }
+ ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
+ release_map(next_map);
+ return con;
+ }
+
+ pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+ {
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ pair<ConnectionRef,ConnectionRef> ret;
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ release_map(next_map);
+ return ret;
+ }
+ ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
+ if (next_map->get_hb_front_addr(peer) != entity_addr_t())
+ ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
+ release_map(next_map);
+ return ret;
+ }
+
+
+ void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+ {
+ Mutex::Locker l(pg_temp_lock);
+ pg_temp_wanted[pgid] = want;
+ }
+
+ void OSDService::send_pg_temp()
+ {
+ Mutex::Locker l(pg_temp_lock);
+ if (pg_temp_wanted.empty())
+ return;
+ dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
+ MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
+ m->pg_temp = pg_temp_wanted;
+ monc->send_mon_message(m);
+ }
+
+
+ // --------------------------------------
+ // dispatch
+
+ epoch_t OSDService::get_peer_epoch(int peer)
+ {
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p == peer_map_epoch.end())
+ return 0;
+ return p->second;
+ }
+
+ epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
+ {
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second < e) {
+ dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
+ p->second = e;
+ } else {
+ dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
+ }
+ return p->second;
+ } else {
+ dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
+ peer_map_epoch[peer] = e;
+ return e;
+ }
+ }
+
+ void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
+ {
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second <= as_of) {
+ dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+ << " had " << p->second << dendl;
+ peer_map_epoch.erase(p);
+ } else {
+ dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+ << " has " << p->second << " - not forgetting" << dendl;
+ }
+ }
+ }
+
+ bool OSDService::should_share_map(entity_name_t name, Connection *con,
+ epoch_t epoch, OSDMapRef& osdmap,
+ const epoch_t *sent_epoch_p)
+ {
+ bool should_send = false;
+ dout(20) << "should_share_map "
+ << name << " " << con->get_peer_addr()
+ << " " << epoch << dendl;
+
+ // does client have old map?
+ if (name.is_client()) {
+ bool message_sendmap = epoch < osdmap->get_epoch();
+ if (message_sendmap && sent_epoch_p) {
+ dout(20) << "client session last_sent_epoch: "
+ << *sent_epoch_p
+ << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
+ if (*sent_epoch_p < osdmap->get_epoch()) {
+ should_send = true;
+ } // else we don't need to send it out again
+ }
+ }
+
+ if (con->get_messenger() == osd->cluster_messenger &&
+ con != osd->cluster_messenger->get_loopback_connection() &&
+ osdmap->is_up(name.num()) &&
+ (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+ // remember
+ epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
+
+ // share?
+ if (has < osdmap->get_epoch()) {
+ dout(10) << name << " " << con->get_peer_addr()
+ << " has old map " << epoch << " < "
+ << osdmap->get_epoch() << dendl;
+ should_send = true;
+ }
+ }
+
+ return should_send;
+ }
+
+ void OSDService::share_map(
+ entity_name_t name,
+ Connection *con,
+ epoch_t epoch,
+ OSDMapRef& osdmap,
+ epoch_t *sent_epoch_p)
+ {
+ dout(20) << "share_map "
+ << name << " " << con->get_peer_addr()
+ << " " << epoch << dendl;
+
+ if ((!osd->is_active()) && (!osd->is_stopping())) {
+ /*It is safe not to proceed as OSD is not in healthy state*/
+ return;
+ }
+
+ bool want_shared = should_share_map(name, con, epoch,
+ osdmap, sent_epoch_p);
+
+ if (want_shared){
+ if (name.is_client()) {
+ dout(10) << name << " has old map " << epoch
+ << " < " << osdmap->get_epoch() << dendl;
+ // we know the Session is valid or we wouldn't be sending
+ if (sent_epoch_p) {
+ *sent_epoch_p = osdmap->get_epoch();
+ }
+ send_incremental_map(epoch, con, osdmap);
+ } else if (con->get_messenger() == osd->cluster_messenger &&
+ osdmap->is_up(name.num()) &&
+ (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+ dout(10) << name << " " << con->get_peer_addr()
+ << " has old map " << epoch << " < "
+ << osdmap->get_epoch() << dendl;
+ note_peer_epoch(name.num(), osdmap->get_epoch());
+ send_incremental_map(epoch, con, osdmap);
+ }
+ }
+ }
+
+
+ void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
+ {
+ if (!map)
+ map = get_osdmap();
+
+ // send map?
+ epoch_t pe = get_peer_epoch(peer);
+ if (pe) {
+ if (pe < map->get_epoch()) {
+ send_incremental_map(pe, con, map);
+ note_peer_epoch(peer, map->get_epoch());
+ } else
+ dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
+ } else {
+ dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
+ // no idea about peer's epoch.
+ // ??? send recent ???
+ // do nothing.
+ }
+ }
+
+
+ bool OSDService::inc_scrubs_pending()
+ {
+ bool result = false;
+
+ sched_scrub_lock.Lock();
+ if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
+ dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ result = true;
+ ++scrubs_pending;
+ } else {
+ dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+ }
+ sched_scrub_lock.Unlock();
+
+ return result;
+ }
+
+ void OSDService::dec_scrubs_pending()
+ {
+ sched_scrub_lock.Lock();
+ dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ --scrubs_pending;
+ assert(scrubs_pending >= 0);
+ sched_scrub_lock.Unlock();
+ }
+
+ void OSDService::inc_scrubs_active(bool reserved)
+ {
+ sched_scrub_lock.Lock();
+ ++(scrubs_active);
+ if (reserved) {
+ --(scrubs_pending);
+ dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+ << " (max " << cct->_conf->osd_max_scrubs
+ << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
+ assert(scrubs_pending >= 0);
+ } else {
+ dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+ << " (max " << cct->_conf->osd_max_scrubs
+ << ", pending " << scrubs_pending << ")" << dendl;
+ }
+ sched_scrub_lock.Unlock();
+ }
+
+ void OSDService::dec_scrubs_active()
+ {
+ sched_scrub_lock.Lock();
+ dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
+ --scrubs_active;
+ sched_scrub_lock.Unlock();
+ }
+
+ void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+ epoch_t *_bind_epoch) const
+ {
+ Mutex::Locker l(epoch_lock);
+ if (_boot_epoch)
+ *_boot_epoch = boot_epoch;
+ if (_up_epoch)
+ *_up_epoch = up_epoch;
+ if (_bind_epoch)
+ *_bind_epoch = bind_epoch;
+ }
+
+ void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+ const epoch_t *_bind_epoch)
+ {
+ Mutex::Locker l(epoch_lock);
+ if (_boot_epoch) {
+ assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
+ boot_epoch = *_boot_epoch;
+ }
+ if (_up_epoch) {
+ assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
+ up_epoch = *_up_epoch;
+ }
+ if (_bind_epoch) {
+ assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
+ bind_epoch = *_bind_epoch;
+ }
+ }
+
+ bool OSDService::prepare_to_stop()
+ {
+ Mutex::Locker l(is_stopping_lock);
+ if (state != NOT_STOPPING)
+ return false;
+
+ OSDMapRef osdmap = get_osdmap();
+ if (osdmap && osdmap->is_up(whoami)) {
+ dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+ state = PREPARING_TO_STOP;
+ monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
+ osdmap->get_inst(whoami),
+ osdmap->get_epoch(),
+ true // request ack
+ ));
+ utime_t now = ceph_clock_now(cct);
+ utime_t timeout;
+ timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
+ while ((ceph_clock_now(cct) < timeout) &&
+ (state != STOPPING)) {
+ is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
+ }
+ }
+ dout(0) << __func__ << " starting shutdown" << dendl;
+ state = STOPPING;
+ return true;
+ }
+
+ void OSDService::got_stop_ack()
+ {
+ Mutex::Locker l(is_stopping_lock);
+ if (state == PREPARING_TO_STOP) {
+ dout(0) << __func__ << " starting shutdown" << dendl;
+ state = STOPPING;
+ is_stopping_cond.Signal();
+ } else {
+ dout(10) << __func__ << " ignoring msg" << dendl;
+ }
+ }
+
+
+ MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+ OSDSuperblock& sblock)
+ {
+ MOSDMap *m = new MOSDMap(monc->get_fsid());
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
+
+ for (epoch_t e = to; e > since; e--) {
+ bufferlist bl;
+ if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
+ m->incremental_maps[e].claim(bl);
+ } else if (get_map_bl(e, bl)) {
+ m->maps[e].claim(bl);
+ break;
+ } else {
+ derr << "since " << since << " to " << to
+ << " oldest " << m->oldest_map << " newest " << m->newest_map
+ << dendl;
+ m->put();
+ m = NULL;
+ break;
+ }
+ }
+ return m;
+ }
+
+ void OSDService::send_map(MOSDMap *m, Connection *con)
+ {
- m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
++ con->send_message(m);
+ }
+
+ void OSDService::send_incremental_map(epoch_t since, Connection *con,
+ OSDMapRef& osdmap)
+ {
+ epoch_t to = osdmap->get_epoch();
+ dout(10) << "send_incremental_map " << since << " -> " << to
+ << " to " << con << " " << con->get_peer_addr() << dendl;
+
+ MOSDMap *m = NULL;
+ while (!m) {
+ OSDSuperblock sblock(get_superblock());
+ if (since < sblock.oldest_map) {
+ // just send latest full map
+ MOSDMap *m = new MOSDMap(monc->get_fsid());
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
+ get_map_bl(to, m->maps[to]);
+ send_map(m, con);
+ return;
+ }
+
+ if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
+ dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
+ << ", only sending most recent" << dendl;
+ since = to - cct->_conf->osd_map_share_max_epochs;
+ }
+
+ if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
+ to = since + cct->_conf->osd_map_message_max;
+ m = build_incremental_map_msg(since, to, sblock);
+ }
+ send_map(m, con);
+ }
+
+ bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
+ {
+ bool found = map_bl_cache.lookup(e, &bl);
+ if (found)
+ return true;
+ found = store->read(
+ coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ if (found)
+ _add_map_bl(e, bl);
+ return found;
+ }
+
+ bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
+ {
+ Mutex::Locker l(map_cache_lock);
+ bool found = map_bl_inc_cache.lookup(e, &bl);
+ if (found)
+ return true;
+ found = store->read(
+ coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ if (found)
+ _add_map_inc_bl(e, bl);
+ return found;
+ }
+
+ void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
+ {
+ dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+ map_bl_cache.add(e, bl);
+ }
+
+ void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+ {
+ dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+ map_bl_inc_cache.add(e, bl);
+ }
+
+ void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+ {
+ Mutex::Locker l(map_cache_lock);
+ map_bl_inc_cache.pin(e, bl);
+ }
+
+ void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
+ {
+ Mutex::Locker l(map_cache_lock);
+ map_bl_cache.pin(e, bl);
+ }
+
+ void OSDService::clear_map_bl_cache_pins(epoch_t e)
+ {
+ Mutex::Locker l(map_cache_lock);
+ map_bl_inc_cache.clear_pinned(e);
+ map_bl_cache.clear_pinned(e);
+ }
+
+ OSDMapRef OSDService::_add_map(OSDMap *o)
+ {
+ epoch_t e = o->get_epoch();
+
+ if (cct->_conf->osd_map_dedup) {
+ // Dedup against an existing map at a nearby epoch
+ OSDMapRef for_dedup = map_cache.lower_bound(e);
+ if (for_dedup) {
+ OSDMap::dedup(for_dedup.get(), o);
+ }
+ }
+ OSDMapRef l = map_cache.add(e, o);
+ return l;
+ }
+
+ OSDMapRef OSDService::try_get_map(epoch_t epoch)
+ {
+ Mutex::Locker l(map_cache_lock);
+ OSDMapRef retval = map_cache.lookup(epoch);
+ if (retval) {
+ dout(30) << "get_map " << epoch << " -cached" << dendl;
+ return retval;
+ }
+
+ OSDMap *map = new OSDMap;
+ if (epoch > 0) {
+ dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
+ bufferlist bl;
+ if (!_get_map_bl(epoch, bl)) {
+ delete map;
+ return OSDMapRef();
+ }
+ map->decode(bl);
+ } else {
+ dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
+ }
+ return _add_map(map);
+ }
+
+ bool OSDService::queue_for_recovery(PG *pg)
+ {
+ bool b = recovery_wq.queue(pg);
+ if (b)
+ dout(10) << "queue_for_recovery queued " << *pg << dendl;
+ else
+ dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+ return b;
+ }
+
+
+ // ops
+
+
+ void OSDService::reply_op_error(OpRequestRef op, int err)
+ {
+ reply_op_error(op, err, eversion_t(), 0);
+ }
+
+ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+ version_t uv)
+ {
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+ int flags;
+ flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
+
+ MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+ true);
+ reply->set_reply_versions(v, uv);
++ m->get_connection()->send_message(reply);
+ }
+
+ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
+ {
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+
+ assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
+
+ if (pg->is_ec_pg()) {
+ /**
+ * OSD recomputes op target based on current OSDMap. With an EC pg, we
+ * can get this result:
+ * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
+ * [CRUSH_ITEM_NONE, 2, 3]/3
+ * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
+ * [3, 2, 3]/3
+ * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
+ * -- misdirected op
+ * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
+ * it and fulfils it
+ *
+ * We can't compute the op target based on the sending map epoch due to
+ * splitting. The simplest thing is to detect such cases here and drop
+ * them without an error (the client will resend anyway).
+ */
+ OSDMapRef opmap = try_get_map(m->get_map_epoch());
+ if (!opmap) {
+ dout(7) << __func__ << ": " << *pg << " no longer have map for "
+ << m->get_map_epoch() << ", dropping" << dendl;
+ return;
+ }
+ pg_t _pgid = m->get_pg();
+ spg_t pgid;
+ if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
+ _pgid = opmap->raw_pg_to_pg(_pgid);
+ if (opmap->get_primary_shard(_pgid, &pgid) &&
+ pgid.shard != pg->info.pgid.shard) {
+ dout(7) << __func__ << ": " << *pg << " primary changed since "
+ << m->get_map_epoch() << ", dropping" << dendl;
+ return;
+ }
+ }
+
+ dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
+ clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+ << " pg " << m->get_pg()
+ << " to osd." << whoami
+ << " not " << pg->acting
+ << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
+ reply_op_error(op, -ENXIO);
+ }
+
+
+ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+ {
+ osd->op_shardedwq.dequeue(pg, dequeued);
+ }
+
+ void OSDService::queue_for_peering(PG *pg)
+ {
+ peering_wq.queue(pg);
+ }
+
+
+ // ====================================================================
+ // OSD
#undef dout_prefix
#define dout_prefix *_dout
}
}
- void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
- {
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- m->put();
- release_map(next_map);
- return;
- }
- const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
- Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
- share_map_peer(peer, peer_con, next_map);
- peer_con->send_message(m);
- release_map(next_map);
- }
-
- ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
- {
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- release_map(next_map);
- return NULL;
- }
- ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
- release_map(next_map);
- return con;
- }
-
- pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
- {
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- pair<ConnectionRef,ConnectionRef> ret;
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- release_map(next_map);
- return ret;
- }
- ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
- if (next_map->get_hb_front_addr(peer) != entity_addr_t())
- ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
- release_map(next_map);
- return ret;
- }
-
- void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
- {
- Mutex::Locker l(pg_temp_lock);
- pg_temp_wanted[pgid] = want;
- }
-
- void OSDService::send_pg_temp()
- {
- Mutex::Locker l(pg_temp_lock);
- if (pg_temp_wanted.empty())
- return;
- dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
- MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
- m->pg_temp = pg_temp_wanted;
- monc->send_mon_message(m);
- }
--
void OSD::send_failures()
{
assert(osd_lock.is_locked());
if (mls.empty())
continue;
fout << "missing_loc: " << mls << std::endl;
- }
- pg->unlock();
- fout << std::endl;
- }
-
- fout.close();
- }
- else if (prefix == "debug kick_recovery_wq") {
- int64_t delay;
- cmd_getval(cct, cmdmap, "delay", delay);
- ostringstream oss;
- oss << delay;
- r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
- if (r != 0) {
- ss << "kick_recovery_wq: error setting "
- << "osd_recovery_delay_start to '" << delay << "': error "
- << r;
- goto out;
- }
- cct->_conf->apply_changes(NULL);
- ss << "kicking recovery queue. set osd_recovery_delay_start "
- << "to " << cct->_conf->osd_recovery_delay_start;
- defer_recovery_until = ceph_clock_now(cct);
- defer_recovery_until += cct->_conf->osd_recovery_delay_start;
- recovery_wq.wake();
- }
-
- else if (prefix == "cpu_profiler") {
- string arg;
- cmd_getval(cct, cmdmap, "arg", arg);
- vector<string> argvec;
- get_str_vec(arg, argvec);
- cpu_profiler_handle_command(argvec, ds);
- }
-
- else if (prefix == "dump_pg_recovery_stats") {
- stringstream s;
- if (f) {
- pg_recovery_stats.dump_formatted(f.get());
- f->flush(ds);
- } else {
- pg_recovery_stats.dump(s);
- ds << "dump pg recovery stats: " << s.str();
- }
- }
-
- else if (prefix == "reset_pg_recovery_stats") {
- ss << "reset pg recovery stats";
- pg_recovery_stats.reset();
- }
-
- else {
- ss << "unrecognized command! " << cmd;
- r = -EINVAL;
- }
-
- out:
- rs = ss.str();
- odata.append(ds);
- dout(0) << "do_command r=" << r << " " << rs << dendl;
- clog.info() << rs << "\n";
- if (con) {
- MCommandReply *reply = new MCommandReply(r, rs);
- reply->set_tid(tid);
- reply->set_data(odata);
- con->send_message(reply);
- }
- return;
- }
-
-
-
- // --------------------------------------
- // dispatch
-
- epoch_t OSDService::get_peer_epoch(int peer)
- {
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p == peer_map_epoch.end())
- return 0;
- return p->second;
- }
-
- epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
- {
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second < e) {
- dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
- p->second = e;
- } else {
- dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
- }
- return p->second;
- } else {
- dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
- peer_map_epoch[peer] = e;
- return e;
- }
- }
-
- void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
- {
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second <= as_of) {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " had " << p->second << dendl;
- peer_map_epoch.erase(p);
- } else {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " has " << p->second << " - not forgetting" << dendl;
- }
- }
- }
-
- bool OSDService::should_share_map(entity_name_t name, Connection *con,
- epoch_t epoch, OSDMapRef& osdmap,
- const epoch_t *sent_epoch_p)
- {
- bool should_send = false;
- dout(20) << "should_share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
-
- // does client have old map?
- if (name.is_client()) {
- bool message_sendmap = epoch < osdmap->get_epoch();
- if (message_sendmap && sent_epoch_p) {
- dout(20) << "client session last_sent_epoch: "
- << *sent_epoch_p
- << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
- if (*sent_epoch_p < osdmap->get_epoch()) {
- should_send = true;
- } // else we don't need to send it out again
+ }
+ pg->unlock();
+ fout << std::endl;
}
- }
-
- if (con->get_messenger() == osd->cluster_messenger &&
- con != osd->cluster_messenger->get_loopback_connection() &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
- // remember
- epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
- // share?
- if (has < osdmap->get_epoch()) {
- dout(10) << name << " " << con->get_peer_addr()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- should_send = true;
+ fout.close();
+ }
+ else if (prefix == "debug kick_recovery_wq") {
+ int64_t delay;
+ cmd_getval(cct, cmdmap, "delay", delay);
+ ostringstream oss;
+ oss << delay;
+ r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str());
+ if (r != 0) {
+ ss << "kick_recovery_wq: error setting "
+ << "osd_recovery_delay_start to '" << delay << "': error "
+ << r;
+ goto out;
}
+ cct->_conf->apply_changes(NULL);
+ ss << "kicking recovery queue. set osd_recovery_delay_start "
+ << "to " << cct->_conf->osd_recovery_delay_start;
+ defer_recovery_until = ceph_clock_now(cct);
+ defer_recovery_until += cct->_conf->osd_recovery_delay_start;
+ recovery_wq.wake();
}
- return should_send;
- }
-
- void OSDService::share_map(
- entity_name_t name,
- Connection *con,
- epoch_t epoch,
- OSDMapRef& osdmap,
- epoch_t *sent_epoch_p)
- {
- dout(20) << "share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
-
- if ((!osd->is_active()) && (!osd->is_stopping())) {
- /*It is safe not to proceed as OSD is not in healthy state*/
- return;
+ else if (prefix == "cpu_profiler") {
+ string arg;
+ cmd_getval(cct, cmdmap, "arg", arg);
+ vector<string> argvec;
+ get_str_vec(arg, argvec);
+ cpu_profiler_handle_command(argvec, ds);
}
- bool want_shared = should_share_map(name, con, epoch,
- osdmap, sent_epoch_p);
-
- if (want_shared){
- if (name.is_client()) {
- dout(10) << name << " has old map " << epoch
- << " < " << osdmap->get_epoch() << dendl;
- // we know the Session is valid or we wouldn't be sending
- if (sent_epoch_p) {
- *sent_epoch_p = osdmap->get_epoch();
- }
- send_incremental_map(epoch, con, osdmap);
- } else if (con->get_messenger() == osd->cluster_messenger &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
- dout(10) << name << " " << con->get_peer_addr()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- note_peer_epoch(name.num(), osdmap->get_epoch());
- send_incremental_map(epoch, con, osdmap);
+ else if (prefix == "dump_pg_recovery_stats") {
+ stringstream s;
+ if (f) {
+ pg_recovery_stats.dump_formatted(f.get());
+ f->flush(ds);
+ } else {
+ pg_recovery_stats.dump(s);
+ ds << "dump pg recovery stats: " << s.str();
}
}
- }
+ else if (prefix == "reset_pg_recovery_stats") {
+ ss << "reset pg recovery stats";
+ pg_recovery_stats.reset();
+ }
- void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
- {
- if (!map)
- map = get_osdmap();
+ else {
+ ss << "unrecognized command! " << cmd;
+ r = -EINVAL;
+ }
- // send map?
- epoch_t pe = get_peer_epoch(peer);
- if (pe) {
- if (pe < map->get_epoch()) {
- send_incremental_map(pe, con, map);
- note_peer_epoch(peer, map->get_epoch());
- } else
- dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
- } else {
- dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
- // no idea about peer's epoch.
- // ??? send recent ???
- // do nothing.
+ out:
+ rs = ss.str();
+ odata.append(ds);
+ dout(0) << "do_command r=" << r << " " << rs << dendl;
+ clog.info() << rs << "\n";
+ if (con) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(tid);
+ reply->set_data(odata);
- client_messenger->send_message(reply, con);
++ con->send_message(reply);
}
+ return;
}