agent_thread.join();
}
+// -------------------------------------
+
+float OSDService::get_full_ratio()
+{
+ float full_ratio = cct->_conf->osd_failsafe_full_ratio;
+ if (full_ratio > 1.0) full_ratio /= 100.0;
+ return full_ratio;
+}
+
+float OSDService::get_nearfull_ratio()
+{
+ float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
+ if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
+ return nearfull_ratio;
+}
+
+void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
+{
+ Mutex::Locker l(full_status_lock);
+ enum s_names new_state;
+
+ time_t now = ceph_clock_gettime(NULL);
+
+ // We base ratio on kb_avail rather than kb_used because they can
+ // differ significantly e.g. on btrfs volumes with a large number of
+ // chunks reserved for metadata, and for our purposes (avoiding
+ // completely filling the disk) it's far more important to know how
+ // much space is available to use than how much we've already used.
+ float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
+ float nearfull_ratio = get_nearfull_ratio();
+ float full_ratio = get_full_ratio();
+ cur_ratio = ratio;
+
+ if (full_ratio > 0 && ratio > full_ratio) {
+ new_state = FULL;
+ } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
+ new_state = NEAR;
+ } else {
+ cur_state = NONE;
+ return;
+ }
+
+ if (cur_state != new_state) {
+ cur_state = new_state;
+ } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
+ return;
+ }
+ last_msg = now;
+ if (cur_state == FULL)
+ clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
+ else
+ clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
+}
+
+bool OSDService::check_failsafe_full()
+{
+ Mutex::Locker l(full_status_lock);
+ if (cur_state == FULL)
+ return true;
+ return false;
+}
+
+bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
+{
+ Mutex::Locker l(full_status_lock);
+ double max_ratio;
+ max_ratio = cct->_conf->osd_backfill_full_ratio;
+ if (_ratio)
+ *_ratio = cur_ratio;
+ if (_max_ratio)
+ *_max_ratio = max_ratio;
+ return cur_ratio >= max_ratio;
+}
+
+void OSDService::update_osd_stat(vector<int>& hb_peers)
+{
+ Mutex::Locker lock(stat_lock);
+
+ // fill in osd stats too
+ struct statfs stbuf;
+ osd->store->statfs(&stbuf);
+
+ uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
+ uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
+ uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+
+ osd_stat.kb = bytes >> 10;
+ osd_stat.kb_used = used >> 10;
+ osd_stat.kb_avail = avail >> 10;
+
+ osd->logger->set(l_osd_stat_bytes, bytes);
+ osd->logger->set(l_osd_stat_bytes_used, used);
+ osd->logger->set(l_osd_stat_bytes_avail, avail);
+
+ osd_stat.hb_in.swap(hb_peers);
+ osd_stat.hb_out.clear();
+
+ check_nearfull_warning(osd_stat);
+
+ osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+
+ dout(20) << "update_osd_stat " << osd_stat << dendl;
+}
+
+void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+{
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ m->put();
+ release_map(next_map);
+ return;
+ }
+ const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
+ Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
+ share_map_peer(peer, peer_con, next_map);
+ osd->cluster_messenger->send_message(m, peer_inst);
+ release_map(next_map);
+}
+
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+{
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ release_map(next_map);
+ return NULL;
+ }
+ ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
+ release_map(next_map);
+ return con;
+}
+
+pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+{
+ OSDMapRef next_map = get_nextmap_reserved();
+ // service map is always newer/newest
+ assert(from_epoch <= next_map->get_epoch());
+
+ pair<ConnectionRef,ConnectionRef> ret;
+ if (next_map->is_down(peer) ||
+ next_map->get_info(peer).up_from > from_epoch) {
+ release_map(next_map);
+ return ret;
+ }
+ ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
+ if (next_map->get_hb_front_addr(peer) != entity_addr_t())
+ ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
+ release_map(next_map);
+ return ret;
+}
+
+
+void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
+{
+ Mutex::Locker l(pg_temp_lock);
+ pg_temp_wanted[pgid] = want;
+}
+
+void OSDService::send_pg_temp()
+{
+ Mutex::Locker l(pg_temp_lock);
+ if (pg_temp_wanted.empty())
+ return;
+ dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
+ MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
+ m->pg_temp = pg_temp_wanted;
+ monc->send_mon_message(m);
+}
+
+
+// --------------------------------------
+// dispatch
+
+epoch_t OSDService::get_peer_epoch(int peer)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p == peer_map_epoch.end())
+ return 0;
+ return p->second;
+}
+
+epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second < e) {
+ dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
+ p->second = e;
+ } else {
+ dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
+ }
+ return p->second;
+ } else {
+ dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
+ peer_map_epoch[peer] = e;
+ return e;
+ }
+}
+
+void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
+{
+ Mutex::Locker l(peer_map_epoch_lock);
+ map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
+ if (p != peer_map_epoch.end()) {
+ if (p->second <= as_of) {
+ dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+ << " had " << p->second << dendl;
+ peer_map_epoch.erase(p);
+ } else {
+ dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
+ << " has " << p->second << " - not forgetting" << dendl;
+ }
+ }
+}
+
+bool OSDService::should_share_map(entity_name_t name, Connection *con,
+ epoch_t epoch, OSDMapRef& osdmap,
+ const epoch_t *sent_epoch_p)
+{
+ bool should_send = false;
+ dout(20) << "should_share_map "
+ << name << " " << con->get_peer_addr()
+ << " " << epoch << dendl;
+
+ // does client have old map?
+ if (name.is_client()) {
+ bool message_sendmap = epoch < osdmap->get_epoch();
+ if (message_sendmap && sent_epoch_p) {
+ dout(20) << "client session last_sent_epoch: "
+ << *sent_epoch_p
+ << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
+ if (*sent_epoch_p < osdmap->get_epoch()) {
+ should_send = true;
+ } // else we don't need to send it out again
+ }
+ }
+
+ if (con->get_messenger() == osd->cluster_messenger &&
+ con != osd->cluster_messenger->get_loopback_connection() &&
+ osdmap->is_up(name.num()) &&
+ (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+ // remember
+ epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
+
+ // share?
+ if (has < osdmap->get_epoch()) {
+ dout(10) << name << " " << con->get_peer_addr()
+ << " has old map " << epoch << " < "
+ << osdmap->get_epoch() << dendl;
+ should_send = true;
+ }
+ }
+
+ return should_send;
+}
+
+void OSDService::share_map(
+ entity_name_t name,
+ Connection *con,
+ epoch_t epoch,
+ OSDMapRef& osdmap,
+ epoch_t *sent_epoch_p)
+{
+ dout(20) << "share_map "
+ << name << " " << con->get_peer_addr()
+ << " " << epoch << dendl;
+
+ if ((!osd->is_active()) && (!osd->is_stopping())) {
+ /*It is safe not to proceed as OSD is not in healthy state*/
+ return;
+ }
+
+ bool want_shared = should_share_map(name, con, epoch,
+ osdmap, sent_epoch_p);
+
+ if (want_shared){
+ if (name.is_client()) {
+ dout(10) << name << " has old map " << epoch
+ << " < " << osdmap->get_epoch() << dendl;
+ // we know the Session is valid or we wouldn't be sending
+ if (sent_epoch_p) {
+ *sent_epoch_p = osdmap->get_epoch();
+ }
+ send_incremental_map(epoch, con, osdmap);
+ } else if (con->get_messenger() == osd->cluster_messenger &&
+ osdmap->is_up(name.num()) &&
+ (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
+ osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
+ dout(10) << name << " " << con->get_peer_addr()
+ << " has old map " << epoch << " < "
+ << osdmap->get_epoch() << dendl;
+ note_peer_epoch(name.num(), osdmap->get_epoch());
+ send_incremental_map(epoch, con, osdmap);
+ }
+ }
+}
+
+
+void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
+{
+ if (!map)
+ map = get_osdmap();
+
+ // send map?
+ epoch_t pe = get_peer_epoch(peer);
+ if (pe) {
+ if (pe < map->get_epoch()) {
+ send_incremental_map(pe, con, map);
+ note_peer_epoch(peer, map->get_epoch());
+ } else
+ dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
+ } else {
+ dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
+ // no idea about peer's epoch.
+ // ??? send recent ???
+ // do nothing.
+ }
+}
+
+
+bool OSDService::inc_scrubs_pending()
+{
+ bool result = false;
+
+ sched_scrub_lock.Lock();
+ if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
+ dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ result = true;
+ ++scrubs_pending;
+ } else {
+ dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
+ }
+ sched_scrub_lock.Unlock();
+
+ return result;
+}
+
+void OSDService::dec_scrubs_pending()
+{
+ sched_scrub_lock.Lock();
+ dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
+ --scrubs_pending;
+ assert(scrubs_pending >= 0);
+ sched_scrub_lock.Unlock();
+}
+
+void OSDService::inc_scrubs_active(bool reserved)
+{
+ sched_scrub_lock.Lock();
+ ++(scrubs_active);
+ if (reserved) {
+ --(scrubs_pending);
+ dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+ << " (max " << cct->_conf->osd_max_scrubs
+ << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
+ assert(scrubs_pending >= 0);
+ } else {
+ dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
+ << " (max " << cct->_conf->osd_max_scrubs
+ << ", pending " << scrubs_pending << ")" << dendl;
+ }
+ sched_scrub_lock.Unlock();
+}
+
+void OSDService::dec_scrubs_active()
+{
+ sched_scrub_lock.Lock();
+ dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
+ << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
+ --scrubs_active;
+ sched_scrub_lock.Unlock();
+}
+
+void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
+ epoch_t *_bind_epoch) const
+{
+ Mutex::Locker l(epoch_lock);
+ if (_boot_epoch)
+ *_boot_epoch = boot_epoch;
+ if (_up_epoch)
+ *_up_epoch = up_epoch;
+ if (_bind_epoch)
+ *_bind_epoch = bind_epoch;
+}
+
+void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
+ const epoch_t *_bind_epoch)
+{
+ Mutex::Locker l(epoch_lock);
+ if (_boot_epoch) {
+ assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
+ boot_epoch = *_boot_epoch;
+ }
+ if (_up_epoch) {
+ assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
+ up_epoch = *_up_epoch;
+ }
+ if (_bind_epoch) {
+ assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
+ bind_epoch = *_bind_epoch;
+ }
+}
+
+bool OSDService::prepare_to_stop()
+{
+ Mutex::Locker l(is_stopping_lock);
+ if (state != NOT_STOPPING)
+ return false;
+
+ OSDMapRef osdmap = get_osdmap();
+ if (osdmap && osdmap->is_up(whoami)) {
+ dout(0) << __func__ << " telling mon we are shutting down" << dendl;
+ state = PREPARING_TO_STOP;
+ monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
+ osdmap->get_inst(whoami),
+ osdmap->get_epoch(),
+ true // request ack
+ ));
+ utime_t now = ceph_clock_now(cct);
+ utime_t timeout;
+ timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
+ while ((ceph_clock_now(cct) < timeout) &&
+ (state != STOPPING)) {
+ is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
+ }
+ }
+ dout(0) << __func__ << " starting shutdown" << dendl;
+ state = STOPPING;
+ return true;
+}
+
+void OSDService::got_stop_ack()
+{
+ Mutex::Locker l(is_stopping_lock);
+ if (state == PREPARING_TO_STOP) {
+ dout(0) << __func__ << " starting shutdown" << dendl;
+ state = STOPPING;
+ is_stopping_cond.Signal();
+ } else {
+ dout(10) << __func__ << " ignoring msg" << dendl;
+ }
+}
+
+
+MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+ OSDSuperblock& sblock)
+{
+ MOSDMap *m = new MOSDMap(monc->get_fsid());
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
+
+ for (epoch_t e = to; e > since; e--) {
+ bufferlist bl;
+ if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
+ m->incremental_maps[e].claim(bl);
+ } else if (get_map_bl(e, bl)) {
+ m->maps[e].claim(bl);
+ break;
+ } else {
+ derr << "since " << since << " to " << to
+ << " oldest " << m->oldest_map << " newest " << m->newest_map
+ << dendl;
+ m->put();
+ m = NULL;
+ break;
+ }
+ }
+ return m;
+}
+
+void OSDService::send_map(MOSDMap *m, Connection *con)
+{
+ Messenger *msgr = client_messenger;
+ if (entity_name_t::TYPE_OSD == con->get_peer_type())
+ msgr = cluster_messenger;
+ msgr->send_message(m, con);
+}
+
+void OSDService::send_incremental_map(epoch_t since, Connection *con,
+ OSDMapRef& osdmap)
+{
+ epoch_t to = osdmap->get_epoch();
+ dout(10) << "send_incremental_map " << since << " -> " << to
+ << " to " << con << " " << con->get_peer_addr() << dendl;
+
+ MOSDMap *m = NULL;
+ while (!m) {
+ OSDSuperblock sblock(get_superblock());
+ if (since < sblock.oldest_map) {
+ // just send latest full map
+ MOSDMap *m = new MOSDMap(monc->get_fsid());
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
+ get_map_bl(to, m->maps[to]);
+ send_map(m, con);
+ return;
+ }
+
+ if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
+ dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
+ << ", only sending most recent" << dendl;
+ since = to - cct->_conf->osd_map_share_max_epochs;
+ }
+
+ if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
+ to = since + cct->_conf->osd_map_message_max;
+ m = build_incremental_map_msg(since, to, sblock);
+ }
+ send_map(m, con);
+}
+
+bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
+{
+ bool found = map_bl_cache.lookup(e, &bl);
+ if (found)
+ return true;
+ found = store->read(
+ coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ if (found)
+ _add_map_bl(e, bl);
+ return found;
+}
+
+bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
+{
+ Mutex::Locker l(map_cache_lock);
+ bool found = map_bl_inc_cache.lookup(e, &bl);
+ if (found)
+ return true;
+ found = store->read(
+ coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
+ if (found)
+ _add_map_inc_bl(e, bl);
+ return found;
+}
+
+void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
+{
+ dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
+ map_bl_cache.add(e, bl);
+}
+
+void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
+{
+ dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
+ map_bl_inc_cache.add(e, bl);
+}
+
+void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
+{
+ Mutex::Locker l(map_cache_lock);
+ map_bl_inc_cache.pin(e, bl);
+}
+
+void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
+{
+ Mutex::Locker l(map_cache_lock);
+ map_bl_cache.pin(e, bl);
+}
+
+void OSDService::clear_map_bl_cache_pins(epoch_t e)
+{
+ Mutex::Locker l(map_cache_lock);
+ map_bl_inc_cache.clear_pinned(e);
+ map_bl_cache.clear_pinned(e);
+}
+
+OSDMapRef OSDService::_add_map(OSDMap *o)
+{
+ epoch_t e = o->get_epoch();
+
+ if (cct->_conf->osd_map_dedup) {
+ // Dedup against an existing map at a nearby epoch
+ OSDMapRef for_dedup = map_cache.lower_bound(e);
+ if (for_dedup) {
+ OSDMap::dedup(for_dedup.get(), o);
+ }
+ }
+ OSDMapRef l = map_cache.add(e, o);
+ return l;
+}
+
+OSDMapRef OSDService::try_get_map(epoch_t epoch)
+{
+ Mutex::Locker l(map_cache_lock);
+ OSDMapRef retval = map_cache.lookup(epoch);
+ if (retval) {
+ dout(30) << "get_map " << epoch << " -cached" << dendl;
+ return retval;
+ }
+
+ OSDMap *map = new OSDMap;
+ if (epoch > 0) {
+ dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
+ bufferlist bl;
+ if (!_get_map_bl(epoch, bl)) {
+ delete map;
+ return OSDMapRef();
+ }
+ map->decode(bl);
+ } else {
+ dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
+ }
+ return _add_map(map);
+}
+
+bool OSDService::queue_for_recovery(PG *pg)
+{
+ bool b = recovery_wq.queue(pg);
+ if (b)
+ dout(10) << "queue_for_recovery queued " << *pg << dendl;
+ else
+ dout(10) << "queue_for_recovery already queued " << *pg << dendl;
+ return b;
+}
+
+
+// ops
+
+
+void OSDService::reply_op_error(OpRequestRef op, int err)
+{
+ reply_op_error(op, err, eversion_t(), 0);
+}
+
+void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
+ version_t uv)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+ int flags;
+ flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
+
+ MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
+ true);
+ reply->set_reply_versions(v, uv);
+ m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
+}
+
+void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
+{
+ MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+ assert(m->get_header().type == CEPH_MSG_OSD_OP);
+
+ assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
+
+ if (pg->is_ec_pg()) {
+ /**
+ * OSD recomputes op target based on current OSDMap. With an EC pg, we
+ * can get this result:
+ * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
+ * [CRUSH_ITEM_NONE, 2, 3]/3
+ * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
+ * [3, 2, 3]/3
+ * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
+ * -- misdirected op
+ * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
+ * it and fulfils it
+ *
+ * We can't compute the op target based on the sending map epoch due to
+ * splitting. The simplest thing is to detect such cases here and drop
+ * them without an error (the client will resend anyway).
+ */
+ OSDMapRef opmap = try_get_map(m->get_map_epoch());
+ if (!opmap) {
+ dout(7) << __func__ << ": " << *pg << " no longer have map for "
+ << m->get_map_epoch() << ", dropping" << dendl;
+ return;
+ }
+ pg_t _pgid = m->get_pg();
+ spg_t pgid;
+ if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
+ _pgid = opmap->raw_pg_to_pg(_pgid);
+ if (opmap->get_primary_shard(_pgid, &pgid) &&
+ pgid.shard != pg->info.pgid.shard) {
+ dout(7) << __func__ << ": " << *pg << " primary changed since "
+ << m->get_map_epoch() << ", dropping" << dendl;
+ return;
+ }
+ }
+
+ dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
+ clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+ << " pg " << m->get_pg()
+ << " to osd." << whoami
+ << " not " << pg->acting
+ << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
+ reply_op_error(op, -ENXIO);
+}
+
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+ osd->op_shardedwq.dequeue(pg, dequeued);
+}
+
+void OSDService::queue_for_peering(PG *pg)
+{
+ peering_wq.queue(pg);
+}
+
+
+// ====================================================================
+// OSD
#undef dout_prefix
#define dout_prefix *_dout
{
dout(15) << "project_pg_history " << pgid
<< " from " << from << " to " << osdmap->get_epoch()
- << ", start " << h
- << dendl;
-
- epoch_t e;
- for (e = osdmap->get_epoch();
- e > from;
- e--) {
- // verify during intermediate epoch (e-1)
- OSDMapRef oldmap = service.try_get_map(e-1);
- if (!oldmap) {
- dout(15) << __func__ << ": found map gap, returning false" << dendl;
- return false;
- }
- assert(oldmap->have_pg_pool(pgid.pool()));
-
- int upprimary, actingprimary;
- vector<int> up, acting;
- oldmap->pg_to_up_acting_osds(
- pgid.pgid,
- &up,
- &upprimary,
- &acting,
- &actingprimary);
-
- // acting set change?
- if ((actingprimary != currentactingprimary ||
- upprimary != currentupprimary ||
- acting != currentacting ||
- up != currentup) && e > h.same_interval_since) {
- dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e
- << " from " << acting << "/" << up
- << " " << actingprimary << "/" << upprimary
- << " -> " << currentacting << "/" << currentup
- << " " << currentactingprimary << "/" << currentupprimary
- << dendl;
- h.same_interval_since = e;
- }
- // split?
- if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
- osdmap->get_pg_num(pgid.pool()),
- 0)) {
- h.same_interval_since = e;
- }
- // up set change?
- if ((up != currentup || upprimary != currentupprimary)
- && e > h.same_up_since) {
- dout(15) << "project_pg_history " << pgid << " up changed in " << e
- << " from " << up << " " << upprimary
- << " -> " << currentup << " " << currentupprimary << dendl;
- h.same_up_since = e;
- }
-
- // primary change?
- if (OSDMap::primary_changed(
- actingprimary,
- acting,
- currentactingprimary,
- currentacting) &&
- e > h.same_primary_since) {
- dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl;
- h.same_primary_since = e;
- }
-
- if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e)
- break;
- }
-
- // base case: these floors should be the creation epoch if we didn't
- // find any changes.
- if (e == h.epoch_created) {
- if (!h.same_interval_since)
- h.same_interval_since = e;
- if (!h.same_up_since)
- h.same_up_since = e;
- if (!h.same_primary_since)
- h.same_primary_since = e;
- }
-
- dout(15) << "project_pg_history end " << h << dendl;
- return true;
-}
-
-// -------------------------------------
-
-float OSDService::get_full_ratio()
-{
- float full_ratio = cct->_conf->osd_failsafe_full_ratio;
- if (full_ratio > 1.0) full_ratio /= 100.0;
- return full_ratio;
-}
-
-float OSDService::get_nearfull_ratio()
-{
- float nearfull_ratio = cct->_conf->osd_failsafe_nearfull_ratio;
- if (nearfull_ratio > 1.0) nearfull_ratio /= 100.0;
- return nearfull_ratio;
-}
-
-void OSDService::check_nearfull_warning(const osd_stat_t &osd_stat)
-{
- Mutex::Locker l(full_status_lock);
- enum s_names new_state;
-
- time_t now = ceph_clock_gettime(NULL);
-
- // We base ratio on kb_avail rather than kb_used because they can
- // differ significantly e.g. on btrfs volumes with a large number of
- // chunks reserved for metadata, and for our purposes (avoiding
- // completely filling the disk) it's far more important to know how
- // much space is available to use than how much we've already used.
- float ratio = ((float)(osd_stat.kb - osd_stat.kb_avail)) / ((float)osd_stat.kb);
- float nearfull_ratio = get_nearfull_ratio();
- float full_ratio = get_full_ratio();
- cur_ratio = ratio;
-
- if (full_ratio > 0 && ratio > full_ratio) {
- new_state = FULL;
- } else if (nearfull_ratio > 0 && ratio > nearfull_ratio) {
- new_state = NEAR;
- } else {
- cur_state = NONE;
- return;
- }
-
- if (cur_state != new_state) {
- cur_state = new_state;
- } else if (now - last_msg < cct->_conf->osd_op_complaint_time) {
- return;
- }
- last_msg = now;
- if (cur_state == FULL)
- clog.error() << "OSD full dropping all updates " << (int)(ratio * 100) << "% full";
- else
- clog.warn() << "OSD near full (" << (int)(ratio * 100) << "%)";
-}
-
-bool OSDService::check_failsafe_full()
-{
- Mutex::Locker l(full_status_lock);
- if (cur_state == FULL)
- return true;
- return false;
-}
-
-bool OSDService::too_full_for_backfill(double *_ratio, double *_max_ratio)
-{
- Mutex::Locker l(full_status_lock);
- double max_ratio;
- max_ratio = cct->_conf->osd_backfill_full_ratio;
- if (_ratio)
- *_ratio = cur_ratio;
- if (_max_ratio)
- *_max_ratio = max_ratio;
- return cur_ratio >= max_ratio;
-}
-
-void OSDService::update_osd_stat(vector<int>& hb_peers)
-{
- Mutex::Locker lock(stat_lock);
-
- // fill in osd stats too
- struct statfs stbuf;
- osd->store->statfs(&stbuf);
+ << ", start " << h
+ << dendl;
- uint64_t bytes = stbuf.f_blocks * stbuf.f_bsize;
- uint64_t used = (stbuf.f_blocks - stbuf.f_bfree) * stbuf.f_bsize;
- uint64_t avail = stbuf.f_bavail * stbuf.f_bsize;
+ epoch_t e;
+ for (e = osdmap->get_epoch();
+ e > from;
+ e--) {
+ // verify during intermediate epoch (e-1)
+ OSDMapRef oldmap = service.try_get_map(e-1);
+ if (!oldmap) {
+ dout(15) << __func__ << ": found map gap, returning false" << dendl;
+ return false;
+ }
+ assert(oldmap->have_pg_pool(pgid.pool()));
- osd_stat.kb = bytes >> 10;
- osd_stat.kb_used = used >> 10;
- osd_stat.kb_avail = avail >> 10;
+ int upprimary, actingprimary;
+ vector<int> up, acting;
+ oldmap->pg_to_up_acting_osds(
+ pgid.pgid,
+ &up,
+ &upprimary,
+ &acting,
+ &actingprimary);
- osd->logger->set(l_osd_stat_bytes, bytes);
- osd->logger->set(l_osd_stat_bytes_used, used);
- osd->logger->set(l_osd_stat_bytes_avail, avail);
+ // acting set change?
+ if ((actingprimary != currentactingprimary ||
+ upprimary != currentupprimary ||
+ acting != currentacting ||
+ up != currentup) && e > h.same_interval_since) {
+ dout(15) << "project_pg_history " << pgid << " acting|up changed in " << e
+ << " from " << acting << "/" << up
+ << " " << actingprimary << "/" << upprimary
+ << " -> " << currentacting << "/" << currentup
+ << " " << currentactingprimary << "/" << currentupprimary
+ << dendl;
+ h.same_interval_since = e;
+ }
+ // split?
+ if (pgid.is_split(oldmap->get_pg_num(pgid.pool()),
+ osdmap->get_pg_num(pgid.pool()),
+ 0)) {
+ h.same_interval_since = e;
+ }
+ // up set change?
+ if ((up != currentup || upprimary != currentupprimary)
+ && e > h.same_up_since) {
+ dout(15) << "project_pg_history " << pgid << " up changed in " << e
+ << " from " << up << " " << upprimary
+ << " -> " << currentup << " " << currentupprimary << dendl;
+ h.same_up_since = e;
+ }
- osd_stat.hb_in.swap(hb_peers);
- osd_stat.hb_out.clear();
+ // primary change?
+ if (OSDMap::primary_changed(
+ actingprimary,
+ acting,
+ currentactingprimary,
+ currentacting) &&
+ e > h.same_primary_since) {
+ dout(15) << "project_pg_history " << pgid << " primary changed in " << e << dendl;
+ h.same_primary_since = e;
+ }
- check_nearfull_warning(osd_stat);
+ if (h.same_interval_since >= e && h.same_up_since >= e && h.same_primary_since >= e)
+ break;
+ }
- osd->op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist);
+ // base case: these floors should be the creation epoch if we didn't
+ // find any changes.
+ if (e == h.epoch_created) {
+ if (!h.same_interval_since)
+ h.same_interval_since = e;
+ if (!h.same_up_since)
+ h.same_up_since = e;
+ if (!h.same_primary_since)
+ h.same_primary_since = e;
+ }
- dout(20) << "update_osd_stat " << osd_stat << dendl;
+ dout(15) << "project_pg_history end " << h << dendl;
+ return true;
}
+
+
void OSD::_add_heartbeat_peer(int p)
{
if (p == whoami)
}
}
-void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
-{
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- m->put();
- release_map(next_map);
- return;
- }
- const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
- Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
- share_map_peer(peer, peer_con, next_map);
- osd->cluster_messenger->send_message(m, peer_inst);
- release_map(next_map);
-}
-
-ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
-{
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- release_map(next_map);
- return NULL;
- }
- ConnectionRef con = osd->cluster_messenger->get_connection(next_map->get_cluster_inst(peer));
- release_map(next_map);
- return con;
-}
-
-pair<ConnectionRef,ConnectionRef> OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
-{
- OSDMapRef next_map = get_nextmap_reserved();
- // service map is always newer/newest
- assert(from_epoch <= next_map->get_epoch());
-
- pair<ConnectionRef,ConnectionRef> ret;
- if (next_map->is_down(peer) ||
- next_map->get_info(peer).up_from > from_epoch) {
- release_map(next_map);
- return ret;
- }
- ret.first = osd->hbclient_messenger->get_connection(next_map->get_hb_back_inst(peer));
- if (next_map->get_hb_front_addr(peer) != entity_addr_t())
- ret.second = osd->hbclient_messenger->get_connection(next_map->get_hb_front_inst(peer));
- release_map(next_map);
- return ret;
-}
-
-void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
-{
- Mutex::Locker l(pg_temp_lock);
- pg_temp_wanted[pgid] = want;
-}
-
-void OSDService::send_pg_temp()
-{
- Mutex::Locker l(pg_temp_lock);
- if (pg_temp_wanted.empty())
- return;
- dout(10) << "send_pg_temp " << pg_temp_wanted << dendl;
- MOSDPGTemp *m = new MOSDPGTemp(osdmap->get_epoch());
- m->pg_temp = pg_temp_wanted;
- monc->send_mon_message(m);
-}
void OSD::send_failures()
{
<< r;
goto out;
}
- cct->_conf->apply_changes(NULL);
- ss << "kicking recovery queue. set osd_recovery_delay_start "
- << "to " << cct->_conf->osd_recovery_delay_start;
- defer_recovery_until = ceph_clock_now(cct);
- defer_recovery_until += cct->_conf->osd_recovery_delay_start;
- recovery_wq.wake();
- }
-
- else if (prefix == "cpu_profiler") {
- string arg;
- cmd_getval(cct, cmdmap, "arg", arg);
- vector<string> argvec;
- get_str_vec(arg, argvec);
- cpu_profiler_handle_command(argvec, ds);
- }
-
- else if (prefix == "dump_pg_recovery_stats") {
- stringstream s;
- if (f) {
- pg_recovery_stats.dump_formatted(f.get());
- f->flush(ds);
- } else {
- pg_recovery_stats.dump(s);
- ds << "dump pg recovery stats: " << s.str();
- }
- }
-
- else if (prefix == "reset_pg_recovery_stats") {
- ss << "reset pg recovery stats";
- pg_recovery_stats.reset();
- }
-
- else {
- ss << "unrecognized command! " << cmd;
- r = -EINVAL;
- }
-
- out:
- rs = ss.str();
- odata.append(ds);
- dout(0) << "do_command r=" << r << " " << rs << dendl;
- clog.info() << rs << "\n";
- if (con) {
- MCommandReply *reply = new MCommandReply(r, rs);
- reply->set_tid(tid);
- reply->set_data(odata);
- client_messenger->send_message(reply, con);
- }
- return;
-}
-
-
-
-// --------------------------------------
-// dispatch
-
-epoch_t OSDService::get_peer_epoch(int peer)
-{
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p == peer_map_epoch.end())
- return 0;
- return p->second;
-}
-
-epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
-{
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second < e) {
- dout(10) << "note_peer_epoch osd." << peer << " has " << e << dendl;
- p->second = e;
- } else {
- dout(30) << "note_peer_epoch osd." << peer << " has " << p->second << " >= " << e << dendl;
- }
- return p->second;
- } else {
- dout(10) << "note_peer_epoch osd." << peer << " now has " << e << dendl;
- peer_map_epoch[peer] = e;
- return e;
- }
-}
-
-void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
-{
- Mutex::Locker l(peer_map_epoch_lock);
- map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
- if (p != peer_map_epoch.end()) {
- if (p->second <= as_of) {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " had " << p->second << dendl;
- peer_map_epoch.erase(p);
- } else {
- dout(10) << "forget_peer_epoch osd." << peer << " as_of " << as_of
- << " has " << p->second << " - not forgetting" << dendl;
- }
+ cct->_conf->apply_changes(NULL);
+ ss << "kicking recovery queue. set osd_recovery_delay_start "
+ << "to " << cct->_conf->osd_recovery_delay_start;
+ defer_recovery_until = ceph_clock_now(cct);
+ defer_recovery_until += cct->_conf->osd_recovery_delay_start;
+ recovery_wq.wake();
}
-}
-
-bool OSDService::should_share_map(entity_name_t name, Connection *con,
- epoch_t epoch, OSDMapRef& osdmap,
- const epoch_t *sent_epoch_p)
-{
- bool should_send = false;
- dout(20) << "should_share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
- // does client have old map?
- if (name.is_client()) {
- bool message_sendmap = epoch < osdmap->get_epoch();
- if (message_sendmap && sent_epoch_p) {
- dout(20) << "client session last_sent_epoch: "
- << *sent_epoch_p
- << " versus osdmap epoch " << osdmap->get_epoch() << dendl;
- if (*sent_epoch_p < osdmap->get_epoch()) {
- should_send = true;
- } // else we don't need to send it out again
- }
+ else if (prefix == "cpu_profiler") {
+ string arg;
+ cmd_getval(cct, cmdmap, "arg", arg);
+ vector<string> argvec;
+ get_str_vec(arg, argvec);
+ cpu_profiler_handle_command(argvec, ds);
}
- if (con->get_messenger() == osd->cluster_messenger &&
- con != osd->cluster_messenger->get_loopback_connection() &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
- // remember
- epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
-
- // share?
- if (has < osdmap->get_epoch()) {
- dout(10) << name << " " << con->get_peer_addr()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- should_send = true;
+ else if (prefix == "dump_pg_recovery_stats") {
+ stringstream s;
+ if (f) {
+ pg_recovery_stats.dump_formatted(f.get());
+ f->flush(ds);
+ } else {
+ pg_recovery_stats.dump(s);
+ ds << "dump pg recovery stats: " << s.str();
}
}
- return should_send;
-}
-
-void OSDService::share_map(
- entity_name_t name,
- Connection *con,
- epoch_t epoch,
- OSDMapRef& osdmap,
- epoch_t *sent_epoch_p)
-{
- dout(20) << "share_map "
- << name << " " << con->get_peer_addr()
- << " " << epoch << dendl;
-
- if ((!osd->is_active()) && (!osd->is_stopping())) {
- /*It is safe not to proceed as OSD is not in healthy state*/
- return;
+ else if (prefix == "reset_pg_recovery_stats") {
+ ss << "reset pg recovery stats";
+ pg_recovery_stats.reset();
}
- bool want_shared = should_share_map(name, con, epoch,
- osdmap, sent_epoch_p);
+ else {
+ ss << "unrecognized command! " << cmd;
+ r = -EINVAL;
+ }
- if (want_shared){
- if (name.is_client()) {
- dout(10) << name << " has old map " << epoch
- << " < " << osdmap->get_epoch() << dendl;
- // we know the Session is valid or we wouldn't be sending
- if (sent_epoch_p) {
- *sent_epoch_p = osdmap->get_epoch();
- }
- send_incremental_map(epoch, con, osdmap);
- } else if (con->get_messenger() == osd->cluster_messenger &&
- osdmap->is_up(name.num()) &&
- (osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
- osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
- dout(10) << name << " " << con->get_peer_addr()
- << " has old map " << epoch << " < "
- << osdmap->get_epoch() << dendl;
- note_peer_epoch(name.num(), osdmap->get_epoch());
- send_incremental_map(epoch, con, osdmap);
- }
+ out:
+ rs = ss.str();
+ odata.append(ds);
+ dout(0) << "do_command r=" << r << " " << rs << dendl;
+ clog.info() << rs << "\n";
+ if (con) {
+ MCommandReply *reply = new MCommandReply(r, rs);
+ reply->set_tid(tid);
+ reply->set_data(odata);
+ client_messenger->send_message(reply, con);
}
+ return;
}
-void OSDService::share_map_peer(int peer, Connection *con, OSDMapRef map)
-{
- if (!map)
- map = get_osdmap();
-
- // send map?
- epoch_t pe = get_peer_epoch(peer);
- if (pe) {
- if (pe < map->get_epoch()) {
- send_incremental_map(pe, con, map);
- note_peer_epoch(peer, map->get_epoch());
- } else
- dout(20) << "share_map_peer " << con << " already has epoch " << pe << dendl;
- } else {
- dout(20) << "share_map_peer " << con << " don't know epoch, doing nothing" << dendl;
- // no idea about peer's epoch.
- // ??? send recent ???
- // do nothing.
- }
-}
bool OSD::heartbeat_dispatch(Message *m)
dout(20) << "sched_scrub done" << dendl;
}
-bool OSDService::inc_scrubs_pending()
-{
- bool result = false;
-
- sched_scrub_lock.Lock();
- if (scrubs_pending + scrubs_active < cct->_conf->osd_max_scrubs) {
- dout(20) << "inc_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending+1)
- << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
- result = true;
- ++scrubs_pending;
- } else {
- dout(20) << "inc_scrubs_pending " << scrubs_pending << " + " << scrubs_active << " active >= max " << cct->_conf->osd_max_scrubs << dendl;
- }
- sched_scrub_lock.Unlock();
-
- return result;
-}
-
-void OSDService::dec_scrubs_pending()
-{
- sched_scrub_lock.Lock();
- dout(20) << "dec_scrubs_pending " << scrubs_pending << " -> " << (scrubs_pending-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", active " << scrubs_active << ")" << dendl;
- --scrubs_pending;
- assert(scrubs_pending >= 0);
- sched_scrub_lock.Unlock();
-}
-
-void OSDService::inc_scrubs_active(bool reserved)
-{
- sched_scrub_lock.Lock();
- ++(scrubs_active);
- if (reserved) {
- --(scrubs_pending);
- dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
- << " (max " << cct->_conf->osd_max_scrubs
- << ", pending " << (scrubs_pending+1) << " -> " << scrubs_pending << ")" << dendl;
- assert(scrubs_pending >= 0);
- } else {
- dout(20) << "inc_scrubs_active " << (scrubs_active-1) << " -> " << scrubs_active
- << " (max " << cct->_conf->osd_max_scrubs
- << ", pending " << scrubs_pending << ")" << dendl;
- }
- sched_scrub_lock.Unlock();
-}
-
-void OSDService::dec_scrubs_active()
-{
- sched_scrub_lock.Lock();
- dout(20) << "dec_scrubs_active " << scrubs_active << " -> " << (scrubs_active-1)
- << " (max " << cct->_conf->osd_max_scrubs << ", pending " << scrubs_pending << ")" << dendl;
- --scrubs_active;
- sched_scrub_lock.Unlock();
-}
-
-void OSDService::retrieve_epochs(epoch_t *_boot_epoch, epoch_t *_up_epoch,
- epoch_t *_bind_epoch) const
-{
- Mutex::Locker l(epoch_lock);
- if (_boot_epoch)
- *_boot_epoch = boot_epoch;
- if (_up_epoch)
- *_up_epoch = up_epoch;
- if (_bind_epoch)
- *_bind_epoch = bind_epoch;
-}
-
-void OSDService::set_epochs(const epoch_t *_boot_epoch, const epoch_t *_up_epoch,
- const epoch_t *_bind_epoch)
-{
- Mutex::Locker l(epoch_lock);
- if (_boot_epoch) {
- assert(*_boot_epoch == 0 || *_boot_epoch >= boot_epoch);
- boot_epoch = *_boot_epoch;
- }
- if (_up_epoch) {
- assert(*_up_epoch == 0 || *_up_epoch >= up_epoch);
- up_epoch = *_up_epoch;
- }
- if (_bind_epoch) {
- assert(*_bind_epoch == 0 || *_bind_epoch >= bind_epoch);
- bind_epoch = *_bind_epoch;
- }
-}
-
-bool OSDService::prepare_to_stop()
-{
- Mutex::Locker l(is_stopping_lock);
- if (state != NOT_STOPPING)
- return false;
-
- OSDMapRef osdmap = get_osdmap();
- if (osdmap && osdmap->is_up(whoami)) {
- dout(0) << __func__ << " telling mon we are shutting down" << dendl;
- state = PREPARING_TO_STOP;
- monc->send_mon_message(new MOSDMarkMeDown(monc->get_fsid(),
- osdmap->get_inst(whoami),
- osdmap->get_epoch(),
- true // request ack
- ));
- utime_t now = ceph_clock_now(cct);
- utime_t timeout;
- timeout.set_from_double(now + cct->_conf->osd_mon_shutdown_timeout);
- while ((ceph_clock_now(cct) < timeout) &&
- (state != STOPPING)) {
- is_stopping_cond.WaitUntil(is_stopping_lock, timeout);
- }
- }
- dout(0) << __func__ << " starting shutdown" << dendl;
- state = STOPPING;
- return true;
-}
-
-void OSDService::got_stop_ack()
-{
- Mutex::Locker l(is_stopping_lock);
- if (state == PREPARING_TO_STOP) {
- dout(0) << __func__ << " starting shutdown" << dendl;
- state = STOPPING;
- is_stopping_cond.Signal();
- } else {
- dout(10) << __func__ << " ignoring msg" << dendl;
- }
-}
-
+
// =====================================================
// MAP
take_waiters(waiting_for_osdmap);
}
-
-MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
- OSDSuperblock& sblock)
-{
- MOSDMap *m = new MOSDMap(monc->get_fsid());
- m->oldest_map = sblock.oldest_map;
- m->newest_map = sblock.newest_map;
-
- for (epoch_t e = to; e > since; e--) {
- bufferlist bl;
- if (e > m->oldest_map && get_inc_map_bl(e, bl)) {
- m->incremental_maps[e].claim(bl);
- } else if (get_map_bl(e, bl)) {
- m->maps[e].claim(bl);
- break;
- } else {
- derr << "since " << since << " to " << to
- << " oldest " << m->oldest_map << " newest " << m->newest_map
- << dendl;
- m->put();
- m = NULL;
- break;
- }
- }
- return m;
-}
-
-void OSDService::send_map(MOSDMap *m, Connection *con)
-{
- Messenger *msgr = client_messenger;
- if (entity_name_t::TYPE_OSD == con->get_peer_type())
- msgr = cluster_messenger;
- msgr->send_message(m, con);
-}
-
-void OSDService::send_incremental_map(epoch_t since, Connection *con,
- OSDMapRef& osdmap)
-{
- epoch_t to = osdmap->get_epoch();
- dout(10) << "send_incremental_map " << since << " -> " << to
- << " to " << con << " " << con->get_peer_addr() << dendl;
-
- MOSDMap *m = NULL;
- while (!m) {
- OSDSuperblock sblock(get_superblock());
- if (since < sblock.oldest_map) {
- // just send latest full map
- MOSDMap *m = new MOSDMap(monc->get_fsid());
- m->oldest_map = sblock.oldest_map;
- m->newest_map = sblock.newest_map;
- get_map_bl(to, m->maps[to]);
- send_map(m, con);
- return;
- }
-
- if (to > since && (int64_t)(to - since) > cct->_conf->osd_map_share_max_epochs) {
- dout(10) << " " << (to - since) << " > max " << cct->_conf->osd_map_share_max_epochs
- << ", only sending most recent" << dendl;
- since = to - cct->_conf->osd_map_share_max_epochs;
- }
-
- if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
- to = since + cct->_conf->osd_map_message_max;
- m = build_incremental_map_msg(since, to, sblock);
- }
- send_map(m, con);
-}
-
-bool OSDService::_get_map_bl(epoch_t e, bufferlist& bl)
-{
- bool found = map_bl_cache.lookup(e, &bl);
- if (found)
- return true;
- found = store->read(
- coll_t::META_COLL, OSD::get_osdmap_pobject_name(e), 0, 0, bl) >= 0;
- if (found)
- _add_map_bl(e, bl);
- return found;
-}
-
-bool OSDService::get_inc_map_bl(epoch_t e, bufferlist& bl)
-{
- Mutex::Locker l(map_cache_lock);
- bool found = map_bl_inc_cache.lookup(e, &bl);
- if (found)
- return true;
- found = store->read(
- coll_t::META_COLL, OSD::get_inc_osdmap_pobject_name(e), 0, 0, bl) >= 0;
- if (found)
- _add_map_inc_bl(e, bl);
- return found;
-}
-
-void OSDService::_add_map_bl(epoch_t e, bufferlist& bl)
-{
- dout(10) << "add_map_bl " << e << " " << bl.length() << " bytes" << dendl;
- map_bl_cache.add(e, bl);
-}
-
-void OSDService::_add_map_inc_bl(epoch_t e, bufferlist& bl)
-{
- dout(10) << "add_map_inc_bl " << e << " " << bl.length() << " bytes" << dendl;
- map_bl_inc_cache.add(e, bl);
-}
-
-void OSDService::pin_map_inc_bl(epoch_t e, bufferlist &bl)
-{
- Mutex::Locker l(map_cache_lock);
- map_bl_inc_cache.pin(e, bl);
-}
-
-void OSDService::pin_map_bl(epoch_t e, bufferlist &bl)
-{
- Mutex::Locker l(map_cache_lock);
- map_bl_cache.pin(e, bl);
-}
-
-void OSDService::clear_map_bl_cache_pins(epoch_t e)
-{
- Mutex::Locker l(map_cache_lock);
- map_bl_inc_cache.clear_pinned(e);
- map_bl_cache.clear_pinned(e);
-}
-
-OSDMapRef OSDService::_add_map(OSDMap *o)
-{
- epoch_t e = o->get_epoch();
-
- if (cct->_conf->osd_map_dedup) {
- // Dedup against an existing map at a nearby epoch
- OSDMapRef for_dedup = map_cache.lower_bound(e);
- if (for_dedup) {
- OSDMap::dedup(for_dedup.get(), o);
- }
- }
- OSDMapRef l = map_cache.add(e, o);
- return l;
-}
-
-OSDMapRef OSDService::try_get_map(epoch_t epoch)
-{
- Mutex::Locker l(map_cache_lock);
- OSDMapRef retval = map_cache.lookup(epoch);
- if (retval) {
- dout(30) << "get_map " << epoch << " -cached" << dendl;
- return retval;
- }
-
- OSDMap *map = new OSDMap;
- if (epoch > 0) {
- dout(20) << "get_map " << epoch << " - loading and decoding " << map << dendl;
- bufferlist bl;
- if (!_get_map_bl(epoch, bl)) {
- delete map;
- return OSDMapRef();
- }
- map->decode(bl);
- } else {
- dout(20) << "get_map " << epoch << " - return initial " << map << dendl;
- }
- return _add_map(map);
-}
-
bool OSD::require_mon_peer(Message *m)
{
if (!m->get_connection()->peer_is_mon()) {
}
}
-
-bool OSDService::queue_for_recovery(PG *pg)
-{
- bool b = recovery_wq.queue(pg);
- if (b)
- dout(10) << "queue_for_recovery queued " << *pg << dendl;
- else
- dout(10) << "queue_for_recovery already queued " << *pg << dendl;
- return b;
-}
-
bool OSD::_recover_now()
{
if (recovery_ops_active >= cct->_conf->osd_recovery_max_active) {
// =========================================================
// OPS
-void OSDService::reply_op_error(OpRequestRef op, int err)
-{
- reply_op_error(op, err, eversion_t(), 0);
-}
-
-void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
- version_t uv)
-{
- MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
- assert(m->get_header().type == CEPH_MSG_OSD_OP);
- int flags;
- flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
-
- MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags,
- true);
- reply->set_reply_versions(v, uv);
- m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
-}
-
-void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
-{
- MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
- assert(m->get_header().type == CEPH_MSG_OSD_OP);
-
- assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
-
- if (pg->is_ec_pg()) {
- /**
- * OSD recomputes op target based on current OSDMap. With an EC pg, we
- * can get this result:
- * 1) client at map 512 sends an op to osd 3, pg_t 3.9 based on mapping
- * [CRUSH_ITEM_NONE, 2, 3]/3
- * 2) OSD 3 at map 513 remaps op to osd 3, spg_t 3.9s0 based on mapping
- * [3, 2, 3]/3
- * 3) PG 3.9s0 dequeues the op at epoch 512 and notices that it isn't primary
- * -- misdirected op
- * 4) client resends and this time PG 3.9s0 having caught up to 513 gets
- * it and fulfils it
- *
- * We can't compute the op target based on the sending map epoch due to
- * splitting. The simplest thing is to detect such cases here and drop
- * them without an error (the client will resend anyway).
- */
- OSDMapRef opmap = try_get_map(m->get_map_epoch());
- if (!opmap) {
- dout(7) << __func__ << ": " << *pg << " no longer have map for "
- << m->get_map_epoch() << ", dropping" << dendl;
- return;
- }
- pg_t _pgid = m->get_pg();
- spg_t pgid;
- if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
- _pgid = opmap->raw_pg_to_pg(_pgid);
- if (opmap->get_primary_shard(_pgid, &pgid) &&
- pgid.shard != pg->info.pgid.shard) {
- dout(7) << __func__ << ": " << *pg << " primary changed since "
- << m->get_map_epoch() << ", dropping" << dendl;
- return;
- }
- }
-
- dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
- clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
- << " pg " << m->get_pg()
- << " to osd." << whoami
- << " not " << pg->acting
- << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() << "\n";
- reply_op_error(op, -ENXIO);
-}
-
class C_SendMap : public GenContext<ThreadPool::TPHandle&> {
OSD *osd;
entity_name_t name;
}
-
-void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
-{
- osd->op_shardedwq.dequeue(pg, dequeued);
-}
-
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
}
-void OSDService::queue_for_peering(PG *pg)
-{
- peering_wq.queue(pg);
-}
-
struct C_CompleteSplits : public Context {
OSD *osd;
set<boost::intrusive_ptr<PG> > pgs;