logger(osd->logger),
recoverystate_perf(osd->recoverystate_perf),
monc(osd->monc),
- peering_wq(osd->peering_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
&osd->disk_tp),
class_handler(osd->class_handler),
osd->op_shardedwq.queue_front(std::move(qi));
}
-void OSDService::queue_for_peering(PG *pg)
-{
- peering_wq.queue(pg);
-}
-
void OSDService::queue_for_snap_trim(PG *pg)
{
dout(10) << "queueing " << *pg << " for snaptrim" << dendl;
trace_endpoint("0.0.0.0", 0, "osd"),
asok_hook(NULL),
osd_compat(get_osd_compat_set()),
- peering_tp(cct, "OSD::peering_tp", "tp_peering",
- cct->_conf->osd_peering_wq_threads,
- "osd_peering_tp_threads"),
osd_op_tp(cct, "OSD::osd_op_tp", "tp_osd_tp",
get_num_op_threads()),
disk_tp(cct, "OSD::disk_tp", "tp_osd_disk", cct->_conf->osd_disk_threads, "osd_disk_threads"),
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
&osd_op_tp),
- peering_wq(
- this,
- cct->_conf->osd_op_thread_timeout,
- cct->_conf->osd_op_thread_suicide_timeout,
- &peering_tp),
map_lock("OSD::map_lock"),
pg_map_lock("OSD::pg_map_lock"),
last_pg_create_epoch(0),
monc->set_log_client(&log_client);
update_log_config();
- peering_tp.start();
osd_op_tp.start();
disk_tp.start();
command_tp.start();
dout(10) << "ensuring pgs have consumed prior maps" << dendl;
consume_map();
- peering_wq.drain();
dout(0) << "done with init, starting boot process" << dendl;
heartbeat_lock.Unlock();
heartbeat_thread.join();
- peering_tp.drain();
- peering_wq.clear();
- peering_tp.stop();
- dout(10) << "osd tp stopped" << dendl;
-
osd_op_tp.drain();
osd_op_tp.stop();
dout(10) << "op sharded tp stopped" << dendl;
hb_front_server_messenger->shutdown();
hb_back_server_messenger->shutdown();
- peering_wq.clear();
-
return r;
}
}
};
-void OSD::process_peering_events(
- const list<PG*> &pgs,
- ThreadPool::TPHandle &handle
- )
-{
- bool need_up_thru = false;
- epoch_t same_interval_since = 0;
- OSDMapRef curmap;
- PG::RecoveryCtx rctx = create_context();
- rctx.handle = &handle;
- for (list<PG*>::const_iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- set<PGRef> split_pgs;
- PG *pg = *i;
- pg->lock_suspend_timeout(handle);
- curmap = service.get_osdmap();
- if (pg->is_deleting()) {
- pg->unlock();
- continue;
- }
- if (!advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs, false)) {
- // we need to requeue the PG explicitly since we didn't actually
- // handle an event
- peering_wq.queue(pg);
- } else {
- pg->process_peering_event(&rctx);
- }
- need_up_thru = pg->get_need_up_thru() || need_up_thru;
- same_interval_since = MAX(pg->get_same_interval_since(),
- same_interval_since);
- if (!split_pgs.empty()) {
- rctx.on_applied->add(new C_CompleteSplits(this, split_pgs));
- split_pgs.clear();
- }
- dispatch_context_transaction(rctx, pg, &handle);
- pg->unlock();
- }
- if (need_up_thru)
- queue_want_up_thru(same_interval_since);
- dispatch_context(rctx, 0, curmap, &handle);
-
- service.send_pg_temp();
-}
-
void OSD::dequeue_peering_evt(
PG *pg,
PGPeeringEventRef evt,
return 0;
}
-void OSD::PeeringWQ::_dequeue(list<PG*> *out) {
- for (list<PG*>::iterator i = peering_queue.begin();
- i != peering_queue.end() &&
- out->size() < osd->cct->_conf->osd_peering_wq_batch_size;
- ) {
- if (in_use.count(*i)) {
- ++i;
- } else {
- out->push_back(*i);
- peering_queue.erase(i++);
- }
- }
- in_use.insert(out->begin(), out->end());
-}
-
// =============================================================
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
- ThreadPool::BatchWorkQueue<PG> &peering_wq;
GenContextWQ recovery_gen_wq;
ClassHandler *&class_handler;
void send_pg_created(pg_t pgid);
- void queue_for_peering(PG *pg);
-
Mutex snap_sleep_lock;
SafeTimer snap_sleep_timer;
private:
- ThreadPool peering_tp;
ShardedThreadPool osd_op_tp;
ThreadPool disk_tp;
ThreadPool command_tp;
PGPeeringEventRef ref,
ThreadPool::TPHandle& handle);
- // -- peering queue --
- struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
- list<PG*> peering_queue;
- OSD *osd;
- set<PG*> in_use;
- PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
- : ThreadPool::BatchWorkQueue<PG>(
- "OSD::PeeringWQ", ti, si, tp), osd(o) {}
-
- void _dequeue(PG *pg) override {
- for (list<PG*>::iterator i = peering_queue.begin();
- i != peering_queue.end();
- ) {
- if (*i == pg) {
- peering_queue.erase(i++);
- pg->put("PeeringWQ");
- } else {
- ++i;
- }
- }
- }
- bool _enqueue(PG *pg) override {
- pg->get("PeeringWQ");
- peering_queue.push_back(pg);
- return true;
- }
- bool _empty() override {
- return peering_queue.empty();
- }
- void _dequeue(list<PG*> *out) override;
- void _process(
- const list<PG *> &pgs,
- ThreadPool::TPHandle &handle) override {
- assert(!pgs.empty());
- osd->process_peering_events(pgs, handle);
- for (list<PG *>::const_iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- (*i)->put("PeeringWQ");
- }
- }
- void _process_finish(const list<PG *> &pgs) override {
- for (list<PG*>::const_iterator i = pgs.begin();
- i != pgs.end();
- ++i) {
- in_use.erase(*i);
- }
- }
- void _clear() override {
- assert(peering_queue.empty());
- }
- } peering_wq;
-
- void process_peering_events(
- const list<PG*> &pg,
- ThreadPool::TPHandle &handle);
-
friend class PG;
friend class PrimaryLogPG;