]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: propogate hit_set history with repop
authorSamuel Just <sam.just@inktank.com>
Thu, 17 Apr 2014 19:27:07 +0000 (12:27 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 21 Apr 2014 17:53:04 +0000 (10:53 -0700)
We don't actually send the whole info on each repop, just the log
entries, updated stats, and a few other bits.  For hit_set ops, we need
to also communicate the new hit_set history status atomically with the
log entries and the transaction.  Thus, we add a channel for an optional
pg_hit_set_history_t field in PGBackend::submit_transaction interface
and associated messages and implementations to update the hit_set info
field along with the log entries.

This also means that hit_set_(persist|trim) update an
updated_hit_set_history field on the OpContext instead of directly
modifying the info field.

Fixes: #8124
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/messages/MOSDSubOp.h
src/osd/ECBackend.cc
src/osd/ECBackend.h
src/osd/ECMsgTypes.cc
src/osd/ECMsgTypes.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 7c9c5048b2aaba25516a355c869ba35fa6f6bd98..6a381861d613ee2dc2e0d521aa866743c29e7565 100644 (file)
@@ -25,7 +25,7 @@
 
 class MOSDSubOp : public Message {
 
-  static const int HEAD_VERSION = 9;
+  static const int HEAD_VERSION = 10;
   static const int COMPAT_VERSION = 1;
 
 public:
@@ -90,6 +90,9 @@ public:
   hobject_t new_temp_oid;      ///< new temp object that we must now start tracking
   hobject_t discard_temp_oid;  ///< previously used temp object that we can now stop tracking
 
+  /// non-empty if this transaction involves a hit_set history update
+  boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+
   int get_cost() const {
     if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL)
       return ops[0].op.extent.length;
@@ -169,6 +172,9 @@ public:
        ghobject_t::NO_SHARD);
       pgid.shard = ghobject_t::NO_SHARD;
     }
+    if (header.version >= 10) {
+      ::decode(updated_hit_set_history, p);
+    }
   }
 
   virtual void encode_payload(uint64_t features) {
@@ -217,6 +223,7 @@ public:
     ::encode(discard_temp_oid, payload);
     ::encode(from, payload);
     ::encode(pgid.shard, payload);
+    ::encode(updated_hit_set_history, payload);
   }
 
   MOSDSubOp()
@@ -258,6 +265,8 @@ public:
     out << " v " << version
        << " snapset=" << snapset << " snapc=" << snapc;    
     if (!data_subset.empty()) out << " subset " << data_subset;
+    if (updated_hit_set_history)
+      out << ", has_updated_hit_set_history";
     out << ")";
   }
 };
index dc2a26fb619248a29e9bb9c5970893cfed5496fe..96985385941ef1c064f514760cd9f473bc2119ce 100644 (file)
@@ -818,6 +818,7 @@ void ECBackend::handle_sub_write(
   clear_temp_objs(op.temp_removed);
   get_parent()->log_operation(
     op.log_entries,
+    op.updated_hit_set_history,
     op.trim_to,
     !(op.t.empty()),
     localt);
@@ -1201,6 +1202,7 @@ void ECBackend::submit_transaction(
   PGTransaction *_t,
   const eversion_t &trim_to,
   vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_history,
   Context *on_local_applied_sync,
   Context *on_all_applied,
   Context *on_all_commit,
@@ -1215,6 +1217,7 @@ void ECBackend::submit_transaction(
   op->version = at_version;
   op->trim_to = trim_to;
   op->log_entries.swap(log_entries);
+  op->updated_hit_set_history.swap(hset_history);
   op->on_local_applied_sync = on_local_applied_sync;
   op->on_all_applied = on_all_applied;
   op->on_all_commit = on_all_commit;
@@ -1520,6 +1523,7 @@ void ECBackend::start_write(Op *op) {
       op->version,
       op->trim_to,
       op->log_entries,
+      op->updated_hit_set_history,
       op->temp_added,
       op->temp_cleared);
     if (*i == get_parent()->whoami_shard()) {
index 0aa37c154fa30cc2b66d3aec70f9759a52c1dadf..2061ea87912b05fa98f28df3dc90267842894998 100644 (file)
@@ -98,6 +98,7 @@ public:
     PGTransaction *t,
     const eversion_t &trim_to,
     vector<pg_log_entry_t> &log_entries,
+    boost::optional<pg_hit_set_history_t> &hset_history,
     Context *on_local_applied_sync,
     Context *on_all_applied,
     Context *on_all_commit,
@@ -326,6 +327,7 @@ public:
     eversion_t version;
     eversion_t trim_to;
     vector<pg_log_entry_t> log_entries;
+    boost::optional<pg_hit_set_history_t> updated_hit_set_history;
     Context *on_local_applied_sync;
     Context *on_all_applied;
     Context *on_all_commit;
index 87e622b0bf1af4e7bb874fb2fa6cda09feaf8e91..4e4c8e3d94237207cff7c3d83aefaaf49ff2a5ff 100644 (file)
@@ -16,7 +16,7 @@
 
 void ECSubWrite::encode(bufferlist &bl) const
 {
-  ENCODE_START(1, 1, bl);
+  ENCODE_START(2, 1, bl);
   ::encode(from, bl);
   ::encode(tid, bl);
   ::encode(reqid, bl);
@@ -28,12 +28,13 @@ void ECSubWrite::encode(bufferlist &bl) const
   ::encode(log_entries, bl);
   ::encode(temp_added, bl);
   ::encode(temp_removed, bl);
+  ::encode(updated_hit_set_history, bl);
   ENCODE_FINISH(bl);
 }
 
 void ECSubWrite::decode(bufferlist::iterator &bl)
 {
-  DECODE_START(1, bl);
+  DECODE_START(2, bl);
   ::decode(from, bl);
   ::decode(tid, bl);
   ::decode(reqid, bl);
@@ -45,17 +46,22 @@ void ECSubWrite::decode(bufferlist::iterator &bl)
   ::decode(log_entries, bl);
   ::decode(temp_added, bl);
   ::decode(temp_removed, bl);
+  if (struct_v >= 2) {
+    ::decode(updated_hit_set_history, bl);
+  }
   DECODE_FINISH(bl);
 }
 
 std::ostream &operator<<(
   std::ostream &lhs, const ECSubWrite &rhs)
 {
-  return lhs
-    << "ECSubWrite(tid=" << rhs.tid
-    << ", reqid=" << rhs.reqid
-    << ", at_version=" << rhs.at_version
-    << ", trim_to=" << rhs.trim_to << ")";
+  lhs << "ECSubWrite(tid=" << rhs.tid
+      << ", reqid=" << rhs.reqid
+      << ", at_version=" << rhs.at_version
+      << ", trim_to=" << rhs.trim_to;
+  if (rhs.updated_hit_set_history)
+    lhs << ", has_updated_hit_set_history";
+  return lhs <<  ")";
 }
 
 void ECSubWrite::dump(Formatter *f) const
@@ -64,6 +70,8 @@ void ECSubWrite::dump(Formatter *f) const
   f->dump_stream("reqid") << reqid;
   f->dump_stream("at_version") << at_version;
   f->dump_stream("trim_to") << trim_to;
+  f->dump_stream("has_updated_hit_set_history")
+    << static_cast<bool>(updated_hit_set_history);
 }
 
 void ECSubWrite::generate_test_instances(list<ECSubWrite*> &o)
index f4cde6dcd4e34a9a1bc628d9e2f23da73252ebb8..11c519d57bb4957a104a9fb77a8443ad537b3453 100644 (file)
@@ -31,6 +31,7 @@ struct ECSubWrite {
   vector<pg_log_entry_t> log_entries;
   set<hobject_t> temp_added;
   set<hobject_t> temp_removed;
+  boost::optional<pg_hit_set_history_t> updated_hit_set_history;
   ECSubWrite() {}
   ECSubWrite(
     pg_shard_t from,
@@ -42,6 +43,7 @@ struct ECSubWrite {
     eversion_t at_version,
     eversion_t trim_to,
     vector<pg_log_entry_t> log_entries,
+    boost::optional<pg_hit_set_history_t> updated_hit_set_history,
     const set<hobject_t> &temp_added,
     const set<hobject_t> &temp_removed)
     : from(from), tid(tid), reqid(reqid),
@@ -49,7 +51,8 @@ struct ECSubWrite {
       at_version(at_version),
       trim_to(trim_to), log_entries(log_entries),
       temp_added(temp_added),
-      temp_removed(temp_removed) {}
+      temp_removed(temp_removed),
+      updated_hit_set_history(updated_hit_set_history) {}
   void encode(bufferlist &bl) const;
   void decode(bufferlist::iterator &bl);
   void dump(Formatter *f) const;
index bd18a4706256ac5ea10c2907d1788ac83ae776fb..1dbf20d2a75d62d9297d96dcd079d843dc64c3df 100644 (file)
 
      virtual void log_operation(
        vector<pg_log_entry_t> &logv,
+       boost::optional<pg_hit_set_history_t> &hset_history,
        const eversion_t &trim_to,
        bool transaction_applied,
        ObjectStore::Transaction *t) = 0;
      PGTransaction *t,                    ///< [in] trans to execute
      const eversion_t &trim_to,           ///< [in] trim log to here
      vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
+     /// [in] hitset history (if updated with this transaction)
+     boost::optional<pg_hit_set_history_t> &hset_history,
      Context *on_local_applied_sync,      ///< [in] called when applied locally
      Context *on_all_applied,             ///< [in] called when all acked
      Context *on_all_commit,              ///< [in] called when all commit
index 8856461a8bc9265f8af3a555e7a9206f16c343d7..5a9668fc2a161c52e9b546099a37eb0a220be162 100644 (file)
@@ -494,6 +494,7 @@ void ReplicatedBackend::submit_transaction(
   PGTransaction *_t,
   const eversion_t &trim_to,
   vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_history,
   Context *on_local_applied_sync,
   Context *on_all_acked,
   Context *on_all_commit,
@@ -536,6 +537,7 @@ void ReplicatedBackend::submit_transaction(
     t->get_temp_cleared().size() ?
       *(t->get_temp_cleared().begin()) :hobject_t(),
     log_entries,
+    hset_history,
     &op,
     op_t);
 
@@ -546,7 +548,7 @@ void ReplicatedBackend::submit_transaction(
   }
   clear_temp_objs(t->get_temp_cleared());
 
-  parent->log_operation(log_entries, trim_to, true, &local_t);
+  parent->log_operation(log_entries, hset_history, trim_to, true, &local_t);
   local_t.append(*op_t);
   local_t.swap(*op_t);
   
index cfad2edae6a41f8577b2dd71f4835f2e25ec6365..371574b35766b49568cebd51bc8dd0d3cae0584e 100644 (file)
@@ -343,6 +343,7 @@ public:
     PGTransaction *t,
     const eversion_t &trim_to,
     vector<pg_log_entry_t> &log_entries,
+    boost::optional<pg_hit_set_history_t> &hset_history,
     Context *on_local_applied_sync,
     Context *on_all_applied,
     Context *on_all_commit,
@@ -361,6 +362,7 @@ private:
     hobject_t new_temp_oid,
     hobject_t discard_temp_oid,
     vector<pg_log_entry_t> &log_entries,
+    boost::optional<pg_hit_set_history_t> &hset_history,
     InProgressOp *op,
     ObjectStore::Transaction *op_t);
   void op_applied(InProgressOp *op);
index cdb049a3d1e4ca387ce5bd8c559ff1fffdbec764..e62649e92372128b218e954e61fd1f0f4c1423b7 100644 (file)
@@ -6553,6 +6553,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
     repop->ctx->op_t,
     pg_trim_to,
     repop->ctx->log,
+    repop->ctx->updated_hset_history,
     onapplied_sync,
     on_all_applied,
     on_all_commit,
@@ -6571,6 +6572,7 @@ void ReplicatedBackend::issue_op(
   hobject_t new_temp_oid,
   hobject_t discard_temp_oid,
   vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_hist,
   InProgressOp *op,
   ObjectStore::Transaction *op_t)
 {
@@ -6625,6 +6627,7 @@ void ReplicatedBackend::issue_op(
 
     wr->new_temp_oid = new_temp_oid;
     wr->discard_temp_oid = discard_temp_oid;
+    wr->updated_hit_set_history = hset_hist;
 
     get_parent()->send_message_osd_cluster(
       peer.osd, wr, get_osdmap()->get_epoch());
@@ -7353,6 +7356,7 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op)
     parent->update_stats(m->pg_stats);
     parent->log_operation(
       log,
+      m->updated_hit_set_history,
       m->pg_trim_to,
       update_snaps,
       &(rm->localt));
@@ -10564,18 +10568,20 @@ void ReplicatedPG::hit_set_persist()
     repop->on_applied = new C_HitSetFlushing(this, flush_time);
   OpContext *ctx = repop->ctx;
   ctx->at_version = get_next_version();
+  ctx->updated_hset_history = info.hit_set;
+  pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history);
 
-  if (info.hit_set.current_last_stamp != utime_t()) {
+  if (updated_hit_set_hist.current_last_stamp != utime_t()) {
     // FIXME: we cheat slightly here by bundling in a remove on a object
     // other the RepGather object.  we aren't carrying an ObjectContext for
     // the deleted object over this period.
     hobject_t old_obj =
-      get_hit_set_current_object(info.hit_set.current_last_stamp);
+      get_hit_set_current_object(updated_hit_set_hist.current_last_stamp);
     ctx->log.push_back(
       pg_log_entry_t(pg_log_entry_t::DELETE,
                     old_obj,
                     ctx->at_version,
-                    info.hit_set.current_last_update,
+                    updated_hit_set_hist.current_last_update,
                     0,
                     osd_reqid_t(),
                     ctx->mtime));
@@ -10601,13 +10607,13 @@ void ReplicatedPG::hit_set_persist()
     ctx->delta_stats.num_bytes -= st.st_size;
   }
 
-  info.hit_set.current_last_update = info.last_update; // *after* above remove!
-  info.hit_set.current_info.version = ctx->at_version;
+  updated_hit_set_hist.current_last_update = info.last_update; // *after* above remove!
+  updated_hit_set_hist.current_info.version = ctx->at_version;
 
-  info.hit_set.history.push_back(info.hit_set.current_info);
+  updated_hit_set_hist.history.push_back(updated_hit_set_hist.current_info);
   hit_set_create();
-  info.hit_set.current_info = pg_hit_set_info_t();
-  info.hit_set.current_last_stamp = utime_t();
+  updated_hit_set_hist.current_info = pg_hit_set_info_t();
+  updated_hit_set_hist.current_last_stamp = utime_t();
 
   // fabricate an object_info_t and SnapSet
   obc->obs.oi.version = ctx->at_version;
@@ -10655,9 +10661,12 @@ void ReplicatedPG::hit_set_persist()
 
 void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
 {
-  for (unsigned num = info.hit_set.history.size(); num > max; --num) {
-    list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
-    assert(p != info.hit_set.history.end());
+  assert(repop->ctx->updated_hset_history);
+  pg_hit_set_history_t &updated_hit_set_hist =
+    *(repop->ctx->updated_hset_history);
+  for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) {
+    list<pg_hit_set_info_t>::iterator p = updated_hit_set_hist.history.begin();
+    assert(p != updated_hit_set_hist.history.end());
     hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
 
     assert(!is_degraded_object(oid));
@@ -10685,7 +10694,7 @@ void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
     }
     if (agent_state)
       agent_state->remove_oldest_hit_set();
-    info.hit_set.history.pop_front();
+    updated_hit_set_hist.history.pop_front();
 
     struct stat st;
     int r = osd->store->stat(
index 239d118c9f0ce42819007e8211b05af5b32f3c64..89c13ada8eeef5108a7ba0e79c7eb6356c97b4f2 100644 (file)
@@ -341,9 +341,14 @@ public:
   }
   void log_operation(
     vector<pg_log_entry_t> &logv,
+    boost::optional<pg_hit_set_history_t> &hset_history,
     const eversion_t &trim_to,
     bool transaction_applied,
     ObjectStore::Transaction *t) {
+    if (hset_history) {
+      info.hit_set = *hset_history;
+      dirty_info = true;
+    }
     append_log(logv, trim_to, *t, transaction_applied);
   }
 
@@ -453,6 +458,7 @@ public:
 
     PGBackend::PGTransaction *op_t;
     vector<pg_log_entry_t> log;
+    boost::optional<pg_hit_set_history_t> updated_hset_history;
 
     interval_set<uint64_t> modified_ranges;
     ObjectContextRef obc;