}
void OSD::advance_pg(
- epoch_t osd_epoch, PG *pg, PG::RecoveryCtx *rctx,
+ epoch_t osd_epoch, PG *pg,
+ ThreadPool::TPHandle &handle,
+ PG::RecoveryCtx *rctx,
set<boost::intrusive_ptr<PG> > *new_pgs)
{
assert(pg->is_locked());
}
lastmap = nextmap;
+ handle.reset_tp_timeout();
}
if (!is_booting())
pg->handle_activate_map(rctx);
}
};
-void OSD::process_peering_events(const list<PG*> &pgs)
+void OSD::process_peering_events(
+ const list<PG*> &pgs,
+ ThreadPool::TPHandle &handle
+ )
{
bool need_up_thru = false;
epoch_t same_interval_since = 0;
pg->unlock();
continue;
}
- advance_pg(curmap->get_epoch(), pg, &rctx, &split_pgs);
+ advance_pg(curmap->get_epoch(), pg, handle, &rctx, &split_pgs);
if (!pg->peering_queue.empty()) {
PG::CephPeeringEvtRef evt = pg->peering_queue.front();
pg->peering_queue.pop_front();
dispatch_context_transaction(rctx, pg);
}
pg->unlock();
+ handle.reset_tp_timeout();
}
if (need_up_thru)
queue_want_up_thru(same_interval_since);
}
in_use.insert(got.begin(), got.end());
}
- void _process(const list<PG *> &pgs) {
- osd->process_peering_events(pgs);
+ void _process(
+ const list<PG *> &pgs,
+ ThreadPool::TPHandle &handle) {
+ osd->process_peering_events(pgs, handle);
for (list<PG *>::const_iterator i = pgs.begin();
i != pgs.end();
++i) {
}
} peering_wq;
- void process_peering_events(const list<PG*> &pg);
+ void process_peering_events(
+ const list<PG*> &pg,
+ ThreadPool::TPHandle &handle);
friend class PG;
friend class ReplicatedPG;
void note_up_osd(int osd);
void advance_pg(
- epoch_t advance_to, PG *pg, PG::RecoveryCtx *rctx,
- set<boost::intrusive_ptr<PG> > *split_pgs);
+ epoch_t advance_to, PG *pg,
+ ThreadPool::TPHandle &handle,
+ PG::RecoveryCtx *rctx,
+ set<boost::intrusive_ptr<PG> > *split_pgs
+ );
void advance_map(ObjectStore::Transaction& t, C_Contexts *tfin);
void consume_map();
void activate_map();