From 0bf6ac893a9c3a0e81cb90520aa2e2a4d130debe Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Mon, 5 Feb 2018 18:22:40 -0600 Subject: [PATCH] osd: move ShardedOpWQ::ShardData -> OSDShard Soon we will destroy pg_map! Signed-off-by: Sage Weil --- src/osd/OSD.cc | 101 ++++++++++++++--------- src/osd/OSD.h | 213 +++++++++++++++++++++++-------------------------- 2 files changed, 161 insertions(+), 153 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7a3aa8ca36da3..e8a4b301df1e5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -2036,7 +2036,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, op_queue(get_io_queue()), op_prio_cutoff(get_io_prio_cut()), op_shardedwq( - get_num_op_shards(), this, cct->_conf->osd_op_thread_timeout, cct->_conf->osd_op_thread_suicide_timeout, @@ -2067,10 +2066,28 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_, ss << "osd." << whoami; trace_endpoint.copy_name(ss.str()); #endif + + // initialize shards + num_shards = get_num_op_shards(); + for (uint32_t i = 0; i < num_shards; i++) { + char lock_name[128] = {0}; + snprintf(lock_name, sizeof(lock_name), "OSDShard.%d::sdata_lock", i); + char order_lock[128] = {0}; + snprintf(order_lock, sizeof(order_lock), "OSDShard.%d::sdata_op_ordering_lock", i); + OSDShard *one_shard = new OSDShard( + lock_name, order_lock, + cct->_conf->osd_op_pq_max_tokens_per_priority, + cct->_conf->osd_op_pq_min_cost, cct, op_queue); + shards.push_back(one_shard); + } } OSD::~OSD() { + while (!shards.empty()) { + delete shards.back(); + shards.pop_back(); + } delete authorize_handler_cluster_registry; delete authorize_handler_service_registry; delete class_handler; @@ -3807,23 +3824,16 @@ void OSD::recursive_remove_collection(CephContext* cct, PGRef OSD::_open_pg( OSDMapRef createmap, - OSDMapRef servicemap, spg_t pgid) { - PG* pg = _make_pg(createmap, pgid); + PGRef pg = _make_pg(createmap, pgid); { RWLock::WLocker l(pg_map_lock); assert(pg_map.count(pgid) == 0); - pg_map[pgid] = pg; + pg_map[pgid] = pg.get(); pg_map_size = pg_map.size(); pg->get("PGMap"); // because it's in pg_map service.pg_add_epoch(pg->pg_id, createmap->get_epoch()); - - // make sure we register any splits that happened between when the pg - // was created and our latest map. - set new_children; - service.init_splits_between(pgid, createmap, servicemap, &new_children); - op_shardedwq.prime_splits(new_children); } return pg; } @@ -3957,7 +3967,7 @@ void OSD::load_pgs() continue; } - PG *pg = NULL; + PGRef pg; if (map_epoch > 0) { OSDMapRef pgosdmap = service.try_get_map(map_epoch); if (!pgosdmap) { @@ -3975,9 +3985,9 @@ void OSD::load_pgs() assert(0 == "Missing map in load_pgs"); } } - pg = _open_pg(pgosdmap, osdmap, pgid); + pg = _open_pg(pgosdmap, pgid); } else { - pg = _open_pg(osdmap, osdmap, pgid); + pg = _open_pg(osdmap, pgid); } // there can be no waiters here, so we don't call wake_pg_waiters @@ -4052,7 +4062,19 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info) role = -1; } - PGRef pg = _open_pg(createmap, osdmap, pgid); + PGRef pg = _open_pg(createmap, pgid); + + // We need to avoid racing with consume_map(). This should get + // redone as a structured waterfall of incoming osdmaps from osd -> + // shard, and something here that gets the to-be-split pg slots + // primed, even when they land on other shards. (I think it will be + // iterative to handle the case where it races with newer incoming + // maps.) For now, just cross our fingers. FIXME + { + set new_children; + service.init_splits_between(pg->pg_id, createmap, osdmap, &new_children); + op_shardedwq.prime_splits(new_children); + } pg->lock(true); @@ -4076,7 +4098,7 @@ PGRef OSD::handle_pg_create_info(OSDMapRef osdmap, const PGCreateInfo *info) dispatch_context(rctx, pg.get(), osdmap, nullptr); - dout(10) << *pg << " is new" << dendl; + dout(10) << __func__ << " new pg " << *pg << dendl; return pg; } @@ -9453,8 +9475,8 @@ int OSD::init_op_flags(OpRequestRef& op) void OSD::ShardedOpWQ::_wake_pg_slot( spg_t pgid, - ShardData *sdata, - ShardData::pg_slot& slot) + OSDShard *sdata, + OSDShard::pg_slot& slot) { dout(20) << __func__ << " " << pgid << " to_process " << slot.to_process @@ -9488,8 +9510,8 @@ void OSD::ShardedOpWQ::_wake_pg_slot( void OSD::ShardedOpWQ::wake_pg_split_waiters(spg_t pgid) { - uint32_t shard_index = pgid.hash_to_shard(shard_list.size()); - auto sdata = shard_list[shard_index]; + uint32_t shard_index = pgid.hash_to_shard(osd->shards.size()); + auto sdata = osd->shards[shard_index]; bool queued = false; { Mutex::Locker l(sdata->sdata_op_ordering_lock); @@ -9510,10 +9532,10 @@ void OSD::ShardedOpWQ::prime_splits(const set& pgs) { dout(20) << __func__ << " " << pgs << dendl; for (auto pgid : pgs) { - unsigned shard_index = pgid.hash_to_shard(shard_list.size()); - ShardData* sdata = shard_list[shard_index]; + unsigned shard_index = pgid.hash_to_shard(osd->shards.size()); + OSDShard* sdata = osd->shards[shard_index]; Mutex::Locker l(sdata->sdata_op_ordering_lock); - ShardData::pg_slot& slot = sdata->pg_slots[pgid]; + OSDShard::pg_slot& slot = sdata->pg_slots[pgid]; slot.waiting_for_split = true; } } @@ -9522,12 +9544,12 @@ void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami) { unsigned pushes_to_free = 0; bool queued = false; - for (auto sdata : shard_list) { + for (auto sdata : osd->shards) { Mutex::Locker l(sdata->sdata_op_ordering_lock); sdata->waiting_for_pg_osdmap = osdmap; auto p = sdata->pg_slots.begin(); while (p != sdata->pg_slots.end()) { - ShardData::pg_slot& slot = p->second; + OSDShard::pg_slot& slot = p->second; if (slot.waiting_for_split) { dout(20) << __func__ << " " << p->first << " waiting for split" << dendl; @@ -9588,8 +9610,8 @@ void OSD::ShardedOpWQ::prune_or_wake_pg_waiters(OSDMapRef osdmap, int whoami) void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg) { spg_t pgid = pg->get_pgid(); - uint32_t shard_index = pgid.hash_to_shard(shard_list.size()); - auto sdata = shard_list[shard_index]; + uint32_t shard_index = pgid.hash_to_shard(osd->shards.size()); + auto sdata = osd->shards[shard_index]; Mutex::Locker l(sdata->sdata_op_ordering_lock); auto p = sdata->pg_slots.find(pgid); if (p != sdata->pg_slots.end()) { @@ -9602,7 +9624,7 @@ void OSD::ShardedOpWQ::clear_pg_pointer(PG *pg) void OSD::ShardedOpWQ::clear_pg_slots() { - for (auto sdata : shard_list) { + for (auto sdata : osd->shards) { Mutex::Locker l(sdata->sdata_op_ordering_lock); sdata->pg_slots.clear(); sdata->waiting_for_pg_osdmap.reset(); @@ -9612,7 +9634,7 @@ void OSD::ShardedOpWQ::clear_pg_slots() void OSD::ShardedOpWQ::_add_slot_waiter( spg_t pgid, - OSD::ShardedOpWQ::ShardData::pg_slot& slot, + OSDShard::pg_slot& slot, OpQueueItem&& qi) { if (qi.is_peering()) { @@ -9635,8 +9657,8 @@ void OSD::ShardedOpWQ::_add_slot_waiter( void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) { - uint32_t shard_index = thread_index % num_shards; - auto& sdata = shard_list[shard_index]; + uint32_t shard_index = thread_index % osd->num_shards; + auto& sdata = osd->shards[shard_index]; assert(sdata); // peek at spg_t sdata->sdata_op_ordering_lock.Lock(); @@ -9806,6 +9828,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) pg = osd->handle_pg_create_info(osdmap, create_info); if (pg) { // we created the pg! drop out and continue "normally"! + slot.pg = pg; // install in shard slot _wake_pg_slot(token, sdata, slot); break; } @@ -9905,9 +9928,9 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { uint32_t shard_index = - item.get_ordering_token().hash_to_shard(shard_list.size()); + item.get_ordering_token().hash_to_shard(osd->shards.size()); - ShardData* sdata = shard_list[shard_index]; + OSDShard* sdata = osd->shards[shard_index]; assert (NULL != sdata); unsigned priority = item.get_priority(); unsigned cost = item.get_cost(); @@ -9930,8 +9953,8 @@ void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) { - auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size()); - auto& sdata = shard_list[shard_index]; + auto shard_index = item.get_ordering_token().hash_to_shard(osd->shards.size()); + auto& sdata = osd->shards[shard_index]; assert(sdata); sdata->sdata_op_ordering_lock.Lock(); auto p = sdata->pg_slots.find(item.get_ordering_token()); @@ -9984,18 +10007,18 @@ int heap(CephContext& cct, const cmdmap_t& cmdmap, Formatter& f, }} // namespace ceph::osd_cmds -std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q) { +std::ostream& operator<<(std::ostream& out, const io_queue& q) { switch(q) { - case OSD::io_queue::prioritized: + case io_queue::prioritized: out << "prioritized"; break; - case OSD::io_queue::weightedpriority: + case io_queue::weightedpriority: out << "weightedpriority"; break; - case OSD::io_queue::mclock_opclass: + case io_queue::mclock_opclass: out << "mclock_opclass"; break; - case OSD::io_queue::mclock_client: + case io_queue::mclock_client: out << "mclock_client"; break; } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 30ab8eec00f27..6568edaba61da 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1106,6 +1106,86 @@ public: ~OSDService(); }; + +enum class io_queue { + prioritized, + weightedpriority, + mclock_opclass, + mclock_client, +}; + +struct OSDShard { + Mutex sdata_lock; + Cond sdata_cond; + + Mutex sdata_op_ordering_lock; ///< protects all members below + + OSDMapRef waiting_for_pg_osdmap; + + struct pg_slot { + PGRef pg; ///< cached pg reference [optional] + deque to_process; ///< order items for this slot + int num_running = 0; ///< _process threads doing pg lookup/lock + + deque waiting; ///< waiting for pg (or map + pg) + + /// waiting for map (peering evt) + map> waiting_peering; + + /// incremented by wake_pg_waiters; indicates racing _process threads + /// should bail out (their op has been requeued) + uint64_t requeue_seq = 0; + + /// waiting for split child to materialize + bool waiting_for_split = false; + }; + + /// map of slots for each spg_t. maintains ordering of items dequeued + /// from pqueue while _process thread drops shard lock to acquire the + /// pg lock. slots are removed only by prune_or_wake_pg_waiters. + unordered_map pg_slots; + + /// priority queue + std::unique_ptr> pqueue; + + bool stop_waiting = false; + + void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); + if (priority >= cutoff) + pqueue->enqueue_strict_front( + item.get_owner(), + priority, std::move(item)); + else + pqueue->enqueue_front( + item.get_owner(), + priority, cost, std::move(item)); + } + + OSDShard( + string lock_name, string ordering_lock, + uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, + io_queue opqueue) + : sdata_lock(lock_name.c_str(), false, true, false, cct), + sdata_op_ordering_lock(ordering_lock.c_str(), false, true, + false, cct) { + if (opqueue == io_queue::weightedpriority) { + pqueue = std::make_unique< + WeightedPriorityQueue>( + max_tok_per_prio, min_cost); + } else if (opqueue == io_queue::prioritized) { + pqueue = std::make_unique< + PrioritizedQueue>( + max_tok_per_prio, min_cost); + } else if (opqueue == io_queue::mclock_opclass) { + pqueue = std::make_unique(cct); + } else if (opqueue == io_queue::mclock_client) { + pqueue = std::make_unique(cct); + } + } +}; + class OSD : public Dispatcher, public md_config_obs_t { /** OSD **/ @@ -1527,13 +1607,7 @@ private: friend struct C_OpenPGs; // -- op queue -- - enum class io_queue { - prioritized, - weightedpriority, - mclock_opclass, - mclock_client, - }; - friend std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); + friend std::ostream& operator<<(std::ostream& out, const io_queue& q); const io_queue op_queue; const unsigned int op_prio_cutoff; @@ -1566,119 +1640,26 @@ private: class ShardedOpWQ : public ShardedThreadPool::ShardedWQ { - struct ShardData { - Mutex sdata_lock; - Cond sdata_cond; - - Mutex sdata_op_ordering_lock; ///< protects all members below - - OSDMapRef waiting_for_pg_osdmap; - struct pg_slot { - PGRef pg; ///< cached pg reference [optional] - deque to_process; ///< order items for this slot - int num_running = 0; ///< _process threads doing pg lookup/lock - - deque waiting; ///< waiting for pg (or map + pg) - - /// waiting for map (peering evt) - map> waiting_peering; - - /// incremented by wake_pg_waiters; indicates racing _process threads - /// should bail out (their op has been requeued) - uint64_t requeue_seq = 0; - - /// waiting for split child to materialize - bool waiting_for_split = false; - }; - - /// map of slots for each spg_t. maintains ordering of items dequeued - /// from pqueue while _process thread drops shard lock to acquire the - /// pg lock. slots are removed only by prune_or_wake_pg_waiters. - unordered_map pg_slots; - - /// priority queue - std::unique_ptr> pqueue; - - bool stop_waiting = false; - - void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { - unsigned priority = item.get_priority(); - unsigned cost = item.get_cost(); - if (priority >= cutoff) - pqueue->enqueue_strict_front( - item.get_owner(), - priority, std::move(item)); - else - pqueue->enqueue_front( - item.get_owner(), - priority, cost, std::move(item)); - } - - ShardData( - string lock_name, string ordering_lock, - uint64_t max_tok_per_prio, uint64_t min_cost, CephContext *cct, - io_queue opqueue) - : sdata_lock(lock_name.c_str(), false, true, false, cct), - sdata_op_ordering_lock(ordering_lock.c_str(), false, true, - false, cct) { - if (opqueue == io_queue::weightedpriority) { - pqueue = std::make_unique< - WeightedPriorityQueue>( - max_tok_per_prio, min_cost); - } else if (opqueue == io_queue::prioritized) { - pqueue = std::make_unique< - PrioritizedQueue>( - max_tok_per_prio, min_cost); - } else if (opqueue == io_queue::mclock_opclass) { - pqueue = std::make_unique(cct); - } else if (opqueue == io_queue::mclock_client) { - pqueue = std::make_unique(cct); - } - } - }; // struct ShardData - - vector shard_list; OSD *osd; - uint32_t num_shards; public: - ShardedOpWQ(uint32_t pnum_shards, - OSD *o, + ShardedOpWQ(OSD *o, time_t ti, time_t si, ShardedThreadPool* tp) : ShardedThreadPool::ShardedWQ(ti, si, tp), - osd(o), - num_shards(pnum_shards) { - for (uint32_t i = 0; i < num_shards; i++) { - char lock_name[32] = {0}; - snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i); - char order_lock[32] = {0}; - snprintf(order_lock, sizeof(order_lock), "%s.%d", - "OSD:ShardedOpWQ:order:", i); - ShardData* one_shard = new ShardData( - lock_name, order_lock, - osd->cct->_conf->osd_op_pq_max_tokens_per_priority, - osd->cct->_conf->osd_op_pq_min_cost, osd->cct, osd->op_queue); - shard_list.push_back(one_shard); - } - } - ~ShardedOpWQ() override { - while (!shard_list.empty()) { - delete shard_list.back(); - shard_list.pop_back(); - } + osd(o) { } void _add_slot_waiter( spg_t token, - ShardData::pg_slot& slot, + OSDShard::pg_slot& slot, OpQueueItem&& qi); /// wake any pg waiters after a PG is split void wake_pg_split_waiters(spg_t pgid); - void _wake_pg_slot(spg_t pgid, ShardData *sdata, ShardData::pg_slot& slot); + void _wake_pg_slot(spg_t pgid, OSDShard *sdata, OSDShard::pg_slot& slot); /// prime slots for splitting pgs void prime_splits(const set& pgs); @@ -1702,8 +1683,8 @@ private: void _enqueue_front(OpQueueItem&& item) override; void return_waiting_threads() override { - for(uint32_t i = 0; i < num_shards; i++) { - ShardData* sdata = shard_list[i]; + for(uint32_t i = 0; i < osd->num_shards; i++) { + OSDShard* sdata = osd->shards[i]; assert (NULL != sdata); sdata->sdata_lock.Lock(); sdata->stop_waiting = true; @@ -1713,8 +1694,8 @@ private: } void stop_return_waiting_threads() override { - for(uint32_t i = 0; i < num_shards; i++) { - ShardData* sdata = shard_list[i]; + for(uint32_t i = 0; i < osd->num_shards; i++) { + OSDShard* sdata = osd->shards[i]; assert (NULL != sdata); sdata->sdata_lock.Lock(); sdata->stop_waiting = false; @@ -1723,8 +1704,8 @@ private: } void dump(Formatter *f) { - for(uint32_t i = 0; i < num_shards; i++) { - auto &&sdata = shard_list[i]; + for(uint32_t i = 0; i < osd->num_shards; i++) { + auto &&sdata = osd->shards[i]; char queue_name[32] = {0}; snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i); @@ -1767,8 +1748,8 @@ private: }; bool is_shard_empty(uint32_t thread_index) override { - uint32_t shard_index = thread_index % num_shards; - auto &&sdata = shard_list[shard_index]; + uint32_t shard_index = thread_index % osd->num_shards; + auto &&sdata = osd->shards[shard_index]; assert(sdata); Mutex::Locker l(sdata->sdata_op_ordering_lock); return sdata->pqueue->empty(); @@ -1851,6 +1832,11 @@ private: return service.add_map_inc_bl(e, bl); } +public: + // -- shards -- + vector shards; + uint32_t num_shards = 0; + protected: // -- placement groups -- RWLock pg_map_lock; // this lock orders *above* individual PG _locks @@ -1880,7 +1866,6 @@ public: protected: PGRef _open_pg( OSDMapRef createmap, ///< map pg is created in - OSDMapRef servicemap, ///< latest service map spg_t pg); PG* _make_pg(OSDMapRef createmap, spg_t pgid); @@ -2265,7 +2250,7 @@ public: }; -std::ostream& operator<<(std::ostream& out, const OSD::io_queue& q); +std::ostream& operator<<(std::ostream& out, const io_queue& q); //compatibility of the executable -- 2.39.5