From: Sage Weil Date: Mon, 4 Aug 2014 21:57:28 +0000 (-0700) Subject: osd: reorder OSDService methods under proper dout_prefix macro X-Git-Tag: v0.84~25^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=80829d7805b1113d322509345372bf9c7984559b;p=ceph.git osd: reorder OSDService methods under proper dout_prefix macro The dout_prefix for OSDService uses get_osdmap() to grab a shared_ptr for the epoch printout. The OSD one does not, and is not safe to run in all thread contexts. In particular, update_osd_stat() is run by the heartbeat thread and can race with the shared_ptr itself being updated with a new map. Ironically, if this were simply an OSDMap*, there would be no race since the pointer is a single word and updates atomically. Fix this, and any similar issues, by moving the OSDService methods up in OSD.cc so that they use the safe dout macro. Fixes: #8998 Backport: firefly (in a minimal form, I think!) Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 08f2e1ad6b79..800732c8a714 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -569,6 +569,722 @@ void OSDService::agent_stop() agent_thread.join(); } +// ------------------------------------- + +float OSDService::get_full_ratio() +{ + float full_ratio = cct->_conf->osd_failsafe_full_ratio; + if (full_ratio > 1.0) full_ratio /= 100.0; + return full_ratio; +} + +float OSDService::get_nearfull_ratio() +{ + float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio; + if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0; + return nearfull_ratio; +} + +void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat) +{ + Mutex::Locker l(full_status_lock); + enum s_names new_state; + + time_t now = ceph_clock_gettime(NULL); + + // We base ratio on kb_avail rather than kb_used because they can + // differ significantly e.g. on btrfs volumes with a large number of + // chunks reserved for metadata, and for our purposes (avoiding + // completely filling the disk) it's far more important to know how + // much space is available to use than how much we've already used. + float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb); + float nearfull_ratio = get_nearfull_ratio(); + float full_ratio = get_full_ratio(); + cur_ratio = ratio; + + if (full_ratio > 0 && ratio > full_ratio) { + new_state = FULL; + } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) { + new_state = NEAR; + } else { + cur_state = NONE; + return; + } + + if (cur_state != new_state) { + cur_state = new_state; + } else if (now - last_msg < cct->_conf->osd_op_complaint_time) { + return; + } + last_msg = now; + if (cur_state == FULL) + clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full"; + else + clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)"; +} + +bool OSDService::check_failsafe_full() +{ + Mutex::Locker l(full_status_lock); + if (cur_state == FULL) + return true; + return false; +} + +bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio) +{ + Mutex::Locker l(full_status_lock); + double max_ratio; + max_ratio = cct->_conf->osd_backfill_full_ratio; + if (_ratio) + *_ratio = cur_ratio; + if (_max_ratio) + *_max_ratio = max_ratio; + return cur_ratio >= max_ratio; +} + +void OSDService::update_osd_stat(vector& hb_peers) +{ + Mutex::Locker lock(stat_lock); + + // fill in osd stats too + struct statfs stbuf; + osd->store->statfs(&stbuf); + + uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize; + uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize; + uint64_t avail = stbuf.f_bavail * stbuf.f_bsize; + + osd_stat.kb = bytes >> 10; + osd_stat.kb_used = used >> 10; + osd_stat.kb_avail = avail >> 10; + + osd->logger->set(l_osd_stat_bytes, bytes); + osd->logger->set(l_osd_stat_bytes_used, used); + osd->logger->set(l_osd_stat_bytes_avail, avail); + + osd_stat.hb_in.swap(hb_peers); + osd_stat.hb_out.clear(); + + check_nearfull_warning(osd_stat); + + osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist); + + dout(20) << "update_osd_stat " << osd_stat << dendl; +} + +void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch) +{ + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + m->put(); + release_map(next_map); + return; + } + const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer); + Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get(); + share_map_peer(peer, peer_con, next_map); + osd->cluster_messenger->send_message(m, peer_inst); + release_map(next_map); +} + +ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) +{ + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + release_map(next_map); + return NULL; + } + ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer)); + release_map(next_map); + return con; +} + +pair OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) +{ + OSDMapRef next_map = get_nextmap_reserved(); + // service map is always newer/newest + assert(from_epoch <= next_map->get_epoch()); + + pair ret; + if (next_map->is_down(peer) || + next_map->get_info(peer).up_from > from_epoch) { + release_map(next_map); + return ret; + } + ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer)); + if (next_map->get_hb_front_addr(peer) != entity_addr_t()) + ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer)); + release_map(next_map); + return ret; +} + + +void OSDService::queue_want_pg_temp(pg_t pgid, vector& want) +{ + Mutex::Locker l(pg_temp_lock); + pg_temp_wanted[pgid] = want; +} + +void OSDService::send_pg_temp() +{ + Mutex::Locker l(pg_temp_lock); + if (pg_temp_wanted.empty()) + return; + dout(10) << "send_pg_temp " << pg_temp_wanted << dendl; + MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch()); + m->pg_temp = pg_temp_wanted; + monc->send_mon_message(m); +} + + +// -------------------------------------- +// dispatch + +epoch_t OSDService::get_peer_epoch(int peer) +{ + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p == peer_map_epoch.end()) + return 0; + return p->second; +} + +epoch_t OSDService::note_peer_epoch(int peer, epoch_t e) +{ + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p != peer_map_epoch.end()) { + if (p->second < e) { + dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl; + p->second = e; + } else { + dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl; + } + return p->second; + } else { + dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl; + peer_map_epoch[peer] = e; + return e; + } +} + +void OSDService::forget_peer_epoch(int peer, epoch_t as_of) +{ + Mutex::Locker l(peer_map_epoch_lock); + map::iterator p = peer_map_epoch.find(peer); + if (p != peer_map_epoch.end()) { + if (p->second <= as_of) { + dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of + << " had " << p->second << dendl; + peer_map_epoch.erase(p); + } else { + dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of + << " has " << p->second << " - not forgetting" << dendl; + } + } +} + +bool OSDService::should_share_map(entity_name_t name, Connection *con, + epoch_t epoch, OSDMapRef& osdmap, + const epoch_t *sent_epoch_p) +{ + bool should_send = false; + dout(20) << "should_share_map " + << name << " " << con->get_peer_addr() + << " " << epoch << dendl; + + // does client have old map? + if (name.is_client()) { + bool message_sendmap = epoch < osdmap->get_epoch(); + if (message_sendmap && sent_epoch_p) { + dout(20) << "client session last_sent_epoch: " + << *sent_epoch_p + << " versus osdmap epoch " << osdmap->get_epoch() << dendl; + if (*sent_epoch_p < osdmap->get_epoch()) { + should_send = true; + } // else we don't need to send it out again + } + } + + if (con->get_messenger() == osd->cluster_messenger && + con != osd->cluster_messenger->get_loopback_connection() && + osdmap->is_up(name.num()) && + (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || + osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { + // remember + epoch_t has = MAX(get_peer_epoch(name.num()), epoch); + + // share? + if (has < osdmap->get_epoch()) { + dout(10) << name << " " << con->get_peer_addr() + << " has old map " << epoch << " < " + << osdmap->get_epoch() << dendl; + should_send = true; + } + } + + return should_send; +} + +void OSDService::share_map( + entity_name_t name, + Connection *con, + epoch_t epoch, + OSDMapRef& osdmap, + epoch_t *sent_epoch_p) +{ + dout(20) << "share_map " + << name << " " << con->get_peer_addr() + << " " << epoch << dendl; + + if ((!osd->is_active()) && (!osd->is_stopping())) { + /*It is safe not to proceed as OSD is not in healthy state*/ + return; + } + + bool want_shared = should_share_map(name, con, epoch, + osdmap, sent_epoch_p); + + if (want_shared){ + if (name.is_client()) { + dout(10) << name << " has old map " << epoch + << " < " << osdmap->get_epoch() << dendl; + // we know the Session is valid or we wouldn't be sending + if (sent_epoch_p) { + *sent_epoch_p = osdmap->get_epoch(); + } + send_incremental_map(epoch, con, osdmap); + } else if (con->get_messenger() == osd->cluster_messenger && + osdmap->is_up(name.num()) && + (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || + osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { + dout(10) << name << " " << con->get_peer_addr() + << " has old map " << epoch << " < " + << osdmap->get_epoch() << dendl; + note_peer_epoch(name.num(), osdmap->get_epoch()); + send_incremental_map(epoch, con, osdmap); + } + } +} + + +void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map) +{ + if (!map) + map = get_osdmap(); + + // send map? + epoch_t pe = get_peer_epoch(peer); + if (pe) { + if (pe < map->get_epoch()) { + send_incremental_map(pe, con, map); + note_peer_epoch(peer, map->get_epoch()); + } else + dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl; + } else { + dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl; + // no idea about peer's epoch. + // ??? send recent ??? + // do nothing. + } +} + + +bool OSDService::inc_scrubs_pending() +{ + bool result = false; + + sched_scrub_lock.Lock(); + if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) { + dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1) + << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; + result = true; + ++scrubs_pending; + } else { + dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl; + } + sched_scrub_lock.Unlock(); + + return result; +} + +void OSDService::dec_scrubs_pending() +{ + sched_scrub_lock.Lock(); + dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1) + << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; + --scrubs_pending; + assert(scrubs_pending >= 0); + sched_scrub_lock.Unlock(); +} + +void OSDService::inc_scrubs_active(bool reserved) +{ + sched_scrub_lock.Lock(); + ++(scrubs_active); + if (reserved) { + --(scrubs_pending); + dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active + << " (max " << cct->_conf->osd_max_scrubs + << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl; + assert(scrubs_pending >= 0); + } else { + dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active + << " (max " << cct->_conf->osd_max_scrubs + << ", pending " << scrubs_pending << ")" << dendl; + } + sched_scrub_lock.Unlock(); +} + +void OSDService::dec_scrubs_active() +{ + sched_scrub_lock.Lock(); + dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1) + << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl; + --scrubs_active; + sched_scrub_lock.Unlock(); +} + +void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, + epoch_t *_bind_epoch) const +{ + Mutex::Locker l(epoch_lock); + if (_boot_epoch) + *_boot_epoch = boot_epoch; + if (_up_epoch) + *_up_epoch = up_epoch; + if (_bind_epoch) + *_bind_epoch = bind_epoch; +} + +void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, + const epoch_t *_bind_epoch) +{ + Mutex::Locker l(epoch_lock); + if (_boot_epoch) { + assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch); + boot_epoch = *_boot_epoch; + } + if (_up_epoch) { + assert(*_up_epoch == 0 || *_up_epoch >= up_epoch); + up_epoch = *_up_epoch; + } + if (_bind_epoch) { + assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch); + bind_epoch = *_bind_epoch; + } +} + +bool OSDService::prepare_to_stop() +{ + Mutex::Locker l(is_stopping_lock); + if (state != NOT_STOPPING) + return false; + + OSDMapRef osdmap = get_osdmap(); + if (osdmap && osdmap->is_up(whoami)) { + dout(0) << __func__ << " telling mon we are shutting down" << dendl; + state = PREPARING_TO_STOP; + monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(), + osdmap->get_inst(whoami), + osdmap->get_epoch(), + true // request ack + )); + utime_t now = ceph_clock_now(cct); + utime_t timeout; + timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout); + while ((ceph_clock_now(cct) < timeout) && + (state != STOPPING)) { + is_stopping_cond.WaitUntil(is_stopping_lock, timeout); + } + } + dout(0) << __func__ << " starting shutdown" << dendl; + state = STOPPING; + return true; +} + +void OSDService::got_stop_ack() +{ + Mutex::Locker l(is_stopping_lock); + if (state == PREPARING_TO_STOP) { + dout(0) << __func__ << " starting shutdown" << dendl; + state = STOPPING; + is_stopping_cond.Signal(); + } else { + dout(10) << __func__ << " ignoring msg" << dendl; + } +} + + +MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to, + OSDSuperblock& sblock) +{ + MOSDMap *m = new MOSDMap(monc->get_fsid()); + m->oldest_map = sblock.oldest_map; + m->newest_map = sblock.newest_map; + + for (epoch_t e = to; e > since; e--) { + bufferlist bl; + if (e > m->oldest_map && get_inc_map_bl(e, bl)) { + m->incremental_maps[e].claim(bl); + } else if (get_map_bl(e, bl)) { + m->maps[e].claim(bl); + break; + } else { + derr << "since " << since << " to " << to + << " oldest " << m->oldest_map << " newest " << m->newest_map + << dendl; + m->put(); + m = NULL; + break; + } + } + return m; +} + +void OSDService::send_map(MOSDMap *m, Connection *con) +{ + Messenger *msgr = client_messenger; + if (entity_name_t::TYPE_OSD == con->get_peer_type()) + msgr = cluster_messenger; + msgr->send_message(m, con); +} + +void OSDService::send_incremental_map(epoch_t since, Connection *con, + OSDMapRef& osdmap) +{ + epoch_t to = osdmap->get_epoch(); + dout(10) << "send_incremental_map " << since << " -> " << to + << " to " << con << " " << con->get_peer_addr() << dendl; + + MOSDMap *m = NULL; + while (!m) { + OSDSuperblock sblock(get_superblock()); + if (since < sblock.oldest_map) { + // just send latest full map + MOSDMap *m = new MOSDMap(monc->get_fsid()); + m->oldest_map = sblock.oldest_map; + m->newest_map = sblock.newest_map; + get_map_bl(to, m->maps[to]); + send_map(m, con); + return; + } + + if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) { + dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs + << ", only sending most recent" << dendl; + since = to - cct->_conf->osd_map_share_max_epochs; + } + + if (to - since > (epoch_t)cct->_conf->osd_map_message_max) + to = since + cct->_conf->osd_map_message_max; + m = build_incremental_map_msg(since, to, sblock); + } + send_map(m, con); +} + +bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl) +{ + bool found = map_bl_cache.lookup(e, &bl); + if (found) + return true; + found = store->read( + coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0; + if (found) + _add_map_bl(e, bl); + return found; +} + +bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl) +{ + Mutex::Locker l(map_cache_lock); + bool found = map_bl_inc_cache.lookup(e, &bl); + if (found) + return true; + found = store->read( + coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0; + if (found) + _add_map_inc_bl(e, bl); + return found; +} + +void OSDService::_add_map_bl(epoch_t e, bufferlist& bl) +{ + dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl; + map_bl_cache.add(e, bl); +} + +void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl) +{ + dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl; + map_bl_inc_cache.add(e, bl); +} + +void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl) +{ + Mutex::Locker l(map_cache_lock); + map_bl_inc_cache.pin(e, bl); +} + +void OSDService::pin_map_bl(epoch_t e, bufferlist &bl) +{ + Mutex::Locker l(map_cache_lock); + map_bl_cache.pin(e, bl); +} + +void OSDService::clear_map_bl_cache_pins(epoch_t e) +{ + Mutex::Locker l(map_cache_lock); + map_bl_inc_cache.clear_pinned(e); + map_bl_cache.clear_pinned(e); +} + +OSDMapRef OSDService::_add_map(OSDMap *o) +{ + epoch_t e = o->get_epoch(); + + if (cct->_conf->osd_map_dedup) { + // Dedup against an existing map at a nearby epoch + OSDMapRef for_dedup = map_cache.lower_bound(e); + if (for_dedup) { + OSDMap::dedup(for_dedup.get(), o); + } + } + OSDMapRef l = map_cache.add(e, o); + return l; +} + +OSDMapRef OSDService::try_get_map(epoch_t epoch) +{ + Mutex::Locker l(map_cache_lock); + OSDMapRef retval = map_cache.lookup(epoch); + if (retval) { + dout(30) << "get_map " << epoch << " -cached" << dendl; + return retval; + } + + OSDMap *map = new OSDMap; + if (epoch > 0) { + dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl; + bufferlist bl; + if (!_get_map_bl(epoch, bl)) { + delete map; + return OSDMapRef(); + } + map->decode(bl); + } else { + dout(20) << "get_map " << epoch << " - return initial " << map << dendl; + } + return _add_map(map); +} + +bool OSDService::queue_for_recovery(PG *pg) +{ + bool b = recovery_wq.queue(pg); + if (b) + dout(10) << "queue_for_recovery queued " << *pg << dendl; + else + dout(10) << "queue_for_recovery already queued " << *pg << dendl; + return b; +} + + +// ops + + +void OSDService::reply_op_error(OpRequestRef op, int err) +{ + reply_op_error(op, err, eversion_t(), 0); +} + +void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, + version_t uv) +{ + MOSDOp *m = static_cast(op->get_req()); + assert(m->get_header().type == CEPH_MSG_OSD_OP); + int flags; + flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); + + MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, + true); + reply->set_reply_versions(v, uv); + m->get_connection()->get_messenger()->send_message(reply, m->get_connection()); +} + +void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) +{ + MOSDOp *m = static_cast(op->get_req()); + assert(m->get_header().type == CEPH_MSG_OSD_OP); + + assert(m->get_map_epoch() >= pg->info.history.same_primary_since); + + if (pg->is_ec_pg()) { + /** + * OSD recomputes op target based on current OSDMap. With an EC pg, we + * can get this result: + * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping + * [CRUSH_ITEM_NONE, 2, 3]/3 + * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping + * [3, 2, 3]/3 + * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary + * -- misdirected op + * 4) client resends and this time PG 3.9s0 having caught up to 513 gets + * it and fulfils it + * + * We can't compute the op target based on the sending map epoch due to + * splitting. The simplest thing is to detect such cases here and drop + * them without an error (the client will resend anyway). + */ + OSDMapRef opmap = try_get_map(m->get_map_epoch()); + if (!opmap) { + dout(7) << __func__ << ": " << *pg << " no longer have map for " + << m->get_map_epoch() << ", dropping" << dendl; + return; + } + pg_t _pgid = m->get_pg(); + spg_t pgid; + if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) + _pgid = opmap->raw_pg_to_pg(_pgid); + if (opmap->get_primary_shard(_pgid, &pgid) && + pgid.shard != pg->info.pgid.shard) { + dout(7) << __func__ << ": " << *pg << " primary changed since " + << m->get_map_epoch() << ", dropping" << dendl; + return; + } + } + + dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl; + clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid() + << " pg " << m->get_pg() + << " to osd." << whoami + << " not " << pg->acting + << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n"; + reply_op_error(op, -ENXIO); +} + + +void OSDService::dequeue_pg(PG *pg, list *dequeued) +{ + osd->op_shardedwq.dequeue(pg, dequeued); +} + +void OSDService::queue_for_peering(PG *pg) +{ + peering_wq.queue(pg); +} + + +// ==================================================================== +// OSD #undef dout_prefix #define dout_prefix *_dout @@ -2649,192 +3365,90 @@ bool OSD::project_pg_history(spg_t pgid, pg_history_t& h, epoch_t from, { dout(15) << "project_pg_history " << pgid << " from " << from << " to " << osdmap->get_epoch() - << ", start " << h - << dendl; - - epoch_t e; - for (e = osdmap->get_epoch(); - e > from; - e--) { - // verify during intermediate epoch (e-1) - OSDMapRef oldmap = service.try_get_map(e-1); - if (!oldmap) { - dout(15) << __func__ << ": found map gap, returning false" << dendl; - return false; - } - assert(oldmap->have_pg_pool(pgid.pool())); - - int upprimary, actingprimary; - vector up, acting; - oldmap->pg_to_up_acting_osds( - pgid.pgid, - &up, - &upprimary, - &acting, - &actingprimary); - - // acting set change? - if ((actingprimary != currentactingprimary || - upprimary != currentupprimary || - acting != currentacting || - up != currentup) && e > h.same_interval_since) { - dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e - << " from " << acting << "/" << up - << " " << actingprimary << "/" << upprimary - << " -> " << currentacting << "/" << currentup - << " " << currentactingprimary << "/" << currentupprimary - << dendl; - h.same_interval_since = e; - } - // split? - if (pgid.is_split(oldmap->get_pg_num(pgid.pool()), - osdmap->get_pg_num(pgid.pool()), - 0)) { - h.same_interval_since = e; - } - // up set change? - if ((up != currentup || upprimary != currentupprimary) - && e > h.same_up_since) { - dout(15) << "project_pg_history " << pgid << " up changed in " << e - << " from " << up << " " << upprimary - << " -> " << currentup << " " << currentupprimary << dendl; - h.same_up_since = e; - } - - // primary change? - if (OSDMap::primary_changed( - actingprimary, - acting, - currentactingprimary, - currentacting) && - e > h.same_primary_since) { - dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl; - h.same_primary_since = e; - } - - if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e) - break; - } - - // base case: these floors should be the creation epoch if we didn't - // find any changes. - if (e == h.epoch_created) { - if (!h.same_interval_since) - h.same_interval_since = e; - if (!h.same_up_since) - h.same_up_since = e; - if (!h.same_primary_since) - h.same_primary_since = e; - } - - dout(15) << "project_pg_history end " << h << dendl; - return true; -} - -// ------------------------------------- - -float OSDService::get_full_ratio() -{ - float full_ratio = cct->_conf->osd_failsafe_full_ratio; - if (full_ratio > 1.0) full_ratio /= 100.0; - return full_ratio; -} - -float OSDService::get_nearfull_ratio() -{ - float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio; - if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0; - return nearfull_ratio; -} - -void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat) -{ - Mutex::Locker l(full_status_lock); - enum s_names new_state; - - time_t now = ceph_clock_gettime(NULL); - - // We base ratio on kb_avail rather than kb_used because they can - // differ significantly e.g. on btrfs volumes with a large number of - // chunks reserved for metadata, and for our purposes (avoiding - // completely filling the disk) it's far more important to know how - // much space is available to use than how much we've already used. - float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb); - float nearfull_ratio = get_nearfull_ratio(); - float full_ratio = get_full_ratio(); - cur_ratio = ratio; - - if (full_ratio > 0 && ratio > full_ratio) { - new_state = FULL; - } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) { - new_state = NEAR; - } else { - cur_state = NONE; - return; - } - - if (cur_state != new_state) { - cur_state = new_state; - } else if (now - last_msg < cct->_conf->osd_op_complaint_time) { - return; - } - last_msg = now; - if (cur_state == FULL) - clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full"; - else - clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)"; -} - -bool OSDService::check_failsafe_full() -{ - Mutex::Locker l(full_status_lock); - if (cur_state == FULL) - return true; - return false; -} - -bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio) -{ - Mutex::Locker l(full_status_lock); - double max_ratio; - max_ratio = cct->_conf->osd_backfill_full_ratio; - if (_ratio) - *_ratio = cur_ratio; - if (_max_ratio) - *_max_ratio = max_ratio; - return cur_ratio >= max_ratio; -} - -void OSDService::update_osd_stat(vector& hb_peers) -{ - Mutex::Locker lock(stat_lock); - - // fill in osd stats too - struct statfs stbuf; - osd->store->statfs(&stbuf); + << ", start " << h + << dendl; - uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize; - uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize; - uint64_t avail = stbuf.f_bavail * stbuf.f_bsize; + epoch_t e; + for (e = osdmap->get_epoch(); + e > from; + e--) { + // verify during intermediate epoch (e-1) + OSDMapRef oldmap = service.try_get_map(e-1); + if (!oldmap) { + dout(15) << __func__ << ": found map gap, returning false" << dendl; + return false; + } + assert(oldmap->have_pg_pool(pgid.pool())); - osd_stat.kb = bytes >> 10; - osd_stat.kb_used = used >> 10; - osd_stat.kb_avail = avail >> 10; + int upprimary, actingprimary; + vector up, acting; + oldmap->pg_to_up_acting_osds( + pgid.pgid, + &up, + &upprimary, + &acting, + &actingprimary); - osd->logger->set(l_osd_stat_bytes, bytes); - osd->logger->set(l_osd_stat_bytes_used, used); - osd->logger->set(l_osd_stat_bytes_avail, avail); + // acting set change? + if ((actingprimary != currentactingprimary || + upprimary != currentupprimary || + acting != currentacting || + up != currentup) && e > h.same_interval_since) { + dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e + << " from " << acting << "/" << up + << " " << actingprimary << "/" << upprimary + << " -> " << currentacting << "/" << currentup + << " " << currentactingprimary << "/" << currentupprimary + << dendl; + h.same_interval_since = e; + } + // split? + if (pgid.is_split(oldmap->get_pg_num(pgid.pool()), + osdmap->get_pg_num(pgid.pool()), + 0)) { + h.same_interval_since = e; + } + // up set change? + if ((up != currentup || upprimary != currentupprimary) + && e > h.same_up_since) { + dout(15) << "project_pg_history " << pgid << " up changed in " << e + << " from " << up << " " << upprimary + << " -> " << currentup << " " << currentupprimary << dendl; + h.same_up_since = e; + } - osd_stat.hb_in.swap(hb_peers); - osd_stat.hb_out.clear(); + // primary change? + if (OSDMap::primary_changed( + actingprimary, + acting, + currentactingprimary, + currentacting) && + e > h.same_primary_since) { + dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl; + h.same_primary_since = e; + } - check_nearfull_warning(osd_stat); + if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e) + break; + } - osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist); + // base case: these floors should be the creation epoch if we didn't + // find any changes. + if (e == h.epoch_created) { + if (!h.same_interval_since) + h.same_interval_since = e; + if (!h.same_up_since) + h.same_up_since = e; + if (!h.same_primary_since) + h.same_primary_since = e; + } - dout(20) << "update_osd_stat " << osd_stat << dendl; + dout(15) << "project_pg_history end " << h << dendl; + return true; } + + void OSD::_add_heartbeat_peer(int p) { if (p == whoami) @@ -4065,76 +4679,6 @@ void OSD::send_alive() } } -void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch) -{ - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - m->put(); - release_map(next_map); - return; - } - const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer); - Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get(); - share_map_peer(peer, peer_con, next_map); - osd->cluster_messenger->send_message(m, peer_inst); - release_map(next_map); -} - -ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch) -{ - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - release_map(next_map); - return NULL; - } - ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer)); - release_map(next_map); - return con; -} - -pair OSDService::get_con_osd_hb(int peer, epoch_t from_epoch) -{ - OSDMapRef next_map = get_nextmap_reserved(); - // service map is always newer/newest - assert(from_epoch <= next_map->get_epoch()); - - pair ret; - if (next_map->is_down(peer) || - next_map->get_info(peer).up_from > from_epoch) { - release_map(next_map); - return ret; - } - ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer)); - if (next_map->get_hb_front_addr(peer) != entity_addr_t()) - ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer)); - release_map(next_map); - return ret; -} - -void OSDService::queue_want_pg_temp(pg_t pgid, vector& want) -{ - Mutex::Locker l(pg_temp_lock); - pg_temp_wanted[pgid] = want; -} - -void OSDService::send_pg_temp() -{ - Mutex::Locker l(pg_temp_lock); - if (pg_temp_wanted.empty()) - return; - dout(10) << "send_pg_temp " << pg_temp_wanted << dendl; - MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch()); - m->pg_temp = pg_temp_wanted; - monc->send_mon_message(m); -} void OSD::send_failures() { @@ -4712,210 +5256,58 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe << r; goto out; } - cct->_conf->apply_changes(NULL); - ss << "kicking recovery queue. set osd_recovery_delay_start " - << "to " << cct->_conf->osd_recovery_delay_start; - defer_recovery_until = ceph_clock_now(cct); - defer_recovery_until += cct->_conf->osd_recovery_delay_start; - recovery_wq.wake(); - } - - else if (prefix == "cpu_profiler") { - string arg; - cmd_getval(cct, cmdmap, "arg", arg); - vector argvec; - get_str_vec(arg, argvec); - cpu_profiler_handle_command(argvec, ds); - } - - else if (prefix == "dump_pg_recovery_stats") { - stringstream s; - if (f) { - pg_recovery_stats.dump_formatted(f.get()); - f->flush(ds); - } else { - pg_recovery_stats.dump(s); - ds << "dump pg recovery stats: " << s.str(); - } - } - - else if (prefix == "reset_pg_recovery_stats") { - ss << "reset pg recovery stats"; - pg_recovery_stats.reset(); - } - - else { - ss << "unrecognized command! " << cmd; - r = -EINVAL; - } - - out: - rs = ss.str(); - odata.append(ds); - dout(0) << "do_command r=" << r << " " << rs << dendl; - clog.info() << rs << "\n"; - if (con) { - MCommandReply *reply = new MCommandReply(r, rs); - reply->set_tid(tid); - reply->set_data(odata); - client_messenger->send_message(reply, con); - } - return; -} - - - -// -------------------------------------- -// dispatch - -epoch_t OSDService::get_peer_epoch(int peer) -{ - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p == peer_map_epoch.end()) - return 0; - return p->second; -} - -epoch_t OSDService::note_peer_epoch(int peer, epoch_t e) -{ - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p != peer_map_epoch.end()) { - if (p->second < e) { - dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl; - p->second = e; - } else { - dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl; - } - return p->second; - } else { - dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl; - peer_map_epoch[peer] = e; - return e; - } -} - -void OSDService::forget_peer_epoch(int peer, epoch_t as_of) -{ - Mutex::Locker l(peer_map_epoch_lock); - map::iterator p = peer_map_epoch.find(peer); - if (p != peer_map_epoch.end()) { - if (p->second <= as_of) { - dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of - << " had " << p->second << dendl; - peer_map_epoch.erase(p); - } else { - dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of - << " has " << p->second << " - not forgetting" << dendl; - } + cct->_conf->apply_changes(NULL); + ss << "kicking recovery queue. set osd_recovery_delay_start " + << "to " << cct->_conf->osd_recovery_delay_start; + defer_recovery_until = ceph_clock_now(cct); + defer_recovery_until += cct->_conf->osd_recovery_delay_start; + recovery_wq.wake(); } -} - -bool OSDService::should_share_map(entity_name_t name, Connection *con, - epoch_t epoch, OSDMapRef& osdmap, - const epoch_t *sent_epoch_p) -{ - bool should_send = false; - dout(20) << "should_share_map " - << name << " " << con->get_peer_addr() - << " " << epoch << dendl; - // does client have old map? - if (name.is_client()) { - bool message_sendmap = epoch < osdmap->get_epoch(); - if (message_sendmap && sent_epoch_p) { - dout(20) << "client session last_sent_epoch: " - << *sent_epoch_p - << " versus osdmap epoch " << osdmap->get_epoch() << dendl; - if (*sent_epoch_p < osdmap->get_epoch()) { - should_send = true; - } // else we don't need to send it out again - } + else if (prefix == "cpu_profiler") { + string arg; + cmd_getval(cct, cmdmap, "arg", arg); + vector argvec; + get_str_vec(arg, argvec); + cpu_profiler_handle_command(argvec, ds); } - if (con->get_messenger() == osd->cluster_messenger && - con != osd->cluster_messenger->get_loopback_connection() && - osdmap->is_up(name.num()) && - (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || - osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { - // remember - epoch_t has = MAX(get_peer_epoch(name.num()), epoch); - - // share? - if (has < osdmap->get_epoch()) { - dout(10) << name << " " << con->get_peer_addr() - << " has old map " << epoch << " < " - << osdmap->get_epoch() << dendl; - should_send = true; + else if (prefix == "dump_pg_recovery_stats") { + stringstream s; + if (f) { + pg_recovery_stats.dump_formatted(f.get()); + f->flush(ds); + } else { + pg_recovery_stats.dump(s); + ds << "dump pg recovery stats: " << s.str(); } } - return should_send; -} - -void OSDService::share_map( - entity_name_t name, - Connection *con, - epoch_t epoch, - OSDMapRef& osdmap, - epoch_t *sent_epoch_p) -{ - dout(20) << "share_map " - << name << " " << con->get_peer_addr() - << " " << epoch << dendl; - - if ((!osd->is_active()) && (!osd->is_stopping())) { - /*It is safe not to proceed as OSD is not in healthy state*/ - return; + else if (prefix == "reset_pg_recovery_stats") { + ss << "reset pg recovery stats"; + pg_recovery_stats.reset(); } - bool want_shared = should_share_map(name, con, epoch, - osdmap, sent_epoch_p); + else { + ss << "unrecognized command! " << cmd; + r = -EINVAL; + } - if (want_shared){ - if (name.is_client()) { - dout(10) << name << " has old map " << epoch - << " < " << osdmap->get_epoch() << dendl; - // we know the Session is valid or we wouldn't be sending - if (sent_epoch_p) { - *sent_epoch_p = osdmap->get_epoch(); - } - send_incremental_map(epoch, con, osdmap); - } else if (con->get_messenger() == osd->cluster_messenger && - osdmap->is_up(name.num()) && - (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() || - osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) { - dout(10) << name << " " << con->get_peer_addr() - << " has old map " << epoch << " < " - << osdmap->get_epoch() << dendl; - note_peer_epoch(name.num(), osdmap->get_epoch()); - send_incremental_map(epoch, con, osdmap); - } + out: + rs = ss.str(); + odata.append(ds); + dout(0) << "do_command r=" << r << " " << rs << dendl; + clog.info() << rs << "\n"; + if (con) { + MCommandReply *reply = new MCommandReply(r, rs); + reply->set_tid(tid); + reply->set_data(odata); + client_messenger->send_message(reply, con); } + return; } -void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map) -{ - if (!map) - map = get_osdmap(); - - // send map? - epoch_t pe = get_peer_epoch(peer); - if (pe) { - if (pe < map->get_epoch()) { - send_incremental_map(pe, con, map); - note_peer_epoch(peer, map->get_epoch()); - } else - dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl; - } else { - dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl; - // no idea about peer's epoch. - // ??? send recent ??? - // do nothing. - } -} bool OSD::heartbeat_dispatch(Message *m) @@ -5535,131 +5927,7 @@ void OSD::sched_scrub() dout(20) << "sched_scrub done" << dendl; } -bool OSDService::inc_scrubs_pending() -{ - bool result = false; - - sched_scrub_lock.Lock(); - if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) { - dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1) - << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; - result = true; - ++scrubs_pending; - } else { - dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl; - } - sched_scrub_lock.Unlock(); - - return result; -} - -void OSDService::dec_scrubs_pending() -{ - sched_scrub_lock.Lock(); - dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1) - << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl; - --scrubs_pending; - assert(scrubs_pending >= 0); - sched_scrub_lock.Unlock(); -} - -void OSDService::inc_scrubs_active(bool reserved) -{ - sched_scrub_lock.Lock(); - ++(scrubs_active); - if (reserved) { - --(scrubs_pending); - dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active - << " (max " << cct->_conf->osd_max_scrubs - << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl; - assert(scrubs_pending >= 0); - } else { - dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active - << " (max " << cct->_conf->osd_max_scrubs - << ", pending " << scrubs_pending << ")" << dendl; - } - sched_scrub_lock.Unlock(); -} - -void OSDService::dec_scrubs_active() -{ - sched_scrub_lock.Lock(); - dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1) - << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl; - --scrubs_active; - sched_scrub_lock.Unlock(); -} - -void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch, - epoch_t *_bind_epoch) const -{ - Mutex::Locker l(epoch_lock); - if (_boot_epoch) - *_boot_epoch = boot_epoch; - if (_up_epoch) - *_up_epoch = up_epoch; - if (_bind_epoch) - *_bind_epoch = bind_epoch; -} - -void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch, - const epoch_t *_bind_epoch) -{ - Mutex::Locker l(epoch_lock); - if (_boot_epoch) { - assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch); - boot_epoch = *_boot_epoch; - } - if (_up_epoch) { - assert(*_up_epoch == 0 || *_up_epoch >= up_epoch); - up_epoch = *_up_epoch; - } - if (_bind_epoch) { - assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch); - bind_epoch = *_bind_epoch; - } -} - -bool OSDService::prepare_to_stop() -{ - Mutex::Locker l(is_stopping_lock); - if (state != NOT_STOPPING) - return false; - - OSDMapRef osdmap = get_osdmap(); - if (osdmap && osdmap->is_up(whoami)) { - dout(0) << __func__ << " telling mon we are shutting down" << dendl; - state = PREPARING_TO_STOP; - monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(), - osdmap->get_inst(whoami), - osdmap->get_epoch(), - true // request ack - )); - utime_t now = ceph_clock_now(cct); - utime_t timeout; - timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout); - while ((ceph_clock_now(cct) < timeout) && - (state != STOPPING)) { - is_stopping_cond.WaitUntil(is_stopping_lock, timeout); - } - } - dout(0) << __func__ << " starting shutdown" << dendl; - state = STOPPING; - return true; -} - -void OSDService::got_stop_ack() -{ - Mutex::Locker l(is_stopping_lock); - if (state == PREPARING_TO_STOP) { - dout(0) << __func__ << " starting shutdown" << dendl; - state = STOPPING; - is_stopping_cond.Signal(); - } else { - dout(10) << __func__ << " ignoring msg" << dendl; - } -} - + // ===================================================== // MAP @@ -6391,169 +6659,6 @@ void OSD::activate_map() take_waiters(waiting_for_osdmap); } - -MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to, - OSDSuperblock& sblock) -{ - MOSDMap *m = new MOSDMap(monc->get_fsid()); - m->oldest_map = sblock.oldest_map; - m->newest_map = sblock.newest_map; - - for (epoch_t e = to; e > since; e--) { - bufferlist bl; - if (e > m->oldest_map && get_inc_map_bl(e, bl)) { - m->incremental_maps[e].claim(bl); - } else if (get_map_bl(e, bl)) { - m->maps[e].claim(bl); - break; - } else { - derr << "since " << since << " to " << to - << " oldest " << m->oldest_map << " newest " << m->newest_map - << dendl; - m->put(); - m = NULL; - break; - } - } - return m; -} - -void OSDService::send_map(MOSDMap *m, Connection *con) -{ - Messenger *msgr = client_messenger; - if (entity_name_t::TYPE_OSD == con->get_peer_type()) - msgr = cluster_messenger; - msgr->send_message(m, con); -} - -void OSDService::send_incremental_map(epoch_t since, Connection *con, - OSDMapRef& osdmap) -{ - epoch_t to = osdmap->get_epoch(); - dout(10) << "send_incremental_map " << since << " -> " << to - << " to " << con << " " << con->get_peer_addr() << dendl; - - MOSDMap *m = NULL; - while (!m) { - OSDSuperblock sblock(get_superblock()); - if (since < sblock.oldest_map) { - // just send latest full map - MOSDMap *m = new MOSDMap(monc->get_fsid()); - m->oldest_map = sblock.oldest_map; - m->newest_map = sblock.newest_map; - get_map_bl(to, m->maps[to]); - send_map(m, con); - return; - } - - if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) { - dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs - << ", only sending most recent" << dendl; - since = to - cct->_conf->osd_map_share_max_epochs; - } - - if (to - since > (epoch_t)cct->_conf->osd_map_message_max) - to = since + cct->_conf->osd_map_message_max; - m = build_incremental_map_msg(since, to, sblock); - } - send_map(m, con); -} - -bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl) -{ - bool found = map_bl_cache.lookup(e, &bl); - if (found) - return true; - found = store->read( - coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0; - if (found) - _add_map_bl(e, bl); - return found; -} - -bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl) -{ - Mutex::Locker l(map_cache_lock); - bool found = map_bl_inc_cache.lookup(e, &bl); - if (found) - return true; - found = store->read( - coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0; - if (found) - _add_map_inc_bl(e, bl); - return found; -} - -void OSDService::_add_map_bl(epoch_t e, bufferlist& bl) -{ - dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl; - map_bl_cache.add(e, bl); -} - -void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl) -{ - dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl; - map_bl_inc_cache.add(e, bl); -} - -void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl) -{ - Mutex::Locker l(map_cache_lock); - map_bl_inc_cache.pin(e, bl); -} - -void OSDService::pin_map_bl(epoch_t e, bufferlist &bl) -{ - Mutex::Locker l(map_cache_lock); - map_bl_cache.pin(e, bl); -} - -void OSDService::clear_map_bl_cache_pins(epoch_t e) -{ - Mutex::Locker l(map_cache_lock); - map_bl_inc_cache.clear_pinned(e); - map_bl_cache.clear_pinned(e); -} - -OSDMapRef OSDService::_add_map(OSDMap *o) -{ - epoch_t e = o->get_epoch(); - - if (cct->_conf->osd_map_dedup) { - // Dedup against an existing map at a nearby epoch - OSDMapRef for_dedup = map_cache.lower_bound(e); - if (for_dedup) { - OSDMap::dedup(for_dedup.get(), o); - } - } - OSDMapRef l = map_cache.add(e, o); - return l; -} - -OSDMapRef OSDService::try_get_map(epoch_t epoch) -{ - Mutex::Locker l(map_cache_lock); - OSDMapRef retval = map_cache.lookup(epoch); - if (retval) { - dout(30) << "get_map " << epoch << " -cached" << dendl; - return retval; - } - - OSDMap *map = new OSDMap; - if (epoch > 0) { - dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl; - bufferlist bl; - if (!_get_map_bl(epoch, bl)) { - delete map; - return OSDMapRef(); - } - map->decode(bl); - } else { - dout(20) << "get_map " << epoch << " - return initial " << map << dendl; - } - return _add_map(map); -} - bool OSD::require_mon_peer(Message *m) { if (!m->get_connection()->peer_is_mon()) { @@ -7587,17 +7692,6 @@ void OSD::check_replay_queue() } } - -bool OSDService::queue_for_recovery(PG *pg) -{ - bool b = recovery_wq.queue(pg); - if (b) - dout(10) << "queue_for_recovery queued " << *pg << dendl; - else - dout(10) << "queue_for_recovery already queued " << *pg << dendl; - return b; -} - bool OSD::_recover_now() { if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) { @@ -7732,76 +7826,6 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue) // ========================================================= // OPS -void OSDService::reply_op_error(OpRequestRef op, int err) -{ - reply_op_error(op, err, eversion_t(), 0); -} - -void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, - version_t uv) -{ - MOSDOp *m = static_cast(op->get_req()); - assert(m->get_header().type == CEPH_MSG_OSD_OP); - int flags; - flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); - - MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags, - true); - reply->set_reply_versions(v, uv); - m->get_connection()->get_messenger()->send_message(reply, m->get_connection()); -} - -void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) -{ - MOSDOp *m = static_cast(op->get_req()); - assert(m->get_header().type == CEPH_MSG_OSD_OP); - - assert(m->get_map_epoch() >= pg->info.history.same_primary_since); - - if (pg->is_ec_pg()) { - /** - * OSD recomputes op target based on current OSDMap. With an EC pg, we - * can get this result: - * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping - * [CRUSH_ITEM_NONE, 2, 3]/3 - * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping - * [3, 2, 3]/3 - * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary - * -- misdirected op - * 4) client resends and this time PG 3.9s0 having caught up to 513 gets - * it and fulfils it - * - * We can't compute the op target based on the sending map epoch due to - * splitting. The simplest thing is to detect such cases here and drop - * them without an error (the client will resend anyway). - */ - OSDMapRef opmap = try_get_map(m->get_map_epoch()); - if (!opmap) { - dout(7) << __func__ << ": " << *pg << " no longer have map for " - << m->get_map_epoch() << ", dropping" << dendl; - return; - } - pg_t _pgid = m->get_pg(); - spg_t pgid; - if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) - _pgid = opmap->raw_pg_to_pg(_pgid); - if (opmap->get_primary_shard(_pgid, &pgid) && - pgid.shard != pg->info.pgid.shard) { - dout(7) << __func__ << ": " << *pg << " primary changed since " - << m->get_map_epoch() << ", dropping" << dendl; - return; - } - } - - dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl; - clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid() - << " pg " << m->get_pg() - << " to osd." << whoami - << " not " << pg->acting - << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n"; - reply_op_error(op, -ENXIO); -} - class C_SendMap : public GenContext { OSD *osd; entity_name_t name; @@ -8181,12 +8205,6 @@ void OSD::ShardedOpWQ::_enqueue_front(pair item) { } - -void OSDService::dequeue_pg(PG *pg, list *dequeued) -{ - osd->op_shardedwq.dequeue(pg, dequeued); -} - /* * NOTE: dequeue called in worker thread, with pg lock */ @@ -8234,11 +8252,6 @@ void OSD::dequeue_op( } -void OSDService::queue_for_peering(PG *pg) -{ - peering_wq.queue(pg); -} - struct C_CompleteSplits : public Context { OSD *osd; set > pgs;