From 2284e133afaaaff83dc80f5b25110a845ad23fa3 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 2 Jan 2018 15:35:44 -0600 Subject: [PATCH] osd: fast dispatch peering events (part 1) This is a big commit that lays out the infrastructure changes to fast dispatch the remaining peering events. It's hard to separate it all out so this probably doesn't quite build; it's just easier to review as a separate patch. - lock ordering for pg_map has changed: before: OSD::pg_map_lock PG::lock ShardData::lock after: PG::lock ShardData::lock OSD::pg_map_lock - queue items are now annotated with whether they can proceed without a pg at all (e.g., query) or can instantiate a pg (e.g., notify log etc). - There is some wonkiness around getting the initial Initialize event to a newly-created PG. I don't love it but it gets the job done for now. Signed-off-by: Sage Weil --- src/osd/OSD.cc | 416 ++++++++++++++++++++++++++------------- src/osd/OSD.h | 34 +++- src/osd/OpQueueItem.h | 19 ++ src/osd/PG.cc | 44 +---- src/osd/PG.h | 8 +- src/osd/PGPeeringEvent.h | 27 ++- 6 files changed, 362 insertions(+), 186 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 2df4556ceb7..cb346bff64a 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3361,9 +3361,15 @@ int OSD::shutdown() // Shutdown PGs { - RWLock::RLocker l(pg_map_lock); - for (auto& p : pg_map) { - p.second->shutdown(); + set pgs; + { + RWLock::RLocker l(pg_map_lock); + for (auto& p : pg_map) { + pgs.insert(p.second); + } + } + for (auto pg : pgs) { + pg->shutdown(); } } @@ -3470,31 +3476,38 @@ int OSD::shutdown() #ifdef PG_DEBUG_REFS service.dump_live_pgids(); #endif - { - RWLock::RLocker l(pg_map_lock); - for (ceph::unordered_map::iterator p = pg_map.begin(); - p != pg_map.end(); - ++p) { - if (p->second->is_deleted()) { + while (true) { + set pgs; + { + RWLock::WLocker l(pg_map_lock); + for (auto& i : pg_map) { + pgs.insert(i.second); + } + pg_map.clear(); + } + if (pgs.empty()) { + break; + } + for (auto& pg : pgs) { + if (pg->is_deleted()) { continue; } - dout(20) << " kicking pg " << p->first << dendl; - p->second->lock(); - if (p->second->get_num_ref() != 1) { - derr << "pgid " << p->first << " has ref count of " - << p->second->get_num_ref() << dendl; + dout(20) << " kicking pg " << pg << dendl; + pg->lock(); + if (pg->get_num_ref() != 1) { + derr << "pgid " << pg->get_pgid() << " has ref count of " + << pg->get_num_ref() << dendl; #ifdef PG_DEBUG_REFS - p->second->dump_live_ids(); + pg->dump_live_ids(); #endif if (cct->_conf->osd_shutdown_pgref_assert) { ceph_abort(); } } - p->second->ch.reset(); - p->second->unlock(); - p->second->put("PGMap"); + pg->ch.reset(); + pg->unlock(); + pg->put("PGMap"); } - pg_map.clear(); } #ifdef PG_DEBUG_REFS service.dump_live_pgids(); @@ -3780,23 +3793,32 @@ void OSD::recursive_remove_collection(CephContext* cct, // ====================================================== // PG's -PG *OSD::_open_lock_pg( +PGRef OSD::_open_pg( OSDMapRef createmap, - spg_t pgid, bool no_lockdep_check) + OSDMapRef servicemap, + spg_t pgid) { - assert(osd_lock.is_locked()); - PG* pg = _make_pg(createmap, pgid); { RWLock::WLocker l(pg_map_lock); - pg->lock(no_lockdep_check); pg_map[pgid] = pg; pg->get("PGMap"); // because it's in pg_map service.pg_add_epoch(pg->pg_id, createmap->get_epoch()); + service.init_splits_between(pgid, createmap, servicemap); } return pg; } +PG *OSD::_open_lock_pg( + OSDMapRef createmap, + OSDMapRef servicemap, + spg_t pgid, bool no_lockdep_check) +{ + PGRef pg = _open_pg(createmap, servicemap, pgid); + pg->lock(); + return pg.get(); +} + PG* OSD::_make_pg( OSDMapRef createmap, spg_t pgid) @@ -3828,29 +3850,6 @@ PG* OSD::_make_pg( } -void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx) -{ - epoch_t e(service.get_osdmap()->get_epoch()); - pg->get("PGMap"); // For pg_map - pg_map[pg->pg_id] = pg; - service.pg_add_epoch(pg->pg_id, pg->get_osdmap()->get_epoch()); - - dout(10) << "Adding newly split pg " << *pg << dendl; - pg->handle_loaded(rctx); - pg->queue_null(e, e); - map >::iterator to_wake = - peering_wait_for_split.find(pg->pg_id); - if (to_wake != peering_wait_for_split.end()) { - for (list::iterator i = - to_wake->second.begin(); - i != to_wake->second.end(); - ++i) { - enqueue_peering_evt(pg->get_pgid(), *i); - } - peering_wait_for_split.erase(to_wake); - } -} - PG *OSD::_create_lock_pg( OSDMapRef createmap, spg_t pgid, @@ -3863,12 +3862,10 @@ PG *OSD::_create_lock_pg( const PastIntervals& pi, ObjectStore::Transaction& t) { - assert(osd_lock.is_locked()); dout(20) << "_create_lock_pg pgid " << pgid << dendl; - PG *pg = _open_lock_pg(createmap, pgid, true); + PG *pg = _open_lock_pg(createmap, service.get_osdmap(), pgid, true); - service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap()); pg->init( role, up, @@ -3886,7 +3883,7 @@ PG *OSD::_create_lock_pg( return pg; } -PG *OSD::_lookup_lock_pg(spg_t pgid) +PGRef OSD::_lookup_pg(spg_t pgid) { while (true) { { @@ -3895,12 +3892,10 @@ PG *OSD::_lookup_lock_pg(spg_t pgid) if (p == pg_map.end()) { return nullptr; } - PG *pg = p->second; - pg->lock(); + PGRef pg = p->second; if (!pg->is_deleted()) { return pg; } - pg->unlock(); } // try again, this time with a write lock { @@ -3909,19 +3904,31 @@ PG *OSD::_lookup_lock_pg(spg_t pgid) if (p == pg_map.end()) { return nullptr; } - PG *pg = p->second; - pg->lock(); + PGRef pg = p->second; if (!pg->is_deleted()) { return pg; } pg_map.erase(p); pg->put("PGMap"); - pg->unlock(); } return nullptr; } } +PG *OSD::_lookup_lock_pg(spg_t pgid) +{ + PGRef pg = _lookup_pg(pgid); + if (!pg) { + return nullptr; + } + pg->lock(); + if (!pg->is_deleted()) { + return pg.get(); + } + pg->unlock(); + return nullptr; +} + PG *OSD::lookup_lock_pg(spg_t pgid) { return _lookup_lock_pg(pgid); @@ -3994,9 +4001,9 @@ void OSD::load_pgs() assert(0 == "Missing map in load_pgs"); } } - pg = _open_lock_pg(pgosdmap, pgid); + pg = _open_lock_pg(pgosdmap, osdmap, pgid); } else { - pg = _open_lock_pg(osdmap, pgid); + pg = _open_lock_pg(osdmap, osdmap, pgid); } // there can be no waiters here, so we don't call wake_pg_waiters @@ -4035,6 +4042,69 @@ void OSD::load_pgs() } +PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info) +{ + spg_t pgid = info->pgid; + + int up_primary, acting_primary; + vector up, acting; + osdmap->pg_to_up_acting_osds( + pgid.pgid, &up, &up_primary, &acting, &acting_primary); + + /* + const bool is_mon_create = + evt->get_event().dynamic_type() == PG::NullEvt::static_type(); + if (maybe_wait_for_max_pg(pgid, is_mon_create)) { + return nullptr; + } + */ + + PG::RecoveryCtx rctx = create_context(); + + const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool()); + if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) && + store->get_type() != "bluestore") { + clog->warn() << "pg " << pgid + << " is at risk of silent data corruption: " + << "the pool allows ec overwrites but is not stored in " + << "bluestore, so deep scrubbing will not detect bitrot"; + } + PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num())); + PG::_init(*rctx.transaction, pgid, pp); + + int role = osdmap->calc_pg_role(whoami, acting, acting.size()); + if (!pp->is_replicated() && role != pgid.shard) { + role = -1; + } + + PGRef pg = _open_pg(get_map(info->epoch), osdmap, pgid); + + pg->lock(true); + + // we are holding the shard lock + assert(!pg->is_deleted()); + + pg->init( + role, + up, + up_primary, + acting, + acting_primary, + info->history, + info->past_intervals, + false, + rctx.transaction); + + pg->handle_initialize(&rctx); + pg->handle_activate_map(&rctx); + rctx.created_pgs.insert(pg); + + dispatch_context(rctx, pg.get(), osdmap, nullptr); + + dout(10) << *pg << " is new" << dendl; + return pg; +} + /* * look up a pg. if we have it, great. if not, consider creating it IF the pg mapping * hasn't changed since the given epoch and we are the primary. @@ -7842,32 +7912,49 @@ void OSD::check_osdmap_features() } } -struct C_CompleteSplits : public Context { +struct C_FinishSplits : public Context { OSD *osd; set pgs; - C_CompleteSplits(OSD *osd, const set &in) + C_FinishSplits(OSD *osd, const set &in) : osd(osd), pgs(in) {} void finish(int r) override { - Mutex::Locker l(osd->osd_lock); - if (osd->is_stopping()) - return; - PG::RecoveryCtx rctx = osd->create_context(); - for (set::iterator i = pgs.begin(); - i != pgs.end(); - ++i) { - osd->pg_map_lock.get_write(); - (*i)->lock(); - PG *pg = i->get(); - osd->add_newly_split_pg(pg, &rctx); - osd->service.complete_split((*i)->get_pgid()); - osd->pg_map_lock.put_write(); - osd->dispatch_context_transaction(rctx, pg); - osd->wake_pg_waiters(*i); - (*i)->unlock(); - } + osd->_finish_splits(pgs); + } +}; + +void OSD::_finish_splits(set& pgs) +{ + dout(10) << __func__ << " " << pgs << dendl; + Mutex::Locker l(osd_lock); + if (is_stopping()) + return; + PG::RecoveryCtx rctx = create_context(); + for (set::iterator i = pgs.begin(); + i != pgs.end(); + ++i) { + PG *pg = i->get(); + + pg->lock(); + dout(10) << __func__ << " " << *pg << dendl; + epoch_t e = pg->get_osdmap()->get_epoch(); + pg->unlock(); - osd->dispatch_context(rctx, 0, osd->service.get_osdmap()); + pg_map_lock.get_write(); + pg->get("PGMap"); // For pg_map + pg_map[pg->get_pgid()] = pg; + service.complete_split(pg->get_pgid()); + service.pg_add_epoch(pg->pg_id, e); + pg_map_lock.put_write(); + + pg->lock(); + pg->handle_initialize(&rctx); + pg->queue_null(e, e); + dispatch_context_transaction(rctx, pg); + wake_pg_waiters(pg); + pg->unlock(); } + + dispatch_context(rctx, 0, service.get_osdmap()); }; void OSD::advance_pg( @@ -7919,7 +8006,7 @@ void OSD::advance_pg( service.pg_update_epoch(pg->pg_id, lastmap->get_epoch()); if (!new_pgs.empty()) { - rctx->on_applied->add(new C_CompleteSplits(this, new_pgs)); + rctx->on_applied->add(new C_FinishSplits(this, new_pgs)); } } @@ -7940,26 +8027,27 @@ void OSD::consume_map() int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0; // scan pg's + vector pgids; { RWLock::RLocker l(pg_map_lock); for (ceph::unordered_map::iterator it = pg_map.begin(); it != pg_map.end(); ++it) { + pgids.push_back(it->first); PG *pg = it->second; if (pg->is_deleted()) { continue; } - pg->lock(); + service.init_splits_between(it->first, service.get_osdmap(), osdmap); + + // FIXME: this is lockless and racy, but we don't want to take pg lock + // here. if (pg->is_primary()) num_pg_primary++; else if (pg->is_replica()) num_pg_replica++; else num_pg_stray++; - - service.init_splits_between(it->first, service.get_osdmap(), osdmap); - - pg->unlock(); } [[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock); @@ -7991,23 +8079,16 @@ void OSD::consume_map() service.maybe_inject_dispatch_delay(); - // scan pg's - { - RWLock::RLocker l(pg_map_lock); - for (ceph::unordered_map::iterator it = pg_map.begin(); - it != pg_map.end(); - ++it) { - enqueue_peering_evt( - it->first, - PGPeeringEventRef( - std::make_shared( - osdmap->get_epoch(), - osdmap->get_epoch(), - NullEvt()))); - } - - logger->set(l_osd_pg, pg_map.size()); + for (auto pgid : pgids) { + enqueue_peering_evt( + pgid, + PGPeeringEventRef( + std::make_shared( + osdmap->get_epoch(), + osdmap->get_epoch(), + NullEvt()))); } + logger->set(l_osd_pg, pgids.size()); logger->set(l_osd_pg_primary, num_pg_primary); logger->set(l_osd_pg_replica, num_pg_replica); logger->set(l_osd_pg_stray, num_pg_stray); @@ -8350,6 +8431,7 @@ void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, ctx.transaction = new ObjectStore::Transaction; ctx.on_applied = new C_Contexts(cct); ctx.on_safe = new C_Contexts(cct); + ctx.created_pgs.clear(); } } @@ -8392,6 +8474,7 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, ctx.created_pgs.clear(); delete (ctx.transaction); assert(tr == 0); + ctx.created_pgs.clear(); } } @@ -9049,35 +9132,45 @@ void OSD::dequeue_peering_evt( PGPeeringEventRef evt, ThreadPool::TPHandle& handle) { - auto curmap = service.get_osdmap(); PG::RecoveryCtx rctx = create_context(); - if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) { - advance_pg(curmap->get_epoch(), pg, handle, &rctx); - } - pg->do_peering_event(evt, &rctx); - auto need_up_thru = pg->get_need_up_thru(); - auto same_interval_since = pg->get_same_interval_since(); - dispatch_context_transaction(rctx, pg, &handle); - bool deleted = pg->is_deleted(); - pg->unlock(); - - if (deleted) { - RWLock::WLocker l(pg_map_lock); - auto p = pg_map.find(pg->get_pgid()); - if (p != pg_map.end() && - p->second == pg) { - dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl; - pg_map.erase(p); - pg->put("PGMap"); + auto curmap = service.get_osdmap(); + epoch_t need_up_thru = 0, same_interval_since = 0; + if (!pg) { + if (const MQuery *q = dynamic_cast(evt->evt.get())) { + handle_pg_query_nopg(*q); } else { - dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl; + derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl; + ceph_abort(); + } + } else { + if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) { + advance_pg(curmap->get_epoch(), pg, handle, &rctx); + } + pg->do_peering_event(evt, &rctx); + dispatch_context_transaction(rctx, pg, &handle); + need_up_thru = pg->get_need_up_thru(); + same_interval_since = pg->get_same_interval_since(); + bool deleted = pg->is_deleted(); + pg->unlock(); + + if (deleted) { + RWLock::WLocker l(pg_map_lock); + auto p = pg_map.find(pg->get_pgid()); + if (p != pg_map.end() && + p->second == pg) { + dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl; + pg_map.erase(p); + pg->put("PGMap"); + } else { + dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl; + } } } if (need_up_thru) { queue_want_up_thru(same_interval_since); } - dispatch_context(rctx, 0, curmap, &handle); + dispatch_context(rctx, pg, curmap, &handle); service.send_pg_temp(); } @@ -9537,6 +9630,7 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid) void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami) { unsigned pushes_to_free = 0; + bool queued = false; for (auto sdata : shard_list) { Mutex::Locker l(sdata->sdata_op_ordering_lock); sdata->waiting_for_pg_osdmap = osdmap; @@ -9545,8 +9639,26 @@ void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami) ShardData::pg_slot& slot = p->second; if (!slot.to_process.empty() && slot.num_running == 0) { if (osdmap->is_up_acting_osd_shard(p->first, whoami)) { - dout(20) << __func__ << " " << p->first << " maps to us, keeping" - << dendl; + if (slot.pending_nopg) { + dout(20) << __func__ << " " << p->first << " maps to us, pending create," + << " requeuing" << dendl; + for (auto& q : slot.to_process) { + pushes_to_free += q.get_reserved_pushes(); + } + for (auto i = slot.to_process.rbegin(); + i != slot.to_process.rend(); + ++i) { + sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff); + } + slot.to_process.clear(); + slot.waiting_for_pg = false; + slot.pending_nopg = false; + ++slot.requeue_seq; + queued = true; + } else { + dout(20) << __func__ << " " << p->first << " maps to us, keeping" + << dendl; + } ++p; continue; } @@ -9571,6 +9683,11 @@ void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami) ++p; } } + if (queued) { + sdata->sdata_lock.Lock(); + sdata->sdata_cond.SignalOne(); + sdata->sdata_lock.Unlock(); + } } if (pushes_to_free > 0) { osd->service.release_reserved_pushes(pushes_to_free); @@ -9733,27 +9850,57 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } + ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, + suicide_interval); + // take next item auto qi = std::move(slot.to_process.front()); slot.to_process.pop_front(); dout(20) << __func__ << " " << qi << " pg " << pg << dendl; + unsigned pushes_to_free = 0; - if (!pg) { + while (!pg) { // should this pg shard exist on this osd in this (or a later) epoch? OSDMapRef osdmap = sdata->waiting_for_pg_osdmap; - if (osdmap->is_up_acting_osd_shard(token, - osd->whoami)) { - dout(20) << __func__ << " " << token - << " no pg, should exist, will wait" << " on " << qi << dendl; - slot.to_process.push_front(std::move(qi)); - slot.waiting_for_pg = true; - } else if (qi.get_map_epoch() > osdmap->get_epoch()) { + const PGCreateInfo *create_info = qi.creates_pg(); + if (qi.get_map_epoch() > osdmap->get_epoch()) { dout(20) << __func__ << " " << token << " no pg, item epoch is " << qi.get_map_epoch() << " > " << osdmap->get_epoch() << ", will wait on " << qi << dendl; + if (!!create_info || !qi.requires_pg()) { + slot.pending_nopg = true; + } slot.to_process.push_front(std::move(qi)); slot.waiting_for_pg = true; + } else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) { + if (osd->service.splitting(token)) { + dout(20) << __func__ << " " << token + << " splitting, waiting on " << qi << dendl; + slot.to_process.push_front(std::move(qi)); + slot.waiting_for_pg = true; + } else if (create_info) { + dout(20) << __func__ << " " << token + << " no pg, should create on " << qi << dendl; + pg = osd->handle_pg_create_info(osdmap, create_info); + if (pg) { + // we created the pg! drop out and continue "normally"! + _wake_pg_slot(token, sdata, slot, &pushes_to_free); + break; + } + dout(20) << __func__ << " ignored create on " << qi << dendl; + } else if (!qi.requires_pg()) { + // for pg-less events, we run them under the ordering lock, since + // we don't have the pg lock to keep them ordered. + qi.run(osd, pg, tp_handle); + sdata->sdata_op_ordering_lock.Unlock(); + return; + } else { + dout(20) << __func__ << " " << token + << " no pg, should exist, will wait on " << qi << dendl; + slot.to_process.push_front(std::move(qi)); + slot.waiting_for_pg = true; + } } else { dout(20) << __func__ << " " << token << " no pg, shouldn't exist," @@ -9779,6 +9926,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) } sdata->sdata_op_ordering_lock.Unlock(); + if (pushes_to_free) { + osd->service.release_reserved_pushes(pushes_to_free); + } // osd_opwq_process marks the point at which an operation has been dequeued // and will begin to be handled by a worker thread. @@ -9802,8 +9952,6 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) delete f; *_dout << dendl; - ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, - suicide_interval); qi.run(osd, pg, tp_handle); { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index b13d70245aa..544a7eaecdd 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -62,6 +62,15 @@ #define CEPH_OSD_PROTOCOL 10 /* cluster internal */ +/* + + lock ordering for pg map + + PG::lock + ShardData::lock + OSD::pg_map_lock + + */ enum { l_osd_first = 10000, @@ -233,7 +242,7 @@ class PrimaryLogPG; class AuthAuthorizeHandlerRegistry; class TestOpsSocketHook; -struct C_CompleteSplits; +struct C_FinishSplits; struct C_OpenPGs; class LogChannel; class CephContext; @@ -1513,7 +1522,7 @@ private: void test_ops(std::string command, std::string args, ostream& ss); friend class TestOpsSocketHook; TestOpsSocketHook *test_ops_hook; - friend struct C_CompleteSplits; + friend struct C_FinishSplits; friend struct C_OpenPGs; // -- op queue -- @@ -1572,6 +1581,9 @@ private: /// to_process. cleared by prune_pg_waiters. bool waiting_for_pg = false; + /// one or more queued items doesn't need a pg + bool pending_nopg = false; + /// incremented by wake_pg_waiters; indicates racing _process threads /// should bail out (their op has been requeued) uint64_t requeue_seq = 0; @@ -1843,6 +1855,7 @@ protected: map > peering_wait_for_split; PGRecoveryStats pg_recovery_stats; + PGRef _lookup_pg(spg_t pgid); PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid); PG *_lookup_lock_pg(spg_t pgid); @@ -1857,10 +1870,15 @@ public: std::set get_mapped_pools(); protected: - PG *_open_lock_pg(OSDMapRef createmap, - spg_t pg, bool no_lockdep_check=false); - - PG *_create_lock_pg( + PGRef _open_pg( + OSDMapRef createmap, OSDMapRef servicemap, + spg_t pg); + PG *_open_lock_pg( + OSDMapRef createmap, + OSDMapRef servicemap, + spg_t pg, + bool no_lockdep_check=false); + PG *_create_lock_pg( OSDMapRef createmap, spg_t pgid, bool hold_map_lock, @@ -1873,8 +1891,6 @@ protected: ObjectStore::Transaction& t); PG* _make_pg(OSDMapRef createmap, spg_t pgid); - void add_newly_split_pg(PG *pg, - PG::RecoveryCtx *rctx); int handle_pg_peering_evt( spg_t pgid, @@ -1919,6 +1935,7 @@ protected: OSDMapRef curmap, OSDMapRef nextmap, PG::RecoveryCtx *rctx); + void _finish_splits(set& pgs); // == monitor interaction == Mutex mon_report_lock; @@ -2017,6 +2034,7 @@ protected: void handle_pg_log(OpRequestRef op); void handle_pg_info(OpRequestRef op); + PGRef handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info); void handle_force_recovery(Message *m); void handle_pg_remove(OpRequestRef op); diff --git a/src/osd/OpQueueItem.h b/src/osd/OpQueueItem.h index ac7401334e4..fe3f66296ad 100644 --- a/src/osd/OpQueueItem.h +++ b/src/osd/OpQueueItem.h @@ -65,6 +65,13 @@ public: return 0; } + virtual bool requires_pg() const { + return true; + } + virtual const PGCreateInfo *creates_pg() const { + return nullptr; + } + virtual ostream &print(ostream &rhs) const = 0; virtual void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) = 0; @@ -141,6 +148,12 @@ public: epoch_t get_map_epoch() const { return map_epoch; } dmc::ReqParams get_qos_params() const { return qos_params; } void set_qos_params(dmc::ReqParams qparams) { qos_params = qparams; } + bool requires_pg() const { + return qitem->requires_pg(); + } + const PGCreateInfo *creates_pg() const { + return qitem->creates_pg(); + } friend ostream& operator<<(ostream& out, const OpQueueItem& item) { return out << "OpQueueItem(" @@ -212,6 +225,12 @@ public: return rhs << "PGPeeringEvent(" << evt->get_desc() << ")"; } void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final; + bool requires_pg() const override { + return evt->requires_pg; + } + const PGCreateInfo *creates_pg() const override { + return evt->create_info.get(); + } }; class PGSnapTrim : public PGOpQueueable { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 7adab2ab4df..daef636424e 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3772,7 +3772,9 @@ void PG::read_state(ObjectStore *store) } PG::RecoveryCtx rctx(0, 0, 0, 0, 0, new ObjectStore::Transaction); - handle_loaded(&rctx); + handle_initialize(&rctx); + // note: we don't activate here because we know the OSD will advance maps + // during boot. write_if_dirty(*rctx.transaction); store->queue_transaction(ch, std::move(*rctx.transaction)); delete rctx.transaction; @@ -6325,11 +6327,11 @@ void PG::do_peering_event(PGPeeringEventRef evt, RecoveryCtx *rctx) if (!have_same_or_newer_map(evt->get_epoch_sent())) { dout(10) << "deferring event " << evt->get_desc() << dendl; peering_waiters.push_back(evt); - return; + } else if (old_peering_evt(evt)) { + dout(10) << "discard old " << evt->get_desc() << dendl; + } else { + recovery_state.handle_event(evt, rctx); } - if (old_peering_evt(evt)) - return; - recovery_state.handle_event(evt, rctx); write_if_dirty(*rctx->transaction); } @@ -6445,27 +6447,11 @@ void PG::handle_activate_map(RecoveryCtx *rctx) write_if_dirty(*rctx->transaction); } -void PG::handle_loaded(RecoveryCtx *rctx) +void PG::handle_initialize(RecoveryCtx *rctx) { - dout(10) << "handle_loaded" << dendl; - Load evt; - recovery_state.handle_event(evt, rctx); - write_if_dirty(*rctx->transaction); -} - -void PG::handle_create(RecoveryCtx *rctx) -{ - dout(10) << "handle_create" << dendl; - rctx->created_pgs.insert(this); + dout(10) << __func__ << dendl; Initialize evt; recovery_state.handle_event(evt, rctx); - ActMap evt2; - recovery_state.handle_event(evt2, rctx); - write_if_dirty(*rctx->transaction); - - rctx->on_applied->add(make_lambda_context([this]() { - update_store_with_options(); - })); } void PG::handle_query_state(Formatter *f) @@ -6603,18 +6589,6 @@ PG::RecoveryState::Initial::Initial(my_context ctx) context< RecoveryMachine >().log_enter(state_name); } -boost::statechart::result PG::RecoveryState::Initial::react(const Load& l) -{ - PG *pg = context< RecoveryMachine >().pg; - - // do we tell someone we're here? - pg->send_notify = (!pg->is_primary()); - - pg->update_store_with_options(); - - return transit< Reset >(); -} - boost::statechart::result PG::RecoveryState::Initial::react(const MNotifyRec& notify) { PG *pg = context< RecoveryMachine >().pg; diff --git a/src/osd/PG.h b/src/osd/PG.h index 7518342a5fe..e060f3a303f 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -418,8 +418,7 @@ public: vector& newacting, int acting_primary, RecoveryCtx *rctx); void handle_activate_map(RecoveryCtx *rctx); - void handle_create(RecoveryCtx *rctx); - void handle_loaded(RecoveryCtx *rctx); + void handle_initialize(RecoveryCtx *rctx); void handle_query_state(Formatter *f); /** @@ -565,7 +564,7 @@ protected: bool deleting; // true while in removing or OSD is shutting down - bool deleted = false; + atomic deleted = {false}; ZTracer::Endpoint trace_endpoint; @@ -1851,7 +1850,6 @@ public: }; protected: TrivialEvent(Initialize) - TrivialEvent(Load) TrivialEvent(GotInfo) TrivialEvent(NeedUpThru) TrivialEvent(Backfilled) @@ -2018,12 +2016,10 @@ protected: typedef boost::mpl::list < boost::statechart::transition< Initialize, Reset >, - boost::statechart::custom_reaction< Load >, boost::statechart::custom_reaction< NullEvt >, boost::statechart::transition< boost::statechart::event_base, Crashed > > reactions; - boost::statechart::result react(const Load&); boost::statechart::result react(const MNotifyRec&); boost::statechart::result react(const MInfoRec&); boost::statechart::result react(const MLogRec&); diff --git a/src/osd/PGPeeringEvent.h b/src/osd/PGPeeringEvent.h index 597da5b729d..302e3028f2a 100644 --- a/src/osd/PGPeeringEvent.h +++ b/src/osd/PGPeeringEvent.h @@ -9,25 +9,46 @@ class MOSDPGLog; +/// what we need to instantiate a pg +struct PGCreateInfo { + spg_t pgid; + epoch_t epoch = 0; + pg_history_t history; + PastIntervals past_intervals; + PGCreateInfo(spg_t p, epoch_t e, + const pg_history_t& h, + const PastIntervals& pi) + : pgid(p), epoch(e), history(h), past_intervals(pi) {} +}; + class PGPeeringEvent { epoch_t epoch_sent; epoch_t epoch_requested; - boost::intrusive_ptr< const boost::statechart::event_base > evt; string desc; public: + boost::intrusive_ptr< const boost::statechart::event_base > evt; + bool requires_pg; + std::unique_ptr create_info; MEMPOOL_CLASS_HELPERS(); template PGPeeringEvent( epoch_t epoch_sent, epoch_t epoch_requested, - const T &evt_) + const T &evt_, + bool req = true, + PGCreateInfo *ci = 0) : epoch_sent(epoch_sent), epoch_requested(epoch_requested), - evt(evt_.intrusive_from_this()) { + evt(evt_.intrusive_from_this()), + requires_pg(req), + create_info(ci) { stringstream out; out << "epoch_sent: " << epoch_sent << " epoch_requested: " << epoch_requested << " "; evt_.print(&out); + if (create_info) { + out << " +create_info"; + } desc = out.str(); } epoch_t get_epoch_sent() { -- 2.39.5