]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
PG.cc: build_scrub_map now drops the PG lock while scanning the PG
authorSamuel Just <samuelj@hq.newdream.net>
Thu, 21 Oct 2010 23:54:01 +0000 (16:54 -0700)
committerSamuel Just <samuelj@hq.newdream.net>
Fri, 5 Nov 2010 17:57:09 +0000 (10:57 -0700)
       build_inc_scrub_map scans all files modified since the given
           version number and creates an incremental scrub map to
           be merged with a scrub map created with build_scrub_map.
           This scan is done while holding the pg lock.
       ScrubMap.objects is now represented as a map rather than as
           a vector.

PG.h:  Added last_update_applied and finalizing_scrub members to
           PG.

ReplicatedPG.cc:
       calc_trim_to will not trim the log during a scrub (since
           replicas need the log to construct incremental maps)
       sub_op_modify_oplied and op_applied maintain a
   last_update_applied PG member to be used for determining
           how far back a replica need go to construct an
           incremental scrub map.

osd_types.h:
       Added merge_incr method for combining a scrub map with
           a subsequent incremental scrub map.
       ScrubMap.objects is now a map from sobject_t to object.

PG scrubs will now drop the PG lock while initially scanning the PG
collection allowing writes to continue.  The scrub map will be tagged
with the most recent version applied.  After halting writes, the
primary will request an incremental map from any replicas whose map
versions do not match log.head.

Signed-off-by: Samuel Just <samuelj@hq.newdream.net>
src/osd/OSD.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/osd_types.h

index 2275399ca9b44504fa2bf90aa008b82dfe7119dd..bab90241d7f64b4db0325c8c748a6f914346ff89 100644 (file)
@@ -4592,13 +4592,6 @@ void OSD::handle_op(MOSDOp *op)
       return;
     }
     
-    // scrubbing?
-    if (pg->is_scrubbing()) {
-      dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
-      pg->waiting_for_active.push_back(op);
-      pg->unlock();
-      return;
-    }
   } else {
     // read
     if (!pg->same_for_read_since(op->get_map_epoch())) {
index d2f6b0a95350507b14f328b393529db39dd0c894..dccd98f6ab80ebeac2525a92148979755aefd678 100644 (file)
@@ -1597,6 +1597,7 @@ void PG::activate(ObjectStore::Transaction& t, list<Context*>& tfin,
     last_update_ondisk = info.last_update;
     min_last_complete_ondisk = eversion_t(0,0);  // we don't know (yet)!
   }
+  last_update_applied = info.last_update;
 
   assert(info.last_complete >= log.tail || log.backlog);
 
@@ -2446,7 +2447,20 @@ void PG::sub_op_scrub(MOSDSubOp *op)
   }
 
   ScrubMap map;
-  build_scrub_map(map);
+  if (op->version > eversion_t()) {
+    epoch_t epoch = info.history.same_acting_since;
+    while (last_update_applied != info.last_update) {
+      wait();
+      if (epoch != info.history.same_acting_since ||
+         osd->is_stopping()) {
+       dout(10) << "scrub  pg changed, aborting" << dendl;
+       return;
+      }
+    }
+    build_inc_scrub_map(map, op->version);
+  } else {
+    build_scrub_map(map);
+  }
 
   MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK); 
   ::encode(map, reply->get_data());
@@ -2480,9 +2494,48 @@ void PG::sub_op_scrub_reply(MOSDSubOpReply *op)
   op->put();
 }
 
+/* 
+ * pg lock may or may not be held
+ */
+void PG::_scan_list(ScrubMap &map, vector<sobject_t> &ls)
+{
+  dout(10) << " scanning " << ls.size() << " objects" << dendl;
+  int i = 0;
+  for (vector<sobject_t>::iterator p = ls.begin(); 
+       p != ls.end(); 
+       p++, i++) {
+    sobject_t poid = *p;
+
+    struct stat st;
+    int r = osd->store->stat(coll, poid, &st);
+    if (r == 0) {
+      ScrubMap::object &o = map.objects[poid];
+      o.size = st.st_size;
+      osd->store->getattrs(coll, poid, o.attrs);
+    }
+
+    dout(25) << "   " << poid << dendl;
+  }
+}
+
+void PG::_request_scrub_map(int replica, eversion_t version)
+{
+    dout(10) << "scrub  requesting scrubmap from osd" << replica << dendl;
+    vector<OSDOp> scrub(1);
+    scrub[0].op.op = CEPH_OSD_OP_SCRUB;
+    sobject_t poid;
+    eversion_t v = version;
+    osd_reqid_t reqid;
+    MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
+                                    osd->osdmap->get_epoch(), osd->get_tid(), v);
+    subop->ops = scrub;
+    osd->cluster_messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
+                                        osd->osdmap->get_cluster_inst(replica));
+}
 
 /*
  * build a (sorted) summary of pg content for purposes of scrubbing
+ * called while holding pg lock
  */ 
 void PG::build_scrub_map(ScrubMap &map)
 {
@@ -2492,47 +2545,25 @@ void PG::build_scrub_map(ScrubMap &map)
   vector<sobject_t> ls;
   osd->store->collection_list(coll, ls);
 
-  // sort
-  dout(10) << "sorting " << ls.size() << " objects" << dendl;
-  vector< pair<sobject_t,int> > tab(ls.size());
-  vector< pair<sobject_t,int> >::iterator q = tab.begin();
-  int i = 0;
-  for (vector<sobject_t>::iterator p = ls.begin(); 
-       p != ls.end(); 
-       p++, i++, q++) {
-    q->first = *p;
-    q->second = i;
-  }
-  sort(tab.begin(), tab.end());
-  // tab is now sorted, with ->second indicating object's original position
-  vector<int> pos(ls.size());
-  i = 0;
-  for (vector< pair<sobject_t,int> >::iterator p = tab.begin(); 
-       p != tab.end(); 
-       p++, i++)
-    pos[p->second] = i;
-  // now, pos[orig pos] = sorted pos
+  map.valid_through = last_update_applied;
+  epoch_t epoch = info.history.same_acting_since;
 
-  dout(10) << " scanning " << ls.size() << " objects" << dendl;
-  map.objects.resize(ls.size());
-  i = 0;
-  for (vector<sobject_t>::iterator p = ls.begin(); 
-       p != ls.end(); 
-       p++, i++) {
-    sobject_t poid = *p;
+  unlock();
+  _scan_list(map, ls);
+  lock();
 
-    ScrubMap::object& o = map.objects[pos[i]];
-    o.poid = *p;
+  if (epoch != info.history.same_acting_since) {
+    dout(10) << "scrub  pg changed, aborting" << dendl;
+    return;
+  }
 
-    struct stat st;
-    int r = osd->store->stat(coll, poid, &st);
-    assert(r == 0);
-    o.size = st.st_size;
 
-    osd->store->getattrs(coll, poid, o.attrs);
+  dout(10) << "PG relocked, finalizing" << dendl;
 
-    dout(25) << "   " << poid << dendl;
-  }
+  // Catch up
+  ScrubMap incr;
+  build_inc_scrub_map(incr, map.valid_through);
+  map.merge_incr(incr);
 
   // pg attrs
   osd->store->collection_getattrs(coll, map.attrs);
@@ -2543,6 +2574,42 @@ void PG::build_scrub_map(ScrubMap &map)
 }
 
 
+/* 
+ * build a summary of pg content changed starting after v
+ * called while holding pg lock
+ */
+void PG::build_inc_scrub_map(ScrubMap &map, eversion_t v)
+{
+  map.valid_through = last_update_applied;
+  map.incr_since = v;
+  vector<sobject_t> ls;
+  list<Log::Entry>::iterator p;
+  if (v == log.tail) {
+    p = log.log.begin();
+  } else if (v > log.tail) {
+    p = log.find_entry(v);
+    p++;
+  } else {
+    assert(0);
+  }
+  
+  for (; p != log.log.end(); p++) {
+    if (p->is_update()) {
+      ls.push_back(p->soid);
+    } else if (p->is_delete()) {
+      map.objects[p->soid];
+      map.objects[p->soid].poid = p->soid;
+      map.objects[p->soid].negative = 1;
+    }
+  }
+
+  _scan_list(map, ls);
+  // pg attrs
+  osd->store->collection_getattrs(coll, map.attrs);
+
+  // log
+  osd->store->read(coll_t(), log_oid, 0, 0, map.logbl);
+}
 
 void PG::repair_object(ScrubMap::object *po, int bad_peer, int ok_peer)
 {
@@ -2568,6 +2635,8 @@ void PG::scrub()
   int errors = 0, fixed = 0;
   bool repair = state_test(PG_STATE_REPAIR);
   const char *mode = repair ? "repair":"scrub";
+  map<int,ScrubMap> received_maps;
+  int waiting_on = 0;
 
   osd->map_lock.get_read();
   lock();
@@ -2594,58 +2663,59 @@ void PG::scrub()
 
   // request maps from replicas
   for (unsigned i=1; i<acting.size(); i++) {
-    dout(10) << "scrub  requesting scrubmap from osd" << acting[i] << dendl;
-    vector<OSDOp> scrub(1);
-    scrub[0].op.op = CEPH_OSD_OP_SCRUB;
-    sobject_t poid;
-    eversion_t v;
-    osd_reqid_t reqid;
-    MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
-                                    osd->osdmap->get_epoch(), osd->get_tid(), v);
-    subop->ops = scrub;
-    osd->cluster_messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
-                                osd->osdmap->get_cluster_inst(acting[i]));
+    _request_scrub_map(acting[i], eversion_t());
   }
   osd->map_lock.put_read();
 
+  build_scrub_map(scrubmap);
 
-  // wait for any ops in progress
-  while (is_write_in_progress()) {
-    dout(10) << "scrub  write(s) in progress, waiting" << dendl;
+  finalizing_scrub = true;
+  while (last_update_applied != info.last_update) {
     wait();
-    if (osd->is_stopping())
+    if (epoch != info.history.same_acting_since ||
+       osd->is_stopping()) {
+      dout(10) << "scrub  pg changed, aborting" << dendl;
       goto out;
+    }
   }
 
 
-  //unlock();
+  waiting_on = acting.size() - 1;
+  while (waiting_on > 0) {
+    while (peer_scrub_map.size() == 0) {
+      wait();
+      if (epoch != info.history.same_acting_since ||
+         osd->is_stopping()) {
+       dout(10) << "scrub  pg changed, aborting" << dendl;
+       goto out;
+      }
+    }
 
-  dout(10) << "scrub  building my map" << dendl;
-  build_scrub_map(scrubmap);
 
-  /*
-  lock();
-  if (epoch != info.history.same_acting_since) {
-    dout(10) << "scrub  pg changed, aborting" << dendl;
-    goto out;
-  }
-  */
+    for (map<int,ScrubMap>::iterator p = peer_scrub_map.begin();
+        p != peer_scrub_map.end();
+        peer_scrub_map.erase(p++)) {
 
-  while (peer_scrub_map.size() < acting.size() - 1) {
-    dout(10) << "scrub  has " << (peer_scrub_map.size()+1) << " / " << acting.size()
-            << " maps, waiting" << dendl;
-    wait();
+      if (received_maps.count(p->first)) {
+       received_maps[p->first].merge_incr(p->second);
+      } else {
+       received_maps[p->first] = p->second;
+      }
 
-    if (epoch != info.history.same_acting_since ||
-       osd->is_stopping()) {
-      dout(10) << "scrub  pg changed, aborting" << dendl;
-      goto out;
+      if (received_maps[p->first].valid_through == log.head) {
+       waiting_on--;
+      } else {
+       // Need to request another incremental map
+       _request_scrub_map(p->first, p->second.valid_through);
+      }
     }
-  }
 
-  /*
-  unlock();
-  */
+    if (scrubmap.valid_through != log.head) {
+      ScrubMap incr;
+      build_inc_scrub_map(incr, scrubmap.valid_through);
+      scrubmap.merge_incr(incr);
+    }
+  }
 
   if (acting.size() > 1) {
     dout(10) << "scrub  comparing replica scrub maps" << dendl;
@@ -2654,8 +2724,8 @@ void PG::scrub()
     vector<ScrubMap*> m(acting.size());
     m[0] = &scrubmap;
     for (unsigned i=1; i<acting.size(); i++)
-      m[i] = &peer_scrub_map[acting[i]];
-    vector<ScrubMap::object>::iterator p[acting.size()];
+      m[i] = &received_maps[acting[i]];
+    map<sobject_t,ScrubMap::object>::iterator p[acting.size()];
     for (unsigned i=0; i<acting.size(); i++)
       p[i] = m[i]->objects.begin();
     
@@ -2672,22 +2742,23 @@ void PG::scrub()
          continue;
        }
        if (!po) {
-         po = &(*p[i]);
+         po = &(p[i]->second);
          pi = i;
        }
-       else if (po->poid != p[i]->poid) {
+       else if (po->poid != p[i]->second.poid) {
          anymissing = true;
-         if (po->poid > p[i]->poid) {
-           po = &(*p[i]);
+         if (po->poid > p[i]->second.poid) {
+           po = &(p[i]->second);
            pi = i;
          }
        }
       }
-      if (!po)
+      if (!po) {
        break;
+      }
       if (anymissing) {
        for (unsigned i=0; i<acting.size(); i++) {
-         if (p[i] == m[i]->objects.end() || po->poid != p[i]->poid) {
+         if (p[i] == m[i]->objects.end() || po->poid != p[i]->second.poid) {
            ss << info.pgid << " " << mode << " osd" << acting[i] << " missing " << po->poid;
            osd->get_logclient()->log(LOG_ERROR, ss);
            num_missing++;
@@ -2704,27 +2775,27 @@ void PG::scrub()
       bool ok = true;
       for (unsigned i=1; i<acting.size(); i++) {
        bool peerok = true;
-       if (po->size != p[i]->size) {
+       if (po->size != p[i]->second.size) {
          dout(0) << "scrub osd" << acting[i] << " " << po->poid
-                 << " size " << p[i]->size << " != " << po->size << dendl;
+                 << " size " << p[i]->second.size << " != " << po->size << dendl;
          ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
-            << " size " << p[i]->size << " != " << po->size;
+            << " size " << p[i]->second.size << " != " << po->size;
          osd->get_logclient()->log(LOG_ERROR, ss);
          peerok = ok = false;
          num_bad++;
        }
-       if (po->attrs.size() != p[i]->attrs.size()) {
+       if (po->attrs.size() != p[i]->second.attrs.size()) {
          dout(0) << "scrub osd" << acting[i] << " " << po->poid
-                 << " attr count " << p[i]->attrs.size() << " != " << po->attrs.size() << dendl;
+                 << " attr count " << p[i]->second.attrs.size() << " != " << po->attrs.size() << dendl;
          ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
-            << " attr count " << p[i]->attrs.size() << " != " << po->attrs.size();
+            << " attr count " << p[i]->second.attrs.size() << " != " << po->attrs.size();
          osd->get_logclient()->log(LOG_ERROR, ss);
          peerok = ok = false;
          num_bad++;
        }
        for (map<string,bufferptr>::iterator q = po->attrs.begin(); q != po->attrs.end(); q++) {
-         if (p[i]->attrs.count(q->first)) {
-           if (q->second.cmp(p[i]->attrs[q->first])) {
+         if (p[i]->second.attrs.count(q->first)) {
+           if (q->second.cmp(p[i]->second.attrs[q->first])) {
              dout(0) << "scrub osd" << acting[i] << " " << po->poid
                      << " attr " << q->first << " value mismatch" << dendl;
              ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
@@ -2818,6 +2889,7 @@ void PG::scrub()
 
   osd->take_waiters(waiting_for_active);
 
+  finalizing_scrub = false;
   unlock();
 }
 
index 9f2c6f2d1d2e7d7bc631d52486ff980da3a74dd2..876912ebd86516c4c9f4f9cbfa6edc6c3bbe7fbe 100644 (file)
@@ -756,6 +756,7 @@ protected:
 public:
   eversion_t  last_update_ondisk;    // last_update that has committed; ONLY DEFINED WHEN is_active()
   eversion_t  last_complete_ondisk;  // last_complete that has committed.
+  eversion_t  last_update_applied;
 
   // primary state
  public:
@@ -900,10 +901,14 @@ public:
 
   // -- scrub --
   map<int,ScrubMap> peer_scrub_map;
+  bool finalizing_scrub; 
 
   void repair_object(ScrubMap::object *po, int bad_peer, int ok_peer);
   void scrub();
+  void _scan_list(ScrubMap &map, vector<sobject_t> &ls);
+  void _request_scrub_map(int replica, eversion_t version);
   void build_scrub_map(ScrubMap &map);
+  void build_inc_scrub_map(ScrubMap &map, eversion_t v);
   virtual int _scrub(ScrubMap &map, int& errors, int& fixed) { return 0; }
 
   void sub_op_scrub(class MOSDSubOp *op);
@@ -925,7 +930,8 @@ public:
     need_up_thru(false),
     pg_stats_lock("PG::pg_stats_lock"),
     pg_stats_valid(false),
-    finish_sync_event(NULL)
+    finish_sync_event(NULL),
+    finalizing_scrub(false)
   {
     pool->get();
   }
index 9cdd6349d87744ec67b5ba8e9291d6e9726fae99..f409e7a991530f7b092418e605b6c9a75eac35f8 100644 (file)
@@ -215,7 +215,7 @@ void ReplicatedPG::do_pg_op(MOSDOp *op)
 
 void ReplicatedPG::calc_trim_to()
 {
-  if (!is_degraded() &&
+  if (!is_degraded() && !is_scrubbing() &&
       (is_clean() ||
        log.head.version - log.tail.version > info.stats.num_objects)) {
     if (min_last_complete_ondisk != eversion_t() &&
@@ -238,6 +238,10 @@ void ReplicatedPG::do_op(MOSDOp *op)
     return do_pg_op(op);
 
   dout(10) << "do_op " << *op << dendl;
+  if (finalizing_scrub && op->may_write()) {
+    waiting_for_active.push_back(op);
+    return;
+  }
 
   entity_inst_t client = op->get_source_inst();
 
@@ -1877,6 +1881,10 @@ void ReplicatedPG::op_applied(RepGather *repop)
   put_object_context(repop->obc);
   repop->obc = 0;
 
+  last_update_applied = repop->v;
+  if (last_update_applied == info.last_update && finalizing_scrub) {
+    kick();
+  }
   update_stats();
 
 #if 0
@@ -2497,6 +2505,11 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
   rm->applied = true;
   bool done = rm->applied && rm->committed;
 
+  last_update_applied = rm->op->version;
+  if (last_update_applied == info.last_update && finalizing_scrub) {
+    kick();
+  }
+
   unlock();
   if (done) {
     delete rm->ctx;
@@ -3855,22 +3868,22 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int& errors, int& fixed)
 
   bufferlist last_data;
 
-  for (vector<ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin(); 
+  for (map<sobject_t,ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin(); 
        p != scrubmap.objects.rend(); 
        p++) {
-    const sobject_t& soid = p->poid;
+    const sobject_t& soid = p->second.poid;
     stat.num_objects++;
 
     // new snapset?
     if (soid.snap == CEPH_SNAPDIR ||
        soid.snap == CEPH_NOSNAP) {
-      if (p->attrs.count(SS_ATTR) == 0) {
+      if (p->second.attrs.count(SS_ATTR) == 0) {
        dout(0) << mode << " no '" << SS_ATTR << "' attr on " << soid << dendl;
        errors++;
        continue;
       }
       bufferlist bl;
-      bl.push_back(p->attrs[SS_ATTR]);
+      bl.push_back(p->second.attrs[SS_ATTR]);
       bufferlist::iterator blp = bl.begin();
       ::decode(snapset, blp);
 
@@ -3903,19 +3916,19 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int& errors, int& fixed)
       continue;
 
     // basic checks.
-    if (p->attrs.count(OI_ATTR) == 0) {
+    if (p->second.attrs.count(OI_ATTR) == 0) {
       dout(0) << mode << " no '" << OI_ATTR << "' attr on " << soid << dendl;
       errors++;
       continue;
     }
     bufferlist bv;
-    bv.push_back(p->attrs[OI_ATTR]);
+    bv.push_back(p->second.attrs[OI_ATTR]);
     object_info_t oi(bv);
 
     dout(20) << mode << "  " << soid << " " << oi << dendl;
 
-    stat.num_bytes += p->size;
-    stat.num_kb += SHIFT_ROUND_UP(p->size, 10);
+    stat.num_bytes += p->second.size;
+    stat.num_kb += SHIFT_ROUND_UP(p->second.size, 10);
 
     //bufferlist data;
     //osd->store->read(c, poid, 0, 0, data);
@@ -3935,7 +3948,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int& errors, int& fixed)
       
       assert(soid.snap == snapset.clones[curclone]);
 
-      assert(p->size == snapset.clone_size[curclone]);
+      assert(p->second.size == snapset.clone_size[curclone]);
 
       // verify overlap?
       // ...
index b74dc1e02b4694529ebb0c9a17f543cffbb9c497..baf170d9cbb8eb023a1823087b7d775591989eb9 100644 (file)
@@ -1326,13 +1326,17 @@ struct ScrubMap {
   struct object {
     sobject_t poid;
     uint64_t size;
+    bool negative;
     map<string,bufferptr> attrs;
 
+    object(): poid(),size(0),negative(0),attrs() {}
+
     void encode(bufferlist& bl) const {
       __u8 struct_v = 1;
       ::encode(struct_v, bl);
       ::encode(poid, bl);
       ::encode(size, bl);
+      ::encode(negative, bl);
       ::encode(attrs, bl);
     }
     void decode(bufferlist::iterator& bl) {
@@ -1340,14 +1344,38 @@ struct ScrubMap {
       ::decode(struct_v, bl);
       ::decode(poid, bl);
       ::decode(size, bl);
+      ::decode(negative, bl);
       ::decode(attrs, bl);
     }
   };
   WRITE_CLASS_ENCODER(object)
 
-  vector<object> objects;
+  map<sobject_t,object> objects;
   map<string,bufferptr> attrs;
   bufferlist logbl;
+  eversion_t valid_through;
+  eversion_t incr_since;
+
+  void merge_incr(const ScrubMap &l) {
+    assert(valid_through == l.incr_since);
+    attrs = l.attrs;
+    logbl = l.logbl;
+    valid_through = l.valid_through;
+
+    for (map<sobject_t,object>::const_iterator p = l.objects.begin();
+         p != l.objects.end();
+         p++){
+      if (p->second.negative) {
+        map<sobject_t,object>::iterator q = objects.find(p->first);
+        if (q != objects.end()) {
+          objects.erase(q);
+        }
+      } else {
+        objects[p->first] = p->second;
+      }
+    }
+  }
+          
 
   void encode(bufferlist& bl) const {
     __u8 struct_v = 1;
@@ -1355,6 +1383,8 @@ struct ScrubMap {
     ::encode(objects, bl);
     ::encode(attrs, bl);
     ::encode(logbl, bl);
+    ::encode(valid_through, bl);
+    ::encode(incr_since, bl);
   }
   void decode(bufferlist::iterator& bl) {
     __u8 struct_v;
@@ -1362,6 +1392,8 @@ struct ScrubMap {
     ::decode(objects, bl);
     ::decode(attrs, bl);
     ::decode(logbl, bl);
+    ::decode(valid_through, bl);
+    ::decode(incr_since, bl);
   }
 };
 WRITE_CLASS_ENCODER(ScrubMap::object)