From: Sage Weil Date: Tue, 31 Jan 2017 19:59:51 +0000 (-0500) Subject: osd/OSDMapMapping: restructure mapper as generic PG work running X-Git-Tag: v12.0.1~343^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=b1b09e177dac1b3e45dafe5fc6bd2b5c5286df40;p=ceph.git osd/OSDMapMapping: restructure mapper as generic PG work running Make Job a parent class that can be specialized for any per-PG batch work. Signed-off-by: Sage Weil --- diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index ee4b1e877f1..63664f04d0b 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -382,8 +382,8 @@ void OSDMonitor::on_active() } C_PrintTime *fin = new C_PrintTime(osdmap.get_epoch()); mapping.reset(new OSDMapMapping); - mapping_job = mapper.queue(osdmap, mapping.get(), - g_conf->mon_osd_mapping_pgs_per_chunk); + mapping_job = mapping->start_update(osdmap, mapper, + g_conf->mon_osd_mapping_pgs_per_chunk); dout(10) << __func__ << " started mapping job " << mapping_job.get() << " at " << fin->start << dendl; mapping_job->set_finish_event(fin); diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index ef1cb58f186..289b4a35e97 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -112,13 +112,13 @@ struct failure_info_t { class OSDMonitor : public PaxosService { CephContext *cct; + + ParallelPGMapper mapper; ///< for background pg work + unique_ptr mapping; ///< pg <-> osd mappings + unique_ptr mapping_job; ///< background mapping job + public: OSDMap osdmap; - unique_ptr mapping; - unique_ptr mapping_job; - -private: - ParallelOSDMapper mapper; // [leader] OSDMap::Incremental pending_inc; diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc index 9edd23ebe2a..5a4c84b54a8 100644 --- a/src/osd/OSDMapMapping.cc +++ b/src/osd/OSDMapMapping.cc @@ -121,14 +121,15 @@ void OSDMapMapping::_update_range( // --------------------------- -void ParallelOSDMapper::Job::finish_one() +void ParallelPGMapper::Job::finish_one() { Context *fin = nullptr; { Mutex::Locker l(lock); if (--shards == 0) { if (!aborted) { - mapping->_finish(*osdmap); + finish = ceph_clock_now(); + complete(); } cond.Signal(); fin = onfinish; @@ -140,33 +141,26 @@ void ParallelOSDMapper::Job::finish_one() } } -void ParallelOSDMapper::WQ::_process( - item *i, - ThreadPool::TPHandle &h) +void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h) { - ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " [" - << i->begin << "," << i->end << ")" << dendl; - i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end); + ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool + << " [" << i->begin << "," << i->end << ")" << dendl; + i->job->process(i->pool, i->begin, i->end); i->job->finish_one(); delete i; } -std::unique_ptr ParallelOSDMapper::queue( - const OSDMap& osdmap, - OSDMapMapping *mapping, +void ParallelPGMapper::queue( + Job *job, unsigned pgs_per_item) { - std::unique_ptr job(new Job(&osdmap, mapping)); - mapping->_start(osdmap); - for (auto& p : osdmap.get_pools()) { + for (auto& p : job->osdmap->get_pools()) { for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) { unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num()); job->start_one(); - wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end)); - ldout(cct, 20) << __func__ << " queue " << &osdmap << " " - << p.first << " [" << ps + wq.queue(new Item(job, p.first, ps, ps_end)); + ldout(cct, 20) << __func__ << " " << job << " " << p.first << " [" << ps << "," << ps_end << ")" << dendl; } } - return job; } diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index 95fed453f9c..e18b5749314 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -13,6 +13,158 @@ class OSDMap; +/// work queue to perform work on batches of pgids on multiple CPUs +class ParallelPGMapper { +public: + struct Job { + utime_t start, finish; + unsigned shards = 0; + const OSDMap *osdmap; + bool aborted = false; + Context *onfinish = nullptr; + + Mutex lock = {"ParallelPGMapper::Job::lock"}; + Cond cond; + + Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {} + virtual ~Job() { + assert(shards == 0); + } + + // child must implement this + virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0; + virtual void complete() = 0; + + void set_finish_event(Context *fin) { + lock.Lock(); + if (shards == 0) { + // already done. + lock.Unlock(); + fin->complete(0); + } else { + // set finisher + onfinish = fin; + lock.Unlock(); + } + } + bool is_done() { + Mutex::Locker l(lock); + return shards == 0; + } + utime_t get_duration() { + return finish - start; + } + void wait() { + Mutex::Locker l(lock); + while (shards > 0) { + cond.Wait(lock); + } + } + bool wait_for(double duration) { + utime_t until = start; + until += duration; + Mutex::Locker l(lock); + while (shards > 0) { + if (ceph_clock_now() >= until) { + return false; + } + cond.Wait(lock); + } + return true; + } + void abort() { + Context *fin = nullptr; + { + Mutex::Locker l(lock); + aborted = true; + fin = onfinish; + onfinish = nullptr; + while (shards > 0) { + cond.Wait(lock); + } + } + if (fin) { + fin->complete(-ECANCELED); + } + } + + void start_one() { + Mutex::Locker l(lock); + ++shards; + } + void finish_one(); + }; + +protected: + CephContext *cct; + + struct Item { + Job *job; + int64_t pool; + unsigned begin, end; + + Item(Job *j, int64_t p, unsigned b, unsigned e) + : job(j), + pool(p), + begin(b), + end(e) {} + }; + std::deque q; + + struct WQ : public ThreadPool::WorkQueue { + ParallelPGMapper *m; + + WQ(ParallelPGMapper *m_, ThreadPool *tp) + : ThreadPool::WorkQueue("ParallelPGMapper::WQ", 0, 0, tp), + m(m_) {} + + bool _enqueue(Item *i) override { + m->q.push_back(i); + return true; + } + void _dequeue(Item *i) override { + ceph_abort(); + } + Item *_dequeue() override { + while (!m->q.empty()) { + Item *i = m->q.front(); + m->q.pop_front(); + if (i->job->aborted) { + i->job->finish_one(); + delete i; + } else { + return i; + } + } + return nullptr; + } + + void _process(Item *i, ThreadPool::TPHandle &h) override; + + void _clear() override { + assert(_empty()); + } + + bool _empty() override { + return m->q.empty(); + } + } wq; + +public: + ParallelPGMapper(CephContext *cct, ThreadPool *tp) + : cct(cct), + wq(this, tp) {} + + void queue( + Job *job, + unsigned pgs_per_item); + + void drain() { + wq.drain(); + } +}; + + /// a precalculated mapping of every PG for a given OSDMap class OSDMapMapping { struct PoolMapping { @@ -102,7 +254,21 @@ class OSDMapMapping { void _dump(); - friend class ParallelOSDMapper; + friend class ParallelPGMapper; + + struct MappingJob : public ParallelPGMapper::Job { + OSDMapMapping *mapping; + MappingJob(const OSDMap *osdmap, OSDMapMapping *m) + : Job(osdmap), mapping(m) { + mapping->_start(*osdmap); + } + void process(int64_t pool, unsigned ps_begin, unsigned ps_end) { + mapping->_update_range(*osdmap, pool, ps_begin, ps_end); + } + void complete() { + mapping->_finish(*osdmap); + } + }; public: void get(pg_t pgid, @@ -129,6 +295,15 @@ public: void update(const OSDMap& map); void update(const OSDMap& map, pg_t pgid); + std::unique_ptr start_update( + const OSDMap& map, + ParallelPGMapper& mapper, + unsigned pgs_per_item) { + std::unique_ptr job(new MappingJob(&map, this)); + mapper.queue(job.get(), pgs_per_item); + return job; + } + epoch_t get_epoch() const { return epoch; } @@ -138,142 +313,5 @@ public: } }; -/// thread pool to calculate mapping on multiple CPUs -class ParallelOSDMapper { -public: - struct Job { - unsigned shards = 0; - const OSDMap *osdmap; - OSDMapMapping *mapping; - bool aborted = false; - Context *onfinish = nullptr; - - Mutex lock = {"ParallelOSDMapper::Job::lock"}; - Cond cond; - - Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {} - ~Job() { - assert(shards == 0); - } - - void set_finish_event(Context *fin) { - lock.Lock(); - if (shards == 0) { - // already done. - lock.Unlock(); - fin->complete(0); - } else { - // set finisher - onfinish = fin; - lock.Unlock(); - } - } - bool is_done() { - Mutex::Locker l(lock); - return shards == 0; - } - void wait() { - Mutex::Locker l(lock); - while (shards > 0) { - cond.Wait(lock); - } - } - void abort() { - Context *fin = nullptr; - { - Mutex::Locker l(lock); - aborted = true; - fin = onfinish; - onfinish = nullptr; - while (shards > 0) { - cond.Wait(lock); - } - } - if (fin) { - fin->complete(-ECANCELED); - } - } - - void start_one() { - Mutex::Locker l(lock); - ++shards; - } - void finish_one(); - }; - -protected: - CephContext *cct; - - struct item { - Job *job; - const OSDMap *osdmap; - OSDMapMapping *mapping; - int64_t pool; - unsigned begin, end; - - item(Job *j, const OSDMap *m, OSDMapMapping *mg, - int64_t p, unsigned b, unsigned e) - : job(j), - osdmap(m), - mapping(mg), - pool(p), - begin(b), - end(e) {} - }; - std::deque q; - - struct WQ : public ThreadPool::WorkQueue { - ParallelOSDMapper *m; - - WQ(ParallelOSDMapper *m_, ThreadPool *tp) - : ThreadPool::WorkQueue("ParallelOSDMapper::WQ", 0, 0, tp), - m(m_) {} - - bool _enqueue(item *i) override { - m->q.push_back(i); - return true; - } - void _dequeue(item *i) override { - ceph_abort(); - } - item *_dequeue() override { - while (!m->q.empty()) { - item *i = m->q.front(); - m->q.pop_front(); - if (i->job->aborted) { - i->job->finish_one(); - delete i; - } else { - return i; - } - } - return nullptr; - } - - void _process(item *i, ThreadPool::TPHandle &h) override; - - void _clear() override { - assert(_empty()); - } - - bool _empty() override { - return m->q.empty(); - } - } wq; - -public: - ParallelOSDMapper(CephContext *cct, ThreadPool *tp) - : cct(cct), - wq(this, tp) {} - - std::unique_ptr queue( - const OSDMap& osdmap, - OSDMapMapping *mapping, - unsigned pgs_per_item); - - void drain() { - wq.drain(); - } -}; #endif