]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSDMapMapping: restructure mapper as generic PG work running
authorSage Weil <sage@redhat.com>
Tue, 31 Jan 2017 19:59:51 +0000 (14:59 -0500)
committerSage Weil <sage@redhat.com>
Thu, 16 Feb 2017 17:04:35 +0000 (12:04 -0500)
Make Job a parent class that can be specialized for any per-PG batch work.

Signed-off-by: Sage Weil <sage@redhat.com>
src/mon/OSDMonitor.cc
src/mon/OSDMonitor.h
src/osd/OSDMapMapping.cc
src/osd/OSDMapMapping.h

index ee4b1e877f1dbcdb1dedbc9b96022051444580bc..63664f04d0bf7069101fec5ca36cd4f8d45e0663 100644 (file)
@@ -382,8 +382,8 @@ void OSDMonitor::on_active()
   }
   C_PrintTime *fin = new C_PrintTime(osdmap.get_epoch());
   mapping.reset(new OSDMapMapping);
-  mapping_job = mapper.queue(osdmap, mapping.get(),
-                            g_conf->mon_osd_mapping_pgs_per_chunk);
+  mapping_job = mapping->start_update(osdmap, mapper,
+                                     g_conf->mon_osd_mapping_pgs_per_chunk);
   dout(10) << __func__ << " started mapping job " << mapping_job.get()
           << " at " << fin->start << dendl;
   mapping_job->set_finish_event(fin);
index ef1cb58f186e8f087eb698150570776445499cd7..289b4a35e97296b3a093226f2ad7f571efdfc582 100644 (file)
@@ -112,13 +112,13 @@ struct failure_info_t {
 
 class OSDMonitor : public PaxosService {
   CephContext *cct;
+
+  ParallelPGMapper mapper;                        ///< for background pg work
+  unique_ptr<OSDMapMapping> mapping;              ///< pg <-> osd mappings
+  unique_ptr<ParallelPGMapper::Job> mapping_job;  ///< background mapping job
+
 public:
   OSDMap osdmap;
-  unique_ptr<OSDMapMapping> mapping;
-  unique_ptr<ParallelOSDMapper::Job> mapping_job;
-
-private:
-  ParallelOSDMapper mapper;
 
   // [leader]
   OSDMap::Incremental pending_inc;
index 9edd23ebe2a0e4bf208a5cb01a5ead5f357d732d..5a4c84b54a8ba0561a3d0315f7a944c417a6e93d 100644 (file)
@@ -121,14 +121,15 @@ void OSDMapMapping::_update_range(
 
 // ---------------------------
 
-void ParallelOSDMapper::Job::finish_one()
+void ParallelPGMapper::Job::finish_one()
 {
   Context *fin = nullptr;
   {
     Mutex::Locker l(lock);
     if (--shards == 0) {
       if (!aborted) {
-       mapping->_finish(*osdmap);
+       finish = ceph_clock_now();
+       complete();
       }
       cond.Signal();
       fin = onfinish;
@@ -140,33 +141,26 @@ void ParallelOSDMapper::Job::finish_one()
   }
 }
 
-void ParallelOSDMapper::WQ::_process(
-  item *i,
-  ThreadPool::TPHandle &h)
+void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
 {
-  ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " ["
-                   << i->begin << "," << i->end << ")" << dendl;
-  i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end);
+  ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool
+                   << " [" << i->begin << "," << i->end << ")" << dendl;
+  i->job->process(i->pool, i->begin, i->end);
   i->job->finish_one();
   delete i;
 }
 
-std::unique_ptr<ParallelOSDMapper::Job> ParallelOSDMapper::queue(
-  const OSDMap& osdmap,
-  OSDMapMapping *mapping,
+void ParallelPGMapper::queue(
+  Job *job,
   unsigned pgs_per_item)
 {
-  std::unique_ptr<Job> job(new Job(&osdmap, mapping));
-  mapping->_start(osdmap);
-  for (auto& p : osdmap.get_pools()) {
+  for (auto& p : job->osdmap->get_pools()) {
     for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
       unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num());
       job->start_one();
-      wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end));
-      ldout(cct, 20) << __func__ << " queue " << &osdmap << " "
-                    << p.first << " [" << ps
+      wq.queue(new Item(job, p.first, ps, ps_end));
+      ldout(cct, 20) << __func__ << " " << job << " " << p.first << " [" << ps
                     << "," << ps_end << ")" << dendl;
     }
   }
-  return job;
 }
index 95fed453f9cf24843e66bab8d53202a6c786d6e8..e18b57493144d9af838b63e07b20a88eb46d28d6 100644 (file)
 
 class OSDMap;
 
+/// work queue to perform work on batches of pgids on multiple CPUs
+class ParallelPGMapper {
+public:
+  struct Job {
+    utime_t start, finish;
+    unsigned shards = 0;
+    const OSDMap *osdmap;
+    bool aborted = false;
+    Context *onfinish = nullptr;
+
+    Mutex lock = {"ParallelPGMapper::Job::lock"};
+    Cond cond;
+
+    Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
+    virtual ~Job() {
+      assert(shards == 0);
+    }
+
+    // child must implement this
+    virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
+    virtual void complete() = 0;
+
+    void set_finish_event(Context *fin) {
+      lock.Lock();
+      if (shards == 0) {
+       // already done.
+       lock.Unlock();
+       fin->complete(0);
+      } else {
+       // set finisher
+       onfinish = fin;
+       lock.Unlock();
+      }
+    }
+    bool is_done() {
+      Mutex::Locker l(lock);
+      return shards == 0;
+    }
+    utime_t get_duration() {
+      return finish - start;
+    }
+    void wait() {
+      Mutex::Locker l(lock);
+      while (shards > 0) {
+       cond.Wait(lock);
+      }
+    }
+    bool wait_for(double duration) {
+      utime_t until = start;
+      until += duration;
+      Mutex::Locker l(lock);
+      while (shards > 0) {
+       if (ceph_clock_now() >= until) {
+         return false;
+       }
+       cond.Wait(lock);
+      }
+      return true;
+    }
+    void abort() {
+      Context *fin = nullptr;
+      {
+       Mutex::Locker l(lock);
+       aborted = true;
+       fin = onfinish;
+       onfinish = nullptr;
+       while (shards > 0) {
+         cond.Wait(lock);
+       }
+      }
+      if (fin) {
+       fin->complete(-ECANCELED);
+      }
+    }
+
+    void start_one() {
+      Mutex::Locker l(lock);
+      ++shards;
+    }
+    void finish_one();
+  };
+
+protected:
+  CephContext *cct;
+
+  struct Item {
+    Job *job;
+    int64_t pool;
+    unsigned begin, end;
+
+    Item(Job *j, int64_t p, unsigned b, unsigned e)
+      : job(j),
+       pool(p),
+       begin(b),
+       end(e) {}
+  };
+  std::deque<Item*> q;
+
+  struct WQ : public ThreadPool::WorkQueue<Item> {
+    ParallelPGMapper *m;
+
+    WQ(ParallelPGMapper *m_, ThreadPool *tp)
+      : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
+        m(m_) {}
+
+    bool _enqueue(Item *i) override {
+      m->q.push_back(i);
+      return true;
+    }
+    void _dequeue(Item *i) override {
+      ceph_abort();
+    }
+    Item *_dequeue() override {
+      while (!m->q.empty()) {
+       Item *i = m->q.front();
+       m->q.pop_front();
+       if (i->job->aborted) {
+         i->job->finish_one();
+         delete i;
+       } else {
+         return i;
+       }
+      }
+      return nullptr;
+    }
+
+    void _process(Item *i, ThreadPool::TPHandle &h) override;
+
+    void _clear() override {
+      assert(_empty());
+    }
+
+    bool _empty() override {
+      return m->q.empty();
+    }
+  } wq;
+
+public:
+  ParallelPGMapper(CephContext *cct, ThreadPool *tp)
+    : cct(cct),
+      wq(this, tp) {}
+
+  void queue(
+    Job *job,
+    unsigned pgs_per_item);
+
+  void drain() {
+    wq.drain();
+  }
+};
+
+
 /// a precalculated mapping of every PG for a given OSDMap
 class OSDMapMapping {
   struct PoolMapping {
@@ -102,7 +254,21 @@ class OSDMapMapping {
 
   void _dump();
 
-  friend class ParallelOSDMapper;
+  friend class ParallelPGMapper;
+
+  struct MappingJob : public ParallelPGMapper::Job {
+    OSDMapMapping *mapping;
+    MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
+      : Job(osdmap), mapping(m) {
+      mapping->_start(*osdmap);
+    }
+    void process(int64_t pool, unsigned ps_begin, unsigned ps_end) {
+      mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
+    }
+    void complete() {
+      mapping->_finish(*osdmap);
+    }
+  };
 
 public:
   void get(pg_t pgid,
@@ -129,6 +295,15 @@ public:
   void update(const OSDMap& map);
   void update(const OSDMap& map, pg_t pgid);
 
+  std::unique_ptr<MappingJob> start_update(
+    const OSDMap& map,
+    ParallelPGMapper& mapper,
+    unsigned pgs_per_item) {
+    std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
+    mapper.queue(job.get(), pgs_per_item);
+    return job;
+  }
+
   epoch_t get_epoch() const {
     return epoch;
   }
@@ -138,142 +313,5 @@ public:
   }
 };
 
-/// thread pool to calculate mapping on multiple CPUs
-class ParallelOSDMapper {
-public:
-  struct Job {
-    unsigned shards = 0;
-    const OSDMap *osdmap;
-    OSDMapMapping *mapping;
-    bool aborted = false;
-    Context *onfinish = nullptr;
-
-    Mutex lock = {"ParallelOSDMapper::Job::lock"};
-    Cond cond;
-
-    Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {}
-    ~Job() {
-      assert(shards == 0);
-    }
-
-    void set_finish_event(Context *fin) {
-      lock.Lock();
-      if (shards == 0) {
-       // already done.
-       lock.Unlock();
-       fin->complete(0);
-      } else {
-       // set finisher
-       onfinish = fin;
-       lock.Unlock();
-      }
-    }
-    bool is_done() {
-      Mutex::Locker l(lock);
-      return shards == 0;
-    }
-    void wait() {
-      Mutex::Locker l(lock);
-      while (shards > 0) {
-       cond.Wait(lock);
-      }
-    }
-    void abort() {
-      Context *fin = nullptr;
-      {
-       Mutex::Locker l(lock);
-       aborted = true;
-       fin = onfinish;
-       onfinish = nullptr;
-       while (shards > 0) {
-         cond.Wait(lock);
-       }
-      }
-      if (fin) {
-       fin->complete(-ECANCELED);
-      }
-    }
-
-    void start_one() {
-      Mutex::Locker l(lock);
-      ++shards;
-    }
-    void finish_one();
-  };
-
-protected:
-  CephContext *cct;
-
-  struct item {
-    Job *job;
-    const OSDMap *osdmap;
-    OSDMapMapping *mapping;
-    int64_t pool;
-    unsigned begin, end;
-
-    item(Job *j, const OSDMap *m, OSDMapMapping *mg,
-        int64_t p, unsigned b, unsigned e)
-      : job(j),
-       osdmap(m),
-       mapping(mg),
-       pool(p),
-       begin(b),
-       end(e) {}
-  };
-  std::deque<item*> q;
-
-  struct WQ : public ThreadPool::WorkQueue<item> {
-    ParallelOSDMapper *m;
-
-    WQ(ParallelOSDMapper *m_, ThreadPool *tp)
-      : ThreadPool::WorkQueue<item>("ParallelOSDMapper::WQ", 0, 0, tp),
-        m(m_) {}
-
-    bool _enqueue(item *i) override {
-      m->q.push_back(i);
-      return true;
-    }
-    void _dequeue(item *i) override {
-      ceph_abort();
-    }
-    item *_dequeue() override {
-      while (!m->q.empty()) {
-       item *i = m->q.front();
-       m->q.pop_front();
-       if (i->job->aborted) {
-         i->job->finish_one();
-         delete i;
-       } else {
-         return i;
-       }
-      }
-      return nullptr;
-    }
-
-    void _process(item *i, ThreadPool::TPHandle &h) override;
-
-    void _clear() override {
-      assert(_empty());
-    }
-
-    bool _empty() override {
-      return m->q.empty();
-    }
-  } wq;
-
-public:
-  ParallelOSDMapper(CephContext *cct, ThreadPool *tp)
-    : cct(cct),
-      wq(this, tp) {}
-
-  std::unique_ptr<Job> queue(
-    const OSDMap& osdmap,
-    OSDMapMapping *mapping,
-    unsigned pgs_per_item);
-
-  void drain() {
-    wq.drain();
-  }
-};
 
 #endif