// ---------------------------
-void ParallelOSDMapper::Job::finish_one()
+void ParallelPGMapper::Job::finish_one()
{
Context *fin = nullptr;
{
Mutex::Locker l(lock);
if (--shards == 0) {
if (!aborted) {
- mapping->_finish(*osdmap);
+ finish = ceph_clock_now();
+ complete();
}
cond.Signal();
fin = onfinish;
}
}
-void ParallelOSDMapper::WQ::_process(
- item *i,
- ThreadPool::TPHandle &h)
+void ParallelPGMapper::WQ::_process(Item *i, ThreadPool::TPHandle &h)
{
- ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " ["
- << i->begin << "," << i->end << ")" << dendl;
- i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end);
+ ldout(m->cct, 20) << __func__ << " " << i->job << " " << i->pool
+ << " [" << i->begin << "," << i->end << ")" << dendl;
+ i->job->process(i->pool, i->begin, i->end);
i->job->finish_one();
delete i;
}
-std::unique_ptr<ParallelOSDMapper::Job> ParallelOSDMapper::queue(
- const OSDMap& osdmap,
- OSDMapMapping *mapping,
+void ParallelPGMapper::queue(
+ Job *job,
unsigned pgs_per_item)
{
- std::unique_ptr<Job> job(new Job(&osdmap, mapping));
- mapping->_start(osdmap);
- for (auto& p : osdmap.get_pools()) {
+ for (auto& p : job->osdmap->get_pools()) {
for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num());
job->start_one();
- wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end));
- ldout(cct, 20) << __func__ << " queue " << &osdmap << " "
- << p.first << " [" << ps
+ wq.queue(new Item(job, p.first, ps, ps_end));
+ ldout(cct, 20) << __func__ << " " << job << " " << p.first << " [" << ps
<< "," << ps_end << ")" << dendl;
}
}
- return job;
}
class OSDMap;
+/// work queue to perform work on batches of pgids on multiple CPUs
+class ParallelPGMapper {
+public:
+ struct Job {
+ utime_t start, finish;
+ unsigned shards = 0;
+ const OSDMap *osdmap;
+ bool aborted = false;
+ Context *onfinish = nullptr;
+
+ Mutex lock = {"ParallelPGMapper::Job::lock"};
+ Cond cond;
+
+ Job(const OSDMap *om) : start(ceph_clock_now()), osdmap(om) {}
+ virtual ~Job() {
+ assert(shards == 0);
+ }
+
+ // child must implement this
+ virtual void process(int64_t poolid, unsigned ps_begin, unsigned ps_end) = 0;
+ virtual void complete() = 0;
+
+ void set_finish_event(Context *fin) {
+ lock.Lock();
+ if (shards == 0) {
+ // already done.
+ lock.Unlock();
+ fin->complete(0);
+ } else {
+ // set finisher
+ onfinish = fin;
+ lock.Unlock();
+ }
+ }
+ bool is_done() {
+ Mutex::Locker l(lock);
+ return shards == 0;
+ }
+ utime_t get_duration() {
+ return finish - start;
+ }
+ void wait() {
+ Mutex::Locker l(lock);
+ while (shards > 0) {
+ cond.Wait(lock);
+ }
+ }
+ bool wait_for(double duration) {
+ utime_t until = start;
+ until += duration;
+ Mutex::Locker l(lock);
+ while (shards > 0) {
+ if (ceph_clock_now() >= until) {
+ return false;
+ }
+ cond.Wait(lock);
+ }
+ return true;
+ }
+ void abort() {
+ Context *fin = nullptr;
+ {
+ Mutex::Locker l(lock);
+ aborted = true;
+ fin = onfinish;
+ onfinish = nullptr;
+ while (shards > 0) {
+ cond.Wait(lock);
+ }
+ }
+ if (fin) {
+ fin->complete(-ECANCELED);
+ }
+ }
+
+ void start_one() {
+ Mutex::Locker l(lock);
+ ++shards;
+ }
+ void finish_one();
+ };
+
+protected:
+ CephContext *cct;
+
+ struct Item {
+ Job *job;
+ int64_t pool;
+ unsigned begin, end;
+
+ Item(Job *j, int64_t p, unsigned b, unsigned e)
+ : job(j),
+ pool(p),
+ begin(b),
+ end(e) {}
+ };
+ std::deque<Item*> q;
+
+ struct WQ : public ThreadPool::WorkQueue<Item> {
+ ParallelPGMapper *m;
+
+ WQ(ParallelPGMapper *m_, ThreadPool *tp)
+ : ThreadPool::WorkQueue<Item>("ParallelPGMapper::WQ", 0, 0, tp),
+ m(m_) {}
+
+ bool _enqueue(Item *i) override {
+ m->q.push_back(i);
+ return true;
+ }
+ void _dequeue(Item *i) override {
+ ceph_abort();
+ }
+ Item *_dequeue() override {
+ while (!m->q.empty()) {
+ Item *i = m->q.front();
+ m->q.pop_front();
+ if (i->job->aborted) {
+ i->job->finish_one();
+ delete i;
+ } else {
+ return i;
+ }
+ }
+ return nullptr;
+ }
+
+ void _process(Item *i, ThreadPool::TPHandle &h) override;
+
+ void _clear() override {
+ assert(_empty());
+ }
+
+ bool _empty() override {
+ return m->q.empty();
+ }
+ } wq;
+
+public:
+ ParallelPGMapper(CephContext *cct, ThreadPool *tp)
+ : cct(cct),
+ wq(this, tp) {}
+
+ void queue(
+ Job *job,
+ unsigned pgs_per_item);
+
+ void drain() {
+ wq.drain();
+ }
+};
+
+
/// a precalculated mapping of every PG for a given OSDMap
class OSDMapMapping {
struct PoolMapping {
void _dump();
- friend class ParallelOSDMapper;
+ friend class ParallelPGMapper;
+
+ struct MappingJob : public ParallelPGMapper::Job {
+ OSDMapMapping *mapping;
+ MappingJob(const OSDMap *osdmap, OSDMapMapping *m)
+ : Job(osdmap), mapping(m) {
+ mapping->_start(*osdmap);
+ }
+ void process(int64_t pool, unsigned ps_begin, unsigned ps_end) {
+ mapping->_update_range(*osdmap, pool, ps_begin, ps_end);
+ }
+ void complete() {
+ mapping->_finish(*osdmap);
+ }
+ };
public:
void get(pg_t pgid,
void update(const OSDMap& map);
void update(const OSDMap& map, pg_t pgid);
+ std::unique_ptr<MappingJob> start_update(
+ const OSDMap& map,
+ ParallelPGMapper& mapper,
+ unsigned pgs_per_item) {
+ std::unique_ptr<MappingJob> job(new MappingJob(&map, this));
+ mapper.queue(job.get(), pgs_per_item);
+ return job;
+ }
+
epoch_t get_epoch() const {
return epoch;
}
}
};
-/// thread pool to calculate mapping on multiple CPUs
-class ParallelOSDMapper {
-public:
- struct Job {
- unsigned shards = 0;
- const OSDMap *osdmap;
- OSDMapMapping *mapping;
- bool aborted = false;
- Context *onfinish = nullptr;
-
- Mutex lock = {"ParallelOSDMapper::Job::lock"};
- Cond cond;
-
- Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {}
- ~Job() {
- assert(shards == 0);
- }
-
- void set_finish_event(Context *fin) {
- lock.Lock();
- if (shards == 0) {
- // already done.
- lock.Unlock();
- fin->complete(0);
- } else {
- // set finisher
- onfinish = fin;
- lock.Unlock();
- }
- }
- bool is_done() {
- Mutex::Locker l(lock);
- return shards == 0;
- }
- void wait() {
- Mutex::Locker l(lock);
- while (shards > 0) {
- cond.Wait(lock);
- }
- }
- void abort() {
- Context *fin = nullptr;
- {
- Mutex::Locker l(lock);
- aborted = true;
- fin = onfinish;
- onfinish = nullptr;
- while (shards > 0) {
- cond.Wait(lock);
- }
- }
- if (fin) {
- fin->complete(-ECANCELED);
- }
- }
-
- void start_one() {
- Mutex::Locker l(lock);
- ++shards;
- }
- void finish_one();
- };
-
-protected:
- CephContext *cct;
-
- struct item {
- Job *job;
- const OSDMap *osdmap;
- OSDMapMapping *mapping;
- int64_t pool;
- unsigned begin, end;
-
- item(Job *j, const OSDMap *m, OSDMapMapping *mg,
- int64_t p, unsigned b, unsigned e)
- : job(j),
- osdmap(m),
- mapping(mg),
- pool(p),
- begin(b),
- end(e) {}
- };
- std::deque<item*> q;
-
- struct WQ : public ThreadPool::WorkQueue<item> {
- ParallelOSDMapper *m;
-
- WQ(ParallelOSDMapper *m_, ThreadPool *tp)
- : ThreadPool::WorkQueue<item>("ParallelOSDMapper::WQ", 0, 0, tp),
- m(m_) {}
-
- bool _enqueue(item *i) override {
- m->q.push_back(i);
- return true;
- }
- void _dequeue(item *i) override {
- ceph_abort();
- }
- item *_dequeue() override {
- while (!m->q.empty()) {
- item *i = m->q.front();
- m->q.pop_front();
- if (i->job->aborted) {
- i->job->finish_one();
- delete i;
- } else {
- return i;
- }
- }
- return nullptr;
- }
-
- void _process(item *i, ThreadPool::TPHandle &h) override;
-
- void _clear() override {
- assert(_empty());
- }
-
- bool _empty() override {
- return m->q.empty();
- }
- } wq;
-
-public:
- ParallelOSDMapper(CephContext *cct, ThreadPool *tp)
- : cct(cct),
- wq(this, tp) {}
-
- std::unique_ptr<Job> queue(
- const OSDMap& osdmap,
- OSDMapMapping *mapping,
- unsigned pgs_per_item);
-
- void drain() {
- wq.drain();
- }
-};
#endif