From: Sage Weil Date: Mon, 30 Jan 2017 21:51:01 +0000 (-0500) Subject: osd/OSDMapMapping: add ParallelMapper X-Git-Tag: v12.0.1~343^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e7bfc65b91bc096aafb5fdcc134cf27bc58e756e;p=ceph.git osd/OSDMapMapping: add ParallelMapper Calculate a mapping in parallel over a workqueue + threadpool. Signed-off-by: Sage Weil --- diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc index 7c022419d1fd..7ba53b2e77f6 100644 --- a/src/osd/OSDMapMapping.cc +++ b/src/osd/OSDMapMapping.cc @@ -4,6 +4,10 @@ #include "OSDMapMapping.h" #include "OSDMap.h" +#define dout_subsys ceph_subsys_mon + +#include "common/debug.h" + // ensure that we have a PoolMappings for each pool and that // the dimensions (pg_num and size) match up. void OSDMapMapping::_init_mappings(const OSDMap& osdmap) @@ -34,14 +38,19 @@ void OSDMapMapping::_init_mappings(const OSDMap& osdmap) void OSDMapMapping::update(const OSDMap& osdmap) { - _init_mappings(osdmap); + _start(osdmap); for (auto& p : osdmap.get_pools()) { _update_range(osdmap, p.first, 0, p.second.get_pg_num()); } - _build_rmap(osdmap); + _finish(osdmap); //_dump(); // for debugging } +void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid) +{ + _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1); +} + void OSDMapMapping::_build_rmap(const OSDMap& osdmap) { acting_rmap.resize(osdmap.get_max_osd()); @@ -69,6 +78,12 @@ void OSDMapMapping::_build_rmap(const OSDMap& osdmap) } } +void OSDMapMapping::_finish(const OSDMap& osdmap) +{ + _build_rmap(osdmap); + epoch = osdmap.get_epoch(); +} + void OSDMapMapping::_dump() { for (auto& p : pools) { @@ -101,3 +116,55 @@ void OSDMapMapping::_update_range( std::move(acting), acting_primary); } } + +// --------------------------- + +void ParallelOSDMapper::Job::finish_one() +{ + Context *fin = nullptr; + { + Mutex::Locker l(lock); + if (--shards == 0) { + if (!aborted) { + mapping->_finish(*osdmap); + } + cond.Signal(); + fin = onfinish; + onfinish = nullptr; + } + } + if (fin) { + fin->complete(0); + } +} + +void ParallelOSDMapper::WQ::_process( + item *i, + ThreadPool::TPHandle &h) +{ + ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " [" + << i->begin << "," << i->end << ")" << dendl; + i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end); + i->job->finish_one(); + delete i; +} + +std::unique_ptr ParallelOSDMapper::queue( + const OSDMap& osdmap, + OSDMapMapping *mapping, + unsigned pgs_per_item) +{ + std::unique_ptr job(new Job(&osdmap, mapping)); + mapping->_start(osdmap); + for (auto& p : osdmap.get_pools()) { + for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) { + unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num()); + job->start_one(); + wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end)); + ldout(cct, 20) << __func__ << " queue " << &osdmap << " " + << p.first << " [" << ps + << "," << ps_end << ")" << dendl; + } + } + return job; +} diff --git a/src/osd/OSDMapMapping.h b/src/osd/OSDMapMapping.h index ff8ed9c63465..bbc92794b462 100644 --- a/src/osd/OSDMapMapping.h +++ b/src/osd/OSDMapMapping.h @@ -9,6 +9,7 @@ #include #include "osd/osd_types.h" +#include "common/WorkQueue.h" class OSDMap; @@ -39,8 +40,8 @@ class OSDMapMapping { std::vector *up, int *up_primary, std::vector *acting, - int *acting_primary) { - int32_t *row = &table[row_size() * ps]; + int *acting_primary) const { + const int32_t *row = &table[row_size() * ps]; if (acting_primary) { *acting_primary = row[0]; } @@ -83,6 +84,7 @@ class OSDMapMapping { std::map pools; std::vector> acting_rmap; // osd -> pg std::vector> up_rmap; // osd -> pg + epoch_t epoch; void _init_mappings(const OSDMap& osdmap); void _update_range( @@ -92,14 +94,21 @@ class OSDMapMapping { void _build_rmap(const OSDMap& osdmap); + void _start(const OSDMap& osdmap) { + _init_mappings(osdmap); + } + void _finish(const OSDMap& osdmap); + void _dump(); + friend class ParallelOSDMapper; + public: void get(pg_t pgid, std::vector *up, int *up_primary, std::vector *acting, - int *acting_primary) { + int *acting_primary) const { auto p = pools.find(pgid.pool()); assert(p != pools.end()); p->second.get(pgid.ps(), up, up_primary, acting, acting_primary); @@ -115,7 +124,149 @@ public: } void update(const OSDMap& map); + void update(const OSDMap& map, pg_t pgid); + + epoch_t get_epoch() const { + return epoch; + } }; +/// thread pool to calculate mapping on multiple CPUs +class ParallelOSDMapper { +public: + struct Job { + unsigned shards = 0; + const OSDMap *osdmap; + OSDMapMapping *mapping; + bool aborted = false; + Context *onfinish = nullptr; + + Mutex lock = {"ParallelOSDMapper::Job::lock"}; + Cond cond; + + Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {} + ~Job() { + assert(shards == 0); + } + + void set_finish_event(Context *fin) { + lock.Lock(); + if (shards == 0) { + // already done. + lock.Unlock(); + fin->complete(0); + } else { + // set finisher + onfinish = fin; + lock.Unlock(); + } + } + bool is_done() { + Mutex::Locker l(lock); + return shards == 0; + } + void wait() { + Mutex::Locker l(lock); + while (shards > 0) { + cond.Wait(lock); + } + } + void abort() { + Context *fin = nullptr; + { + Mutex::Locker l(lock); + aborted = true; + fin = onfinish; + onfinish = nullptr; + while (shards > 0) { + cond.Wait(lock); + } + } + if (fin) { + fin->complete(-ECANCELED); + } + } + + void start_one() { + Mutex::Locker l(lock); + ++shards; + } + void finish_one(); + }; + +protected: + CephContext *cct; + + struct item { + Job *job; + const OSDMap *osdmap; + OSDMapMapping *mapping; + int64_t pool; + unsigned begin, end; + + item(Job *j, const OSDMap *m, OSDMapMapping *mg, + int64_t p, unsigned b, unsigned e) + : job(j), + osdmap(m), + mapping(mg), + pool(p), + begin(b), + end(e) {} + }; + std::deque q; + + struct WQ : public ThreadPool::WorkQueue { + ParallelOSDMapper *m; + + WQ(ParallelOSDMapper *m_, ThreadPool *tp) + : ThreadPool::WorkQueue("ParallelOSDMapper::WQ", 0, 0, tp), + m(m_) {} + + bool _enqueue(item *i) override { + m->q.push_back(i); + return true; + } + void _dequeue(item *i) override { + ceph_abort(); + } + item *_dequeue() override { + while (!m->q.empty()) { + item *i = m->q.front(); + m->q.pop_front(); + if (i->job->aborted) { + i->job->finish_one(); + delete i; + } else { + return i; + } + } + return nullptr; + } + + void _process(item *i, ThreadPool::TPHandle &h) override; + + void _clear() override { + assert(_empty()); + } + + bool _empty() override { + return m->q.empty(); + } + } wq; + +public: + ParallelOSDMapper(CephContext *cct, ThreadPool *tp) + : cct(cct), + wq(this, tp) {} + + std::unique_ptr queue( + const OSDMap& osdmap, + OSDMapMapping *mapping, + unsigned pgs_per_item); + + void drain() { + wq.drain(); + } +}; #endif