]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: ping TPHandle during scan_range
authorSamuel Just <sam.just@inktank.com>
Tue, 6 Aug 2013 00:21:46 +0000 (17:21 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 6 Aug 2013 20:51:51 +0000 (13:51 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 1d51e02ad437c29051197cc55137a767bec6fff8..987f8354c805be007f82691d575b8111d1d56b92 100644 (file)
@@ -6664,7 +6664,7 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle)
 #endif
     
     PG::RecoveryCtx rctx = create_context();
-    int started = pg->start_recovery_ops(max, &rctx);
+    int started = pg->start_recovery_ops(max, &rctx, handle);
     dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl;
 
     /*
@@ -7052,7 +7052,7 @@ void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle)
     if (!(pg_for_processing[&*pg].size()))
       pg_for_processing.erase(&*pg);
   }
-  osd->dequeue_op(pg, op);
+  osd->dequeue_op(pg, op, handle);
   pg->unlock();
 }
 
@@ -7065,7 +7065,9 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
 /*
  * NOTE: dequeue called in worker thread, with pg lock
  */
-void OSD::dequeue_op(PGRef pg, OpRequestRef op)
+void OSD::dequeue_op(
+  PGRef pg, OpRequestRef op,
+  ThreadPool::TPHandle &handle)
 {
   utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp();
   dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
@@ -7078,7 +7080,7 @@ void OSD::dequeue_op(PGRef pg, OpRequestRef op)
 
   op->mark_reached_pg();
 
-  pg->do_request(op);
+  pg->do_request(op, handle);
 
   // finish
   dout(10) << "dequeue_op " << op << " finish" << dendl;
index ae77644eeeb759bc661974fededcdc066de39764..82a251d9a80a6d88b80fd308f686dcc38b40908e 100644 (file)
@@ -915,7 +915,9 @@ private:
   } op_wq;
 
   void enqueue_op(PG *pg, OpRequestRef op);
-  void dequeue_op(PGRef pg, OpRequestRef op);
+  void dequeue_op(
+    PGRef pg, OpRequestRef op,
+    ThreadPool::TPHandle &handle);
 
   // -- peering queue --
   struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> {
index 63e760e3b212e265451dd0daad40b88b25c86195..8e78eaa7a16d06927e9bf626a4ebcf71c7bc2be3 100644 (file)
@@ -1397,7 +1397,9 @@ void PG::queue_op(OpRequestRef op)
   osd->op_wq.queue(make_pair(PGRef(this), op));
 }
 
-void PG::do_request(OpRequestRef op)
+void PG::do_request(
+  OpRequestRef op,
+  ThreadPool::TPHandle &handle)
 {
   // do any pending flush
   do_pending_flush();
@@ -1435,7 +1437,7 @@ void PG::do_request(OpRequestRef op)
     break;
 
   case MSG_OSD_PG_SCAN:
-    do_scan(op);
+    do_scan(op, handle);
     break;
 
   case MSG_OSD_PG_BACKFILL:
index 8f572c75e19691b911e7bedb9ce117fda4cd4211..d4679ce4fd84f091b88e2bf4f143154fc0e70933 100644 (file)
@@ -645,7 +645,9 @@ public:
 
   virtual void check_local() = 0;
 
-  virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0;
+  virtual int start_recovery_ops(
+    int max, RecoveryCtx *prctx,
+    ThreadPool::TPHandle &handle) = 0;
 
   void purge_strays();
 
@@ -1804,12 +1806,18 @@ public:
 
 
   // abstract bits
-  void do_request(OpRequestRef op);
+  void do_request(
+    OpRequestRef op,
+    ThreadPool::TPHandle &handle
+  );
 
   virtual void do_op(OpRequestRef op) = 0;
   virtual void do_sub_op(OpRequestRef op) = 0;
   virtual void do_sub_op_reply(OpRequestRef op) = 0;
-  virtual void do_scan(OpRequestRef op) = 0;
+  virtual void do_scan(
+    OpRequestRef op,
+    ThreadPool::TPHandle &handle
+  ) = 0;
   virtual void do_backfill(OpRequestRef op) = 0;
   virtual void do_push(OpRequestRef op) = 0;
   virtual void do_pull(OpRequestRef op) = 0;
index 658ea7cb746333a138b13c0b5840ac46a5f8e69e..73ac280740429561e2e3e3e00166574b79a54eb2 100644 (file)
@@ -1252,7 +1252,9 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
   sub_op_modify_reply(op);
 }
 
-void ReplicatedPG::do_scan(OpRequestRef op)
+void ReplicatedPG::do_scan(
+  OpRequestRef op,
+  ThreadPool::TPHandle &handle)
 {
   MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
   assert(m->get_header().type == MSG_OSD_PG_SCAN);
@@ -1278,7 +1280,9 @@ void ReplicatedPG::do_scan(OpRequestRef op)
 
       BackfillInterval bi;
       osr->flush();
-      scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi);
+      scan_range(
+       m->begin, g_conf->osd_backfill_scan_min,
+       g_conf->osd_backfill_scan_max, &bi, handle);
       MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
                                         get_osdmap()->get_epoch(), m->query_epoch,
                                         info.pgid, bi.begin, bi.end);
@@ -6875,7 +6879,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap)
 }
   
 
-int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
+int ReplicatedPG::start_recovery_ops(
+  int max, RecoveryCtx *prctx,
+  ThreadPool::TPHandle &handle)
 {
   int started = 0;
   assert(is_primary());
@@ -6931,7 +6937,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx)
       }
       deferred_backfill = true;
     } else {
-      started += recover_backfill(max - started);
+      started += recover_backfill(max - started, handle);
     }
   }
 
@@ -7275,7 +7281,9 @@ int ReplicatedPG::recover_replicas(int max)
  * peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin,
  * backfill_info.begin, backfills_in_flight)
  */
-int ReplicatedPG::recover_backfill(int max)
+int ReplicatedPG::recover_backfill(
+  int max,
+  ThreadPool::TPHandle &handle)
 {
   dout(10) << "recover_backfill (" << max << ")" << dendl;
   assert(backfill_target >= 0);
@@ -7305,7 +7313,7 @@ int ReplicatedPG::recover_backfill(int max)
   dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
   backfill_info.clear();
   osr->flush();
-  scan_range(backfill_pos, local_min, local_max, &backfill_info);
+  scan_range(backfill_pos, local_min, local_max, &backfill_info, handle);
 
   int ops = 0;
   map<hobject_t, pair<eversion_t, eversion_t> > to_push;
@@ -7319,7 +7327,8 @@ int ReplicatedPG::recover_backfill(int max)
     if (backfill_info.begin <= pbi.begin &&
        !backfill_info.extends_to_end() && backfill_info.empty()) {
       osr->flush();
-      scan_range(backfill_info.end, local_min, local_max, &backfill_info);
+      scan_range(backfill_info.end, local_min, local_max, &backfill_info,
+                handle);
       backfill_info.trim();
     }
     backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin;
@@ -7480,7 +7489,9 @@ void ReplicatedPG::prep_backfill_object_push(
   put_object_context(obc);
 }
 
-void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi)
+void ReplicatedPG::scan_range(
+  hobject_t begin, int min, int max, BackfillInterval *bi,
+  ThreadPool::TPHandle &handle)
 {
   assert(is_locked());
   dout(10) << "scan_range from " << begin << dendl;
@@ -7496,6 +7507,7 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva
   dout(20) << ls << dendl;
 
   for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) {
+    handle.reset_tp_timeout();
     ObjectContext *obc = NULL;
     if (is_primary())
       obc = _lookup_object_context(*p);
index 7b70b4381ea4f8b8d77cfea7d2e16871f1ef6805..ac6694555f56557d618cdcde19b04a8d07acc9ea 100644 (file)
@@ -759,10 +759,13 @@ protected:
   void _clear_recovery_state();
 
   void queue_for_recovery();
-  int start_recovery_ops(int max, RecoveryCtx *prctx);
+  int start_recovery_ops(
+    int max, RecoveryCtx *prctx,
+    ThreadPool::TPHandle &handle);
+
   int recover_primary(int max);
   int recover_replicas(int max);
-  int recover_backfill(int max);
+  int recover_backfill(int max, ThreadPool::TPHandle &handle);
 
   /**
    * scan a (hash) range of objects in the current pg
@@ -772,7 +775,10 @@ protected:
    * @max return no more than this many items
    * @bi [out] resulting map of objects to eversion_t's
    */
-  void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi);
+  void scan_range(
+    hobject_t begin, int min, int max, BackfillInterval *bi,
+    ThreadPool::TPHandle &handle
+    );
 
   void prep_backfill_object_push(
     hobject_t oid, eversion_t v, eversion_t have, int peer,
@@ -939,7 +945,9 @@ public:
   void do_pg_op(OpRequestRef op);
   void do_sub_op(OpRequestRef op);
   void do_sub_op_reply(OpRequestRef op);
-  void do_scan(OpRequestRef op);
+  void do_scan(
+    OpRequestRef op,
+    ThreadPool::TPHandle &handle);
   void do_backfill(OpRequestRef op);
   void _do_push(OpRequestRef op);
   void _do_pull_response(OpRequestRef op);