uint32_t shard_index = pgid.hash_to_shard(num_shards);
auto sdata = shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
- auto r = sdata->pg_slots.emplace(pgid, make_unique<OSDShard::pg_slot>());
+ auto r = sdata->pg_slots.emplace(pgid, make_unique<OSDShardPGSlot>());
assert(r.second);
auto *slot = r.first->second.get();
dout(20) << __func__ << " " << pgid << " " << pg << dendl;
#undef dout_prefix
#define dout_prefix *_dout << "osd." << osd->get_nodeid() << ":" << shard_id << "." << __func__ << " "
-void OSDShard::_attach_pg(pg_slot *slot, PG *pg)
+void OSDShard::_attach_pg(OSDShardPGSlot *slot, PG *pg)
{
dout(10) << pg->pg_id << " " << pg << dendl;
slot->pg = pg;
++osd->num_pgs;
}
-void OSDShard::_detach_pg(pg_slot *slot)
+void OSDShard::_detach_pg(OSDShardPGSlot *slot)
{
dout(10) << slot->pg->pg_id << " " << slot->pg << dendl;
slot->pg->osd_shard = nullptr;
// check slots
auto p = pg_slots.begin();
while (p != pg_slots.end()) {
- OSDShard::pg_slot *slot = p->second.get();
+ OSDShardPGSlot *slot = p->second.get();
const spg_t& pgid = p->first;
dout(20) << __func__ << " " << pgid << dendl;
if (old_osdmap &&
void OSDShard::_wake_pg_slot(
spg_t pgid,
- OSDShard::pg_slot *slot)
+ OSDShardPGSlot *slot)
{
dout(20) << __func__ << " " << pgid
<< " to_process " << slot->to_process
while (p != pgids->end()) {
unsigned shard_index = p->hash_to_shard(osd->num_shards);
if (shard_index == shard_id) {
- auto r = pg_slots.emplace(*p, make_unique<OSDShard::pg_slot>());
+ auto r = pg_slots.emplace(*p, make_unique<OSDShardPGSlot>());
if (r.second) {
dout(10) << "priming slot " << *p << dendl;
r.first->second->waiting_for_split = true;
void OSD::ShardedOpWQ::_add_slot_waiter(
spg_t pgid,
- OSDShard::pg_slot *slot,
+ OSDShardPGSlot *slot,
OpQueueItem&& qi)
{
if (qi.is_peering()) {
uint64_t requeue_seq;
const auto token = item.get_ordering_token();
{
- auto r = sdata->pg_slots.emplace(token, make_unique<OSDShard::pg_slot>());
+ auto r = sdata->pg_slots.emplace(token, make_unique<OSDShardPGSlot>());
auto *slot = r.first->second.get();
dout(30) << __func__ << " " << token
<< (r.second ? " (new)" : "")
mclock_client,
};
+struct OSDShardPGSlot {
+ PGRef pg; ///< pg reference
+ deque<OpQueueItem> to_process; ///< order items for this slot
+ int num_running = 0; ///< _process threads doing pg lookup/lock
+
+ deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
+
+ /// waiting for map (peering evt)
+ map<epoch_t,deque<OpQueueItem>> waiting_peering;
+
+ /// incremented by wake_pg_waiters; indicates racing _process threads
+ /// should bail out (their op has been requeued)
+ uint64_t requeue_seq = 0;
+
+ /// waiting for split child to materialize
+ bool waiting_for_split = false;
+};
+
struct OSDShard {
const unsigned shard_id;
CephContext *cct;
OSDMapRef osdmap;
- struct pg_slot {
- PGRef pg; ///< cached pg reference [optional]
- deque<OpQueueItem> to_process; ///< order items for this slot
- int num_running = 0; ///< _process threads doing pg lookup/lock
-
- deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
-
- /// waiting for map (peering evt)
- map<epoch_t,deque<OpQueueItem>> waiting_peering;
-
- /// incremented by wake_pg_waiters; indicates racing _process threads
- /// should bail out (their op has been requeued)
- uint64_t requeue_seq = 0;
-
- /// waiting for split child to materialize
- bool waiting_for_split = false;
- };
-
/// map of slots for each spg_t. maintains ordering of items dequeued
/// from pqueue while _process thread drops shard lock to acquire the
- /// pg lock. slots are removed by consume_map.
- unordered_map<spg_t,unique_ptr<pg_slot>> pg_slots;
+ /// pg lock. stale slots are removed by consume_map.
+ unordered_map<spg_t,unique_ptr<OSDShardPGSlot>> pg_slots;
/// priority queue
std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
priority, cost, std::move(item));
}
- void _attach_pg(pg_slot *slot, PG *pg);
- void _detach_pg(pg_slot *slot);
+ void _attach_pg(OSDShardPGSlot *slot, PG *pg);
+ void _detach_pg(OSDShardPGSlot *slot);
/// push osdmap into shard
void consume_map(
unsigned *pushes_to_free,
set<spg_t> *new_children);
- void _wake_pg_slot(spg_t pgid, OSDShard::pg_slot *slot);
+ void _wake_pg_slot(spg_t pgid, OSDShardPGSlot *slot);
void _prime_splits(set<spg_t> *pgids);
void prime_splits(OSDMapRef as_of_osdmap, set<spg_t> *pgids);
void _add_slot_waiter(
spg_t token,
- OSDShard::pg_slot *slot,
+ OSDShardPGSlot *slot,
OpQueueItem&& qi);
/// try to do some work