// drain op queue again (in case PGs requeued something)
op_shardedwq.drain();
- {
- finished.clear(); // zap waiters (bleh, this is messy)
- waiting_for_osdmap.clear();
- }
// unregister commands
cct->get_admin_socket()->unregister_commands(asok_hook);
// =====================================================
// MAP
-
-void OSD::wait_for_new_map(OpRequestRef op)
-{
- // ask?
- if (waiting_for_osdmap.empty()) {
- osdmap_subscribe(get_osdmap_epoch() + 1, false);
- }
-
- logger->inc(l_osd_waiting_for_map);
- waiting_for_osdmap.push_back(op);
- op->mark_delayed("wait for new map");
-}
-
-
/** update_map
* assimilate new OSDMap(s). scan pgs, etc.
*/
}
service.activate_map();
-
- // process waiters
- take_waiters(waiting_for_osdmap);
}
bool OSD::require_mon_peer(const Message *m)
return true;
}
-bool OSD::require_self_aliveness(const Message *m, epoch_t epoch)
-{
- epoch_t up_epoch = service.get_up_epoch();
- if (epoch < up_epoch) {
- dout(7) << "from pre-up epoch " << epoch << " < " << up_epoch << dendl;
- return false;
- }
-
- if (!is_active()) {
- dout(7) << "still in boot state, dropping message " << *m << dendl;
- return false;
- }
-
- return true;
-}
-
-bool OSD::require_same_peer_instance(const Message *m, const OSDMapRef& map,
- bool is_fast_dispatch)
-{
- int from = m->get_source().num();
-
- if (map->is_down(from) ||
- (map->get_cluster_addrs(from) != m->get_source_addrs())) {
- dout(5) << "from dead osd." << from << ", marking down, "
- << " msg was " << m->get_source_inst().addr
- << " expected "
- << (map->is_up(from) ?
- map->get_cluster_addrs(from) : entity_addrvec_t())
- << dendl;
- ConnectionRef con = m->get_connection();
- con->mark_down();
- if (auto s = ceph::ref_cast<Session>(con->get_priv()); s) {
- if (!is_fast_dispatch)
- s->session_dispatch_lock.lock();
- clear_session_waiting_on_map(s);
- con->set_priv(nullptr); // break ref <-> session cycle, if any
- s->con.reset();
- if (!is_fast_dispatch)
- s->session_dispatch_lock.unlock();
- }
- return false;
- }
- return true;
-}
-
-
-/*
- * require that we have same (or newer) map, and that
- * the source is the pg primary.
- */
-bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
- bool is_fast_dispatch)
-{
- const Message *m = op->get_req();
- const auto osdmap = get_osdmap();
- dout(15) << "require_same_or_newer_map " << epoch
- << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
-
- ceph_assert(ceph_mutex_is_locked(osd_lock));
-
- // do they have a newer map?
- if (epoch > osdmap->get_epoch()) {
- dout(7) << "waiting for newer map epoch " << epoch
- << " > my " << osdmap->get_epoch() << " with " << m << dendl;
- wait_for_new_map(op);
- return false;
- }
-
- if (!require_self_aliveness(op->get_req(), epoch)) {
- return false;
- }
-
- // ok, our map is same or newer.. do they still exist?
- if (m->get_connection()->get_messenger() == cluster_messenger &&
- !require_same_peer_instance(op->get_req(), osdmap, is_fast_dispatch)) {
- return false;
- }
-
- return true;
-}
-
-
-
-
-
// ----------------------------------------
// pg creation
} heartbeat_dispatcher;
private:
- // -- waiters --
- std::list<OpRequestRef> finished;
-
- void take_waiters(std::list<OpRequestRef>& ls) {
- ceph_assert(ceph_mutex_is_locked(osd_lock));
- finished.splice(finished.end(), ls);
- }
-
// -- op tracking --
OpTracker op_tracker;
void test_ops(std::string command, std::string args, std::ostream& ss);
pool_pg_num_history_t pg_num_history;
ceph::shared_mutex map_lock = ceph::make_shared_mutex("OSD::map_lock");
- std::list<OpRequestRef> waiting_for_osdmap;
std::deque<utime_t> osd_markdown_log;
friend struct send_map_on_destruct;
- void wait_for_new_map(OpRequestRef op);
void handle_osd_map(class MOSDMap *m);
void _committed_osd_maps(epoch_t first, epoch_t last, class MOSDMap *m);
void trim_maps(epoch_t oldest, int nreceived, bool skip_maps);
bool require_mon_peer(const Message *m);
bool require_mon_or_mgr_peer(const Message *m);
bool require_osd_peer(const Message *m);
- /***
- * Verifies that we were alive in the given epoch, and that
- * still are.
- */
- bool require_self_aliveness(const Message *m, epoch_t alive_since);
- /**
- * Verifies that the OSD who sent the given op has the same
- * address as in the given std::map.
- * @pre op was sent by an OSD using the cluster messenger
- */
- bool require_same_peer_instance(const Message *m, const OSDMapRef& map,
- bool is_fast_dispatch);
-
- bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
- bool is_fast_dispatch);
void handle_fast_pg_create(MOSDPGCreate2 *m);
void handle_pg_query_nopg(const MQuery& q);