#endif
PG::RecoveryCtx rctx = create_context();
- int started = pg->start_recovery_ops(max, &rctx);
+ int started = pg->start_recovery_ops(max, &rctx, handle);
dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
/*
if (!(pg_for_processing[&*pg].size()))
pg_for_processing.erase(&*pg);
}
- osd->dequeue_op(pg, op);
+ osd->dequeue_op(pg, op, handle);
pg->unlock();
}
/*
* NOTE: dequeue called in worker thread, with pg lock
*/
-void OSD::dequeue_op(PGRef pg, OpRequestRef op)
+void OSD::dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
op->mark_reached_pg();
- pg->do_request(op);
+ pg->do_request(op, handle);
// finish
dout(10) << "dequeue_op " << op << " finish" << dendl;
} op_wq;
void enqueue_op(PG *pg, OpRequestRef op);
- void dequeue_op(PGRef pg, OpRequestRef op);
+ void dequeue_op(
+ PGRef pg, OpRequestRef op,
+ ThreadPool::TPHandle &handle);
// -- peering queue --
struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
osd->op_wq.queue(make_pair(PGRef(this), op));
}
-void PG::do_request(OpRequestRef op)
+void PG::do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
// do any pending flush
do_pending_flush();
break;
case MSG_OSD_PG_SCAN:
- do_scan(op);
+ do_scan(op, handle);
break;
case MSG_OSD_PG_BACKFILL:
virtual void check_local() = 0;
- virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0;
+ virtual int start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle) = 0;
void purge_strays();
// abstract bits
- void do_request(OpRequestRef op);
+ void do_request(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle
+ );
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
virtual void do_sub_op_reply(OpRequestRef op) = 0;
- virtual void do_scan(OpRequestRef op) = 0;
+ virtual void do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle
+ ) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
virtual void do_push(OpRequestRef op) = 0;
virtual void do_pull(OpRequestRef op) = 0;
sub_op_modify_reply(op);
}
-void ReplicatedPG::do_scan(OpRequestRef op)
+void ReplicatedPG::do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle)
{
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
assert(m->get_header().type == MSG_OSD_PG_SCAN);
BackfillInterval bi;
osr->flush();
- scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
+ scan_range(
+ m->begin, g_conf->osd_backfill_scan_min,
+ g_conf->osd_backfill_scan_max, &bi, handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
info.pgid, bi.begin, bi.end);
}
-int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
+int ReplicatedPG::start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle)
{
int started = 0;
assert(is_primary());
}
deferred_backfill = true;
} else {
- started += recover_backfill(max - started);
+ started += recover_backfill(max - started, handle);
}
}
* peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin,
* backfill_info.begin, backfills_in_flight)
*/
-int ReplicatedPG::recover_backfill(int max)
+int ReplicatedPG::recover_backfill(
+ int max,
+ ThreadPool::TPHandle &handle)
{
dout(10) << "recover_backfill (" << max << ")" << dendl;
assert(backfill_target >= 0);
dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
backfill_info.clear();
osr->flush();
- scan_range(backfill_pos, local_min, local_max, &backfill_info);
+ scan_range(backfill_pos, local_min, local_max, &backfill_info, handle);
int ops = 0;
map<hobject_t, pair<eversion_t, eversion_t> > to_push;
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
osr->flush();
- scan_range(backfill_info.end, local_min, local_max, &backfill_info);
+ scan_range(backfill_info.end, local_min, local_max, &backfill_info,
+ handle);
backfill_info.trim();
}
backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
put_object_context(obc);
}
-void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
+void ReplicatedPG::scan_range(
+ hobject_t begin, int min, int max, BackfillInterval *bi,
+ ThreadPool::TPHandle &handle)
{
assert(is_locked());
dout(10) << "scan_range from " << begin << dendl;
dout(20) << ls << dendl;
for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
+ handle.reset_tp_timeout();
ObjectContext *obc = NULL;
if (is_primary())
obc = _lookup_object_context(*p);
void _clear_recovery_state();
void queue_for_recovery();
- int start_recovery_ops(int max, RecoveryCtx *prctx);
+ int start_recovery_ops(
+ int max, RecoveryCtx *prctx,
+ ThreadPool::TPHandle &handle);
+
int recover_primary(int max);
int recover_replicas(int max);
- int recover_backfill(int max);
+ int recover_backfill(int max, ThreadPool::TPHandle &handle);
/**
* scan a (hash) range of objects in the current pg
* @max return no more than this many items
* @bi [out] resulting map of objects to eversion_t's
*/
- void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
+ void scan_range(
+ hobject_t begin, int min, int max, BackfillInterval *bi,
+ ThreadPool::TPHandle &handle
+ );
void prep_backfill_object_push(
hobject_t oid, eversion_t v, eversion_t have, int peer,
void do_pg_op(OpRequestRef op);
void do_sub_op(OpRequestRef op);
void do_sub_op_reply(OpRequestRef op);
- void do_scan(OpRequestRef op);
+ void do_scan(
+ OpRequestRef op,
+ ThreadPool::TPHandle &handle);
void do_backfill(OpRequestRef op);
void _do_push(OpRequestRef op);
void _do_pull_response(OpRequestRef op);