for (auto& s : shards) {
Mutex::Locker l(s->sdata_op_ordering_lock);
for (auto& j : s->pg_slots) {
- if (j.second.pg &&
- !j.second.pg->is_deleted()) {
- v->push_back(j.second.pg);
+ if (j.second->pg &&
+ !j.second->pg->is_deleted()) {
+ v->push_back(j.second->pg);
if (clear_too) {
- s->_detach_pg(j.second);
+ s->_detach_pg(j.second.get());
}
}
}
for (auto& s : shards) {
Mutex::Locker l(s->sdata_op_ordering_lock);
for (auto& j : s->pg_slots) {
- if (j.second.pg &&
- !j.second.pg->is_deleted()) {
+ if (j.second->pg &&
+ !j.second->pg->is_deleted()) {
v->push_back(j.first);
}
}
uint32_t shard_index = pgid.hash_to_shard(num_shards);
auto sdata = shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
- auto& slot = sdata->pg_slots[pgid];
- assert(!slot.pg);
+ auto r = sdata->pg_slots.emplace(pgid, make_unique<OSDShard::pg_slot>());
+ assert(r.second);
+ auto *slot = r.first->second.get();
dout(20) << __func__ << " " << pgid << " " << pg << dendl;
sdata->_attach_pg(slot, pg.get());
}
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pg->pg_id);
if (p != sdata->pg_slots.end() &&
- p->second.pg) {
+ p->second->pg) {
dout(20) << __func__ << " " << pg->pg_id << " " << pg << dendl;
- sdata->_detach_pg(p->second);
+ sdata->_detach_pg(p->second.get());
} else {
dout(20) << __func__ << " " << pg->pg_id << " not found" << dendl;
}
if (p == sdata->pg_slots.end()) {
return nullptr;
}
- return p->second.pg;
+ return p->second->pg;
}
PG *OSD::_lookup_lock_pg(spg_t pgid)
#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " "
-void OSDShard::_attach_pg(pg_slot& slot, PG *pg)
+void OSDShard::_attach_pg(pg_slot *slot, PG *pg)
{
dout(10) << pg->pg_id << " " << pg << dendl;
- slot.pg = pg;
+ slot->pg = pg;
pg->osd_shard = this;
++osd->num_pgs;
}
-void OSDShard::_detach_pg(pg_slot& slot)
+void OSDShard::_detach_pg(pg_slot *slot)
{
- dout(10) << slot.pg->pg_id << " " << slot.pg << dendl;
- slot.pg->osd_shard = nullptr;
- slot.pg = nullptr;
+ dout(10) << slot->pg->pg_id << " " << slot->pg << dendl;
+ slot->pg->osd_shard = nullptr;
+ slot->pg = nullptr;
--osd->num_pgs;
}
// check slots
auto p = pg_slots.begin();
while (p != pg_slots.end()) {
- OSDShard::pg_slot& slot = p->second;
+ OSDShard::pg_slot *slot = p->second.get();
const spg_t& pgid = p->first;
dout(20) << __func__ << " " << pgid << dendl;
if (old_osdmap &&
osd->service.identify_split_children(old_osdmap, new_osdmap, pgid,
new_children);
}
- if (slot.waiting_for_split) {
+ if (slot->waiting_for_split) {
dout(20) << __func__ << " " << pgid
<< " waiting for split" << dendl;
++p;
continue;
}
- if (!slot.waiting_peering.empty()) {
- epoch_t first = slot.waiting_peering.begin()->first;
+ if (!slot->waiting_peering.empty()) {
+ epoch_t first = slot->waiting_peering.begin()->first;
if (first <= osdmap->get_epoch()) {
dout(20) << __func__ << " " << pgid
<< " pending_peering first epoch " << first
++p;
continue;
}
- if (!slot.waiting.empty()) {
+ if (!slot->waiting.empty()) {
if (osdmap->is_up_acting_osd_shard(pgid, osd->get_nodeid())) {
dout(20) << __func__ << " " << pgid << " maps to us, keeping"
<< dendl;
++p;
continue;
}
- while (!slot.waiting.empty() &&
- slot.waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
- auto& qi = slot.waiting.front();
+ while (!slot->waiting.empty() &&
+ slot->waiting.front().get_map_epoch() <= osdmap->get_epoch()) {
+ auto& qi = slot->waiting.front();
dout(20) << __func__ << " " << pgid
<< " waiting item " << qi
<< " epoch " << qi.get_map_epoch()
<< " <= " << osdmap->get_epoch()
<< ", stale, dropping" << dendl;
*pushes_to_free += qi.get_reserved_pushes();
- slot.waiting.pop_front();
+ slot->waiting.pop_front();
}
- if (slot.waiting.empty() &&
- slot.num_running == 0 &&
- !slot.pg) {
+ if (slot->waiting.empty() &&
+ slot->num_running == 0 &&
+ !slot->pg) {
dout(20) << __func__ << " " << pgid << " empty, pruning" << dendl;
p = pg_slots.erase(p);
continue;
void OSDShard::_wake_pg_slot(
spg_t pgid,
- OSDShard::pg_slot& slot)
+ OSDShard::pg_slot *slot)
{
dout(20) << __func__ << " " << pgid
- << " to_process " << slot.to_process
- << " waiting " << slot.waiting
- << " waiting_peering " << slot.waiting_peering << dendl;
- for (auto i = slot.to_process.rbegin();
- i != slot.to_process.rend();
+ << " to_process " << slot->to_process
+ << " waiting " << slot->waiting
+ << " waiting_peering " << slot->waiting_peering << dendl;
+ for (auto i = slot->to_process.rbegin();
+ i != slot->to_process.rend();
++i) {
_enqueue_front(std::move(*i), osd->op_prio_cutoff);
}
- slot.to_process.clear();
- for (auto i = slot.waiting.rbegin();
- i != slot.waiting.rend();
+ slot->to_process.clear();
+ for (auto i = slot->waiting.rbegin();
+ i != slot->waiting.rend();
++i) {
_enqueue_front(std::move(*i), osd->op_prio_cutoff);
}
- slot.waiting.clear();
- for (auto i = slot.waiting_peering.rbegin();
- i != slot.waiting_peering.rend();
+ slot->waiting.clear();
+ for (auto i = slot->waiting_peering.rbegin();
+ i != slot->waiting_peering.rend();
++i) {
// this is overkill; we requeue everything, even if some of these items are
// waiting for maps we don't have yet. FIXME.
_enqueue_front(std::move(*j), osd->op_prio_cutoff);
}
}
- slot.waiting_peering.clear();
- slot.waiting_for_split = false;
- ++slot.requeue_seq;
+ slot->waiting_peering.clear();
+ slot->waiting_for_split = false;
+ ++slot->requeue_seq;
}
void OSDShard::prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids)
while (p != pgids->end()) {
unsigned shard_index = p->hash_to_shard(osd->num_shards);
if (shard_index == shard_id) {
- auto i = pg_slots.find(*p);
- if (i == pg_slots.end()) {
+ auto r = pg_slots.emplace(*p, make_unique<OSDShard::pg_slot>());
+ if (r.second) {
dout(10) << "priming slot " << *p << dendl;
- OSDShard::pg_slot& slot = pg_slots[*p];
- slot.waiting_for_split = true;
+ r.first->second->waiting_for_split = true;
} else {
auto q = pg_slots.find(*p);
assert(q != pg_slots.end());
dout(10) << pg->pg_id << " " << pg << dendl;
auto p = pg_slots.find(pg->pg_id);
assert(p != pg_slots.end());
- auto& slot = p->second;
- assert(!slot.pg);
- assert(slot.waiting_for_split);
+ auto *slot = p->second.get();
+ assert(!slot->pg);
+ assert(slot->waiting_for_split);
_attach_pg(slot, pg);
_wake_pg_slot(pg->pg_id, slot);
}
void OSD::ShardedOpWQ::_add_slot_waiter(
spg_t pgid,
- OSDShard::pg_slot& slot,
+ OSDShard::pg_slot *slot,
OpQueueItem&& qi)
{
if (qi.is_peering()) {
<< " peering, item epoch is "
<< qi.get_map_epoch()
<< ", will wait on " << qi << dendl;
- slot.waiting_peering[qi.get_map_epoch()].push_back(std::move(qi));
+ slot->waiting_peering[qi.get_map_epoch()].push_back(std::move(qi));
} else {
dout(20) << __func__ << " " << pgid
<< " item epoch is "
<< qi.get_map_epoch()
<< ", will wait on " << qi << dendl;
- slot.waiting.push_back(std::move(qi));
+ slot->waiting.push_back(std::move(qi));
}
}
uint64_t requeue_seq;
const auto token = item.get_ordering_token();
{
- auto& slot = sdata->pg_slots[token];
+ auto r = sdata->pg_slots.emplace(token, make_unique<OSDShard::pg_slot>());
+ auto *slot = r.first->second.get();
dout(30) << __func__ << " " << token
- << " to_process " << slot.to_process
- << " waiting " << slot.waiting
- << " waiting_peering " << slot.waiting_peering
+ << (r.second ? " (new)" : "")
+ << " to_process " << slot->to_process
+ << " waiting " << slot->waiting
+ << " waiting_peering " << slot->waiting_peering
<< dendl;
- if (slot.waiting_for_split
- || (item.is_peering() && !slot.waiting_peering.empty())
- || (!item.is_peering() && !slot.waiting.empty())) {
+ if (slot->waiting_for_split
+ || (item.is_peering() && !slot->waiting_peering.empty())
+ || (!item.is_peering() && !slot->waiting.empty())) {
dout(20) << __func__ << " " << token << " already waiting, adding " << item
<< dendl;
_add_slot_waiter(token, slot, std::move(item));
return;
}
// note the requeue seq now...
- requeue_seq = slot.requeue_seq;
- pg = slot.pg;
- slot.to_process.push_back(std::move(item));
- dout(20) << __func__ << " " << slot.to_process.back()
+ requeue_seq = slot->requeue_seq;
+ pg = slot->pg;
+ slot->to_process.push_back(std::move(item));
+ dout(20) << __func__ << " " << slot->to_process.back()
<< " queued" << dendl;
- ++slot.num_running;
+ ++slot->num_running;
}
sdata->sdata_op_ordering_lock.Unlock();
auto q = sdata->pg_slots.find(token);
assert(q != sdata->pg_slots.end());
- auto& slot = q->second;
- --slot.num_running;
+ auto *slot = q->second.get();
+ --slot->num_running;
- if (slot.to_process.empty()) {
+ if (slot->to_process.empty()) {
// raced with wake_pg_waiters or consume_map
dout(20) << __func__ << " " << token
<< " nothing queued" << dendl;
sdata->sdata_op_ordering_lock.Unlock();
return;
}
- if (requeue_seq != slot.requeue_seq) {
+ if (requeue_seq != slot->requeue_seq) {
dout(20) << __func__ << " " << token
- << " requeue_seq " << slot.requeue_seq << " > our "
+ << " requeue_seq " << slot->requeue_seq << " > our "
<< requeue_seq << ", we raced with wake_pg_waiters"
<< dendl;
if (pg) {
return;
}
dout(30) << __func__ << " " << token
- << " to_process " << slot.to_process
- << " waiting " << slot.waiting
- << " waiting_peering " << slot.waiting_peering << dendl;
+ << " to_process " << slot->to_process
+ << " waiting " << slot->waiting
+ << " waiting_peering " << slot->waiting_peering << dendl;
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
suicide_interval);
// take next item
- auto qi = std::move(slot.to_process.front());
- slot.to_process.pop_front();
+ auto qi = std::move(slot->to_process.front());
+ slot->to_process.pop_front();
dout(20) << __func__ << " " << qi << " pg " << pg << dendl;
unsigned pushes_to_free = 0;
set<spg_t> new_children;
// should this pg shard exist on this osd in this (or a later) epoch?
osdmap = sdata->osdmap;
const PGCreateInfo *create_info = qi.creates_pg();
- if (slot.waiting_for_split) {
+ if (slot->waiting_for_split) {
dout(20) << __func__ << " " << token
<< " splitting" << dendl;
_add_slot_waiter(token, slot, std::move(qi));
assert(sdata);
sdata->sdata_op_ordering_lock.Lock();
auto p = sdata->pg_slots.find(item.get_ordering_token());
- if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) {
+ if (p != sdata->pg_slots.end() &&
+ !p->second->to_process.empty()) {
// we may be racing with _process, which has dequeued a new item
// from pqueue, put it on to_process, and is now busy taking the
// pg lock. ensure this old requeued item is ordered before any
// such newer item in to_process.
- p->second.to_process.push_front(std::move(item));
- item = std::move(p->second.to_process.back());
- p->second.to_process.pop_back();
+ p->second->to_process.push_front(std::move(item));
+ item = std::move(p->second->to_process.back());
+ p->second->to_process.pop_back();
dout(20) << __func__
- << " " << p->second.to_process.front()
+ << " " << p->second->to_process.front()
<< " shuffled w/ " << item << dendl;
} else {
dout(20) << __func__ << " " << item << dendl;