op_queue(get_io_queue()),
op_prio_cutoff(get_io_prio_cut()),
op_shardedwq(
- get_num_op_shards(),
this,
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
ss << "osd." << whoami;
trace_endpoint.copy_name(ss.str());
#endif
+
+ // initialize shards
+ num_shards = get_num_op_shards();
+ for (uint32_t i = 0; i < num_shards; i++) {
+ char lock_name[128] = {0};
+ snprintf(lock_name, sizeof(lock_name), "OSDShard.%d::sdata_lock", i);
+ char order_lock[128] = {0};
+ snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i);
+ OSDShard *one_shard = new OSDShard(
+ lock_name, order_lock,
+ cct->_conf->osd_op_pq_max_tokens_per_priority,
+ cct->_conf->osd_op_pq_min_cost, cct, op_queue);
+ shards.push_back(one_shard);
+ }
}
OSD::~OSD()
{
+ while (!shards.empty()) {
+ delete shards.back();
+ shards.pop_back();
+ }
delete authorize_handler_cluster_registry;
delete authorize_handler_service_registry;
delete class_handler;
PGRef OSD::_open_pg(
OSDMapRef createmap,
- OSDMapRef servicemap,
spg_t pgid)
{
- PG* pg = _make_pg(createmap, pgid);
+ PGRef pg = _make_pg(createmap, pgid);
{
RWLock::WLocker l(pg_map_lock);
assert(pg_map.count(pgid) == 0);
- pg_map[pgid] = pg;
+ pg_map[pgid] = pg.get();
pg_map_size = pg_map.size();
pg->get("PGMap"); // because it's in pg_map
service.pg_add_epoch(pg->pg_id, createmap->get_epoch());
-
- // make sure we register any splits that happened between when the pg
- // was created and our latest map.
- set<spg_t> new_children;
- service.init_splits_between(pgid, createmap, servicemap, &new_children);
- op_shardedwq.prime_splits(new_children);
}
return pg;
}
continue;
}
- PG *pg = NULL;
+ PGRef pg;
if (map_epoch > 0) {
OSDMapRef pgosdmap = service.try_get_map(map_epoch);
if (!pgosdmap) {
assert(0 == "Missing map in load_pgs");
}
}
- pg = _open_pg(pgosdmap, osdmap, pgid);
+ pg = _open_pg(pgosdmap, pgid);
} else {
- pg = _open_pg(osdmap, osdmap, pgid);
+ pg = _open_pg(osdmap, pgid);
}
// there can be no waiters here, so we don't call wake_pg_waiters
role = -1;
}
- PGRef pg = _open_pg(createmap, osdmap, pgid);
+ PGRef pg = _open_pg(createmap, pgid);
+
+ // We need to avoid racing with consume_map(). This should get
+ // redone as a structured waterfall of incoming osdmaps from osd ->
+ // shard, and something here that gets the to-be-split pg slots
+ // primed, even when they land on other shards. (I think it will be
+ // iterative to handle the case where it races with newer incoming
+ // maps.) For now, just cross our fingers. FIXME
+ {
+ set<spg_t> new_children;
+ service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children);
+ op_shardedwq.prime_splits(new_children);
+ }
pg->lock(true);
dispatch_context(rctx, pg.get(), osdmap, nullptr);
- dout(10) << *pg << " is new" << dendl;
+ dout(10) << __func__ << " new pg " << *pg << dendl;
return pg;
}
void OSD::ShardedOpWQ::_wake_pg_slot(
spg_t pgid,
- ShardData *sdata,
- ShardData::pg_slot& slot)
+ OSDShard *sdata,
+ OSDShard::pg_slot& slot)
{
dout(20) << __func__ << " " << pgid
<< " to_process " << slot.to_process
void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid)
{
- uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
- auto sdata = shard_list[shard_index];
+ uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
+ auto sdata = osd->shards[shard_index];
bool queued = false;
{
Mutex::Locker l(sdata->sdata_op_ordering_lock);
{
dout(20) << __func__ << " " << pgs << dendl;
for (auto pgid : pgs) {
- unsigned shard_index = pgid.hash_to_shard(shard_list.size());
- ShardData* sdata = shard_list[shard_index];
+ unsigned shard_index = pgid.hash_to_shard(osd->shards.size());
+ OSDShard* sdata = osd->shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
- ShardData::pg_slot& slot = sdata->pg_slots[pgid];
+ OSDShard::pg_slot& slot = sdata->pg_slots[pgid];
slot.waiting_for_split = true;
}
}
{
unsigned pushes_to_free = 0;
bool queued = false;
- for (auto sdata : shard_list) {
+ for (auto sdata : osd->shards) {
Mutex::Locker l(sdata->sdata_op_ordering_lock);
sdata->waiting_for_pg_osdmap = osdmap;
auto p = sdata->pg_slots.begin();
while (p != sdata->pg_slots.end()) {
- ShardData::pg_slot& slot = p->second;
+ OSDShard::pg_slot& slot = p->second;
if (slot.waiting_for_split) {
dout(20) << __func__ << " " << p->first
<< " waiting for split" << dendl;
void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg)
{
spg_t pgid = pg->get_pgid();
- uint32_t shard_index = pgid.hash_to_shard(shard_list.size());
- auto sdata = shard_list[shard_index];
+ uint32_t shard_index = pgid.hash_to_shard(osd->shards.size());
+ auto sdata = osd->shards[shard_index];
Mutex::Locker l(sdata->sdata_op_ordering_lock);
auto p = sdata->pg_slots.find(pgid);
if (p != sdata->pg_slots.end()) {
void OSD::ShardedOpWQ::clear_pg_slots()
{
- for (auto sdata : shard_list) {
+ for (auto sdata : osd->shards) {
Mutex::Locker l(sdata->sdata_op_ordering_lock);
sdata->pg_slots.clear();
sdata->waiting_for_pg_osdmap.reset();
void OSD::ShardedOpWQ::_add_slot_waiter(
spg_t pgid,
- OSD::ShardedOpWQ::ShardData::pg_slot& slot,
+ OSDShard::pg_slot& slot,
OpQueueItem&& qi)
{
if (qi.is_peering()) {
void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb)
{
- uint32_t shard_index = thread_index % num_shards;
- auto& sdata = shard_list[shard_index];
+ uint32_t shard_index = thread_index % osd->num_shards;
+ auto& sdata = osd->shards[shard_index];
assert(sdata);
// peek at spg_t
sdata->sdata_op_ordering_lock.Lock();
pg = osd->handle_pg_create_info(osdmap, create_info);
if (pg) {
// we created the pg! drop out and continue "normally"!
+ slot.pg = pg; // install in shard slot
_wake_pg_slot(token, sdata, slot);
break;
}
void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) {
uint32_t shard_index =
- item.get_ordering_token().hash_to_shard(shard_list.size());
+ item.get_ordering_token().hash_to_shard(osd->shards.size());
- ShardData* sdata = shard_list[shard_index];
+ OSDShard* sdata = osd->shards[shard_index];
assert (NULL != sdata);
unsigned priority = item.get_priority();
unsigned cost = item.get_cost();
void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item)
{
- auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size());
- auto& sdata = shard_list[shard_index];
+ auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size());
+ auto& sdata = osd->shards[shard_index];
assert(sdata);
sdata->sdata_op_ordering_lock.Lock();
auto p = sdata->pg_slots.find(item.get_ordering_token());
}} // namespace ceph::osd_cmds
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) {
+std::ostream& operator<<(std::ostream& out, const io_queue& q) {
switch(q) {
- case OSD::io_queue::prioritized:
+ case io_queue::prioritized:
out << "prioritized";
break;
- case OSD::io_queue::weightedpriority:
+ case io_queue::weightedpriority:
out << "weightedpriority";
break;
- case OSD::io_queue::mclock_opclass:
+ case io_queue::mclock_opclass:
out << "mclock_opclass";
break;
- case OSD::io_queue::mclock_client:
+ case io_queue::mclock_client:
out << "mclock_client";
break;
}
~OSDService();
};
+
+enum class io_queue {
+ prioritized,
+ weightedpriority,
+ mclock_opclass,
+ mclock_client,
+};
+
+struct OSDShard {
+ Mutex sdata_lock;
+ Cond sdata_cond;
+
+ Mutex sdata_op_ordering_lock; ///< protects all members below
+
+ OSDMapRef waiting_for_pg_osdmap;
+
+ struct pg_slot {
+ PGRef pg; ///< cached pg reference [optional]
+ deque<OpQueueItem> to_process; ///< order items for this slot
+ int num_running = 0; ///< _process threads doing pg lookup/lock
+
+ deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
+
+ /// waiting for map (peering evt)
+ map<epoch_t,deque<OpQueueItem>> waiting_peering;
+
+ /// incremented by wake_pg_waiters; indicates racing _process threads
+ /// should bail out (their op has been requeued)
+ uint64_t requeue_seq = 0;
+
+ /// waiting for split child to materialize
+ bool waiting_for_split = false;
+ };
+
+ /// map of slots for each spg_t. maintains ordering of items dequeued
+ /// from pqueue while _process thread drops shard lock to acquire the
+ /// pg lock. slots are removed only by prune_or_wake_pg_waiters.
+ unordered_map<spg_t,pg_slot> pg_slots;
+
+ /// priority queue
+ std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
+
+ bool stop_waiting = false;
+
+ void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
+ unsigned priority = item.get_priority();
+ unsigned cost = item.get_cost();
+ if (priority >= cutoff)
+ pqueue->enqueue_strict_front(
+ item.get_owner(),
+ priority, std::move(item));
+ else
+ pqueue->enqueue_front(
+ item.get_owner(),
+ priority, cost, std::move(item));
+ }
+
+ OSDShard(
+ string lock_name, string ordering_lock,
+ uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
+ io_queue opqueue)
+ : sdata_lock(lock_name.c_str(), false, true, false, cct),
+ sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
+ false, cct) {
+ if (opqueue == io_queue::weightedpriority) {
+ pqueue = std::make_unique<
+ WeightedPriorityQueue<OpQueueItem,uint64_t>>(
+ max_tok_per_prio, min_cost);
+ } else if (opqueue == io_queue::prioritized) {
+ pqueue = std::make_unique<
+ PrioritizedQueue<OpQueueItem,uint64_t>>(
+ max_tok_per_prio, min_cost);
+ } else if (opqueue == io_queue::mclock_opclass) {
+ pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
+ } else if (opqueue == io_queue::mclock_client) {
+ pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
+ }
+ }
+};
+
class OSD : public Dispatcher,
public md_config_obs_t {
/** OSD **/
friend struct C_OpenPGs;
// -- op queue --
- enum class io_queue {
- prioritized,
- weightedpriority,
- mclock_opclass,
- mclock_client,
- };
- friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+ friend std::ostream& operator<<(std::ostream& out, const io_queue& q);
const io_queue op_queue;
const unsigned int op_prio_cutoff;
class ShardedOpWQ
: public ShardedThreadPool::ShardedWQ<OpQueueItem>
{
- struct ShardData {
- Mutex sdata_lock;
- Cond sdata_cond;
-
- Mutex sdata_op_ordering_lock; ///< protects all members below
-
- OSDMapRef waiting_for_pg_osdmap;
- struct pg_slot {
- PGRef pg; ///< cached pg reference [optional]
- deque<OpQueueItem> to_process; ///< order items for this slot
- int num_running = 0; ///< _process threads doing pg lookup/lock
-
- deque<OpQueueItem> waiting; ///< waiting for pg (or map + pg)
-
- /// waiting for map (peering evt)
- map<epoch_t,deque<OpQueueItem>> waiting_peering;
-
- /// incremented by wake_pg_waiters; indicates racing _process threads
- /// should bail out (their op has been requeued)
- uint64_t requeue_seq = 0;
-
- /// waiting for split child to materialize
- bool waiting_for_split = false;
- };
-
- /// map of slots for each spg_t. maintains ordering of items dequeued
- /// from pqueue while _process thread drops shard lock to acquire the
- /// pg lock. slots are removed only by prune_or_wake_pg_waiters.
- unordered_map<spg_t,pg_slot> pg_slots;
-
- /// priority queue
- std::unique_ptr<OpQueue<OpQueueItem, uint64_t>> pqueue;
-
- bool stop_waiting = false;
-
- void _enqueue_front(OpQueueItem&& item, unsigned cutoff) {
- unsigned priority = item.get_priority();
- unsigned cost = item.get_cost();
- if (priority >= cutoff)
- pqueue->enqueue_strict_front(
- item.get_owner(),
- priority, std::move(item));
- else
- pqueue->enqueue_front(
- item.get_owner(),
- priority, cost, std::move(item));
- }
-
- ShardData(
- string lock_name, string ordering_lock,
- uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct,
- io_queue opqueue)
- : sdata_lock(lock_name.c_str(), false, true, false, cct),
- sdata_op_ordering_lock(ordering_lock.c_str(), false, true,
- false, cct) {
- if (opqueue == io_queue::weightedpriority) {
- pqueue = std::make_unique<
- WeightedPriorityQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::prioritized) {
- pqueue = std::make_unique<
- PrioritizedQueue<OpQueueItem,uint64_t>>(
- max_tok_per_prio, min_cost);
- } else if (opqueue == io_queue::mclock_opclass) {
- pqueue = std::make_unique<ceph::mClockOpClassQueue>(cct);
- } else if (opqueue == io_queue::mclock_client) {
- pqueue = std::make_unique<ceph::mClockClientQueue>(cct);
- }
- }
- }; // struct ShardData
-
- vector<ShardData*> shard_list;
OSD *osd;
- uint32_t num_shards;
public:
- ShardedOpWQ(uint32_t pnum_shards,
- OSD *o,
+ ShardedOpWQ(OSD *o,
time_t ti,
time_t si,
ShardedThreadPool* tp)
: ShardedThreadPool::ShardedWQ<OpQueueItem>(ti, si, tp),
- osd(o),
- num_shards(pnum_shards) {
- for (uint32_t i = 0; i < num_shards; i++) {
- char lock_name[32] = {0};
- snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
- char order_lock[32] = {0};
- snprintf(order_lock, sizeof(order_lock), "%s.%d",
- "OSD:ShardedOpWQ:order:", i);
- ShardData* one_shard = new ShardData(
- lock_name, order_lock,
- osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
- osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue);
- shard_list.push_back(one_shard);
- }
- }
- ~ShardedOpWQ() override {
- while (!shard_list.empty()) {
- delete shard_list.back();
- shard_list.pop_back();
- }
+ osd(o) {
}
void _add_slot_waiter(
spg_t token,
- ShardData::pg_slot& slot,
+ OSDShard::pg_slot& slot,
OpQueueItem&& qi);
/// wake any pg waiters after a PG is split
void wake_pg_split_waiters(spg_t pgid);
- void _wake_pg_slot(spg_t pgid, ShardData *sdata, ShardData::pg_slot& slot);
+ void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot);
/// prime slots for splitting pgs
void prime_splits(const set<spg_t>& pgs);
void _enqueue_front(OpQueueItem&& item) override;
void return_waiting_threads() override {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ OSDShard* sdata = osd->shards[i];
assert (NULL != sdata);
sdata->sdata_lock.Lock();
sdata->stop_waiting = true;
}
void stop_return_waiting_threads() override {
- for(uint32_t i = 0; i < num_shards; i++) {
- ShardData* sdata = shard_list[i];
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ OSDShard* sdata = osd->shards[i];
assert (NULL != sdata);
sdata->sdata_lock.Lock();
sdata->stop_waiting = false;
}
void dump(Formatter *f) {
- for(uint32_t i = 0; i < num_shards; i++) {
- auto &&sdata = shard_list[i];
+ for(uint32_t i = 0; i < osd->num_shards; i++) {
+ auto &&sdata = osd->shards[i];
char queue_name[32] = {0};
snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i);
};
bool is_shard_empty(uint32_t thread_index) override {
- uint32_t shard_index = thread_index % num_shards;
- auto &&sdata = shard_list[shard_index];
+ uint32_t shard_index = thread_index % osd->num_shards;
+ auto &&sdata = osd->shards[shard_index];
assert(sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
return sdata->pqueue->empty();
return service.add_map_inc_bl(e, bl);
}
+public:
+ // -- shards --
+ vector<OSDShard*> shards;
+ uint32_t num_shards = 0;
+
protected:
// -- placement groups --
RWLock pg_map_lock; // this lock orders *above* individual PG _locks
protected:
PGRef _open_pg(
OSDMapRef createmap, ///< map pg is created in
- OSDMapRef servicemap, ///< latest service map
spg_t pg);
PG* _make_pg(OSDMapRef createmap, spg_t pgid);
};
-std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q);
+std::ostream& operator<<(std::ostream& out, const io_queue& q);
//compatibility of the executable