From: Samuel Just Date: Fri, 29 May 2015 18:42:52 +0000 (-0700) Subject: OSD: add PGQueueable X-Git-Tag: v9.0.2~48^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e8cddf8e6f5a13f2efc4823e4818291f9f032f55;p=ceph.git OSD: add PGQueueable Signed-off-by: Samuel Just --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f6e941901934..246c4f71ae16 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -152,6 +152,10 @@ static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) { return *_dout << "osd." << whoami << " " << epoch << " "; } +void PGQueueable::RunVis::operator()(OpRequestRef &op) { + return osd->dequeue_op(pg, op, handle); +} + //Initial features in new superblock. //Features here are also automatically upgraded CompatSet OSD::get_osd_initial_compat_set() { @@ -1272,7 +1276,10 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) void OSDService::dequeue_pg(PG *pg, list *dequeued) { - osd->op_shardedwq.dequeue(pg, dequeued); + if (dequeued) + osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued); + else + osd->op_shardedwq.dequeue(pg); } void OSDService::queue_for_peering(PG *pg) @@ -8194,7 +8201,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) return; } } - pair item = sdata->pqueue.dequeue(); + pair item = sdata->pqueue.dequeue(); sdata->pg_for_processing[&*(item.first)].push_back(item.second); sdata->sdata_op_ordering_lock.Unlock(); ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval, @@ -8202,7 +8209,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) (item.first)->lock_suspend_timeout(tp_handle); - OpRequestRef op; + boost::optional op; { Mutex::Locker l(sdata->sdata_op_ordering_lock); if (!sdata->pg_for_processing.count(&*(item.first))) { @@ -8220,7 +8227,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) // and will begin to be handled by a worker thread. { #ifdef WITH_LTTNG - osd_reqid_t reqid = op->get_reqid(); + osd_reqid_t reqid; + if (boost::optional _op = op->maybe_get_op()) { + reqid = (*_op)->get_reqid(); + } #endif tracepoint(osd, opwq_process_start, reqid.name._type, reqid.name._num, reqid.tid, reqid.inc); @@ -8235,11 +8245,14 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) delete f; *_dout << dendl; - osd->dequeue_op(item.first, op, tp_handle); + op->run(osd, item.first, tp_handle); { #ifdef WITH_LTTNG - osd_reqid_t reqid = op->get_reqid(); + osd_reqid_t reqid; + if (boost::optional _op = op->maybe_get_op()) { + reqid = (*_op)->get_reqid(); + } #endif tracepoint(osd, opwq_process_finish, reqid.name._type, reqid.name._num, reqid.tid, reqid.inc); @@ -8248,21 +8261,22 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb ) (item.first)->unlock(); } -void OSD::ShardedOpWQ::_enqueue(pair item) { +void OSD::ShardedOpWQ::_enqueue(pair item) { uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); ShardData* sdata = shard_list[shard_index]; assert (NULL != sdata); - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); + unsigned priority = item.second.get_priority(); + unsigned cost = item.second.get_cost(); sdata->sdata_op_ordering_lock.Lock(); if (priority >= CEPH_MSG_PRIO_LOW) sdata->pqueue.enqueue_strict( - item.second->get_req()->get_source_inst(), priority, item); + item.second.get_owner(), priority, item); else - sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(), + sdata->pqueue.enqueue( + item.second.get_owner(), priority, cost, item); sdata->sdata_op_ordering_lock.Unlock(); @@ -8272,7 +8286,7 @@ void OSD::ShardedOpWQ::_enqueue(pair item) { } -void OSD::ShardedOpWQ::_enqueue_front(pair item) { +void OSD::ShardedOpWQ::_enqueue_front(pair item) { uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size()); @@ -8284,13 +8298,15 @@ void OSD::ShardedOpWQ::_enqueue_front(pair item) { item.second = sdata->pg_for_processing[&*(item.first)].back(); sdata->pg_for_processing[&*(item.first)].pop_back(); } - unsigned priority = item.second->get_req()->get_priority(); - unsigned cost = item.second->get_req()->get_cost(); + unsigned priority = item.second.get_priority(); + unsigned cost = item.second.get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) sdata->pqueue.enqueue_strict_front( - item.second->get_req()->get_source_inst(),priority, item); + item.second.get_owner(), + priority, item); else - sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(), + sdata->pqueue.enqueue_front( + item.second.get_owner(), priority, cost, item); sdata->sdata_op_ordering_lock.Unlock(); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 33c8e94617dc..78d33959d849 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -316,6 +316,44 @@ public: typedef ceph::shared_ptr DeletingStateRef; class OSD; +class PGQueueable { + typedef boost::variant< + OpRequestRef + > QVariant; + QVariant qvariant; + int cost; + unsigned priority; + utime_t start_time; + entity_inst_t owner; + struct RunVis : public boost::static_visitor<> { + OSD *osd; + PGRef &pg; + ThreadPool::TPHandle &handle; + RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) + : osd(osd), pg(pg), handle(handle) {} + void operator()(OpRequestRef &op); + }; +public: + PGQueueable(OpRequestRef op) + : qvariant(op), cost(op->get_req()->get_cost()), + priority(op->get_req()->get_priority()), + start_time(op->get_req()->get_recv_stamp()), + owner(op->get_req()->get_source_inst()) + {} + boost::optional maybe_get_op() { + OpRequestRef *op = boost::get(&qvariant); + return op ? *op : boost::optional(); + } + void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) { + RunVis v(osd, pg, handle); + boost::apply_visitor(v, qvariant); + } + unsigned get_priority() const { return priority; } + int get_cost() const { return cost; } + utime_t get_start_time() const { return start_time; } + entity_inst_t get_owner() const { return owner; } +}; + class OSDService { public: OSD *osd; @@ -334,7 +372,7 @@ public: PerfCounters *&logger; PerfCounters *&recoverystate_perf; MonClient *&monc; - ShardedThreadPool::ShardedWQ < pair > &op_wq; + ShardedThreadPool::ShardedWQ < pair > &op_wq; ThreadPool::BatchWorkQueue &peering_wq; ThreadPool::WorkQueue &recovery_wq; ThreadPool::WorkQueue &snap_trim_wq; @@ -1456,15 +1494,15 @@ private: // -- op queue -- - - class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair > { + friend class PGQueueable; + class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair > { struct ShardData { Mutex sdata_lock; Cond sdata_cond; Mutex sdata_op_ordering_lock; - map > pg_for_processing; - PrioritizedQueue< pair, entity_inst_t> pqueue; + map > pg_for_processing; + PrioritizedQueue< pair, entity_inst_t> pqueue; ShardData( string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost) @@ -1479,7 +1517,7 @@ private: public: ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp): - ShardedThreadPool::ShardedWQ < pair >(ti, si, tp), + ShardedThreadPool::ShardedWQ < pair >(ti, si, tp), osd(o), num_shards(pnum_shards) { for(uint32_t i = 0; i < num_shards; i++) { char lock_name[32] = {0}; @@ -1504,8 +1542,8 @@ private: } void _process(uint32_t thread_index, heartbeat_handle_d *hb); - void _enqueue(pair item); - void _enqueue_front(pair item); + void _enqueue(pair item); + void _enqueue_front(pair item); void return_waiting_threads() { for(uint32_t i = 0; i < num_shards; i++) { @@ -1534,38 +1572,52 @@ private: struct Pred { PG *pg; Pred(PG *pg) : pg(pg) {} - bool operator()(const pair &op) { + bool operator()(const pair &op) { return op.first == pg; } }; - void dequeue(PG *pg, list *dequeued = 0) { + void dequeue(PG *pg) { ShardData* sdata = NULL; assert(pg != NULL); uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); sdata = shard_list[shard_index]; assert(sdata != NULL); - if (!dequeued) { - sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue.remove_by_filter(Pred(pg)); - sdata->pg_for_processing.erase(pg); - sdata->sdata_op_ordering_lock.Unlock(); - } else { - list > _dequeued; - sdata->sdata_op_ordering_lock.Lock(); - sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued); - for (list >::iterator i = _dequeued.begin(); - i != _dequeued.end(); ++i) { - dequeued->push_back(i->second); - } - if (sdata->pg_for_processing.count(pg)) { - dequeued->splice( - dequeued->begin(), - sdata->pg_for_processing[pg]); - sdata->pg_for_processing.erase(pg); + sdata->sdata_op_ordering_lock.Lock(); + sdata->pqueue.remove_by_filter(Pred(pg)); + sdata->pg_for_processing.erase(pg); + sdata->sdata_op_ordering_lock.Unlock(); + } + + void dequeue_and_get_ops(PG *pg, list *dequeued) { + ShardData* sdata = NULL; + assert(pg != NULL); + uint32_t shard_index = pg->get_pgid().ps()% shard_list.size(); + sdata = shard_list[shard_index]; + assert(sdata != NULL); + assert(dequeued); + list > _dequeued; + sdata->sdata_op_ordering_lock.Lock(); + sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued); + for (list >::iterator i = _dequeued.begin(); + i != _dequeued.end(); ++i) { + boost::optional mop = i->second.maybe_get_op(); + if (mop) + dequeued->push_back(*mop); + } + map >::iterator iter = + sdata->pg_for_processing.find(pg); + if (iter != sdata->pg_for_processing.end()) { + for (list::reverse_iterator i = iter->second.rbegin(); + i != iter->second.rend(); + ++i) { + boost::optional mop = i->maybe_get_op(); + if (mop) + dequeued->push_front(*mop); } - sdata->sdata_op_ordering_lock.Unlock(); + sdata->pg_for_processing.erase(iter); } + sdata->sdata_op_ordering_lock.Unlock(); } bool is_shard_empty(uint32_t thread_index) {