]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: some infrastructure for primary to trim replica logs
authorSage Weil <sage@newdream.net>
Thu, 18 Jun 2009 16:37:03 +0000 (09:37 -0700)
committerSage Weil <sage@newdream.net>
Thu, 18 Jun 2009 16:37:03 +0000 (09:37 -0700)
src/messages/MOSDPGTrim.h [new file with mode: 0644]
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

diff --git a/src/messages/MOSDPGTrim.h b/src/messages/MOSDPGTrim.h
new file mode 100644 (file)
index 0000000..c3139f6
--- /dev/null
@@ -0,0 +1,52 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MOSDPGTRIM_H
+#define __MOSDPGTRIM_H
+
+#include "msg/Message.h"
+
+class MOSDPGTrim : public Message {
+public:
+  epoch_t epoch;
+  pg_t pgid;
+  eversion_t trim_to;
+
+  epoch_t get_epoch() { return epoch; }
+
+  MOSDPGTrim() {}
+  MOSDPGTrim(version_t mv, pg_t p, eversion_t tt) :
+    Message(MSG_OSD_PG_TRIM),
+    epoch(mv), pgid(p), trim_to(tt) { }
+
+  const char *get_type_name() { return "pg_trim"; }
+  void print(ostream& out) {
+    out << "pg_trim(" << pgid << " to " << trim_to << " e" << epoch << ")";
+  }
+
+  void encode_payload() {
+    ::encode(epoch, payload);
+    ::encode(pgid, payload);
+    ::encode(trim_to, payload);
+  }
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(epoch, p);
+    ::decode(pgid, p);
+    ::decode(trim_to, p);
+  }
+};
+
+#endif
index b0714d801b6be58d27d6c94ab0e0d397e394f84e..40512fde3ca56de7de79bc56bb4e0c463996ed09 100644 (file)
@@ -53,6 +53,7 @@ using namespace std;
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
+#include "messages/MOSDPGTrim.h"
 #include "messages/MOSDScrub.h"
 
 #include "messages/MRemoveSnaps.h"
@@ -271,6 +272,9 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
   case MSG_OSD_PG_CREATE:
     m = new MOSDPGCreate;
     break;
+  case MSG_OSD_PG_TRIM:
+    m = new MOSDPGTrim;
+    break;
 
   case MSG_OSD_SCRUB:
     m = new MOSDScrub;
index d6c2feba4cf7debee8cde743fe6accd3275dc42a..ade923ef06df329dc70042e3e4352a7312719ba2 100644 (file)
 #define MSG_OSD_PG_LOG         83
 #define MSG_OSD_PG_REMOVE      84
 #define MSG_OSD_PG_INFO        85
+#define MSG_OSD_PG_TRIM        86
 
-#define MSG_PGSTATS            86
-#define MSG_PGSTATSACK         87
+#define MSG_PGSTATS            87
+#define MSG_PGSTATSACK         88
 
-#define MSG_OSD_PG_CREATE      88
-#define MSG_REMOVE_SNAPS       89
+#define MSG_OSD_PG_CREATE      89
+#define MSG_REMOVE_SNAPS       90
 
-#define MSG_OSD_SCRUB          90
+#define MSG_OSD_SCRUB          91
 
 
 
index 0f9e0fa339252fe48133678b5cb6fcfb0440ad83..75b2e382ef7f85a8743d18aab2d855030f710266 100644 (file)
@@ -57,6 +57,7 @@
 #include "messages/MOSDPGRemove.h"
 #include "messages/MOSDPGInfo.h"
 #include "messages/MOSDPGCreate.h"
+#include "messages/MOSDPGTrim.h"
 
 #include "messages/MOSDAlive.h"
 
@@ -1640,6 +1641,9 @@ void OSD::_dispatch(Message *m)
       case MSG_OSD_PG_INFO:
         handle_pg_info((MOSDPGInfo*)m);
         break;
+      case MSG_OSD_PG_TRIM:
+        handle_pg_trim((MOSDPGTrim*)m);
+        break;
 
        // client ops
       case CEPH_MSG_OSD_OP:
@@ -3055,6 +3059,37 @@ void OSD::handle_pg_info(MOSDPGInfo *m)
   delete m;
 }
 
+void OSD::handle_pg_trim(MOSDPGTrim *m)
+{
+  dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
+
+  int from = m->get_source().num();
+  if (!require_same_or_newer_map(m, m->epoch)) return;
+
+  if (!_have_pg(m->pgid)) {
+    dout(10) << " don't have pg " << m->pgid << dendl;
+  } else {
+    PG *pg = _lookup_lock_pg(m->pgid);
+    if (m->epoch < pg->info.history.same_since) {
+      dout(10) << *pg << " got old trim to " << m->trim_to << ", ignoring" << dendl;
+      pg->unlock();
+      goto out;
+    }
+    assert(pg);
+    assert(from == pg->acting[0]);
+
+    ObjectStore::Transaction t;
+    pg->trim(t, m->trim_to);
+    pg->write_info(t);
+    pg->unlock();
+
+    store->apply_transaction(t);
+  }
+
+ out:
+  delete m;
+}
+
 
 /** PGQuery
  * from primary to replica | stray
index cea761132ce4d85a23aebd6350803625f80842bd..8a0dbbf35d0fef2e3845f09cf540ac77014d2755 100644 (file)
@@ -567,6 +567,7 @@ protected:
   void handle_pg_notify(class MOSDPGNotify *m);
   void handle_pg_log(class MOSDPGLog *m);
   void handle_pg_info(class MOSDPGInfo *m);
+  void handle_pg_trim(class MOSDPGTrim *m);
   void handle_pg_remove(class MOSDPGRemove *m);
 
   // helper for handle_pg_log and handle_pg_info
index 12915a3285309efd4d9359435bf7fe63e23b92bd..5c9e97f233f4790b3b3e3710e0c39ea77e073547 100644 (file)
@@ -1691,6 +1691,18 @@ void PG::write_log(ObjectStore::Transaction& t)
   dirty_log = false;
 }
 
+void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to)
+{
+  // trim?
+  if (trim_to > log.bottom) {
+    dout(10) << "trim " << log << " to " << trim_to << dendl;
+    log.trim(t, trim_to);
+    info.log_bottom = log.bottom;
+    info.log_backlog = log.backlog;
+    trim_ondisklog_to(t, trim_to);
+  }
+}
+
 void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v) 
 {
   dout(15) << "trim_ondisk_log_to v " << v << dendl;
@@ -1747,7 +1759,7 @@ void PG::add_log_entry(Log::Entry& e, bufferlist& log_bl)
 
 
 void PG::append_log(ObjectStore::Transaction &t, bufferlist& bl,
-                   eversion_t log_version, eversion_t trim_to)
+                   eversion_t log_version)
 {
   dout(10) << "append_log " << ondisklog.bottom << "~" << ondisklog.length()
           << " adding " << bl.length() <<  dendl;
@@ -1763,17 +1775,7 @@ void PG::append_log(ObjectStore::Transaction &t, bufferlist& bl,
   bufferlist blb(sizeof(ondisklog));
   ::encode(ondisklog, blb);
   t.collection_setattr(info.pgid.to_coll(), "ondisklog", blb);
-
-  
-  // trim?
-  if (is_complete() && trim_to > log.bottom) {
-    dout(10) << " trimming " << log << " to " << trim_to << dendl;
-    log.trim(t, trim_to);
-    info.log_bottom = log.bottom;
-    info.log_backlog = log.backlog;
-    trim_ondisklog_to(t, trim_to);
-  }
-  dout(10) << " ondisklog " << ondisklog.bottom << "~" << ondisklog.length() << dendl;
+  dout(10) << "append_log  now " << ondisklog.bottom << "~" << ondisklog.length() << dendl;
 }
 
 void PG::read_log(ObjectStore *store)
index e8e6cd0b22bc94eeee7a5a6f3a8540ac9ffd34c5..3e01e3b2742eb9e51b8a15fb963dd70f1fc974b2 100644 (file)
@@ -851,8 +851,9 @@ public:
   void write_info(ObjectStore::Transaction& t);
   void write_log(ObjectStore::Transaction& t);
   void append_log(ObjectStore::Transaction &t, bufferlist& bl,
-                 eversion_t log_version, eversion_t trim_to);
+                 eversion_t log_version);
   void read_log(ObjectStore *store);
+  void trim(ObjectStore::Transaction& t, eversion_t v);
   void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v);
 
   void read_state(ObjectStore *store);
index 3461b9821365b64f7e0b54a620b9993939fe0af7..c9bc4f5bc0e5db9f8d95c41027adc2b3addf1e75 100644 (file)
@@ -26,6 +26,7 @@
 
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGRemove.h"
+#include "messages/MOSDPGTrim.h"
 
 #include "messages/MOSDPing.h"
 
@@ -1441,7 +1442,8 @@ void ReplicatedPG::log_op(vector<Log::Entry>& logv, eversion_t trim_to,
        p != logv.end();
        p++)
     add_log_entry(*p, log_bl);
-  append_log(t, log_bl, logv[0].version, trim_to);
+  append_log(t, log_bl, logv[0].version);
+  trim(t, trim_to);
 
   // update the local pg, pg log
   write_info(t);
@@ -1729,7 +1731,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
       repop->waitfor_disk.erase(fromosd);
       repop->waitfor_nvram.erase(fromosd);
       repop->waitfor_ack.erase(fromosd);
-      repop->pg_complete_thru[fromosd] = pg_complete_thru;
+      peer_info[fromosd].last_complete;
     }
   } else if (ack_type & CEPH_OSD_FLAG_ONNVRAM) {
     // nvram
@@ -2104,6 +2106,12 @@ void ReplicatedPG::sub_op_modify(MOSDSubOp *op)
       tls.push_back(&ctx->op_t);
       tls.push_back(&ctx->local_t);
     }
+  } else {
+    // just trim the log
+    if (op->pg_trim_to != eversion_t()) {
+      trim(ctx->local_t, op->pg_trim_to);
+      tls.push_back(&ctx->local_t);
+    }
   }
     
   C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd,
@@ -2972,6 +2980,7 @@ int ReplicatedPG::recover_primary(int max)
   if (is_all_uptodate()) {
     dout(-7) << "recover_primary complete" << dendl;
     finish_recovery();
+    trim_replicas();
   } else {
     dout(-10) << "recover_primary primary now complete, starting peer recovery" << dendl;
   }
@@ -3016,9 +3025,10 @@ int ReplicatedPG::recover_replicas(int max)
   // nothing to do!
   dout(-10) << "recover_replicas - nothing to do!" << dendl;
 
-  if (is_all_uptodate()) 
+  if (is_all_uptodate()) {
     finish_recovery();
-  else {
+    trim_replicas();
+  } else {
     dout(10) << "recover_replicas not all uptodate, acting " << acting << ", uptodate " << uptodate_set << dendl;
   }
 
@@ -3026,6 +3036,21 @@ int ReplicatedPG::recover_replicas(int max)
 }
 
 
+void ReplicatedPG::trim_replicas()
+{
+  dout(10) << "trim_replicas" << dendl;
+
+  return;  // hmm FIXME
+
+
+  // trim myself
+  eversion_t trim_to;
+
+  for (unsigned i=1; i<acting.size(); i++)
+    osd->messenger->send_message(new MOSDPGTrim(osd->osdmap->get_epoch(), info.pgid, trim_to),
+                                osd->osdmap->get_inst(acting[i]));
+}
+
 
 /** clean_up_local
  * remove any objects that we're storing but shouldn't.
index b8089f16e99412f2b6b2e99f5246a03cc30184ba..e1075bfc23649d95a60a1d081efe20051e82a24a 100644 (file)
@@ -419,6 +419,7 @@ protected:
   void finish_recovery_op();
   int recover_primary(int max);
   int recover_replicas(int max);
+  void trim_replicas();
 
   void sub_op_modify(MOSDSubOp *op);
   void sub_op_modify_ondisk(MOSDSubOp *op, int ackerosd, eversion_t last_complete);