]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG: split ops for child objects into child
authorSamuel Just <sam.just@inktank.com>
Thu, 20 Sep 2012 03:15:04 +0000 (20:15 -0700)
committerSamuel Just <sam.just@inktank.com>
Fri, 7 Dec 2012 06:51:56 +0000 (22:51 -0800)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index ec3035c0a66f409ecbc65cfb9ebe9a36dc85dc20..7e8506a5b6c70fcbf87bd12824db7fb8368762b6 100644 (file)
@@ -5994,7 +5994,10 @@ void OSD::OpWQ::_process(PGRef pg)
   OpRequestRef op;
   {
     Mutex::Locker l(qlock);
-    assert(pg_for_processing.count(&*pg));
+    if (!pg_for_processing.count(&*pg)) {
+      pg->unlock();
+      return;
+    }
     assert(pg_for_processing[&*pg].size());
     op = pg_for_processing[&*pg].front();
     pg_for_processing[&*pg].pop_front();
@@ -6005,6 +6008,12 @@ void OSD::OpWQ::_process(PGRef pg)
   pg->unlock();
 }
 
+
+void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
+{
+  osd->op_wq.dequeue(pg, dequeued);
+}
+
 /*
  * NOTE: dequeue called in worker thread, with pg lock
  */
index a87431a69d12afcd3b728026f99ca95db541e741..7afa1f9ac21ae32f5677ae8090608e77cb3b67cd 100644 (file)
@@ -184,6 +184,8 @@ public:
   ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
   ClassHandler  *&class_handler;
 
+  void dequeue_pg(PG *pg, list<OpRequestRef> *dequeued);
+
   // -- superblock --
   Mutex publish_lock, pre_publish_lock;
   OSDSuperblock superblock;
@@ -644,9 +646,26 @@ private:
        return op.first == pg;
       }
     };
-    void dequeue(PG *pg) {
+    void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
       lock();
-      pqueue.remove_by_filter(Pred(pg));
+      if (!dequeued) {
+       pqueue.remove_by_filter(Pred(pg));
+       pg_for_processing.erase(pg);
+      } else {
+       list<pair<PGRef, OpRequestRef> > _dequeued;
+       pqueue.remove_by_filter(Pred(pg), &_dequeued);
+       for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
+            i != _dequeued.end();
+            ++i) {
+         dequeued->push_back(i->second);
+       }
+       if (pg_for_processing.count(pg)) {
+         dequeued->splice(
+           dequeued->begin(),
+           pg_for_processing[pg]);
+         pg_for_processing.erase(pg);
+       }
+      }
       unlock();
     }
     bool _empty() {
index 2323b988f7070f8f34b8ee91e665c1482291176f..36deae95fa455ceadc5f54b585f92c2d38f8f32b 100644 (file)
@@ -1948,6 +1948,56 @@ void PG::IndexedLog::split_into(
   index();
 }
 
+static void split_list(
+  list<OpRequestRef> *from,
+  list<OpRequestRef> *to,
+  unsigned match,
+  unsigned bits)
+{
+  for (list<OpRequestRef>::iterator i = from->begin();
+       i != from->end();
+    ) {
+    if (PG::split_request(*i, match, bits)) {
+      to->push_back(*i);
+      from->erase(i++);
+    } else {
+      ++i;
+    }
+  }
+}
+
+static void split_replay_queue(
+  map<eversion_t, OpRequestRef> *from,
+  map<eversion_t, OpRequestRef> *to,
+  unsigned match,
+  unsigned bits)
+{
+  for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
+       i != from->end();
+       ) {
+    if (PG::split_request(i->second, match, bits)) {
+      to->insert(*i);
+      from->erase(i++);
+    } else {
+      ++i;
+    }
+  }
+}
+
+void PG::split_ops(PG *child, unsigned split_bits) {
+  unsigned match = child->info.pgid.m_seed;
+  assert(waiting_for_map.empty());
+  assert(waiting_for_all_missing.empty());
+  assert(waiting_for_missing_object.empty());
+  assert(waiting_for_degraded_object.empty());
+  assert(waiting_for_ack.empty());
+  assert(waiting_for_ondisk.empty());
+  split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
+
+  osd->dequeue_pg(this, &waiting_for_active);
+  split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits);
+}
+
 void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
 {
   child->osdmap_ref = osdmap_ref;
@@ -1983,6 +2033,9 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
 
   // History
   child->past_intervals = past_intervals;
+
+  split_ops(child, split_bits);
+  _split_into(child_pgid, child, split_bits);
 }
 
 void PG::defer_recovery()
@@ -4791,6 +4844,24 @@ bool PG::can_discard_request(OpRequestRef op)
   return true;
 }
 
+bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
+{
+  unsigned mask = ~((~0)<<bits);
+  switch (op->request->get_type()) {
+  case CEPH_MSG_OSD_OP:
+    return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
+  case MSG_OSD_SUBOP:
+    return false;
+  case MSG_OSD_SUBOPREPLY:
+    return false;
+  case MSG_OSD_PG_SCAN:
+    return false;
+  case MSG_OSD_PG_BACKFILL:
+    return false;
+  }
+  return false;
+}
+
 bool PG::must_delay_request(OpRequestRef op)
 {
   switch (op->request->get_type()) {
index cc2e112f48d47252917eedbde020e6ea6345f84a..f0e57eb120f926065c0c1ddb1b21498dd3e3461a 100644 (file)
@@ -658,6 +658,7 @@ protected:
                                         waiting_for_degraded_object;
   map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
   map<eversion_t,OpRequestRef>   replay_queue;
+  void split_ops(PG *child, unsigned split_bits);
 
   void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
   void requeue_ops(list<OpRequestRef> &l);
@@ -795,6 +796,7 @@ public:
   void finish_recovery_op(const hobject_t& soid, bool dequeue=false);
 
   void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
+  virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits) = 0;
 
   loff_t get_log_write_pos() {
     return 0;
@@ -1748,6 +1750,8 @@ public:
 
   bool must_delay_request(OpRequestRef op);
 
+  static bool split_request(OpRequestRef op, unsigned match, unsigned bits);
+
   bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
   bool old_peering_evt(CephPeeringEvtRef evt) {
     return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
index 76ad5089493cfd3dce4376431a0429f205d91cef..d3ea2a51935d99af44efad33fd0cab2b1cee91ea 100644 (file)
@@ -6063,6 +6063,10 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
   unlock();
 }
 
+void ReplicatedPG::_split_into(pg_t child_pgid, PG *child, unsigned split_bits)
+{
+  assert(repop_queue.empty());
+}
 
 /*
  * pg status change notification
index 5abc8e5365777ef54f6f91ba2dce29c6b0bb993e..fbc1b65571cd301e3735f6989414ef28d0c9715b 100644 (file)
@@ -930,6 +930,7 @@ protected:
   virtual void _scrub_finish();
   object_stat_collection_t scrub_cstat;
 
+  virtual void _split_into(pg_t child_pgid, PG *child, unsigned split_bits);
   void apply_and_flush_repops(bool requeue);
 
   void calc_trim_to();