]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSDMapMapping: add ParallelMapper
authorSage Weil <sage@redhat.com>
Mon, 30 Jan 2017 21:51:01 +0000 (16:51 -0500)
committerSage Weil <sage@redhat.com>
Thu, 16 Feb 2017 17:04:07 +0000 (12:04 -0500)
Calculate a mapping in parallel over a workqueue + threadpool.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSDMapMapping.cc
src/osd/OSDMapMapping.h

index 7c022419d1fd91b5bdb09ff3b970f6376b5aae6f..7ba53b2e77f6a7581797cbf1ef88177ebef1997d 100644 (file)
@@ -4,6 +4,10 @@
 #include "OSDMapMapping.h"
 #include "OSDMap.h"
 
+#define dout_subsys ceph_subsys_mon
+
+#include "common/debug.h"
+
 // ensure that we have a PoolMappings for each pool and that
 // the dimensions (pg_num and size) match up.
 void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
@@ -34,14 +38,19 @@ void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
 
 void OSDMapMapping::update(const OSDMap& osdmap)
 {
-  _init_mappings(osdmap);
+  _start(osdmap);
   for (auto& p : osdmap.get_pools()) {
     _update_range(osdmap, p.first, 0, p.second.get_pg_num());
   }
-  _build_rmap(osdmap);
+  _finish(osdmap);
   //_dump();  // for debugging
 }
 
+void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid)
+{
+  _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1);
+}
+
 void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
 {
   acting_rmap.resize(osdmap.get_max_osd());
@@ -69,6 +78,12 @@ void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
   }
 }
 
+void OSDMapMapping::_finish(const OSDMap& osdmap)
+{
+  _build_rmap(osdmap);
+  epoch = osdmap.get_epoch();
+}
+
 void OSDMapMapping::_dump()
 {
   for (auto& p : pools) {
@@ -101,3 +116,55 @@ void OSDMapMapping::_update_range(
                  std::move(acting), acting_primary);
   }
 }
+
+// ---------------------------
+
+void ParallelOSDMapper::Job::finish_one()
+{
+  Context *fin = nullptr;
+  {
+    Mutex::Locker l(lock);
+    if (--shards == 0) {
+      if (!aborted) {
+       mapping->_finish(*osdmap);
+      }
+      cond.Signal();
+      fin = onfinish;
+      onfinish = nullptr;
+    }
+  }
+  if (fin) {
+    fin->complete(0);
+  }
+}
+
+void ParallelOSDMapper::WQ::_process(
+  item *i,
+  ThreadPool::TPHandle &h)
+{
+  ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " ["
+                   << i->begin << "," << i->end << ")" << dendl;
+  i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end);
+  i->job->finish_one();
+  delete i;
+}
+
+std::unique_ptr<ParallelOSDMapper::Job> ParallelOSDMapper::queue(
+  const OSDMap& osdmap,
+  OSDMapMapping *mapping,
+  unsigned pgs_per_item)
+{
+  std::unique_ptr<Job> job(new Job(&osdmap, mapping));
+  mapping->_start(osdmap);
+  for (auto& p : osdmap.get_pools()) {
+    for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
+      unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num());
+      job->start_one();
+      wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end));
+      ldout(cct, 20) << __func__ << " queue " << &osdmap << " "
+                    << p.first << " [" << ps
+                    << "," << ps_end << ")" << dendl;
+    }
+  }
+  return job;
+}
index ff8ed9c63465a1a5285d4b10cd3ef144b643fef6..bbc92794b46205fe270baaeb8bbfbb446a3746dd 100644 (file)
@@ -9,6 +9,7 @@
 #include <map>
 
 #include "osd/osd_types.h"
+#include "common/WorkQueue.h"
 
 class OSDMap;
 
@@ -39,8 +40,8 @@ class OSDMapMapping {
             std::vector<int> *up,
             int *up_primary,
             std::vector<int> *acting,
-            int *acting_primary) {
-      int32_t *row = &table[row_size() * ps];
+            int *acting_primary) const {
+      const int32_t *row = &table[row_size() * ps];
       if (acting_primary) {
        *acting_primary = row[0];
       }
@@ -83,6 +84,7 @@ class OSDMapMapping {
   std::map<int64_t,PoolMapping> pools;
   std::vector<std::vector<pg_t>> acting_rmap;  // osd -> pg
   std::vector<std::vector<pg_t>> up_rmap;  // osd -> pg
+  epoch_t epoch;
 
   void _init_mappings(const OSDMap& osdmap);
   void _update_range(
@@ -92,14 +94,21 @@ class OSDMapMapping {
 
   void _build_rmap(const OSDMap& osdmap);
 
+  void _start(const OSDMap& osdmap) {
+    _init_mappings(osdmap);
+  }
+  void _finish(const OSDMap& osdmap);
+
   void _dump();
 
+  friend class ParallelOSDMapper;
+
 public:
   void get(pg_t pgid,
           std::vector<int> *up,
           int *up_primary,
           std::vector<int> *acting,
-          int *acting_primary) {
+          int *acting_primary) const {
     auto p = pools.find(pgid.pool());
     assert(p != pools.end());
     p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
@@ -115,7 +124,149 @@ public:
   }
 
   void update(const OSDMap& map);
+  void update(const OSDMap& map, pg_t pgid);
+
+  epoch_t get_epoch() const {
+    return epoch;
+  }
 };
 
+/// thread pool to calculate mapping on multiple CPUs
+class ParallelOSDMapper {
+public:
+  struct Job {
+    unsigned shards = 0;
+    const OSDMap *osdmap;
+    OSDMapMapping *mapping;
+    bool aborted = false;
+    Context *onfinish = nullptr;
+
+    Mutex lock = {"ParallelOSDMapper::Job::lock"};
+    Cond cond;
+
+    Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {}
+    ~Job() {
+      assert(shards == 0);
+    }
+
+    void set_finish_event(Context *fin) {
+      lock.Lock();
+      if (shards == 0) {
+       // already done.
+       lock.Unlock();
+       fin->complete(0);
+      } else {
+       // set finisher
+       onfinish = fin;
+       lock.Unlock();
+      }
+    }
+    bool is_done() {
+      Mutex::Locker l(lock);
+      return shards == 0;
+    }
+    void wait() {
+      Mutex::Locker l(lock);
+      while (shards > 0) {
+       cond.Wait(lock);
+      }
+    }
+    void abort() {
+      Context *fin = nullptr;
+      {
+       Mutex::Locker l(lock);
+       aborted = true;
+       fin = onfinish;
+       onfinish = nullptr;
+       while (shards > 0) {
+         cond.Wait(lock);
+       }
+      }
+      if (fin) {
+       fin->complete(-ECANCELED);
+      }
+    }
+
+    void start_one() {
+      Mutex::Locker l(lock);
+      ++shards;
+    }
+    void finish_one();
+  };
+
+protected:
+  CephContext *cct;
+
+  struct item {
+    Job *job;
+    const OSDMap *osdmap;
+    OSDMapMapping *mapping;
+    int64_t pool;
+    unsigned begin, end;
+
+    item(Job *j, const OSDMap *m, OSDMapMapping *mg,
+        int64_t p, unsigned b, unsigned e)
+      : job(j),
+       osdmap(m),
+       mapping(mg),
+       pool(p),
+       begin(b),
+       end(e) {}
+  };
+  std::deque<item*> q;
+
+  struct WQ : public ThreadPool::WorkQueue<item> {
+    ParallelOSDMapper *m;
+
+    WQ(ParallelOSDMapper *m_, ThreadPool *tp)
+      : ThreadPool::WorkQueue<item>("ParallelOSDMapper::WQ", 0, 0, tp),
+        m(m_) {}
+
+    bool _enqueue(item *i) override {
+      m->q.push_back(i);
+      return true;
+    }
+    void _dequeue(item *i) override {
+      ceph_abort();
+    }
+    item *_dequeue() override {
+      while (!m->q.empty()) {
+       item *i = m->q.front();
+       m->q.pop_front();
+       if (i->job->aborted) {
+         i->job->finish_one();
+         delete i;
+       } else {
+         return i;
+       }
+      }
+      return nullptr;
+    }
+
+    void _process(item *i, ThreadPool::TPHandle &h) override;
+
+    void _clear() override {
+      assert(_empty());
+    }
+
+    bool _empty() override {
+      return m->q.empty();
+    }
+  } wq;
+
+public:
+  ParallelOSDMapper(CephContext *cct, ThreadPool *tp)
+    : cct(cct),
+      wq(this, tp) {}
+
+  std::unique_ptr<Job> queue(
+    const OSDMap& osdmap,
+    OSDMapMapping *mapping,
+    unsigned pgs_per_item);
+
+  void drain() {
+    wq.drain();
+  }
+};
 
 #endif