// Shutdown PGs
{
- RWLock::RLocker l(pg_map_lock);
- for (auto& p : pg_map) {
- p.second->shutdown();
+ set<PGRef> pgs;
+ {
+ RWLock::RLocker l(pg_map_lock);
+ for (auto& p : pg_map) {
+ pgs.insert(p.second);
+ }
+ }
+ for (auto pg : pgs) {
+ pg->shutdown();
}
}
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
#endif
- {
- RWLock::RLocker l(pg_map_lock);
- for (ceph::unordered_map<spg_t, PG*>::iterator p = pg_map.begin();
- p != pg_map.end();
- ++p) {
- if (p->second->is_deleted()) {
+ while (true) {
+ set<PGRef> pgs;
+ {
+ RWLock::WLocker l(pg_map_lock);
+ for (auto& i : pg_map) {
+ pgs.insert(i.second);
+ }
+ pg_map.clear();
+ }
+ if (pgs.empty()) {
+ break;
+ }
+ for (auto& pg : pgs) {
+ if (pg->is_deleted()) {
continue;
}
- dout(20) << " kicking pg " << p->first << dendl;
- p->second->lock();
- if (p->second->get_num_ref() != 1) {
- derr << "pgid " << p->first << " has ref count of "
- << p->second->get_num_ref() << dendl;
+ dout(20) << " kicking pg " << pg << dendl;
+ pg->lock();
+ if (pg->get_num_ref() != 1) {
+ derr << "pgid " << pg->get_pgid() << " has ref count of "
+ << pg->get_num_ref() << dendl;
#ifdef PG_DEBUG_REFS
- p->second->dump_live_ids();
+ pg->dump_live_ids();
#endif
if (cct->_conf->osd_shutdown_pgref_assert) {
ceph_abort();
}
}
- p->second->ch.reset();
- p->second->unlock();
- p->second->put("PGMap");
+ pg->ch.reset();
+ pg->unlock();
+ pg->put("PGMap");
}
- pg_map.clear();
}
#ifdef PG_DEBUG_REFS
service.dump_live_pgids();
// ======================================================
// PG's
-PG *OSD::_open_lock_pg(
+PGRef OSD::_open_pg(
OSDMapRef createmap,
- spg_t pgid, bool no_lockdep_check)
+ OSDMapRef servicemap,
+ spg_t pgid)
{
- assert(osd_lock.is_locked());
-
PG* pg = _make_pg(createmap, pgid);
{
RWLock::WLocker l(pg_map_lock);
- pg->lock(no_lockdep_check);
pg_map[pgid] = pg;
pg->get("PGMap"); // because it's in pg_map
service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
+ service.init_splits_between(pgid, createmap, servicemap);
}
return pg;
}
+PG *OSD::_open_lock_pg(
+ OSDMapRef createmap,
+ OSDMapRef servicemap,
+ spg_t pgid, bool no_lockdep_check)
+{
+ PGRef pg = _open_pg(createmap, servicemap, pgid);
+ pg->lock();
+ return pg.get();
+}
+
PG* OSD::_make_pg(
OSDMapRef createmap,
spg_t pgid)
}
-void OSD::add_newly_split_pg(PG *pg, PG::RecoveryCtx *rctx)
-{
- epoch_t e(service.get_osdmap()->get_epoch());
- pg->get("PGMap"); // For pg_map
- pg_map[pg->pg_id] = pg;
- service.pg_add_epoch(pg->pg_id, pg->get_osdmap()->get_epoch());
-
- dout(10) << "Adding newly split pg " << *pg << dendl;
- pg->handle_loaded(rctx);
- pg->queue_null(e, e);
- map<spg_t, list<PGPeeringEventRef> >::iterator to_wake =
- peering_wait_for_split.find(pg->pg_id);
- if (to_wake != peering_wait_for_split.end()) {
- for (list<PGPeeringEventRef>::iterator i =
- to_wake->second.begin();
- i != to_wake->second.end();
- ++i) {
- enqueue_peering_evt(pg->get_pgid(), *i);
- }
- peering_wait_for_split.erase(to_wake);
- }
-}
-
PG *OSD::_create_lock_pg(
OSDMapRef createmap,
spg_t pgid,
const PastIntervals& pi,
ObjectStore::Transaction& t)
{
- assert(osd_lock.is_locked());
dout(20) << "_create_lock_pg pgid " << pgid << dendl;
- PG *pg = _open_lock_pg(createmap, pgid, true);
+ PG *pg = _open_lock_pg(createmap, service.get_osdmap(), pgid, true);
- service.init_splits_between(pgid, pg->get_osdmap(), service.get_osdmap());
pg->init(
role,
up,
return pg;
}
-PG *OSD::_lookup_lock_pg(spg_t pgid)
+PGRef OSD::_lookup_pg(spg_t pgid)
{
while (true) {
{
if (p == pg_map.end()) {
return nullptr;
}
- PG *pg = p->second;
- pg->lock();
+ PGRef pg = p->second;
if (!pg->is_deleted()) {
return pg;
}
- pg->unlock();
}
// try again, this time with a write lock
{
if (p == pg_map.end()) {
return nullptr;
}
- PG *pg = p->second;
- pg->lock();
+ PGRef pg = p->second;
if (!pg->is_deleted()) {
return pg;
}
pg_map.erase(p);
pg->put("PGMap");
- pg->unlock();
}
return nullptr;
}
}
+PG *OSD::_lookup_lock_pg(spg_t pgid)
+{
+ PGRef pg = _lookup_pg(pgid);
+ if (!pg) {
+ return nullptr;
+ }
+ pg->lock();
+ if (!pg->is_deleted()) {
+ return pg.get();
+ }
+ pg->unlock();
+ return nullptr;
+}
+
PG *OSD::lookup_lock_pg(spg_t pgid)
{
return _lookup_lock_pg(pgid);
assert(0 == "Missing map in load_pgs");
}
}
- pg = _open_lock_pg(pgosdmap, pgid);
+ pg = _open_lock_pg(pgosdmap, osdmap, pgid);
} else {
- pg = _open_lock_pg(osdmap, pgid);
+ pg = _open_lock_pg(osdmap, osdmap, pgid);
}
// there can be no waiters here, so we don't call wake_pg_waiters
}
+PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info)
+{
+ spg_t pgid = info->pgid;
+
+ int up_primary, acting_primary;
+ vector<int> up, acting;
+ osdmap->pg_to_up_acting_osds(
+ pgid.pgid, &up, &up_primary, &acting, &acting_primary);
+
+ /*
+ const bool is_mon_create =
+ evt->get_event().dynamic_type() == PG::NullEvt::static_type();
+ if (maybe_wait_for_max_pg(pgid, is_mon_create)) {
+ return nullptr;
+ }
+ */
+
+ PG::RecoveryCtx rctx = create_context();
+
+ const pg_pool_t* pp = osdmap->get_pg_pool(pgid.pool());
+ if (pp->has_flag(pg_pool_t::FLAG_EC_OVERWRITES) &&
+ store->get_type() != "bluestore") {
+ clog->warn() << "pg " << pgid
+ << " is at risk of silent data corruption: "
+ << "the pool allows ec overwrites but is not stored in "
+ << "bluestore, so deep scrubbing will not detect bitrot";
+ }
+ PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+ PG::_init(*rctx.transaction, pgid, pp);
+
+ int role = osdmap->calc_pg_role(whoami, acting, acting.size());
+ if (!pp->is_replicated() && role != pgid.shard) {
+ role = -1;
+ }
+
+ PGRef pg = _open_pg(get_map(info->epoch), osdmap, pgid);
+
+ pg->lock(true);
+
+ // we are holding the shard lock
+ assert(!pg->is_deleted());
+
+ pg->init(
+ role,
+ up,
+ up_primary,
+ acting,
+ acting_primary,
+ info->history,
+ info->past_intervals,
+ false,
+ rctx.transaction);
+
+ pg->handle_initialize(&rctx);
+ pg->handle_activate_map(&rctx);
+ rctx.created_pgs.insert(pg);
+
+ dispatch_context(rctx, pg.get(), osdmap, nullptr);
+
+ dout(10) << *pg << " is new" << dendl;
+ return pg;
+}
+
/*
* look up a pg. if we have it, great. if not, consider creating it IF the pg mapping
* hasn't changed since the given epoch and we are the primary.
}
}
-struct C_CompleteSplits : public Context {
+struct C_FinishSplits : public Context {
OSD *osd;
set<PGRef> pgs;
- C_CompleteSplits(OSD *osd, const set<PGRef> &in)
+ C_FinishSplits(OSD *osd, const set<PGRef> &in)
: osd(osd), pgs(in) {}
void finish(int r) override {
- Mutex::Locker l(osd->osd_lock);
- if (osd->is_stopping())
- return;
- PG::RecoveryCtx rctx = osd->create_context();
- for (set<PGRef>::iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- osd->pg_map_lock.get_write();
- (*i)->lock();
- PG *pg = i->get();
- osd->add_newly_split_pg(pg, &rctx);
- osd->service.complete_split((*i)->get_pgid());
- osd->pg_map_lock.put_write();
- osd->dispatch_context_transaction(rctx, pg);
- osd->wake_pg_waiters(*i);
- (*i)->unlock();
- }
+ osd->_finish_splits(pgs);
+ }
+};
+
+void OSD::_finish_splits(set<PGRef>& pgs)
+{
+ dout(10) << __func__ << " " << pgs << dendl;
+ Mutex::Locker l(osd_lock);
+ if (is_stopping())
+ return;
+ PG::RecoveryCtx rctx = create_context();
+ for (set<PGRef>::iterator i = pgs.begin();
+ i != pgs.end();
+ ++i) {
+ PG *pg = i->get();
+
+ pg->lock();
+ dout(10) << __func__ << " " << *pg << dendl;
+ epoch_t e = pg->get_osdmap()->get_epoch();
+ pg->unlock();
- osd->dispatch_context(rctx, 0, osd->service.get_osdmap());
+ pg_map_lock.get_write();
+ pg->get("PGMap"); // For pg_map
+ pg_map[pg->get_pgid()] = pg;
+ service.complete_split(pg->get_pgid());
+ service.pg_add_epoch(pg->pg_id, e);
+ pg_map_lock.put_write();
+
+ pg->lock();
+ pg->handle_initialize(&rctx);
+ pg->queue_null(e, e);
+ dispatch_context_transaction(rctx, pg);
+ wake_pg_waiters(pg);
+ pg->unlock();
}
+
+ dispatch_context(rctx, 0, service.get_osdmap());
};
void OSD::advance_pg(
service.pg_update_epoch(pg->pg_id, lastmap->get_epoch());
if (!new_pgs.empty()) {
- rctx->on_applied->add(new C_CompleteSplits(this, new_pgs));
+ rctx->on_applied->add(new C_FinishSplits(this, new_pgs));
}
}
int num_pg_primary = 0, num_pg_replica = 0, num_pg_stray = 0;
// scan pg's
+ vector<spg_t> pgids;
{
RWLock::RLocker l(pg_map_lock);
for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
it != pg_map.end();
++it) {
+ pgids.push_back(it->first);
PG *pg = it->second;
if (pg->is_deleted()) {
continue;
}
- pg->lock();
+ service.init_splits_between(it->first, service.get_osdmap(), osdmap);
+
+ // FIXME: this is lockless and racy, but we don't want to take pg lock
+ // here.
if (pg->is_primary())
num_pg_primary++;
else if (pg->is_replica())
num_pg_replica++;
else
num_pg_stray++;
-
- service.init_splits_between(it->first, service.get_osdmap(), osdmap);
-
- pg->unlock();
}
[[gnu::unused]] auto&& pending_create_locker = guardedly_lock(pending_creates_lock);
service.maybe_inject_dispatch_delay();
- // scan pg's
- {
- RWLock::RLocker l(pg_map_lock);
- for (ceph::unordered_map<spg_t,PG*>::iterator it = pg_map.begin();
- it != pg_map.end();
- ++it) {
- enqueue_peering_evt(
- it->first,
- PGPeeringEventRef(
- std::make_shared<PGPeeringEvent>(
- osdmap->get_epoch(),
- osdmap->get_epoch(),
- NullEvt())));
- }
-
- logger->set(l_osd_pg, pg_map.size());
+ for (auto pgid : pgids) {
+ enqueue_peering_evt(
+ pgid,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ osdmap->get_epoch(),
+ osdmap->get_epoch(),
+ NullEvt())));
}
+ logger->set(l_osd_pg, pgids.size());
logger->set(l_osd_pg_primary, num_pg_primary);
logger->set(l_osd_pg_replica, num_pg_replica);
logger->set(l_osd_pg_stray, num_pg_stray);
ctx.transaction = new ObjectStore::Transaction;
ctx.on_applied = new C_Contexts(cct);
ctx.on_safe = new C_Contexts(cct);
+ ctx.created_pgs.clear();
}
}
ctx.created_pgs.clear();
delete (ctx.transaction);
assert(tr == 0);
+ ctx.created_pgs.clear();
}
}
PGPeeringEventRef evt,
ThreadPool::TPHandle& handle)
{
- auto curmap = service.get_osdmap();
PG::RecoveryCtx rctx = create_context();
- if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
- advance_pg(curmap->get_epoch(), pg, handle, &rctx);
- }
- pg->do_peering_event(evt, &rctx);
- auto need_up_thru = pg->get_need_up_thru();
- auto same_interval_since = pg->get_same_interval_since();
- dispatch_context_transaction(rctx, pg, &handle);
- bool deleted = pg->is_deleted();
- pg->unlock();
-
- if (deleted) {
- RWLock::WLocker l(pg_map_lock);
- auto p = pg_map.find(pg->get_pgid());
- if (p != pg_map.end() &&
- p->second == pg) {
- dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
- pg_map.erase(p);
- pg->put("PGMap");
+ auto curmap = service.get_osdmap();
+ epoch_t need_up_thru = 0, same_interval_since = 0;
+ if (!pg) {
+ if (const MQuery *q = dynamic_cast<const MQuery*>(evt->evt.get())) {
+ handle_pg_query_nopg(*q);
} else {
- dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+ derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
+ ceph_abort();
+ }
+ } else {
+ if (curmap->get_epoch() > pg->get_osdmap()->get_epoch()) {
+ advance_pg(curmap->get_epoch(), pg, handle, &rctx);
+ }
+ pg->do_peering_event(evt, &rctx);
+ dispatch_context_transaction(rctx, pg, &handle);
+ need_up_thru = pg->get_need_up_thru();
+ same_interval_since = pg->get_same_interval_since();
+ bool deleted = pg->is_deleted();
+ pg->unlock();
+
+ if (deleted) {
+ RWLock::WLocker l(pg_map_lock);
+ auto p = pg_map.find(pg->get_pgid());
+ if (p != pg_map.end() &&
+ p->second == pg) {
+ dout(20) << __func__ << " removed pg " << pg << " from pg_map" << dendl;
+ pg_map.erase(p);
+ pg->put("PGMap");
+ } else {
+ dout(20) << __func__ << " failed to remove pg " << pg << " from pg_map" << dendl;
+ }
}
}
if (need_up_thru) {
queue_want_up_thru(same_interval_since);
}
- dispatch_context(rctx, 0, curmap, &handle);
+ dispatch_context(rctx, pg, curmap, &handle);
service.send_pg_temp();
}
void OSD::ShardedOpWQ::prune_pg_waiters(OSDMapRef osdmap, int whoami)
{
unsigned pushes_to_free = 0;
+ bool queued = false;
for (auto sdata : shard_list) {
Mutex::Locker l(sdata->sdata_op_ordering_lock);
sdata->waiting_for_pg_osdmap = osdmap;
ShardData::pg_slot& slot = p->second;
if (!slot.to_process.empty() && slot.num_running == 0) {
if (osdmap->is_up_acting_osd_shard(p->first, whoami)) {
- dout(20) << __func__ << " " << p->first << " maps to us, keeping"
- << dendl;
+ if (slot.pending_nopg) {
+ dout(20) << __func__ << " " << p->first << " maps to us, pending create,"
+ << " requeuing" << dendl;
+ for (auto& q : slot.to_process) {
+ pushes_to_free += q.get_reserved_pushes();
+ }
+ for (auto i = slot.to_process.rbegin();
+ i != slot.to_process.rend();
+ ++i) {
+ sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff);
+ }
+ slot.to_process.clear();
+ slot.waiting_for_pg = false;
+ slot.pending_nopg = false;
+ ++slot.requeue_seq;
+ queued = true;
+ } else {
+ dout(20) << __func__ << " " << p->first << " maps to us, keeping"
+ << dendl;
+ }
++p;
continue;
}
++p;
}
}
+ if (queued) {
+ sdata->sdata_lock.Lock();
+ sdata->sdata_cond.SignalOne();
+ sdata->sdata_lock.Unlock();
+ }
}
if (pushes_to_free > 0) {
osd->service.release_reserved_pushes(pushes_to_free);
return;
}
+ ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
+ suicide_interval);
+
// take next item
auto qi = std::move(slot.to_process.front());
slot.to_process.pop_front();
dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
+ unsigned pushes_to_free = 0;
- if (!pg) {
+ while (!pg) {
// should this pg shard exist on this osd in this (or a later) epoch?
OSDMapRef osdmap = sdata->waiting_for_pg_osdmap;
- if (osdmap->is_up_acting_osd_shard(token,
- osd->whoami)) {
- dout(20) << __func__ << " " << token
- << " no pg, should exist, will wait" << " on " << qi << dendl;
- slot.to_process.push_front(std::move(qi));
- slot.waiting_for_pg = true;
- } else if (qi.get_map_epoch() > osdmap->get_epoch()) {
+ const PGCreateInfo *create_info = qi.creates_pg();
+ if (qi.get_map_epoch() > osdmap->get_epoch()) {
dout(20) << __func__ << " " << token
<< " no pg, item epoch is "
<< qi.get_map_epoch() << " > " << osdmap->get_epoch()
<< ", will wait on " << qi << dendl;
+ if (!!create_info || !qi.requires_pg()) {
+ slot.pending_nopg = true;
+ }
slot.to_process.push_front(std::move(qi));
slot.waiting_for_pg = true;
+ } else if (osdmap->is_up_acting_osd_shard(token, osd->whoami)) {
+ if (osd->service.splitting(token)) {
+ dout(20) << __func__ << " " << token
+ << " splitting, waiting on " << qi << dendl;
+ slot.to_process.push_front(std::move(qi));
+ slot.waiting_for_pg = true;
+ } else if (create_info) {
+ dout(20) << __func__ << " " << token
+ << " no pg, should create on " << qi << dendl;
+ pg = osd->handle_pg_create_info(osdmap, create_info);
+ if (pg) {
+ // we created the pg! drop out and continue "normally"!
+ _wake_pg_slot(token, sdata, slot, &pushes_to_free);
+ break;
+ }
+ dout(20) << __func__ << " ignored create on " << qi << dendl;
+ } else if (!qi.requires_pg()) {
+ // for pg-less events, we run them under the ordering lock, since
+ // we don't have the pg lock to keep them ordered.
+ qi.run(osd, pg, tp_handle);
+ sdata->sdata_op_ordering_lock.Unlock();
+ return;
+ } else {
+ dout(20) << __func__ << " " << token
+ << " no pg, should exist, will wait on " << qi << dendl;
+ slot.to_process.push_front(std::move(qi));
+ slot.waiting_for_pg = true;
+ }
} else {
dout(20) << __func__ << " " << token
<< " no pg, shouldn't exist,"
}
sdata->sdata_op_ordering_lock.Unlock();
+ if (pushes_to_free) {
+ osd->service.release_reserved_pushes(pushes_to_free);
+ }
// osd_opwq_process marks the point at which an operation has been dequeued
// and will begin to be handled by a worker thread.
delete f;
*_dout << dendl;
- ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
- suicide_interval);
qi.run(osd, pg, tp_handle);
{