From b679abcfc555dad0e490c6f6a47b1a5fb17dbc69 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Wed, 12 Jul 2017 15:19:13 +0800 Subject: [PATCH] osd: move pgid into OpQueueItem this pave the road to generalize OpWQ We're going to want to be able to queue things that are not ordered by the PG lock. To that end, this patch genearlizes OSD::ShardedOpWQ to use a type which can specify an ordering token and locking structure other than a PG. There is a lot of collateral damage which I didn't feel was worth separating out into other commits. The code in ShardedOpWQ itself got some superficial cleanup. Also, the item being queued has been switched to not use a boost::variant. It was a cute way before to make the type easily copyable, but adding more visitors for the locking support would have been annoying. Instead, the variant is a unique_ptr to an interface. This makes the queue item type no longer copyable, which is just as well since we don't really want to be copying queue items anyway (duplicates would be most likely a bug) Signed-off-by: Samuel Just Signed-off-by: Kefu Chai Signed-off-by: Myoungwon Oh --- src/osd/OSD.cc | 250 ++++++++++++++++++++++++---------- src/osd/OSD.h | 92 +++++-------- src/osd/OpQueueItem.cc | 20 +-- src/osd/OpQueueItem.h | 217 +++++++++++++++++++---------- src/osd/PG.cc | 28 ++-- src/osd/PrimaryLogPG.cc | 22 ++- src/osd/mClockClientQueue.cc | 25 ++-- src/osd/mClockClientQueue.h | 4 +- src/osd/mClockOpClassQueue.cc | 25 ++-- src/osd/mClockOpClassQueue.h | 4 +- 10 files changed, 419 insertions(+), 268 deletions(-) diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 211cdfa0d82..81f71d41be0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1674,14 +1674,14 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch(); } -void OSDService::enqueue_back(spg_t pgid, OpQueueItem&& qi) +void OSDService::enqueue_back(OpQueueItem&& qi) { - osd->op_shardedwq.queue(make_pair(pgid, std::move(qi))); + osd->op_shardedwq.queue(std::move(qi)); } -void OSDService::enqueue_front(spg_t pgid, OpQueueItem&& qi) +void OSDService::enqueue_front(OpQueueItem&& qi) { - osd->op_shardedwq.queue_front(make_pair(pgid, std::move(qi))); + osd->op_shardedwq.queue_front(std::move(qi)); } void OSDService::queue_for_peering(PG *pg) @@ -1691,19 +1691,119 @@ void OSDService::queue_for_peering(PG *pg) void OSDService::queue_for_snap_trim(PG *pg) { + class PGSnapTrim : public PGOpQueueable { + epoch_t epoch_queued; + public: + PGSnapTrim( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const override final { + return op_type_t::bg_snaptrim; + } + ostream &print(ostream &rhs) const override final { + return rhs << "PGSnapTrim(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final { + pg->snap_trimmer(epoch_queued); + } + }; dout(10) << "queueing " << *pg << " for snaptrim" << dendl; - osd->op_shardedwq.queue( - make_pair( - pg->pg_id, - OpQueueItem( - PGSnapTrim(pg->get_osdmap()->get_epoch()), - cct->_conf->osd_snap_trim_cost, - cct->_conf->osd_snap_trim_priority, - ceph_clock_now(), - 0, - pg->get_osdmap()->get_epoch()))); + enqueue_back( + OpQueueItem( + unique_ptr( + new PGSnapTrim(pg->pg_id, pg->get_osdmap()->get_epoch())), + cct->_conf->osd_snap_trim_cost, + cct->_conf->osd_snap_trim_priority, + ceph_clock_now(), + 0, + pg->get_osdmap()->get_epoch())); } +void OSDService::queue_for_scrub(PG *pg, bool with_high_priority) +{ + class PGScrub : public PGOpQueueable { + epoch_t epoch_queued; + public: + PGScrub( + spg_t pg, + epoch_t epoch_queued) + : PGOpQueueable(pg), epoch_queued(epoch_queued) {} + op_type_t get_op_type() const override final { + return op_type_t::bg_scrub; + } + ostream &print(ostream &rhs) const override final { + return rhs << "PGScrub(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << ")"; + } + void run( + OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final { + pg->scrub(epoch_queued, handle); + } + }; + unsigned scrub_queue_priority = pg->scrubber.priority; + if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) { + scrub_queue_priority = cct->_conf->osd_client_op_priority; + } + const auto epoch = pg->get_osdmap()->get_epoch(); + enqueue_back( + OpQueueItem( + unique_ptr(new PGScrub(pg->info.pgid, epoch)), + cct->_conf->osd_snap_trim_cost, + scrub_queue_priority, + ceph_clock_now(), + 0, + epoch)); +} + +void OSDService::_queue_for_recovery( + std::pair p, + uint64_t reserved_pushes) +{ + class PGRecovery : public PGOpQueueable { + epoch_t epoch_queued; + uint64_t reserved_pushes; + public: + PGRecovery( + spg_t pg, + epoch_t epoch_queued, + uint64_t reserved_pushes) + : PGOpQueueable(pg), + epoch_queued(epoch_queued), + reserved_pushes(reserved_pushes) {} + op_type_t get_op_type() const override final { + return op_type_t::bg_recovery; + } + virtual ostream &print(ostream &rhs) const override final { + return rhs << "PGRecovery(pgid=" << get_pgid() + << "epoch_queued=" << epoch_queued + << "reserved_pushes=" << reserved_pushes + << ")"; + } + virtual uint64_t get_reserved_pushes() const override final { + return reserved_pushes; + } + virtual void run( + OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final { + osd->do_recovery(pg.get(), epoch_queued, reserved_pushes, handle); + } + }; + assert(recovery_lock.is_locked_by_me()); + enqueue_back( + OpQueueItem( + unique_ptr( + new PGRecovery( + p.second->info.pgid, p.first, reserved_pushes)), + cct->_conf->osd_recovery_cost, + cct->_conf->osd_recovery_priority, + ceph_clock_now(), + 0, + p.first)); +} // ==================================================================== // OSD @@ -8909,11 +9009,16 @@ void OSD::enqueue_op(spg_t pg, OpRequestRef& op, epoch_t epoch) op->osd_trace.keyval("cost", op->get_req()->get_cost()); op->mark_queued_for_pg(); logger->tinc(l_osd_op_before_queue_op_lat, latency); - op_shardedwq.queue(make_pair(pg, OpQueueItem(op, epoch))); + op_shardedwq.queue( + OpQueueItem( + unique_ptr(new PGOpItem(pg, op)), + op->get_req()->get_cost(), + op->get_req()->get_priority(), + op->get_req()->get_recv_stamp(), + op->get_req()->get_source().num(), + epoch)); } - - /* * NOTE: dequeue called in worker thread, with pg lock */ @@ -9454,7 +9559,7 @@ void OSD::ShardedOpWQ::wake_pg_waiters(spg_t pgid) for (auto i = p->second.to_process.rbegin(); i != p->second.to_process.rend(); ++i) { - sdata->_enqueue_front(make_pair(pgid, *i), osd->op_prio_cutoff); + sdata->_enqueue_front(std::move(*i), osd->op_prio_cutoff); } for (auto& q : p->second.to_process) { pushes_to_free += q.get_reserved_pushes(); @@ -9548,9 +9653,8 @@ void OSD::ShardedOpWQ::clear_pg_slots() void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) { uint32_t shard_index = thread_index % num_shards; - ShardData *sdata = shard_list[shard_index]; - assert(NULL != sdata); - + auto& sdata = shard_list[shard_index]; + assert(sdata); // peek at spg_t sdata->sdata_op_ordering_lock.Lock(); if (sdata->pqueue->empty()) { @@ -9569,30 +9673,31 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } } - pair item = sdata->pqueue->dequeue(); + OpQueueItem item = sdata->pqueue->dequeue(); if (osd->is_stopping()) { sdata->sdata_op_ordering_lock.Unlock(); return; // OSD shutdown, discard. } PGRef pg; uint64_t requeue_seq; + const auto token = item.get_ordering_token(); { - auto& slot = sdata->pg_slots[item.first]; - dout(30) << __func__ << " " << item.first + auto& slot = sdata->pg_slots[token]; + dout(30) << __func__ << " " << token << " to_process " << slot.to_process << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl; - slot.to_process.push_back(item.second); + slot.to_process.push_back(std::move(item)); // note the requeue seq now... requeue_seq = slot.requeue_seq; if (slot.waiting_for_pg) { // save ourselves a bit of effort - dout(20) << __func__ << " " << item.first << " item " << item.second + dout(20) << __func__ << slot.to_process.back() << " queued, waiting_for_pg" << dendl; sdata->sdata_op_ordering_lock.Unlock(); return; } pg = slot.pg; - dout(20) << __func__ << " " << item.first << " item " << item.second + dout(20) << __func__ << " " << slot.to_process.back() << " queued" << dendl; ++slot.num_running; } @@ -9602,27 +9707,26 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) // [lookup +] lock pg (if we have it) if (!pg) { - pg = osd->_lookup_lock_pg(item.first); + pg = osd->_lookup_lock_pg(token); } else { pg->lock(); } osd->service.maybe_inject_dispatch_delay(); - boost::optional qi; - // we don't use a Mutex::Locker here because of the // osd->service.release_reserved_pushes() call below sdata->sdata_op_ordering_lock.Lock(); - auto q = sdata->pg_slots.find(item.first); + auto q = sdata->pg_slots.find(token); assert(q != sdata->pg_slots.end()); auto& slot = q->second; --slot.num_running; if (slot.to_process.empty()) { // raced with wake_pg_waiters or prune_pg_waiters - dout(20) << __func__ << " " << item.first << " nothing queued" << dendl; + dout(20) << __func__ << " " << token + << " nothing queued" << dendl; if (pg) { pg->unlock(); } @@ -9630,7 +9734,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) return; } if (requeue_seq != slot.requeue_seq) { - dout(20) << __func__ << " " << item.first + dout(20) << __func__ << " " << token << " requeue_seq " << slot.requeue_seq << " > our " << requeue_seq << ", we raced with wake_pg_waiters" << dendl; @@ -9644,12 +9748,13 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) dout(20) << __func__ << " " << item.first << " set pg to " << pg << dendl; slot.pg = pg; } - dout(30) << __func__ << " " << item.first << " to_process " << slot.to_process + dout(30) << __func__ << " " << token + << " to_process " << slot.to_process << " waiting_for_pg=" << (int)slot.waiting_for_pg << dendl; // make sure we're not already waiting for this pg if (slot.waiting_for_pg) { - dout(20) << __func__ << " " << item.first << " item " << item.second + dout(20) << __func__ << " " << token << " slot is waiting_for_pg" << dendl; if (pg) { pg->unlock(); @@ -9659,30 +9764,32 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) } // take next item - qi = slot.to_process.front(); + auto qi = std::move(slot.to_process.front()); slot.to_process.pop_front(); - dout(20) << __func__ << " " << item.first << " item " << *qi - << " pg " << pg << dendl; + dout(20) << __func__ << " " << qi << " pg " << pg << dendl; if (!pg) { // should this pg shard exist on this osd in this (or a later) epoch? OSDMapRef osdmap = sdata->waiting_for_pg_osdmap; - if (osdmap->is_up_acting_osd_shard(item.first, osd->whoami)) { - dout(20) << __func__ << " " << item.first - << " no pg, should exist, will wait on " << *qi << dendl; - slot.to_process.push_front(*qi); + if (osdmap->is_up_acting_osd_shard(token, + osd->whoami)) { + dout(20) << __func__ << " " << token + << " no pg, should exist, will wait" << " on " << qi << dendl; + slot.to_process.push_front(std::move(qi)); slot.waiting_for_pg = true; - } else if (qi->get_map_epoch() > osdmap->get_epoch()) { - dout(20) << __func__ << " " << item.first << " no pg, item epoch is " - << qi->get_map_epoch() << " > " << osdmap->get_epoch() - << ", will wait on " << *qi << dendl; - slot.to_process.push_front(*qi); + } else if (qi.get_map_epoch() > osdmap->get_epoch()) { + dout(20) << __func__ << " " << token + << " no pg, item epoch is " + << qi.get_map_epoch() << " > " << osdmap->get_epoch() + << ", will wait on " << qi << dendl; + slot.to_process.push_front(std::move(qi)); slot.waiting_for_pg = true; } else { - dout(20) << __func__ << " " << item.first << " no pg, shouldn't exist," - << " dropping " << *qi << dendl; + dout(20) << __func__ << " " << token + << " no pg, shouldn't exist," + << " dropping " << qi << dendl; // share map with client? - if (boost::optional _op = qi->maybe_get_op()) { + if (boost::optional _op = qi.maybe_get_op()) { Session *session = static_cast( (*_op)->get_req()->get_connection()->get_priv()); if (session) { @@ -9690,7 +9797,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) session->put(); } } - unsigned pushes_to_free = qi->get_reserved_pushes(); + unsigned pushes_to_free = qi.get_reserved_pushes(); if (pushes_to_free > 0) { sdata->sdata_op_ordering_lock.Unlock(); osd->service.release_reserved_pushes(pushes_to_free); @@ -9708,7 +9815,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) { #ifdef WITH_LTTNG osd_reqid_t reqid; - if (boost::optional _op = qi->maybe_get_op()) { + if (boost::optional _op = qi.maybe_get_op()) { reqid = (*_op)->get_reqid(); } #endif @@ -9727,12 +9834,12 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, suicide_interval); - qi->run(osd, pg, tp_handle); + qi.run(osd, pg, tp_handle); { #ifdef WITH_LTTNG osd_reqid_t reqid; - if (boost::optional _op = qi->maybe_get_op()) { + if (boost::optional _op = qi.maybe_get_op()) { reqid = (*_op)->get_reqid(); } #endif @@ -9743,24 +9850,23 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb) pg->unlock(); } -void OSD::ShardedOpWQ::_enqueue(pair&& item) { +void OSD::ShardedOpWQ::_enqueue(OpQueueItem&& item) { uint32_t shard_index = - item.first.hash_to_shard(shard_list.size()); + item.get_ordering_token().hash_to_shard(shard_list.size()); ShardData* sdata = shard_list[shard_index]; assert (NULL != sdata); - unsigned priority = item.second.get_priority(); - unsigned cost = item.second.get_cost(); + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); sdata->sdata_op_ordering_lock.Lock(); - dout(20) << __func__ << " " << item.first << " " << item.second << dendl; + dout(20) << __func__ << " " << item << dendl; if (priority >= osd->op_prio_cutoff) sdata->pqueue->enqueue_strict( - item.second.get_owner(), priority, std::move(item)); + item.get_owner(), priority, std::move(item)); else sdata->pqueue->enqueue( - item.second.get_owner(), - priority, cost, std::move(item)); + item.get_owner(), priority, cost, std::move(item)); sdata->sdata_op_ordering_lock.Unlock(); sdata->sdata_lock.Lock(); @@ -9769,26 +9875,26 @@ void OSD::ShardedOpWQ::_enqueue(pair&& item) { } -void OSD::ShardedOpWQ::_enqueue_front(pair&& item) +void OSD::ShardedOpWQ::_enqueue_front(OpQueueItem&& item) { - uint32_t shard_index = item.first.hash_to_shard(shard_list.size()); - ShardData* sdata = shard_list[shard_index]; - assert (NULL != sdata); + auto shard_index = item.get_ordering_token().hash_to_shard(shard_list.size()); + auto& sdata = shard_list[shard_index]; + assert(sdata); sdata->sdata_op_ordering_lock.Lock(); - auto p = sdata->pg_slots.find(item.first); + auto p = sdata->pg_slots.find(item.get_ordering_token()); if (p != sdata->pg_slots.end() && !p->second.to_process.empty()) { // we may be racing with _process, which has dequeued a new item // from pqueue, put it on to_process, and is now busy taking the // pg lock. ensure this old requeued item is ordered before any // such newer item in to_process. - p->second.to_process.push_front(item.second); - item.second = p->second.to_process.back(); + p->second.to_process.push_front(std::move(item)); + item = std::move(p->second.to_process.back()); p->second.to_process.pop_back(); - dout(20) << __func__ << " " << item.first + dout(20) << __func__ << " " << p->second.to_process.front() - << " shuffled w/ " << item.second << dendl; + << " shuffled w/ " << item << dendl; } else { - dout(20) << __func__ << " " << item.first << " " << item.second << dendl; + dout(20) << __func__ << " " << item << dendl; } sdata->_enqueue_front(std::move(item), osd->op_prio_cutoff); sdata->sdata_op_ordering_lock.Unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index dfb4d0b11f5..2df4ad6e87e 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -367,8 +367,8 @@ public: GenContextWQ recovery_gen_wq; ClassHandler *&class_handler; - void enqueue_back(spg_t pgid, OpQueueItem&& qi); - void enqueue_front(spg_t pgid, OpQueueItem&& qi); + void enqueue_back(OpQueueItem&& qi); + void enqueue_front(OpQueueItem&& qi); void maybe_inject_dispatch_delay() { if (g_conf->osd_debug_inject_dispatch_delay_probability > 0) { @@ -859,22 +859,7 @@ public: AsyncReserver snap_reserver; void queue_for_snap_trim(PG *pg); - - void queue_for_scrub(PG *pg, bool with_high_priority) { - unsigned scrub_queue_priority = pg->scrubber.priority; - if (with_high_priority && scrub_queue_priority < cct->_conf->osd_client_op_priority) { - scrub_queue_priority = cct->_conf->osd_client_op_priority; - } - enqueue_back( - pg->get_pgid(), - OpQueueItem( - PGScrub(pg->get_osdmap()->get_epoch()), - cct->_conf->osd_scrub_cost, - scrub_queue_priority, - ceph_clock_now(), - 0, - pg->get_osdmap()->get_epoch())); - } + void queue_for_scrub(PG *pg, bool with_high_priority); private: // -- pg recovery and associated throttling -- @@ -891,18 +876,7 @@ private: bool _recover_now(uint64_t *available_pushes); void _maybe_queue_recovery(); void _queue_for_recovery( - pair p, uint64_t reserved_pushes) { - assert(recovery_lock.is_locked_by_me()); - enqueue_back( - p.second->get_pgid(), - OpQueueItem( - PGRecovery(p.first, reserved_pushes), - cct->_conf->osd_recovery_cost, - cct->_conf->osd_recovery_priority, - ceph_clock_now(), - 0, - p.first)); - } + pair p, uint64_t reserved_pushes); public: void start_recovery_op(PG *pg, const hobject_t& soid); void finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue); @@ -1618,10 +1592,10 @@ private: * wake_pg_waiters, and (2) when wake_pg_waiters just ran, waiting_for_pg * and already requeued the items. */ - friend class OpQueueItem; + friend class PGOpItem; class ShardedOpWQ - : public ShardedThreadPool::ShardedWQ> + : public ShardedThreadPool::ShardedWQ { struct ShardData { Mutex sdata_lock; @@ -1650,18 +1624,18 @@ private: unordered_map pg_slots; /// priority queue - std::unique_ptr, uint64_t>> pqueue; + std::unique_ptr> pqueue; - void _enqueue_front(pair&& item, unsigned cutoff) { - unsigned priority = item.second.get_priority(); - unsigned cost = item.second.get_cost(); + void _enqueue_front(OpQueueItem&& item, unsigned cutoff) { + unsigned priority = item.get_priority(); + unsigned cost = item.get_cost(); if (priority >= cutoff) pqueue->enqueue_strict_front( - item.second.get_owner(), + item.get_owner(), priority, std::move(item)); else pqueue->enqueue_front( - item.second.get_owner(), + item.get_owner(), priority, cost, std::move(item)); } @@ -1673,15 +1647,13 @@ private: sdata_op_ordering_lock(ordering_lock.c_str(), false, true, false, cct) { if (opqueue == io_queue::weightedpriority) { - pqueue = std::unique_ptr - ,uint64_t>>( - new WeightedPriorityQueue,uint64_t>( - max_tok_per_prio, min_cost)); + pqueue = ceph::make_unique< + WeightedPriorityQueue>( + max_tok_per_prio, min_cost); } else if (opqueue == io_queue::prioritized) { - pqueue = std::unique_ptr - ,uint64_t>>( - new PrioritizedQueue,uint64_t>( - max_tok_per_prio, min_cost)); + pqueue = ceph::make_unique< + PrioritizedQueue>( + max_tok_per_prio, min_cost); } else if (opqueue == io_queue::mclock_opclass) { pqueue = ceph::make_unique(cct); } else if (opqueue == io_queue::mclock_client) { @@ -1700,7 +1672,7 @@ private: time_t ti, time_t si, ShardedThreadPool* tp) - : ShardedThreadPool::ShardedWQ>(ti, si, tp), + : ShardedThreadPool::ShardedWQ(ti, si, tp), osd(o), num_shards(pnum_shards) { for (uint32_t i = 0; i < num_shards; i++) { @@ -1739,10 +1711,10 @@ private: void _process(uint32_t thread_index, heartbeat_handle_d *hb) override; /// enqueue a new item - void _enqueue(pair && item) override; + void _enqueue(OpQueueItem&& item) override; /// requeue an old item (at the front of the line) - void _enqueue_front(pair && item) override; + void _enqueue_front(OpQueueItem&& item) override; void return_waiting_threads() override { for(uint32_t i = 0; i < num_shards; i++) { @@ -1756,12 +1728,14 @@ private: void dump(Formatter *f) { for(uint32_t i = 0; i < num_shards; i++) { - ShardData* sdata = shard_list[i]; - char lock_name[32] = {0}; - snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i); - assert (NULL != sdata); + auto &&sdata = shard_list[i]; + + char queue_name[32] = {0}; + snprintf(queue_name, sizeof(queue_name), "%s%d", "OSD:ShardedOpWQ:", i); + assert(NULL != sdata); + sdata->sdata_op_ordering_lock.Lock(); - f->open_object_section(lock_name); + f->open_object_section(queue_name); sdata->pqueue->dump(f); f->close_section(); sdata->sdata_op_ordering_lock.Unlock(); @@ -1783,9 +1757,9 @@ private: out_ops->push_front(*mop); } } - bool operator()(const pair &op) { - if (op.first == pgid) { - accumulate(op.second); + bool operator()(const OpQueueItem &op) { + if (op.get_ordering_token() == pgid) { + accumulate(op); return true; } else { return false; @@ -1798,8 +1772,8 @@ private: bool is_shard_empty(uint32_t thread_index) override { uint32_t shard_index = thread_index % num_shards; - ShardData* sdata = shard_list[shard_index]; - assert(NULL != sdata); + auto &&sdata = shard_list[shard_index]; + assert(sdata); Mutex::Locker l(sdata->sdata_op_ordering_lock); return sdata->pqueue->empty(); } diff --git a/src/osd/OpQueueItem.cc b/src/osd/OpQueueItem.cc index bf944161866..02bd1672771 100644 --- a/src/osd/OpQueueItem.cc +++ b/src/osd/OpQueueItem.cc @@ -12,24 +12,12 @@ * */ - -#include "PG.h" #include "OpQueueItem.h" #include "OSD.h" - -void OpQueueItem::RunVis::operator()(const OpRequestRef &op) { +void PGOpItem::run(OSD *osd, + PGRef& pg, + ThreadPool::TPHandle &handle) +{ osd->dequeue_op(pg, op, handle); } - -void OpQueueItem::RunVis::operator()(const PGSnapTrim &op) { - pg->snap_trimmer(op.epoch_queued); -} - -void OpQueueItem::RunVis::operator()(const PGScrub &op) { - pg->scrub(op.epoch_queued, handle); -} - -void OpQueueItem::RunVis::operator()(const PGRecovery &op) { - osd->do_recovery(pg.get(), op.epoch_queued, op.reserved_pushes, handle); -} diff --git a/src/osd/OpQueueItem.h b/src/osd/OpQueueItem.h index 97c4c5ae22f..212be71c96a 100644 --- a/src/osd/OpQueueItem.h +++ b/src/osd/OpQueueItem.h @@ -53,96 +53,163 @@ struct PGRecovery { } }; - class OpQueueItem { - typedef boost::variant< - OpRequestRef, - PGSnapTrim, - PGScrub, - PGRecovery - > QVariant; - QVariant qvariant; - int cost; - unsigned priority; - utime_t start_time; - uint64_t owner; ///< global id (e.g., client.XXX) - epoch_t map_epoch; ///< an epoch we expect the PG to exist in - - struct RunVis : public boost::static_visitor<> { - OSD *osd; - PGRef &pg; - ThreadPool::TPHandle &handle; - RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) - : osd(osd), pg(pg), handle(handle) {} - void operator()(const OpRequestRef &op); - void operator()(const PGSnapTrim &op); - void operator()(const PGScrub &op); - void operator()(const PGRecovery &op); - }; // struct RunVis - - struct StringifyVis : public boost::static_visitor { - std::string operator()(const OpRequestRef &op) { - return stringify(op); - } - std::string operator()(const PGSnapTrim &op) { - return "PGSnapTrim"; +public: + class OrderLocker { + public: + using Ref = unique_ptr; + virtual void lock() = 0; + virtual void unlock() = 0; + virtual ~OrderLocker() {} + }; + // Abstraction for operations queueable in the op queue + class OpQueueable { + public: + enum class op_type_t { + client_op, + osd_subop, + bg_snaptrim, + bg_recovery, + bg_scrub + }; + using Ref = std::unique_ptr; + + /// Items with the same queue token will end up in the same shard + virtual uint32_t get_queue_token() const = 0; + + /* Items will be dequeued and locked atomically w.r.t. other items with the + * same ordering token */ + virtual const spg_t& get_ordering_token() const = 0; + virtual OrderLocker::Ref get_order_locker(PGRef pg) = 0; + virtual op_type_t get_op_type() const = 0; + virtual boost::optional maybe_get_op() const { + return boost::none; } - std::string operator()(const PGScrub &op) { - return "PGScrub"; + + virtual uint64_t get_reserved_pushes() const { + return 0; } - std::string operator()(const PGRecovery &op) { - return "PGRecovery"; + + virtual ostream &print(ostream &rhs) const = 0; + + virtual void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) = 0; + virtual ~OpQueueable() {} + friend ostream& operator<<(ostream& out, const OpQueueable& q) { + return q.print(out); } }; - friend ostream& operator<<(ostream& out, const OpQueueItem& q) { - StringifyVis v; - return out << "OpQueueItem(" << boost::apply_visitor(v, q.qvariant) - << " prio " << q.priority << " cost " << q.cost - << " e" << q.map_epoch << ")"; - } +private: + OpQueueable::Ref qitem; + int cost; + unsigned priority; + utime_t start_time; + uint64_t owner; ///< global id (e.g., client.XXX) + epoch_t map_epoch; ///< an epoch we expect the PG to exist in public: - - OpQueueItem(OpRequestRef op, epoch_t e) - : qvariant(op), cost(op->get_req()->get_cost()), - priority(op->get_req()->get_priority()), - start_time(op->get_req()->get_recv_stamp()), - owner(op->get_req()->get_source().num()), - map_epoch(e) - {} OpQueueItem( - const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time, - uint64_t owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - OpQueueItem( - const PGScrub &op, int cost, unsigned priority, utime_t start_time, - uint64_t owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - OpQueueItem( - const PGRecovery &op, int cost, unsigned priority, utime_t start_time, - uint64_t owner, epoch_t e) - : qvariant(op), cost(cost), priority(priority), start_time(start_time), - owner(owner), map_epoch(e) {} - - const boost::optional maybe_get_op() const { - const OpRequestRef *op = boost::get(&qvariant); - return op ? OpRequestRef(*op) : boost::optional(); + OpQueueable::Ref &&item, + int cost, + unsigned priority, + utime_t start_time, + uint64_t owner, + epoch_t e) + : qitem(std::move(item)), + cost(cost), + priority(priority), + start_time(start_time), + owner(owner), + map_epoch(e) + {} + OpQueueItem(OpQueueItem &&) = default; + OpQueueItem(const OpQueueItem &) = delete; + OpQueueItem &operator=(OpQueueItem &&) = default; + OpQueueItem &operator=(const OpQueueItem &) = delete; + + OrderLocker::Ref get_order_locker(PGRef pg) { + return qitem->get_order_locker(pg); + } + uint32_t get_queue_token() const { + return qitem->get_queue_token(); + } + const spg_t& get_ordering_token() const { + return qitem->get_ordering_token(); + } + using op_type_t = OpQueueable::op_type_t; + OpQueueable::op_type_t get_op_type() const { + return qitem->get_op_type(); + } + boost::optional maybe_get_op() const { + return qitem->maybe_get_op(); } uint64_t get_reserved_pushes() const { - const PGRecovery *op = boost::get(&qvariant); - return op ? op->reserved_pushes : 0; + return qitem->get_reserved_pushes(); } - void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { - RunVis v(osd, pg, handle); - boost::apply_visitor(v, qvariant); + void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) { + qitem->run(osd, pg, handle); } unsigned get_priority() const { return priority; } int get_cost() const { return cost; } utime_t get_start_time() const { return start_time; } uint64_t get_owner() const { return owner; } epoch_t get_map_epoch() const { return map_epoch; } - const QVariant& get_variant() const { return qvariant; } -}; // struct OpQueueItem + + friend ostream& operator<<(ostream& out, const OpQueueItem& item) { + return out << "OpQueueItem(" + << item.get_ordering_token() << " " << *item.qitem + << " prio " << item.get_priority() + << " cost " << item.get_cost() + << " e" << item.get_map_epoch() << ")"; + } +}; + +/// Implements boilerplate for operations queued for the pg lock +class PGOpQueueable : public OpQueueItem::OpQueueable { + spg_t pgid; +protected: + const spg_t& get_pgid() const { + return pgid; + } +public: + PGOpQueueable(spg_t pg) : pgid(pg) {} + uint32_t get_queue_token() const override final { + return get_pgid().ps(); + } + + const spg_t& get_ordering_token() const override final { + return get_pgid(); + } + + OpQueueItem::OrderLocker::Ref get_order_locker(PGRef pg) override final { + class Locker : public OpQueueItem::OrderLocker { + PGRef pg; + public: + Locker(PGRef pg) : pg(pg) {} + void lock() override final { + pg->lock(); + } + void unlock() override final { + pg->unlock(); + } + }; + return OpQueueItem::OrderLocker::Ref( + new Locker(pg)); + } +}; + +class PGOpItem : public PGOpQueueable { + OpRequestRef op; +public: + PGOpItem(spg_t pg, OpRequestRef op) : PGOpQueueable(pg), op(op) {} + op_type_t get_op_type() const override final { + return op_type_t::client_op; + } + ostream &print(ostream &rhs) const override final { + return rhs << "PGOpItem(op=" << *(op->get_req()) << ")"; + } + boost::optional maybe_get_op() const override final { + return op; + } + void run(OSD *osd, PGRef& pg, ThreadPool::TPHandle &handle) override final; +}; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 509537d9953..e2c4028f88c 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -3457,7 +3457,14 @@ void PG::requeue_op(OpRequestRef op) p->second.push_front(op); } else { dout(20) << __func__ << " " << op << dendl; - osd->enqueue_front(info.pgid, OpQueueItem(op, get_osdmap()->get_epoch())); + osd->enqueue_front( + OpQueueItem( + unique_ptr(new PGOpItem(info.pgid, op)), + op->get_req()->get_cost(), + op->get_req()->get_priority(), + op->get_req()->get_recv_stamp(), + op->get_req()->get_source_inst(), + get_osdmap()->get_epoch())); } } @@ -3466,15 +3473,7 @@ void PG::requeue_ops(list &ls) for (list::reverse_iterator i = ls.rbegin(); i != ls.rend(); ++i) { - auto p = waiting_for_map.find((*i)->get_source()); - if (p != waiting_for_map.end()) { - dout(20) << __func__ << " " << *i << " (waiting_for_map " << p->first - << ")" << dendl; - p->second.push_front(*i); - } else { - dout(20) << __func__ << " " << *i << dendl; - osd->enqueue_front(info.pgid, OpQueueItem(*i, get_osdmap()->get_epoch())); - } + requeue_op(*i); } ls.clear(); } @@ -3492,7 +3491,14 @@ void PG::requeue_map_waiters() } else { dout(20) << __func__ << " " << p->first << " " << p->second << dendl; for (auto q = p->second.rbegin(); q != p->second.rend(); ++q) { - osd->enqueue_front(info.pgid, OpQueueItem(*q, epoch)); + auto req = *q; + osd->enqueue_front(OpQueueItem( + unique_ptr(new PGOpItem(info.pgid, req)), + req->get_req()->get_cost(), + req->get_req()->get_priority(), + req->get_req()->get_recv_stamp(), + req->get_req()->get_source_inst(), + epoch)); } p = waiting_for_map.erase(p); } diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 15231f2c101..6abb40fe443 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -9060,9 +9060,15 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version) if (scrubber.active_rep_scrub) { if (last_update_applied >= static_cast( scrubber.active_rep_scrub->get_req())->scrub_to) { + auto& op = scrubber.active_rep_scrub; osd->enqueue_back( - info.pgid, - OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch())); + OpQueueItem( + unique_ptr(new PGOpItem(info.pgid, op)), + op->get_req()->get_cost(), + op->get_req()->get_priority(), + op->get_req()->get_recv_stamp(), + op->get_req()->get_source_inst(), + get_osdmap()->get_epoch())); scrubber.active_rep_scrub = OpRequestRef(); } } @@ -10297,10 +10303,16 @@ void PrimaryLogPG::_applied_recovered_object_replica() if (!deleting && active_pushes == 0 && scrubber.active_rep_scrub && static_cast( scrubber.active_rep_scrub->get_req())->chunky) { + auto& op = scrubber.active_rep_scrub; osd->enqueue_back( - info.pgid, - OpQueueItem(scrubber.active_rep_scrub, get_osdmap()->get_epoch())); - scrubber.active_rep_scrub = OpRequestRef(); + OpQueueItem( + unique_ptr(new PGOpItem(info.pgid, op)), + op->get_req()->get_cost(), + op->get_req()->get_priority(), + op->get_req()->get_recv_stamp(), + op->get_req()->get_source_inst(), + get_osdmap()->get_epoch())); + scrubber.active_rep_scrub.reset(); } unlock(); } diff --git a/src/osd/mClockClientQueue.cc b/src/osd/mClockClientQueue.cc index 5393792a4aa..31ce375e354 100644 --- a/src/osd/mClockClientQueue.cc +++ b/src/osd/mClockClientQueue.cc @@ -101,22 +101,21 @@ namespace ceph { mClockClientQueue::osd_op_type_t mClockClientQueue::get_osd_op_type(const Request& request) { - osd_op_type_t type = - boost::apply_visitor(pg_queueable_visitor, request.second.get_variant()); - + switch (request.get_op_type()) { // if we got client_op back then we need to distinguish between // a client op and an osd subop. - - if (osd_op_type_t::client_op != type) { - return type; - /* fixme: this should match REPOP and probably others - } else if (MSG_OSD_SUBOP == - boost::get( - request.second.get_variant())->get_req()->get_header().type) { - return osd_op_type_t::osd_subop; - */ - } else { + case OpQueueItem::op_type_t::client_op: return osd_op_type_t::client_op; + case OpQueueItem::op_type_t::osd_subop: + return osd_op_type_t::osd_subop; + case OpQueueItem::op_type_t::bg_snaptrim: + return osd_op_type_t::bg_snaptrim; + case OpQueueItem::op_type_t::bg_recovery: + return osd_op_type_t::bg_recovery; + case OpQueueItem::op_type_t::bg_scrub: + return osd_op_type_t::bg_scrub; + default: + assert(0); } } diff --git a/src/osd/mClockClientQueue.h b/src/osd/mClockClientQueue.h index a08ee0e0ec6..42bb175f7a0 100644 --- a/src/osd/mClockClientQueue.h +++ b/src/osd/mClockClientQueue.h @@ -28,7 +28,7 @@ namespace ceph { - using Request = std::pair; + using Request = OpQueueItem; using Client = uint64_t; // This class exists to bridge the ceph code, which treats the class @@ -74,7 +74,7 @@ namespace ceph { std::list *out) override final { queue.remove_by_filter( [&cl, out] (Request&& r) -> bool { - if (cl == r.second.get_owner()) { + if (cl == r.get_owner()) { out->push_front(std::move(r)); return true; } else { diff --git a/src/osd/mClockOpClassQueue.cc b/src/osd/mClockOpClassQueue.cc index f825511f67d..743c7e36725 100644 --- a/src/osd/mClockOpClassQueue.cc +++ b/src/osd/mClockOpClassQueue.cc @@ -98,22 +98,21 @@ namespace ceph { mClockOpClassQueue::osd_op_type_t mClockOpClassQueue::get_osd_op_type(const Request& request) { - osd_op_type_t type = - boost::apply_visitor(pg_queueable_visitor, request.second.get_variant()); - + switch (request.get_op_type()) { // if we got client_op back then we need to distinguish between // a client op and an osd subop. - - if (osd_op_type_t::client_op != type) { - return type; - /* fixme: this should match REPOP and probably others - } else if (MSG_OSD_SUBOP == - boost::get( - request.second.get_variant())->get_req()->get_header().type) { - return osd_op_type_t::osd_subop; - */ - } else { + case OpQueueItem::op_type_t::client_op: return osd_op_type_t::client_op; + case OpQueueItem::op_type_t::osd_subop: + return osd_op_type_t::osd_subop; + case OpQueueItem::op_type_t::bg_snaptrim: + return osd_op_type_t::bg_snaptrim; + case OpQueueItem::op_type_t::bg_recovery: + return osd_op_type_t::bg_recovery; + case OpQueueItem::op_type_t::bg_scrub: + return osd_op_type_t::bg_scrub; + default: + assert(0); } } diff --git a/src/osd/mClockOpClassQueue.h b/src/osd/mClockOpClassQueue.h index 957b7232a9d..de9757f6354 100644 --- a/src/osd/mClockOpClassQueue.h +++ b/src/osd/mClockOpClassQueue.h @@ -28,7 +28,7 @@ namespace ceph { - using Request = std::pair; + using Request = OpQueueItem; using Client = uint64_t; // This class exists to bridge the ceph code, which treats the class @@ -72,7 +72,7 @@ namespace ceph { std::list *out) override final { queue.remove_by_filter( [&cl, out] (Request&& r) -> bool { - if (cl == r.second.get_owner()) { + if (cl == r.get_owner()) { out->push_front(std::move(r)); return true; } else { -- 2.39.5