#include "OSDMapMapping.h"
#include "OSDMap.h"
+#define dout_subsys ceph_subsys_mon
+
+#include "common/debug.h"
+
// ensure that we have a PoolMappings for each pool and that
// the dimensions (pg_num and size) match up.
void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
void OSDMapMapping::update(const OSDMap& osdmap)
{
- _init_mappings(osdmap);
+ _start(osdmap);
for (auto& p : osdmap.get_pools()) {
_update_range(osdmap, p.first, 0, p.second.get_pg_num());
}
- _build_rmap(osdmap);
+ _finish(osdmap);
//_dump(); // for debugging
}
+void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid)
+{
+ _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1);
+}
+
void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
{
acting_rmap.resize(osdmap.get_max_osd());
}
}
+void OSDMapMapping::_finish(const OSDMap& osdmap)
+{
+ _build_rmap(osdmap);
+ epoch = osdmap.get_epoch();
+}
+
void OSDMapMapping::_dump()
{
for (auto& p : pools) {
std::move(acting), acting_primary);
}
}
+
+// ---------------------------
+
+void ParallelOSDMapper::Job::finish_one()
+{
+ Context *fin = nullptr;
+ {
+ Mutex::Locker l(lock);
+ if (--shards == 0) {
+ if (!aborted) {
+ mapping->_finish(*osdmap);
+ }
+ cond.Signal();
+ fin = onfinish;
+ onfinish = nullptr;
+ }
+ }
+ if (fin) {
+ fin->complete(0);
+ }
+}
+
+void ParallelOSDMapper::WQ::_process(
+ item *i,
+ ThreadPool::TPHandle &h)
+{
+ ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " ["
+ << i->begin << "," << i->end << ")" << dendl;
+ i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end);
+ i->job->finish_one();
+ delete i;
+}
+
+std::unique_ptr<ParallelOSDMapper::Job> ParallelOSDMapper::queue(
+ const OSDMap& osdmap,
+ OSDMapMapping *mapping,
+ unsigned pgs_per_item)
+{
+ std::unique_ptr<Job> job(new Job(&osdmap, mapping));
+ mapping->_start(osdmap);
+ for (auto& p : osdmap.get_pools()) {
+ for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
+ unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num());
+ job->start_one();
+ wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end));
+ ldout(cct, 20) << __func__ << " queue " << &osdmap << " "
+ << p.first << " [" << ps
+ << "," << ps_end << ")" << dendl;
+ }
+ }
+ return job;
+}
#include <map>
#include "osd/osd_types.h"
+#include "common/WorkQueue.h"
class OSDMap;
std::vector<int> *up,
int *up_primary,
std::vector<int> *acting,
- int *acting_primary) {
- int32_t *row = &table[row_size() * ps];
+ int *acting_primary) const {
+ const int32_t *row = &table[row_size() * ps];
if (acting_primary) {
*acting_primary = row[0];
}
std::map<int64_t,PoolMapping> pools;
std::vector<std::vector<pg_t>> acting_rmap; // osd -> pg
std::vector<std::vector<pg_t>> up_rmap; // osd -> pg
+ epoch_t epoch;
void _init_mappings(const OSDMap& osdmap);
void _update_range(
void _build_rmap(const OSDMap& osdmap);
+ void _start(const OSDMap& osdmap) {
+ _init_mappings(osdmap);
+ }
+ void _finish(const OSDMap& osdmap);
+
void _dump();
+ friend class ParallelOSDMapper;
+
public:
void get(pg_t pgid,
std::vector<int> *up,
int *up_primary,
std::vector<int> *acting,
- int *acting_primary) {
+ int *acting_primary) const {
auto p = pools.find(pgid.pool());
assert(p != pools.end());
p->second.get(pgid.ps(), up, up_primary, acting, acting_primary);
}
void update(const OSDMap& map);
+ void update(const OSDMap& map, pg_t pgid);
+
+ epoch_t get_epoch() const {
+ return epoch;
+ }
};
+/// thread pool to calculate mapping on multiple CPUs
+class ParallelOSDMapper {
+public:
+ struct Job {
+ unsigned shards = 0;
+ const OSDMap *osdmap;
+ OSDMapMapping *mapping;
+ bool aborted = false;
+ Context *onfinish = nullptr;
+
+ Mutex lock = {"ParallelOSDMapper::Job::lock"};
+ Cond cond;
+
+ Job(const OSDMap *om, OSDMapMapping *m) : osdmap(om), mapping(m) {}
+ ~Job() {
+ assert(shards == 0);
+ }
+
+ void set_finish_event(Context *fin) {
+ lock.Lock();
+ if (shards == 0) {
+ // already done.
+ lock.Unlock();
+ fin->complete(0);
+ } else {
+ // set finisher
+ onfinish = fin;
+ lock.Unlock();
+ }
+ }
+ bool is_done() {
+ Mutex::Locker l(lock);
+ return shards == 0;
+ }
+ void wait() {
+ Mutex::Locker l(lock);
+ while (shards > 0) {
+ cond.Wait(lock);
+ }
+ }
+ void abort() {
+ Context *fin = nullptr;
+ {
+ Mutex::Locker l(lock);
+ aborted = true;
+ fin = onfinish;
+ onfinish = nullptr;
+ while (shards > 0) {
+ cond.Wait(lock);
+ }
+ }
+ if (fin) {
+ fin->complete(-ECANCELED);
+ }
+ }
+
+ void start_one() {
+ Mutex::Locker l(lock);
+ ++shards;
+ }
+ void finish_one();
+ };
+
+protected:
+ CephContext *cct;
+
+ struct item {
+ Job *job;
+ const OSDMap *osdmap;
+ OSDMapMapping *mapping;
+ int64_t pool;
+ unsigned begin, end;
+
+ item(Job *j, const OSDMap *m, OSDMapMapping *mg,
+ int64_t p, unsigned b, unsigned e)
+ : job(j),
+ osdmap(m),
+ mapping(mg),
+ pool(p),
+ begin(b),
+ end(e) {}
+ };
+ std::deque<item*> q;
+
+ struct WQ : public ThreadPool::WorkQueue<item> {
+ ParallelOSDMapper *m;
+
+ WQ(ParallelOSDMapper *m_, ThreadPool *tp)
+ : ThreadPool::WorkQueue<item>("ParallelOSDMapper::WQ", 0, 0, tp),
+ m(m_) {}
+
+ bool _enqueue(item *i) override {
+ m->q.push_back(i);
+ return true;
+ }
+ void _dequeue(item *i) override {
+ ceph_abort();
+ }
+ item *_dequeue() override {
+ while (!m->q.empty()) {
+ item *i = m->q.front();
+ m->q.pop_front();
+ if (i->job->aborted) {
+ i->job->finish_one();
+ delete i;
+ } else {
+ return i;
+ }
+ }
+ return nullptr;
+ }
+
+ void _process(item *i, ThreadPool::TPHandle &h) override;
+
+ void _clear() override {
+ assert(_empty());
+ }
+
+ bool _empty() override {
+ return m->q.empty();
+ }
+ } wq;
+
+public:
+ ParallelOSDMapper(CephContext *cct, ThreadPool *tp)
+ : cct(cct),
+ wq(this, tp) {}
+
+ std::unique_ptr<Job> queue(
+ const OSDMap& osdmap,
+ OSDMapMapping *mapping,
+ unsigned pgs_per_item);
+
+ void drain() {
+ wq.drain();
+ }
+};
#endif