]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: make replica scrub_map generation a subop
authorSage Weil <sage@newdream.net>
Thu, 4 Dec 2008 21:41:29 +0000 (13:41 -0800)
committerSage Weil <sage@newdream.net>
Thu, 4 Dec 2008 21:41:29 +0000 (13:41 -0800)
This puts build_scrub_map in a worker thread, _and_ ensures it is
serialized wrt any in-progress writes.

src/include/ceph_fs.h
src/messages/MOSDPGScrub.h [deleted file]
src/messages/MOSDSubOp.h
src/messages/MOSDSubOpReply.h
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc

index 9535cfbea808feb9a6146f72fdda77f7cb5a6fc9..23aae8fdcb264625a35e69020de38c1447f21335 100644 (file)
@@ -1068,6 +1068,7 @@ enum {
        CEPH_OSD_OP_PUSH           = CEPH_OSD_OP_MODE_SUB | 2,
        CEPH_OSD_OP_BALANCEREADS   = CEPH_OSD_OP_MODE_SUB | 3,
        CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
+       CEPH_OSD_OP_SCRUB          = CEPH_OSD_OP_MODE_SUB | 5,
 
        /* object data */
        CEPH_OSD_OP_WRITE      = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
@@ -1151,6 +1152,7 @@ static inline const char *ceph_osd_op_name(int op)
        case CEPH_OSD_OP_PUSH: return "push";
        case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
        case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
+       case CEPH_OSD_OP_SCRUB: return "scrub";
 
        default: return "???";
        }
diff --git a/src/messages/MOSDPGScrub.h b/src/messages/MOSDPGScrub.h
deleted file mode 100644 (file)
index 4be4368..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software 
- * Foundation.  See file COPYING.
- * 
- */
-
-
-#ifndef __MOSDPGSCRUB_H
-#define __MOSDPGSCRUB_H
-
-#include "msg/Message.h"
-
-struct MOSDPGScrub : public Message {
-  pg_t pgid;
-  epoch_t epoch;
-  bufferlist map;
-
-  MOSDPGScrub() {}
-  MOSDPGScrub(pg_t p, epoch_t e) :
-    Message(MSG_OSD_PG_SCRUB),
-    pgid(p), epoch(e) {}
-
-  const char *get_type_name() { return "pg_scrub"; }
-  void print(ostream& out) {
-    out << "pg_scrub(" << pgid << " e" << epoch;
-    if (map.length())
-      out << " " << map.length() << " bytes";
-    out << ")";
-  }
-
-  void encode_payload() {
-    ::encode(pgid, payload);
-    ::encode(epoch, payload);
-    ::encode(map, payload);
-  }
-  void decode_payload() {
-    bufferlist::iterator p = payload.begin();
-    ::decode(pgid, p);
-    ::decode(epoch, p);
-    ::decode(map, p);
-  }
-};
-
-#endif
index 6be970ea55d09c6d779c37ae3fe959e2690693ed..cfcd5e4e9c1d25a71aff7fcfca91e60430645659 100644 (file)
@@ -131,6 +131,7 @@ public:
   const char *get_type_name() { return "osd_sub_op"; }
   void print(ostream& out) {
     out << "osd_sub_op(" << reqid
+       << " " << pgid
        << " " << poid
        << " " << ops;
     if (noop)
index a939f4603be825aae536222bb39fdc3e9b1f53dd..110ae555286da2a349b8e969bb13a912bf1abb75 100644 (file)
@@ -30,6 +30,7 @@
  */
 
 class MOSDSubOpReply : public Message {
+public:
   epoch_t map_epoch;
   
   // subop metadata
@@ -38,7 +39,6 @@ class MOSDSubOpReply : public Message {
   tid_t rep_tid;
   pobject_t poid;
 
-public:
   vector<ceph_osd_op> ops;
 
   // result
@@ -119,6 +119,7 @@ public:
   
   void print(ostream& out) {
     out << "osd_sub_op_reply(" << reqid
+       << " " << pgid 
        << " " << poid << " " << ops;
     if (ack_type & CEPH_OSD_OP_ONDISK)
       out << " ondisk";
index 5b7a3e88ef9a7dc287327793eb5fda4a0a97d9c0..3e322a1b40bda0c20db694dbe1470a5bf5867488 100644 (file)
@@ -44,7 +44,6 @@ using namespace std;
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
-#include "messages/MOSDPGScrub.h"
 #include "messages/MOSDScrub.h"
 
 #include "messages/MRemoveSnaps.h"
@@ -237,9 +236,6 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
   case MSG_OSD_PG_CREATE:
     m = new MOSDPGCreate;
     break;
-  case MSG_OSD_PG_SCRUB:
-    m = new MOSDPGScrub;
-    break;
 
   case MSG_OSD_SCRUB:
     m = new MOSDScrub;
index bc94c083341436abd9a0aeb15279b619b1d3155f..bf6eca6d497b56e90be25042f0a5725e45f0e50f 100644 (file)
@@ -53,7 +53,6 @@
 #define MSG_REMOVE_SNAPS       89
 
 #define MSG_OSD_SCRUB          90
-#define MSG_OSD_PG_SCRUB       91
 
 
 
index f86db67529d25e919df10ece4d996d300e5ec605..89732f65c8115dcb185d98a9a604e7e69a9f1f99 100644 (file)
@@ -58,7 +58,6 @@
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
-#include "messages/MOSDPGScrub.h"
 
 #include "messages/MOSDAlive.h"
 
@@ -1536,9 +1535,6 @@ void OSD::dispatch(Message *m)
       case MSG_OSD_PG_INFO:
         handle_pg_info((MOSDPGInfo*)m);
         break;
-      case MSG_OSD_PG_SCRUB:
-       handle_pg_scrub((MOSDPGScrub*)m);
-       break;
 
        // client ops
       case CEPH_MSG_OSD_OP:
@@ -2868,46 +2864,6 @@ void OSD::handle_pg_info(MOSDPGInfo *m)
   delete m;
 }
 
-void OSD::handle_pg_scrub(MOSDPGScrub *m)
-{
-  dout(7) << "handle_pg_scrub " << *m << " from " << m->get_source() << dendl;
-  int from = m->get_source().num();
-  if (!require_same_or_newer_map(m, m->epoch)) return;
-
-  PG *pg = _lookup_lock_pg(m->pgid);
-  if (pg) {
-    if (m->epoch < pg->info.history.same_since) {
-      dout(10) << *pg << " has changed since " << m->epoch << dendl;
-    } else {
-      if (pg->is_primary()) {
-       if (pg->peer_scrub_map.count(from)) {
-         dout(10) << *pg << " already had osd" << from << " scrub map" << dendl;
-       } else {
-         dout(10) << *pg << " got osd" << from << " scrub map" << dendl;
-         bufferlist::iterator p = m->map.begin();
-         pg->peer_scrub_map[from].decode(p);
-         pg->kick();
-       }
-      } else {
-       // replica, reply
-       dout(10) << *pg << " building scrub map for primary" << dendl;
-
-       // do this is a separate thread.. FIXME
-       ScrubMap map;
-       pg->build_scrub_map(map);
-
-       MOSDPGScrub *reply = new MOSDPGScrub(pg->info.pgid, osdmap->get_epoch());
-       ::encode(map, reply->map);
-       messenger->send_message(reply, m->get_source_inst());
-      }
-    }
-    pg->unlock();
-  } else {
-    dout(10) << " pg " << m->pgid << " not found" << dendl;
-  }
-  delete m;
-}
-
 
 /** PGQuery
  * from primary to replica | stray
index ae6b5303591b11e0d4982efe61752aa74ac70252..47122ee24e5957b6efeaa95074511f1f68c0102e 100644 (file)
@@ -450,7 +450,6 @@ private:
   void handle_pg_log(class MOSDPGLog *m);
   void handle_pg_info(class MOSDPGInfo *m);
   void handle_pg_remove(class MOSDPGRemove *m);
-  void handle_pg_scrub(class MOSDPGScrub *m);
 
   // helper for handle_pg_log and handle_pg_info
   void _process_pg_info(epoch_t epoch, int from,
index d456a361c0e3ded36d045419be27f2ac373c99bd..c87eb7e701cad6203860c6fe6c8914d57fc7cdd8 100644 (file)
@@ -25,7 +25,9 @@
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
-#include "messages/MOSDPGScrub.h"
+
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
 
 #include <sstream>
 
@@ -1720,6 +1722,52 @@ bool PG::block_if_wrlocked(MOSDOp* op)
 // ==========================================================================================
 // SCRUB
 
+void PG::sub_op_scrub(MOSDSubOp *op)
+{
+  dout(7) << "sub_op_scrub" << dendl;
+
+  if (op->map_epoch < info.history.same_primary_since) {
+    dout(10) << "sub_op_scrub discarding old sub_op from "
+            << op->map_epoch << " < " << info.history.same_primary_since << dendl;
+    delete op;
+    return;
+  }
+
+  ScrubMap map;
+  build_scrub_map(map);
+
+  MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_OP_ACK); 
+  ::encode(map, reply->get_data());
+  osd->messenger->send_message(reply, op->get_source_inst());
+
+  delete op;
+}
+
+void PG::sub_op_scrub_reply(MOSDSubOpReply *op)
+{
+  dout(7) << "sub_op_scrub_reply" << dendl;
+
+  if (op->map_epoch < info.history.same_primary_since) {
+    dout(10) << "sub_op_scrub discarding old sub_op from "
+            << op->map_epoch << " < " << info.history.same_primary_since << dendl;
+    delete op;
+    return;
+  }
+
+  int from = op->get_source().num();
+
+  if (peer_scrub_map.count(from)) {
+    dout(10) << " already had osd" << from << " scrub map" << dendl;
+  } else {
+    dout(10) << " got osd" << from << " scrub map" << dendl;
+    bufferlist::iterator p = op->get_data().begin();
+    peer_scrub_map[from].decode(p);
+    kick();
+  }
+
+  delete op;
+}
+
 
 /*
  * build a (sorted) summary of pg content for purposes of scrubbing
@@ -1787,10 +1835,12 @@ void PG::build_scrub_map(ScrubMap &map)
 
 void PG::scrub()
 {
-  osd->map_lock.get_read();
+  stringstream ss;
+  ScrubMap scrubmap;
 
+  osd->map_lock.get_read();
   lock();
-
   epoch_t epoch = info.history.same_since;
 
   if (!is_primary()) {
@@ -1814,30 +1864,36 @@ void PG::scrub()
   // request maps from replicas
   for (unsigned i=1; i<acting.size(); i++) {
     dout(10) << "scrub  requesting scrubmap from osd" << acting[i] << dendl;
-    osd->messenger->send_message(new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
+    vector<ceph_osd_op> scrub(1);
+    scrub[0].op = CEPH_OSD_OP_SCRUB;
+    pobject_t poid;
+    eversion_t v;
+    osd_reqid_t reqid;
+    MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, scrub, false, 0,
+                                    osd->osdmap->get_epoch(), osd->get_tid(), 0, v);
+    osd->messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
                                 osd->osdmap->get_inst(acting[i]));
   }
-
   osd->map_lock.put_read();
 
+
   // wait for any ops in progress
   while (is_write_in_progress()) {
     dout(10) << "scrub  write(s) in progress, waiting" << dendl;
     wait();
   }
 
+
   //unlock();
 
   dout(10) << "scrub  building my map" << dendl;
-  ScrubMap scrubmap;
   build_scrub_map(scrubmap);
 
   /*
   lock();
   if (epoch != info.history.same_since) {
     dout(10) << "scrub  pg changed, aborting" << dendl;
-    unlock();
-    return;
+    goto out;
   }
   */
 
@@ -1848,8 +1904,7 @@ void PG::scrub()
 
     if (epoch != info.history.same_since) {
       dout(10) << "scrub  pg changed, aborting" << dendl;
-      unlock();
-      return;
+      goto out;
     }
   }
 
@@ -1857,8 +1912,6 @@ void PG::scrub()
   unlock();
   */
 
-  stringstream ss;
-
   if (acting.size() > 1) {
     dout(10) << "scrub  comparing replica scrub maps" << dendl;
 
@@ -1961,8 +2014,7 @@ void PG::scrub()
   lock();
   if (epoch != info.history.same_since) {
     dout(10) << "scrub  pg changed, aborting" << dendl;
-    unlock();
-    return;
+    goto out;
   }
   */
 
@@ -1980,14 +2032,16 @@ void PG::scrub()
   lock();
   if (epoch != info.history.same_since) {
     dout(10) << "scrub  pg changed, aborting" << dendl;
-    unlock();
-    return;
+    goto out;
   }
   */
 
   // finish up
   info.stats.last_scrub = info.last_update;
   info.stats.last_scrub_stamp = g_clock.now();
+
+
+ out:
   state_clear(PG_STATE_SCRUBBING);
   update_stats();
 
index 0e92507bc67ad5a992375327412c218a5a52a113..ee5863e744a86efcfb4640492119a5520b81e546 100644 (file)
@@ -692,6 +692,9 @@ public:
   void build_scrub_map(ScrubMap &map);
   virtual void _scrub(ScrubMap &map) {}
 
+  void sub_op_scrub(class MOSDSubOp *op);
+  void sub_op_scrub_reply(class MOSDSubOpReply *op);
+
 
  public:  
   PG(OSD *o, pg_t p) : 
index aaa1e7d1cbc5ee5a9597c0564c71916454eca4f9..f3686894efb8f60530c94db5d69ef7d3fa78d516 100644 (file)
@@ -399,6 +399,9 @@ void ReplicatedPG::do_sub_op(MOSDSubOp *op)
     case CEPH_OSD_OP_PUSH:
       sub_op_push(op);
       return;
+    case CEPH_OSD_OP_SCRUB:
+      sub_op_scrub(op);
+      return;
     }
   }
 
@@ -414,6 +417,10 @@ void ReplicatedPG::do_sub_op_reply(MOSDSubOpReply *r)
       sub_op_push_reply(r);
       return;
     }
+    if (first.op == CEPH_OSD_OP_SCRUB) {
+      sub_op_scrub_reply(r);
+      return;
+    }
   }
 
   sub_op_modify_reply(r);
@@ -2235,6 +2242,7 @@ void ReplicatedPG::sub_op_pull(MOSDSubOp *op)
 
   // push it back!
   push(poid, op->get_source().num(), op->data_subset, op->clone_subsets);
+  delete op;
 }
 
 
@@ -2899,6 +2907,7 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t)
 // ==========================================================================================
 // SCRUB
 
+
 void ReplicatedPG::_scrub(ScrubMap& scrubmap)
 {
   dout(10) << "_scrub" << dendl;