]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: "mark" OpRequests as they move through the system.
authorGreg Farnum <gregory.farnum@dreamhost.com>
Mon, 30 Jan 2012 22:50:28 +0000 (14:50 -0800)
committerGreg Farnum <gregory.farnum@dreamhost.com>
Wed, 1 Feb 2012 23:02:29 +0000 (15:02 -0800)
Right now these are just informational flags which can be read out. Later
they might extend to timing information, separate lists for more precise
control over latency warnings, etc.

Signed-off-by: Greg Farnum <gregory.farnum@dreamhost.com>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/ReplicatedPG.cc

index 7dfb07f6e22fc6c2194cfd5e499ccfe50a4bd8fe..ae39befafea4725dfce56593f6ff313807862fca 100644 (file)
@@ -3105,6 +3105,7 @@ void OSD::wait_for_new_map(OpRequest *op)
   }
   
   waiting_for_osdmap.push_back(op);
+  op->mark_delayed();
 }
 
 
@@ -4162,6 +4163,8 @@ void OSD::handle_pg_create(OpRequest *op)
 
   if (!require_same_or_newer_map(op, m->epoch)) return;
 
+  op->mark_started();
+
   map< int, map<pg_t,PG::Query> > query_map;
   map<int, MOSDPGInfo*> info_map;
 
@@ -4346,6 +4349,8 @@ void OSD::handle_pg_notify(OpRequest *op)
 
   if (!require_same_or_newer_map(op, m->get_epoch())) return;
 
+  op->mark_started();
+
   // look for unknown PGs i'm primary for
   map< int, map<pg_t,PG::Query> > query_map;
   map<int, MOSDPGInfo*> info_map;
@@ -4419,6 +4424,8 @@ void OSD::handle_pg_log(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   map< int, map<pg_t,PG::Query> > query_map;
   map< int, MOSDPGInfo* > info_map;
   PG::RecoveryCtx rctx(&query_map, &info_map, 0, &fin->contexts, t);
@@ -4446,6 +4453,9 @@ void OSD::handle_pg_info(OpRequest *op)
 
   int from = m->get_source().num();
   if (!require_same_or_newer_map(op, m->get_epoch())) return;
+
+  op->mark_started();
+
   map< int, MOSDPGInfo* > info_map;
 
   int created = 0;
@@ -4498,6 +4508,8 @@ void OSD::handle_pg_trim(OpRequest *op)
   int from = m->get_source().num();
   if (!require_same_or_newer_map(op, m->epoch)) return;
 
+  op->mark_started();
+
   if (!_have_pg(m->pgid)) {
     dout(10) << " don't have pg " << m->pgid << dendl;
   } else {
@@ -4633,6 +4645,8 @@ void OSD::handle_pg_missing(OpRequest *op)
   if (!require_same_or_newer_map(op, m->get_epoch()))
     return;
 
+  op->mark_started();
+
   map< int, map<pg_t,PG::Query> > query_map;
   PG::Log empty_log;
   int created = 0;
@@ -4665,6 +4679,8 @@ void OSD::handle_pg_query(OpRequest *op)
   
   if (!require_same_or_newer_map(op, m->get_epoch())) return;
 
+  op->mark_started();
+
   map< int, vector<PG::Info> > notify_list;
   
   for (map<pg_t,PG::Query>::iterator it = m->pg_list.begin();
@@ -4764,6 +4780,8 @@ void OSD::handle_pg_remove(OpRequest *op)
   
   if (!require_same_or_newer_map(op, m->get_epoch())) return;
   
+  op->mark_started();
+
   for (vector<pg_t>::iterator it = m->pg_list.begin();
        it != m->pg_list.end();
        it++) {
@@ -5258,6 +5276,7 @@ void OSD::handle_op(OpRequest *op)
     if (osdmap->get_pg_role(pgid, whoami) >= 0) {
       dout(7) << "we are valid target for op, waiting" << dendl;
       waiting_for_pg[pgid].push_back(op);
+      op->mark_delayed();
       return;
     }
 
@@ -5454,6 +5473,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op)
   if (!pg->is_active()) {
     dout(7) << *pg << " not active (yet)" << dendl;
     pg->waiting_for_active.push_back(op);
+    op->mark_delayed();
     return false;
   }
 
@@ -5462,6 +5482,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op)
       dout(7) << *pg << " queueing replay at " << m->get_version()
              << " for " << *m << dendl;
       pg->replay_queue[m->get_version()] = op;
+      op->mark_delayed();
       return false;
     }
   }
@@ -5533,6 +5554,8 @@ void OSD::enqueue_op(PG *pg, OpRequest *op)
   pg->op_queue.push_back(op);
   
   op_wq.queue(pg);
+
+  op->mark_queued_for_pg();
 }
 
 bool OSD::OpWQ::_enqueue(PG *pg)
@@ -5615,6 +5638,8 @@ void OSD::dequeue_op(PG *pg)
   }
   osd_lock.Unlock();
 
+  op->mark_reached_pg();
+
   switch (op->request->get_type()) {
   case CEPH_MSG_OSD_OP:
     if (op_is_discardable((MOSDOp*)op->request))
index 6b0361bfd19d30f2df12f4c1e848a2254adb3622..43802a7715c7ed9424511692fed77f7cc34ca4c9 100644 (file)
@@ -2352,6 +2352,8 @@ void PG::sub_op_scrub_map(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   int from = m->get_source().num();
 
   dout(10) << " got osd." << from << " scrub map" << dendl;
@@ -2423,6 +2425,8 @@ void PG::sub_op_scrub_reserve(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   scrub_reserved = osd->inc_scrubs_pending();
 
   MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
@@ -2444,6 +2448,8 @@ void PG::sub_op_scrub_reserve_reply(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   int from = reply->get_source().num();
   bufferlist::iterator p = reply->get_data().begin();
   bool reserved;
@@ -2471,6 +2477,8 @@ void PG::sub_op_scrub_unreserve(OpRequest *op)
   assert(op->request->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_unreserve" << dendl;
 
+  op->mark_started();
+
   clear_scrub_reserved();
 
   op->put();
@@ -2478,6 +2486,8 @@ void PG::sub_op_scrub_unreserve(OpRequest *op)
 
 void PG::sub_op_scrub_stop(OpRequest *op)
 {
+  op->mark_started();
+
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_scrub_stop" << dendl;
index d40ade61070b001f04c76bc3ddd36028700cb2b0..49e5a1fcb19ccb99beaf32c500850decc841ff7d 100644 (file)
@@ -120,6 +120,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
     pull(soid, v);
   }
   waiting_for_missing_object[soid].push_back(op);
+  op->mark_delayed();
 }
 
 void ReplicatedPG::wait_for_all_missing(OpRequest *op)
@@ -175,6 +176,7 @@ void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op
     recover_object_replicas(soid, v);
   }
   waiting_for_degraded_object[soid].push_back(op);
+  op->mark_delayed();
 }
 
 bool PGLSParentFilter::filter(bufferlist& xattr_data, bufferlist& outdata)
@@ -263,6 +265,8 @@ void ReplicatedPG::do_pg_op(OpRequest *op)
   assert(m->get_header().type == CEPH_MSG_OSD_OP);
   dout(10) << "do_pg_op " << *m << dendl;
 
+  op->mark_started();
+
   bufferlist outdata;
   int result = 0;
   string cname, mname;
@@ -465,6 +469,7 @@ void ReplicatedPG::do_op(OpRequest *op)
   if (finalizing_scrub && m->may_write()) {
     dout(20) << __func__ << ": waiting for scrub" << dendl;
     waiting_for_active.push_back(op);
+    op->mark_delayed();
     return;
   }
 
@@ -560,6 +565,7 @@ void ReplicatedPG::do_op(OpRequest *op)
   if (!ok) {
     dout(10) << "do_op waiting on mode " << mode << dendl;
     mode.waiting.push_back(op);
+    op->mark_delayed();
     return;
   }
 
@@ -648,6 +654,8 @@ void ReplicatedPG::do_op(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   const hobject_t& soid = obc->obs.oi.soid;
   OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops,
                                 &obc->obs, obc->ssc, 
@@ -688,10 +696,13 @@ void ReplicatedPG::do_op(OpRequest *op)
       } else {
        dout(10) << " waiting for " << oldv << " to commit" << dendl;
        waiting_for_ondisk[oldv].push_back(op);
+       op->mark_delayed();
       }
       return;
     }
 
+    op->mark_started();
+
     // version
     ctx->at_version = log.head;
 
@@ -933,6 +944,8 @@ void ReplicatedPG::do_scan(OpRequest *op)
   assert(m->get_header().type == MSG_OSD_PG_SCAN);
   dout(10) << "do_scan " << *m << dendl;
 
+  op->mark_started();
+
   switch (m->op) {
   case MOSDPGScan::OP_SCAN_GET_DIGEST:
     {
@@ -977,6 +990,8 @@ void ReplicatedPG::do_backfill(OpRequest *op)
   assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
   dout(10) << "do_backfill " << *m << dendl;
 
+  op->mark_started();
+
   switch (m->op) {
   case MOSDPGBackfill::OP_BACKFILL_FINISH:
     {
@@ -3068,6 +3083,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now,
   int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
 
   for (unsigned i=1; i<acting.size(); i++) {
+    ctx->op->mark_sub_op_sent();
     int peer = acting[i];
     Info &pinfo = peer_info[peer];
 
@@ -3778,6 +3794,9 @@ void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
 {
   MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
   assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
+
+  op->mark_started();
+
   // must be replication.
   tid_t rep_tid = r->get_tid();
   int fromosd = r->get_source().num();
@@ -4257,6 +4276,8 @@ void ReplicatedPG::sub_op_push_reply(OpRequest *op)
   MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
   assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
   dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
+
+  op->mark_started();
   
   int peer = reply->get_source().num();
   const hobject_t& soid = reply->get_poid();
@@ -4341,6 +4362,8 @@ void ReplicatedPG::sub_op_pull(OpRequest *op)
   MOSDSubOp *m = (MOSDSubOp*)op->request;
   assert(m->get_header().type == MSG_OSD_SUBOP);
 
+  op->mark_started();
+
   const hobject_t soid = m->poid;
 
   dout(7) << "op_pull " << soid << " v " << m->version
@@ -4481,6 +4504,8 @@ void ReplicatedPG::sub_op_push(OpRequest *op)
     return;
   }
 
+  op->mark_started();
+
   interval_set<uint64_t> data_subset;
   map<hobject_t, interval_set<uint64_t> > clone_subsets;
 
@@ -4831,6 +4856,8 @@ void ReplicatedPG::sub_op_remove(OpRequest *op)
   assert(m->get_header().type == MSG_OSD_SUBOP);
   dout(7) << "sub_op_remove " << m->poid << dendl;
 
+  op->mark_started();
+
   ObjectStore::Transaction *t = new ObjectStore::Transaction;
   remove_object_with_snap_hardlinks(*t, m->poid);
   int r = osd->store->queue_transaction(&osr, t);