}
waiting_for_osdmap.push_back(op);
+ op->mark_delayed();
}
if (!require_same_or_newer_map(op, m->epoch)) return;
+ op->mark_started();
+
map< int, map<pg_t,PG::Query> > query_map;
map<int, MOSDPGInfo*> info_map;
if (!require_same_or_newer_map(op, m->get_epoch())) return;
+ op->mark_started();
+
// look for unknown PGs i'm primary for
map< int, map<pg_t,PG::Query> > query_map;
map<int, MOSDPGInfo*> info_map;
return;
}
+ op->mark_started();
+
map< int, map<pg_t,PG::Query> > query_map;
map< int, MOSDPGInfo* > info_map;
PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
int from = m->get_source().num();
if (!require_same_or_newer_map(op, m->get_epoch())) return;
+
+ op->mark_started();
+
map< int, MOSDPGInfo* > info_map;
int created = 0;
int from = m->get_source().num();
if (!require_same_or_newer_map(op, m->epoch)) return;
+ op->mark_started();
+
if (!_have_pg(m->pgid)) {
dout(10) << " don't have pg " << m->pgid << dendl;
} else {
if (!require_same_or_newer_map(op, m->get_epoch()))
return;
+ op->mark_started();
+
map< int, map<pg_t,PG::Query> > query_map;
PG::Log empty_log;
int created = 0;
if (!require_same_or_newer_map(op, m->get_epoch())) return;
+ op->mark_started();
+
map< int, vector<PG::Info> > notify_list;
for (map<pg_t,PG::Query>::iterator it = m->pg_list.begin();
if (!require_same_or_newer_map(op, m->get_epoch())) return;
+ op->mark_started();
+
for (vector<pg_t>::iterator it = m->pg_list.begin();
it != m->pg_list.end();
it++) {
if (osdmap->get_pg_role(pgid, whoami) >= 0) {
dout(7) << "we are valid target for op, waiting" << dendl;
waiting_for_pg[pgid].push_back(op);
+ op->mark_delayed();
return;
}
if (!pg->is_active()) {
dout(7) << *pg << " not active (yet)" << dendl;
pg->waiting_for_active.push_back(op);
+ op->mark_delayed();
return false;
}
dout(7) << *pg << " queueing replay at " << m->get_version()
<< " for " << *m << dendl;
pg->replay_queue[m->get_version()] = op;
+ op->mark_delayed();
return false;
}
}
pg->op_queue.push_back(op);
op_wq.queue(pg);
+
+ op->mark_queued_for_pg();
}
bool OSD::OpWQ::_enqueue(PG *pg)
}
osd_lock.Unlock();
+ op->mark_reached_pg();
+
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
if (op_is_discardable((MOSDOp*)op->request))
return;
}
+ op->mark_started();
+
int from = m->get_source().num();
dout(10) << " got osd." << from << " scrub map" << dendl;
return;
}
+ op->mark_started();
+
scrub_reserved = osd->inc_scrubs_pending();
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
return;
}
+ op->mark_started();
+
int from = reply->get_source().num();
bufferlist::iterator p = reply->get_data().begin();
bool reserved;
assert(op->request->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_unreserve" << dendl;
+ op->mark_started();
+
clear_scrub_reserved();
op->put();
void PG::sub_op_scrub_stop(OpRequest *op)
{
+ op->mark_started();
+
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_stop" << dendl;
pull(soid, v);
}
waiting_for_missing_object[soid].push_back(op);
+ op->mark_delayed();
}
void ReplicatedPG::wait_for_all_missing(OpRequest *op)
recover_object_replicas(soid, v);
}
waiting_for_degraded_object[soid].push_back(op);
+ op->mark_delayed();
}
bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata)
assert(m->get_header().type == CEPH_MSG_OSD_OP);
dout(10) << "do_pg_op " << *m << dendl;
+ op->mark_started();
+
bufferlist outdata;
int result = 0;
string cname, mname;
if (finalizing_scrub && m->may_write()) {
dout(20) << __func__ << ": waiting for scrub" << dendl;
waiting_for_active.push_back(op);
+ op->mark_delayed();
return;
}
if (!ok) {
dout(10) << "do_op waiting on mode " << mode << dendl;
mode.waiting.push_back(op);
+ op->mark_delayed();
return;
}
return;
}
+ op->mark_started();
+
const hobject_t& soid = obc->obs.oi.soid;
OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
&obc->obs, obc->ssc,
} else {
dout(10) << " waiting for " << oldv << " to commit" << dendl;
waiting_for_ondisk[oldv].push_back(op);
+ op->mark_delayed();
}
return;
}
+ op->mark_started();
+
// version
ctx->at_version = log.head;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
dout(10) << "do_scan " << *m << dendl;
+ op->mark_started();
+
switch (m->op) {
case MOSDPGScan::OP_SCAN_GET_DIGEST:
{
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
dout(10) << "do_backfill " << *m << dendl;
+ op->mark_started();
+
switch (m->op) {
case MOSDPGBackfill::OP_BACKFILL_FINISH:
{
int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
for (unsigned i=1; i<acting.size(); i++) {
+ ctx->op->mark_sub_op_sent();
int peer = acting[i];
Info &pinfo = peer_info[peer];
{
MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
+
+ op->mark_started();
+
// must be replication.
tid_t rep_tid = r->get_tid();
int fromosd = r->get_source().num();
MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+
+ op->mark_started();
int peer = reply->get_source().num();
const hobject_t& soid = reply->get_poid();
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
+ op->mark_started();
+
const hobject_t soid = m->poid;
dout(7) << "op_pull " << soid << " v " << m->version
return;
}
+ op->mark_started();
+
interval_set<uint64_t> data_subset;
map<hobject_t, interval_set<uint64_t> > clone_subsets;
assert(m->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_remove " << m->poid << dendl;
+ op->mark_started();
+
ObjectStore::Transaction *t = new ObjectStore::Transaction;
remove_object_with_snap_hardlinks(*t, m->poid);
int r = osd->store->queue_transaction(&osr, t);