From: Greg Farnum Date: Mon, 30 Jan 2012 22:50:28 +0000 (-0800) Subject: osd: "mark" OpRequests as they move through the system. X-Git-Tag: v0.44~99^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fd3108ee9dec87235a9f294d77690bb53d9363bf;p=ceph.git osd: "mark" OpRequests as they move through the system. Right now these are just informational flags which can be read out. Later they might extend to timing information, separate lists for more precise control over latency warnings, etc. Signed-off-by: Greg Farnum --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7dfb07f6e22..ae39befafea 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -3105,6 +3105,7 @@ void OSD::wait_for_new_map(OpRequest *op) } waiting_for_osdmap.push_back(op); + op->mark_delayed(); } @@ -4162,6 +4163,8 @@ void OSD::handle_pg_create(OpRequest *op) if (!require_same_or_newer_map(op, m->epoch)) return; + op->mark_started(); + map< int, map > query_map; map info_map; @@ -4346,6 +4349,8 @@ void OSD::handle_pg_notify(OpRequest *op) if (!require_same_or_newer_map(op, m->get_epoch())) return; + op->mark_started(); + // look for unknown PGs i'm primary for map< int, map > query_map; map info_map; @@ -4419,6 +4424,8 @@ void OSD::handle_pg_log(OpRequest *op) return; } + op->mark_started(); + map< int, map > query_map; map< int, MOSDPGInfo* > info_map; PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t); @@ -4446,6 +4453,9 @@ void OSD::handle_pg_info(OpRequest *op) int from = m->get_source().num(); if (!require_same_or_newer_map(op, m->get_epoch())) return; + + op->mark_started(); + map< int, MOSDPGInfo* > info_map; int created = 0; @@ -4498,6 +4508,8 @@ void OSD::handle_pg_trim(OpRequest *op) int from = m->get_source().num(); if (!require_same_or_newer_map(op, m->epoch)) return; + op->mark_started(); + if (!_have_pg(m->pgid)) { dout(10) << " don't have pg " << m->pgid << dendl; } else { @@ -4633,6 +4645,8 @@ void OSD::handle_pg_missing(OpRequest *op) if (!require_same_or_newer_map(op, m->get_epoch())) return; + op->mark_started(); + map< int, map > query_map; PG::Log empty_log; int created = 0; @@ -4665,6 +4679,8 @@ void OSD::handle_pg_query(OpRequest *op) if (!require_same_or_newer_map(op, m->get_epoch())) return; + op->mark_started(); + map< int, vector > notify_list; for (map::iterator it = m->pg_list.begin(); @@ -4764,6 +4780,8 @@ void OSD::handle_pg_remove(OpRequest *op) if (!require_same_or_newer_map(op, m->get_epoch())) return; + op->mark_started(); + for (vector::iterator it = m->pg_list.begin(); it != m->pg_list.end(); it++) { @@ -5258,6 +5276,7 @@ void OSD::handle_op(OpRequest *op) if (osdmap->get_pg_role(pgid, whoami) >= 0) { dout(7) << "we are valid target for op, waiting" << dendl; waiting_for_pg[pgid].push_back(op); + op->mark_delayed(); return; } @@ -5454,6 +5473,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op) if (!pg->is_active()) { dout(7) << *pg << " not active (yet)" << dendl; pg->waiting_for_active.push_back(op); + op->mark_delayed(); return false; } @@ -5462,6 +5482,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op) dout(7) << *pg << " queueing replay at " << m->get_version() << " for " << *m << dendl; pg->replay_queue[m->get_version()] = op; + op->mark_delayed(); return false; } } @@ -5533,6 +5554,8 @@ void OSD::enqueue_op(PG *pg, OpRequest *op) pg->op_queue.push_back(op); op_wq.queue(pg); + + op->mark_queued_for_pg(); } bool OSD::OpWQ::_enqueue(PG *pg) @@ -5615,6 +5638,8 @@ void OSD::dequeue_op(PG *pg) } osd_lock.Unlock(); + op->mark_reached_pg(); + switch (op->request->get_type()) { case CEPH_MSG_OSD_OP: if (op_is_discardable((MOSDOp*)op->request)) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 6b0361bfd19..43802a7715c 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2352,6 +2352,8 @@ void PG::sub_op_scrub_map(OpRequest *op) return; } + op->mark_started(); + int from = m->get_source().num(); dout(10) << " got osd." << from << " scrub map" << dendl; @@ -2423,6 +2425,8 @@ void PG::sub_op_scrub_reserve(OpRequest *op) return; } + op->mark_started(); + scrub_reserved = osd->inc_scrubs_pending(); MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); @@ -2444,6 +2448,8 @@ void PG::sub_op_scrub_reserve_reply(OpRequest *op) return; } + op->mark_started(); + int from = reply->get_source().num(); bufferlist::iterator p = reply->get_data().begin(); bool reserved; @@ -2471,6 +2477,8 @@ void PG::sub_op_scrub_unreserve(OpRequest *op) assert(op->request->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_unreserve" << dendl; + op->mark_started(); + clear_scrub_reserved(); op->put(); @@ -2478,6 +2486,8 @@ void PG::sub_op_scrub_unreserve(OpRequest *op) void PG::sub_op_scrub_stop(OpRequest *op) { + op->mark_started(); + MOSDSubOp *m = (MOSDSubOp*)op->request; assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_stop" << dendl; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d40ade61070..49e5a1fcb19 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -120,6 +120,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op) pull(soid, v); } waiting_for_missing_object[soid].push_back(op); + op->mark_delayed(); } void ReplicatedPG::wait_for_all_missing(OpRequest *op) @@ -175,6 +176,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op recover_object_replicas(soid, v); } waiting_for_degraded_object[soid].push_back(op); + op->mark_delayed(); } bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata) @@ -263,6 +265,8 @@ void ReplicatedPG::do_pg_op(OpRequest *op) assert(m->get_header().type == CEPH_MSG_OSD_OP); dout(10) << "do_pg_op " << *m << dendl; + op->mark_started(); + bufferlist outdata; int result = 0; string cname, mname; @@ -465,6 +469,7 @@ void ReplicatedPG::do_op(OpRequest *op) if (finalizing_scrub && m->may_write()) { dout(20) << __func__ << ": waiting for scrub" << dendl; waiting_for_active.push_back(op); + op->mark_delayed(); return; } @@ -560,6 +565,7 @@ void ReplicatedPG::do_op(OpRequest *op) if (!ok) { dout(10) << "do_op waiting on mode " << mode << dendl; mode.waiting.push_back(op); + op->mark_delayed(); return; } @@ -648,6 +654,8 @@ void ReplicatedPG::do_op(OpRequest *op) return; } + op->mark_started(); + const hobject_t& soid = obc->obs.oi.soid; OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, &obc->obs, obc->ssc, @@ -688,10 +696,13 @@ void ReplicatedPG::do_op(OpRequest *op) } else { dout(10) << " waiting for " << oldv << " to commit" << dendl; waiting_for_ondisk[oldv].push_back(op); + op->mark_delayed(); } return; } + op->mark_started(); + // version ctx->at_version = log.head; @@ -933,6 +944,8 @@ void ReplicatedPG::do_scan(OpRequest *op) assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "do_scan " << *m << dendl; + op->mark_started(); + switch (m->op) { case MOSDPGScan::OP_SCAN_GET_DIGEST: { @@ -977,6 +990,8 @@ void ReplicatedPG::do_backfill(OpRequest *op) assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "do_backfill " << *m << dendl; + op->mark_started(); + switch (m->op) { case MOSDPGBackfill::OP_BACKFILL_FINISH: { @@ -3068,6 +3083,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now, int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; for (unsigned i=1; iop->mark_sub_op_sent(); int peer = acting[i]; Info &pinfo = peer_info[peer]; @@ -3778,6 +3794,9 @@ void ReplicatedPG::sub_op_modify_reply(OpRequest *op) { MOSDSubOpReply *r = (MOSDSubOpReply*)op->request; assert(r->get_header().type == MSG_OSD_SUBOPREPLY); + + op->mark_started(); + // must be replication. tid_t rep_tid = r->get_tid(); int fromosd = r->get_source().num(); @@ -4257,6 +4276,8 @@ void ReplicatedPG::sub_op_push_reply(OpRequest *op) MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request; assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; + + op->mark_started(); int peer = reply->get_source().num(); const hobject_t& soid = reply->get_poid(); @@ -4341,6 +4362,8 @@ void ReplicatedPG::sub_op_pull(OpRequest *op) MOSDSubOp *m = (MOSDSubOp*)op->request; assert(m->get_header().type == MSG_OSD_SUBOP); + op->mark_started(); + const hobject_t soid = m->poid; dout(7) << "op_pull " << soid << " v " << m->version @@ -4481,6 +4504,8 @@ void ReplicatedPG::sub_op_push(OpRequest *op) return; } + op->mark_started(); + interval_set data_subset; map > clone_subsets; @@ -4831,6 +4856,8 @@ void ReplicatedPG::sub_op_remove(OpRequest *op) assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_remove " << m->poid << dendl; + op->mark_started(); + ObjectStore::Transaction *t = new ObjectStore::Transaction; remove_object_with_snap_hardlinks(*t, m->poid); int r = osd->store->queue_transaction(&osr, t);