From: Samuel Just Date: Tue, 5 Aug 2014 20:49:50 +0000 (-0700) Subject: Merge remote-tracking branch 'upstream/next' into wip-sam-testing X-Git-Tag: v0.85~93 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3f9f135f375886d22a0ee432bc41f61523db3737;p=ceph.git Merge remote-tracking branch 'upstream/next' into wip-sam-testing Conflicts: src/osd/OSD.cc --- 3f9f135f375886d22a0ee432bc41f61523db3737 diff --cc src/osd/OSD.cc index 09432821f950c,307def8f5f78a..cf63ba75d2169 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@@ -569,6 -567,722 +567,719 @@@ void OSDService::agent_stop( agent_thread.join(); } + // ------------------------------------- + + float OSDService::get_full_ratio() + { + float full_ratio = cct->_conf->osd_failsafe_full_ratio; + if (full_ratio > 1.0) full_ratio /= 100.0; + return full_ratio; + } + + float OSDService::get_nearfull_ratio() + { + float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio; + if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0; + return nearfull_ratio; + } + + void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat) + { + Mutex::Locker l(full_status_lock); + enum s_names new_state; + + time_t now = ceph_clock_gettime(NULL); + + // We base ratio on kb_avail rather than kb_used because they can + // differ significantly e.g. on btrfs volumes with a large number of + // chunks reserved for metadata, and for our purposes (avoiding + // completely filling the disk) it's far more important to know how + // much space is available to use than how much we've already used. + float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb); + float nearfull_ratio = get_nearfull_ratio(); + float full_ratio = get_full_ratio(); + cur_ratio = ratio; + + if (full_ratio > 0 && ratio > full_ratio) { + new_state = FULL; + } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) { + new_state = NEAR; + } else { + cur_state = NONE; + return; + } + + if (cur_state != new_state) { + cur_state = new_state; + } else if (now - last_msg < cct->_conf->osd_op_complaint_time) { + return; + } + last_msg = now; + if (cur_state == FULL) + clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full"; + else + clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)"; + } + + bool OSDService::check_failsafe_full() + { + Mutex::Locker l(full_status_lock); + if (cur_state == FULL) + return true; + return false; + } + + bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio) + { + Mutex::Locker l(full_status_lock); + double max_ratio; + max_ratio = cct->_conf->osd_backfill_full_ratio; + if (_ratio) + *_ratio = cur_ratio; + if (_max_ratio) + *_max_ratio = max_ratio; + return cur_ratio >= max_ratio; + } + + void OSDService::update_osd_stat(vector& hb_peers) + { + Mutex::Locker lock(stat_lock); + + // fill in osd stats too + struct statfs stbuf; + osd->store->statfs(&stbuf); + + uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize; + uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize; + uint64_t avail = stbuf.f_bavail * stbuf.f_bsize; + + osd_stat.kb = bytes >> 10; + osd_stat.kb_used = used >> 10; + osd_stat.kb_avail = avail >> 10; + + osd->logger->set(l_osd_stat_bytes, bytes); + osd->logger->set(l_osd_stat_bytes_used, used); + osd->logger->set(l_osd_stat_bytes_avail, avail); + + osd_stat.hb_in.swap(hb_peers); + osd_stat.hb_out.clear(); + + check_nearfull_warning(osd_stat); + + osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist); + + dout(20) << "update_osd_stat " << osd_stat << dendl; + } + + void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch) + { + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + m->put(); + release_map(next_map); + return; + } + const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer); + Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get(); + share_map_peer(peer, peer_con, next_map); - osd->cluster_messenger->send_message(m, peer_inst); ++ peer_con->send_message(m); + release_map(next_map); + } + + ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) + { + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + release_map(next_map); + return NULL; + } + ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer)); + release_map(next_map); + return con; + } + + pair OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) + { + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + pair ret; + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + release_map(next_map); + return ret; + } + ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer)); + if (next_map->get_hb_front_addr(peer) != entity_addr_t()) + ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer)); + release_map(next_map); + return ret; + } + + + void OSDService::queue_want_pg_temp(pg_t pgid, vector& want) + { + Mutex::Locker l(pg_temp_lock); + pg_temp_wanted[pgid] = want; + } + + void OSDService::send_pg_temp() + { + Mutex::Locker l(pg_temp_lock); + if (pg_temp_wanted.empty()) + return; + dout(10) << "send_pg_temp " << pg_temp_wanted << dendl; + MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch()); + m->pg_temp = pg_temp_wanted; + monc->send_mon_message(m); + } + + + // -------------------------------------- + // dispatch + + epoch_t OSDService::get_peer_epoch(int peer) + { + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p == peer_map_epoch.end()) + return 0; + return p->second; + } + + epoch_t OSDService::note_peer_epoch(int peer, epoch_t e) + { + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p != peer_map_epoch.end()) { + if (p->second < e) { + dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl; + p->second = e; + } else { + dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl; + } + return p->second; + } else { + dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl; + peer_map_epoch[peer] = e; + return e; + } + } + + void OSDService::forget_peer_epoch(int peer, epoch_t as_of) + { + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p != peer_map_epoch.end()) { + if (p->second <= as_of) { + dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of + << " had " << p->second << dendl; + peer_map_epoch.erase(p); + } else { + dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of + << " has " << p->second << " - not forgetting" << dendl; + } + } + } + + bool OSDService::should_share_map(entity_name_t name, Connection *con, + epoch_t epoch, OSDMapRef& osdmap, + const epoch_t *sent_epoch_p) + { + bool should_send = false; + dout(20) << "should_share_map " + << name << " " << con->get_peer_addr() + << " " << epoch << dendl; + + // does client have old map? + if (name.is_client()) { + bool message_sendmap = epoch < osdmap->get_epoch(); + if (message_sendmap && sent_epoch_p) { + dout(20) << "client session last_sent_epoch: " + << *sent_epoch_p + << " versus osdmap epoch " << osdmap->get_epoch() << dendl; + if (*sent_epoch_p < osdmap->get_epoch()) { + should_send = true; + } // else we don't need to send it out again + } + } + + if (con->get_messenger() == osd->cluster_messenger && + con != osd->cluster_messenger->get_loopback_connection() && + osdmap->is_up(name.num()) && + (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || + osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { + // remember + epoch_t has = MAX(get_peer_epoch(name.num()), epoch); + + // share? + if (has < osdmap->get_epoch()) { + dout(10) << name << " " << con->get_peer_addr() + << " has old map " << epoch << " < " + << osdmap->get_epoch() << dendl; + should_send = true; + } + } + + return should_send; + } + + void OSDService::share_map( + entity_name_t name, + Connection *con, + epoch_t epoch, + OSDMapRef& osdmap, + epoch_t *sent_epoch_p) + { + dout(20) << "share_map " + << name << " " << con->get_peer_addr() + << " " << epoch << dendl; + + if ((!osd->is_active()) && (!osd->is_stopping())) { + /*It is safe not to proceed as OSD is not in healthy state*/ + return; + } + + bool want_shared = should_share_map(name, con, epoch, + osdmap, sent_epoch_p); + + if (want_shared){ + if (name.is_client()) { + dout(10) << name << " has old map " << epoch + << " < " << osdmap->get_epoch() << dendl; + // we know the Session is valid or we wouldn't be sending + if (sent_epoch_p) { + *sent_epoch_p = osdmap->get_epoch(); + } + send_incremental_map(epoch, con, osdmap); + } else if (con->get_messenger() == osd->cluster_messenger && + osdmap->is_up(name.num()) && + (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || + osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { + dout(10) << name << " " << con->get_peer_addr() + << " has old map " << epoch << " < " + << osdmap->get_epoch() << dendl; + note_peer_epoch(name.num(), osdmap->get_epoch()); + send_incremental_map(epoch, con, osdmap); + } + } + } + + + void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map) + { + if (!map) + map = get_osdmap(); + + // send map? + epoch_t pe = get_peer_epoch(peer); + if (pe) { + if (pe < map->get_epoch()) { + send_incremental_map(pe, con, map); + note_peer_epoch(peer, map->get_epoch()); + } else + dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl; + } else { + dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl; + // no idea about peer's epoch. + // ??? send recent ??? + // do nothing. + } + } + + + bool OSDService::inc_scrubs_pending() + { + bool result = false; + + sched_scrub_lock.Lock(); + if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) { + dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1) + << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; + result = true; + ++scrubs_pending; + } else { + dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl; + } + sched_scrub_lock.Unlock(); + + return result; + } + + void OSDService::dec_scrubs_pending() + { + sched_scrub_lock.Lock(); + dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1) + << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; + --scrubs_pending; + assert(scrubs_pending >= 0); + sched_scrub_lock.Unlock(); + } + + void OSDService::inc_scrubs_active(bool reserved) + { + sched_scrub_lock.Lock(); + ++(scrubs_active); + if (reserved) { + --(scrubs_pending); + dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active + << " (max " << cct->_conf->osd_max_scrubs + << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl; + assert(scrubs_pending >= 0); + } else { + dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active + << " (max " << cct->_conf->osd_max_scrubs + << ", pending " << scrubs_pending << ")" << dendl; + } + sched_scrub_lock.Unlock(); + } + + void OSDService::dec_scrubs_active() + { + sched_scrub_lock.Lock(); + dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1) + << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl; + --scrubs_active; + sched_scrub_lock.Unlock(); + } + + void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, + epoch_t *_bind_epoch) const + { + Mutex::Locker l(epoch_lock); + if (_boot_epoch) + *_boot_epoch = boot_epoch; + if (_up_epoch) + *_up_epoch = up_epoch; + if (_bind_epoch) + *_bind_epoch = bind_epoch; + } + + void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, + const epoch_t *_bind_epoch) + { + Mutex::Locker l(epoch_lock); + if (_boot_epoch) { + assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch); + boot_epoch = *_boot_epoch; + } + if (_up_epoch) { + assert(*_up_epoch == 0 || *_up_epoch >= up_epoch); + up_epoch = *_up_epoch; + } + if (_bind_epoch) { + assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch); + bind_epoch = *_bind_epoch; + } + } + + bool OSDService::prepare_to_stop() + { + Mutex::Locker l(is_stopping_lock); + if (state != NOT_STOPPING) + return false; + + OSDMapRef osdmap = get_osdmap(); + if (osdmap && osdmap->is_up(whoami)) { + dout(0) << __func__ << " telling mon we are shutting down" << dendl; + state = PREPARING_TO_STOP; + monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(), + osdmap->get_inst(whoami), + osdmap->get_epoch(), + true // request ack + )); + utime_t now = ceph_clock_now(cct); + utime_t timeout; + timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout); + while ((ceph_clock_now(cct) < timeout) && + (state != STOPPING)) { + is_stopping_cond.WaitUntil(is_stopping_lock, timeout); + } + } + dout(0) << __func__ << " starting shutdown" << dendl; + state = STOPPING; + return true; + } + + void OSDService::got_stop_ack() + { + Mutex::Locker l(is_stopping_lock); + if (state == PREPARING_TO_STOP) { + dout(0) << __func__ << " starting shutdown" << dendl; + state = STOPPING; + is_stopping_cond.Signal(); + } else { + dout(10) << __func__ << " ignoring msg" << dendl; + } + } + + + MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to, + OSDSuperblock& sblock) + { + MOSDMap *m = new MOSDMap(monc->get_fsid()); + m->oldest_map = sblock.oldest_map; + m->newest_map = sblock.newest_map; + + for (epoch_t e = to; e > since; e--) { + bufferlist bl; + if (e > m->oldest_map && get_inc_map_bl(e, bl)) { + m->incremental_maps[e].claim(bl); + } else if (get_map_bl(e, bl)) { + m->maps[e].claim(bl); + break; + } else { + derr << "since " << since << " to " << to + << " oldest " << m->oldest_map << " newest " << m->newest_map + << dendl; + m->put(); + m = NULL; + break; + } + } + return m; + } + + void OSDService::send_map(MOSDMap *m, Connection *con) + { - Messenger *msgr = client_messenger; - if (entity_name_t::TYPE_OSD == con->get_peer_type()) - msgr = cluster_messenger; - msgr->send_message(m, con); ++ con->send_message(m); + } + + void OSDService::send_incremental_map(epoch_t since, Connection *con, + OSDMapRef& osdmap) + { + epoch_t to = osdmap->get_epoch(); + dout(10) << "send_incremental_map " << since << " -> " << to + << " to " << con << " " << con->get_peer_addr() << dendl; + + MOSDMap *m = NULL; + while (!m) { + OSDSuperblock sblock(get_superblock()); + if (since < sblock.oldest_map) { + // just send latest full map + MOSDMap *m = new MOSDMap(monc->get_fsid()); + m->oldest_map = sblock.oldest_map; + m->newest_map = sblock.newest_map; + get_map_bl(to, m->maps[to]); + send_map(m, con); + return; + } + + if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) { + dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs + << ", only sending most recent" << dendl; + since = to - cct->_conf->osd_map_share_max_epochs; + } + + if (to - since > (epoch_t)cct->_conf->osd_map_message_max) + to = since + cct->_conf->osd_map_message_max; + m = build_incremental_map_msg(since, to, sblock); + } + send_map(m, con); + } + + bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl) + { + bool found = map_bl_cache.lookup(e, &bl); + if (found) + return true; + found = store->read( + coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0; + if (found) + _add_map_bl(e, bl); + return found; + } + + bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl) + { + Mutex::Locker l(map_cache_lock); + bool found = map_bl_inc_cache.lookup(e, &bl); + if (found) + return true; + found = store->read( + coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0; + if (found) + _add_map_inc_bl(e, bl); + return found; + } + + void OSDService::_add_map_bl(epoch_t e, bufferlist& bl) + { + dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl; + map_bl_cache.add(e, bl); + } + + void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl) + { + dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl; + map_bl_inc_cache.add(e, bl); + } + + void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl) + { + Mutex::Locker l(map_cache_lock); + map_bl_inc_cache.pin(e, bl); + } + + void OSDService::pin_map_bl(epoch_t e, bufferlist &bl) + { + Mutex::Locker l(map_cache_lock); + map_bl_cache.pin(e, bl); + } + + void OSDService::clear_map_bl_cache_pins(epoch_t e) + { + Mutex::Locker l(map_cache_lock); + map_bl_inc_cache.clear_pinned(e); + map_bl_cache.clear_pinned(e); + } + + OSDMapRef OSDService::_add_map(OSDMap *o) + { + epoch_t e = o->get_epoch(); + + if (cct->_conf->osd_map_dedup) { + // Dedup against an existing map at a nearby epoch + OSDMapRef for_dedup = map_cache.lower_bound(e); + if (for_dedup) { + OSDMap::dedup(for_dedup.get(), o); + } + } + OSDMapRef l = map_cache.add(e, o); + return l; + } + + OSDMapRef OSDService::try_get_map(epoch_t epoch) + { + Mutex::Locker l(map_cache_lock); + OSDMapRef retval = map_cache.lookup(epoch); + if (retval) { + dout(30) << "get_map " << epoch << " -cached" << dendl; + return retval; + } + + OSDMap *map = new OSDMap; + if (epoch > 0) { + dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl; + bufferlist bl; + if (!_get_map_bl(epoch, bl)) { + delete map; + return OSDMapRef(); + } + map->decode(bl); + } else { + dout(20) << "get_map " << epoch << " - return initial " << map << dendl; + } + return _add_map(map); + } + + bool OSDService::queue_for_recovery(PG *pg) + { + bool b = recovery_wq.queue(pg); + if (b) + dout(10) << "queue_for_recovery queued " << *pg << dendl; + else + dout(10) << "queue_for_recovery already queued " << *pg << dendl; + return b; + } + + + // ops + + + void OSDService::reply_op_error(OpRequestRef op, int err) + { + reply_op_error(op, err, eversion_t(), 0); + } + + void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, + version_t uv) + { + MOSDOp *m = static_cast(op->get_req()); + assert(m->get_header().type == CEPH_MSG_OSD_OP); + int flags; + flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); + + MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, + true); + reply->set_reply_versions(v, uv); - m->get_connection()->get_messenger()->send_message(reply, m->get_connection()); ++ m->get_connection()->send_message(reply); + } + + void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) + { + MOSDOp *m = static_cast(op->get_req()); + assert(m->get_header().type == CEPH_MSG_OSD_OP); + + assert(m->get_map_epoch() >= pg->info.history.same_primary_since); + + if (pg->is_ec_pg()) { + /** + * OSD recomputes op target based on current OSDMap. With an EC pg, we + * can get this result: + * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping + * [CRUSH_ITEM_NONE, 2, 3]/3 + * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping + * [3, 2, 3]/3 + * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary + * -- misdirected op + * 4) client resends and this time PG 3.9s0 having caught up to 513 gets + * it and fulfils it + * + * We can't compute the op target based on the sending map epoch due to + * splitting. The simplest thing is to detect such cases here and drop + * them without an error (the client will resend anyway). + */ + OSDMapRef opmap = try_get_map(m->get_map_epoch()); + if (!opmap) { + dout(7) << __func__ << ": " << *pg << " no longer have map for " + << m->get_map_epoch() << ", dropping" << dendl; + return; + } + pg_t _pgid = m->get_pg(); + spg_t pgid; + if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) + _pgid = opmap->raw_pg_to_pg(_pgid); + if (opmap->get_primary_shard(_pgid, &pgid) && + pgid.shard != pg->info.pgid.shard) { + dout(7) << __func__ << ": " << *pg << " primary changed since " + << m->get_map_epoch() << ", dropping" << dendl; + return; + } + } + + dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl; + clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid() + << " pg " << m->get_pg() + << " to osd." << whoami + << " not " << pg->acting + << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n"; + reply_op_error(op, -ENXIO); + } + + + void OSDService::dequeue_pg(PG *pg, list *dequeued) + { + osd->op_shardedwq.dequeue(pg, dequeued); + } + + void OSDService::queue_for_peering(PG *pg) + { + peering_wq.queue(pg); + } + + + // ==================================================================== + // OSD #undef dout_prefix #define dout_prefix *_dout @@@ -4064,77 -4677,7 +4673,6 @@@ void OSD::send_alive( } } - void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch) - { - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - m->put(); - release_map(next_map); - return; - } - const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer); - Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get(); - share_map_peer(peer, peer_con, next_map); - peer_con->send_message(m); - release_map(next_map); - } - - ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) - { - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - release_map(next_map); - return NULL; - } - ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer)); - release_map(next_map); - return con; - } - - pair OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) - { - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - pair ret; - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - release_map(next_map); - return ret; - } - ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer)); - if (next_map->get_hb_front_addr(peer) != entity_addr_t()) - ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer)); - release_map(next_map); - return ret; - } - - void OSDService::queue_want_pg_temp(pg_t pgid, vector& want) - { - Mutex::Locker l(pg_temp_lock); - pg_temp_wanted[pgid] = want; - } - - void OSDService::send_pg_temp() - { - Mutex::Locker l(pg_temp_lock); - if (pg_temp_wanted.empty()) - return; - dout(10) << "send_pg_temp " << pg_temp_wanted << dendl; - MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch()); - m->pg_temp = pg_temp_wanted; - monc->send_mon_message(m); - } -- void OSD::send_failures() { assert(osd_lock.is_locked()); @@@ -4692,228 -5235,74 +5230,74 @@@ void OSD::do_command(Connection *con, c if (mls.empty()) continue; fout << "missing_loc: " << mls << std::endl; - } - pg->unlock(); - fout << std::endl; - } - - fout.close(); - } - else if (prefix == "debug kick_recovery_wq") { - int64_t delay; - cmd_getval(cct, cmdmap, "delay", delay); - ostringstream oss; - oss << delay; - r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str()); - if (r != 0) { - ss << "kick_recovery_wq: error setting " - << "osd_recovery_delay_start to '" << delay << "': error " - << r; - goto out; - } - cct->_conf->apply_changes(NULL); - ss << "kicking recovery queue. set osd_recovery_delay_start " - << "to " << cct->_conf->osd_recovery_delay_start; - defer_recovery_until = ceph_clock_now(cct); - defer_recovery_until += cct->_conf->osd_recovery_delay_start; - recovery_wq.wake(); - } - - else if (prefix == "cpu_profiler") { - string arg; - cmd_getval(cct, cmdmap, "arg", arg); - vector argvec; - get_str_vec(arg, argvec); - cpu_profiler_handle_command(argvec, ds); - } - - else if (prefix == "dump_pg_recovery_stats") { - stringstream s; - if (f) { - pg_recovery_stats.dump_formatted(f.get()); - f->flush(ds); - } else { - pg_recovery_stats.dump(s); - ds << "dump pg recovery stats: " << s.str(); - } - } - - else if (prefix == "reset_pg_recovery_stats") { - ss << "reset pg recovery stats"; - pg_recovery_stats.reset(); - } - - else { - ss << "unrecognized command! " << cmd; - r = -EINVAL; - } - - out: - rs = ss.str(); - odata.append(ds); - dout(0) << "do_command r=" << r << " " << rs << dendl; - clog.info() << rs << "\n"; - if (con) { - MCommandReply *reply = new MCommandReply(r, rs); - reply->set_tid(tid); - reply->set_data(odata); - con->send_message(reply); - } - return; - } - - - - // -------------------------------------- - // dispatch - - epoch_t OSDService::get_peer_epoch(int peer) - { - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p == peer_map_epoch.end()) - return 0; - return p->second; - } - - epoch_t OSDService::note_peer_epoch(int peer, epoch_t e) - { - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p != peer_map_epoch.end()) { - if (p->second < e) { - dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl; - p->second = e; - } else { - dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl; - } - return p->second; - } else { - dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl; - peer_map_epoch[peer] = e; - return e; - } - } - - void OSDService::forget_peer_epoch(int peer, epoch_t as_of) - { - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p != peer_map_epoch.end()) { - if (p->second <= as_of) { - dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of - << " had " << p->second << dendl; - peer_map_epoch.erase(p); - } else { - dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of - << " has " << p->second << " - not forgetting" << dendl; - } - } - } - - bool OSDService::should_share_map(entity_name_t name, Connection *con, - epoch_t epoch, OSDMapRef& osdmap, - const epoch_t *sent_epoch_p) - { - bool should_send = false; - dout(20) << "should_share_map " - << name << " " << con->get_peer_addr() - << " " << epoch << dendl; - - // does client have old map? - if (name.is_client()) { - bool message_sendmap = epoch < osdmap->get_epoch(); - if (message_sendmap && sent_epoch_p) { - dout(20) << "client session last_sent_epoch: " - << *sent_epoch_p - << " versus osdmap epoch " << osdmap->get_epoch() << dendl; - if (*sent_epoch_p < osdmap->get_epoch()) { - should_send = true; - } // else we don't need to send it out again + } + pg->unlock(); + fout << std::endl; } - } - - if (con->get_messenger() == osd->cluster_messenger && - con != osd->cluster_messenger->get_loopback_connection() && - osdmap->is_up(name.num()) && - (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || - osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { - // remember - epoch_t has = MAX(get_peer_epoch(name.num()), epoch); - // share? - if (has < osdmap->get_epoch()) { - dout(10) << name << " " << con->get_peer_addr() - << " has old map " << epoch << " < " - << osdmap->get_epoch() << dendl; - should_send = true; + fout.close(); + } + else if (prefix == "debug kick_recovery_wq") { + int64_t delay; + cmd_getval(cct, cmdmap, "delay", delay); + ostringstream oss; + oss << delay; + r = cct->_conf->set_val("osd_recovery_delay_start", oss.str().c_str()); + if (r != 0) { + ss << "kick_recovery_wq: error setting " + << "osd_recovery_delay_start to '" << delay << "': error " + << r; + goto out; } + cct->_conf->apply_changes(NULL); + ss << "kicking recovery queue. set osd_recovery_delay_start " + << "to " << cct->_conf->osd_recovery_delay_start; + defer_recovery_until = ceph_clock_now(cct); + defer_recovery_until += cct->_conf->osd_recovery_delay_start; + recovery_wq.wake(); } - return should_send; - } - - void OSDService::share_map( - entity_name_t name, - Connection *con, - epoch_t epoch, - OSDMapRef& osdmap, - epoch_t *sent_epoch_p) - { - dout(20) << "share_map " - << name << " " << con->get_peer_addr() - << " " << epoch << dendl; - - if ((!osd->is_active()) && (!osd->is_stopping())) { - /*It is safe not to proceed as OSD is not in healthy state*/ - return; + else if (prefix == "cpu_profiler") { + string arg; + cmd_getval(cct, cmdmap, "arg", arg); + vector argvec; + get_str_vec(arg, argvec); + cpu_profiler_handle_command(argvec, ds); } - bool want_shared = should_share_map(name, con, epoch, - osdmap, sent_epoch_p); - - if (want_shared){ - if (name.is_client()) { - dout(10) << name << " has old map " << epoch - << " < " << osdmap->get_epoch() << dendl; - // we know the Session is valid or we wouldn't be sending - if (sent_epoch_p) { - *sent_epoch_p = osdmap->get_epoch(); - } - send_incremental_map(epoch, con, osdmap); - } else if (con->get_messenger() == osd->cluster_messenger && - osdmap->is_up(name.num()) && - (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || - osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { - dout(10) << name << " " << con->get_peer_addr() - << " has old map " << epoch << " < " - << osdmap->get_epoch() << dendl; - note_peer_epoch(name.num(), osdmap->get_epoch()); - send_incremental_map(epoch, con, osdmap); + else if (prefix == "dump_pg_recovery_stats") { + stringstream s; + if (f) { + pg_recovery_stats.dump_formatted(f.get()); + f->flush(ds); + } else { + pg_recovery_stats.dump(s); + ds << "dump pg recovery stats: " << s.str(); } } - } + else if (prefix == "reset_pg_recovery_stats") { + ss << "reset pg recovery stats"; + pg_recovery_stats.reset(); + } - void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map) - { - if (!map) - map = get_osdmap(); + else { + ss << "unrecognized command! " << cmd; + r = -EINVAL; + } - // send map? - epoch_t pe = get_peer_epoch(peer); - if (pe) { - if (pe < map->get_epoch()) { - send_incremental_map(pe, con, map); - note_peer_epoch(peer, map->get_epoch()); - } else - dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl; - } else { - dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl; - // no idea about peer's epoch. - // ??? send recent ??? - // do nothing. + out: + rs = ss.str(); + odata.append(ds); + dout(0) << "do_command r=" << r << " " << rs << dendl; + clog.info() << rs << "\n"; + if (con) { + MCommandReply *reply = new MCommandReply(r, rs); + reply->set_tid(tid); + reply->set_data(odata); - client_messenger->send_message(reply, con); ++ con->send_message(reply); } + return; }