]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/: add PGBackend interfaces and stubs
authorSamuel Just <sam.just@inktank.com>
Fri, 30 Aug 2013 01:13:26 +0000 (18:13 -0700)
committerSamuel Just <sam.just@inktank.com>
Thu, 26 Sep 2013 18:24:25 +0000 (11:24 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
doc/dev/osd_internals/erasure_coding/recovery.rst [new file with mode: 0644]
src/osd/Makefile.am
src/osd/PGBackend.h [new file with mode: 0644]
src/osd/ReplicatedBackend.cc [new file with mode: 0644]
src/osd/ReplicatedBackend.h [new file with mode: 0644]
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

diff --git a/doc/dev/osd_internals/erasure_coding/recovery.rst b/doc/dev/osd_internals/erasure_coding/recovery.rst
new file mode 100644 (file)
index 0000000..793a5b0
--- /dev/null
@@ -0,0 +1,4 @@
+===================
+PGBackend Recovery
+===================
+
index ea7c036f858037d4783f62e447dfd2e3759e5c66..9d3bc1d5e478900525cf786cb3dbe5bbad0ba37a 100644 (file)
@@ -9,6 +9,7 @@ libosd_la_SOURCES = \
        osd/PG.cc \
        osd/PGLog.cc \
        osd/ReplicatedPG.cc \
+       osd/ReplicatedBackend.cc \
        osd/Ager.cc \
        osd/OSD.cc \
        osd/OSDCap.cc \
@@ -35,6 +36,8 @@ noinst_HEADERS += \
        osd/PG.h \
        osd/PGLog.h \
        osd/ReplicatedPG.h \
+       osd/PGBackend.h \
+       osd/ReplicatedBackend.h \
        osd/Watch.h \
        osd/osd_types.h
 
diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h
new file mode 100644 (file)
index 0000000..6a77c72
--- /dev/null
@@ -0,0 +1,194 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef PGBACKEND_H
+#define PGBACKEND_H
+
+#include "osd_types.h"
+#include "include/Context.h"
+#include <string>
+
+ /**
+  * PGBackend
+  *
+  * PGBackend defines an interface for logic handling IO and
+  * replication on RADOS objects.  The PGBackend implementation
+  * is responsible for:
+  *
+  * 1) Handling client operations
+  * 2) Handling object recovery
+  * 3) Handling object access
+  */
+ class PGBackend {
+ public:       
+   /**
+    * Provides interfaces for PGBackend callbacks
+    *
+    * The intention is that the parent calls into the PGBackend
+    * implementation holding a lock and that the callbacks are
+    * called under the same locks.
+    */
+   class Listener {
+   public:
+     /// Recovery
+
+     virtual void on_local_recover_start(
+       const hobject_t &oid,
+       ObjectStore::Transaction *t) = 0;
+     /**
+      * Called with the transaction recovering oid
+      */
+     virtual void on_local_recover(
+       const hobject_t &oid,
+       const object_stat_sum_t &stat_diff,
+       const ObjectRecoveryInfo &recovery_info,
+       ObjectStore::Transaction *t
+       ) = 0;
+
+     /**
+      * Called when transaction recovering oid is durable and
+      * applied on all replicas
+      */
+     virtual void on_global_recover(const hobject_t &oid) = 0;
+
+     /**
+      * Called when peer is recovered
+      */
+     virtual void on_peer_recover(
+       int peer,
+       const hobject_t &oid,
+       const ObjectRecoveryInfo &recovery_info
+       ) = 0;
+
+     virtual void failed_push(int from, const hobject_t &soid) = 0;
+
+     /**
+      * Bless a context
+      *
+      * Wraps a context in whatever outer layers the parent usually
+      * uses to call into the PGBackend
+      */
+     virtual Context *bless_context(Context *c) = 0;
+     virtual GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+       GenContext<ThreadPool::TPHandle&> *c) = 0;
+
+     virtual void send_message(int to_osd, Message *m) = 0;
+     virtual void queue_transaction(ObjectStore::Transaction *t) = 0;
+     virtual epoch_t get_epoch() = 0;
+     virtual const vector<int> &get_acting() = 0;
+     virtual std::string gen_dbg_prefix() const = 0;
+
+     virtual const map<hobject_t, set<int> > &get_missing_loc() = 0;
+     virtual const map<int, pg_missing_t> &get_peer_missing() = 0;
+     virtual const pg_missing_t &get_local_missing() = 0;
+     virtual const PGLog &get_log() = 0;
+     virtual bool pgb_is_primary() const = 0;
+     virtual OSDMapRef pgb_get_osdmap() const = 0;
+     virtual const pg_info_t &get_info() const = 0;
+
+     virtual ObjectContextRef get_obc(
+       const hobject_t &hoid,
+       map<string, bufferptr> &attrs) = 0;
+
+     virtual ~Listener() {}
+   };
+   Listener *parent;
+   Listener *get_parent() const { return parent; }
+   PGBackend(Listener *l) : parent(l) {}
+   bool is_primary() const { return get_parent()->pgb_is_primary(); }
+   OSDMapRef get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+   const pg_info_t &get_info() { return get_parent()->get_info(); }
+
+   std::string gen_prefix() const {
+     return parent->gen_dbg_prefix();
+   }
+
+   /**
+    * RecoveryHandle
+    *
+    * We may want to recover multiple objects in the same set of
+    * messages.  RecoveryHandle is an interface for the opaque
+    * object used by the implementation to store the details of
+    * the pending recovery operations.
+    */
+   struct RecoveryHandle {
+     virtual ~RecoveryHandle() {}
+   };
+
+   /// Get a fresh recovery operation
+   virtual RecoveryHandle *open_recovery_op() = 0;
+
+   /// run_recovery_op: finish the operation represented by h
+   virtual void run_recovery_op(
+     RecoveryHandle *h,     ///< [in] op to finish
+     int priority           ///< [in] msg priority
+     ) = 0;
+
+   /**
+    * recover_object
+    *
+    * Triggers a recovery operation on the specified hobject_t
+    * onreadable must be called before onwriteable
+    *
+    * On each replica (primary included), get_parent()->on_not_missing()
+    * must be called when the transaction finalizing the recovery
+    * is queued.  Similarly, get_parent()->on_readable() must be called
+    * when the transaction is applied in the backing store.
+    *
+    * get_parent()->on_not_degraded() should be called on the primary
+    * when writes can resume on the object.
+    *
+    * obc may be NULL if the primary lacks the object.
+    *
+    * head may be NULL only if the head/snapdir is missing
+    *
+    * @param missing [in] set of info, missing pairs for queried nodes
+    * @param overlaps [in] mapping of object to file offset overlaps
+    */
+   virtual void recover_object(
+     const hobject_t &hoid, ///< [in] object to recover
+     ObjectContextRef head,  ///< [in] context of the head/snapdir object
+     ObjectContextRef obc,  ///< [in] context of the object
+     RecoveryHandle *h      ///< [in,out] handle to attach recovery op to
+     ) = 0;
+
+   /// gives PGBackend a crack at an incoming message
+   virtual bool handle_message(
+     OpRequestRef op ///< [in] message received
+     ) = 0; ///< @return true if the message was handled
+
+   /**
+    * implementation should clear itself, contexts blessed prior to on_change
+    * won't be called after on_change()
+    */
+   virtual void on_change(ObjectStore::Transaction *t) = 0;
+   virtual void clear_state() = 0;
+
+   virtual void on_flushed() = 0;
+
+
+   virtual void split_colls(
+     pg_t child,
+     int split_bits,
+     int seed,
+     ObjectStore::Transaction *t) = 0;
+
+   virtual void temp_colls(list<coll_t> *out) = 0;
+
+   virtual void dump_recovery_info(Formatter *f) const = 0;
+
+   virtual ~PGBackend() {}
+ };
+
+#endif
diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc
new file mode 100644 (file)
index 0000000..ecbfea9
--- /dev/null
@@ -0,0 +1,158 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+#include "ReplicatedBackend.h"
+#include "messages/MOSDSubOp.h"
+#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPull.h"
+#include "messages/MOSDPGPushReply.h"
+
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
+  return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
+
+ReplicatedBackend::ReplicatedBackend(
+  PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
+  PGBackend(pg), temp_created(false), coll(coll), osd(osd) {}
+
+void ReplicatedBackend::run_recovery_op(
+  PGBackend::RecoveryHandle *h,
+  int priority)
+{
+}
+
+void ReplicatedBackend::recover_object(
+  const hobject_t &hoid,
+  ObjectContextRef head,
+  ObjectContextRef obc,
+  RecoveryHandle *h
+  )
+{
+#if 0
+  op.recovery_progress.data_complete = false;
+  op.recovery_progress.omap_complete = false;
+  op.recovery_progress.data_recovered_to = 0;
+  op.recovery_progress.first = true;
+
+  assert(!pulling.count(soid));
+  pull_from_peer[fromosd].insert(soid);
+  PullInfo &pi = pulling[soid];
+  pi.recovery_info = op.recovery_info;
+  pi.recovery_progress = op.recovery_progress;
+  pi.priority = priority;
+#endif
+  dout(10) << __func__ << dendl;
+}
+
+bool ReplicatedBackend::handle_message(
+  OpRequestRef op
+  )
+{
+  dout(10) << __func__ << ": " << op << dendl;
+  switch (op->request->get_type()) {
+  case MSG_OSD_PG_PUSH:
+    // TODOXXX: needs to be active possibly
+    do_push(op);
+    return true;
+
+  case MSG_OSD_PG_PULL:
+    do_pull(op);
+    return true;
+
+  case MSG_OSD_PG_PUSH_REPLY:
+    do_push_reply(op);
+    return true;
+
+  case MSG_OSD_SUBOP: {
+    MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
+    if (m->ops.size() >= 1) {
+      OSDOp *first = &m->ops[0];
+      switch (first->op.op) {
+      case CEPH_OSD_OP_PULL:
+       sub_op_pull(op);
+       return true;
+      case CEPH_OSD_OP_PUSH:
+        // TODOXXX: needs to be active possibly
+       sub_op_push(op);
+       return true;
+      }
+    }
+    break;
+  }
+
+  case MSG_OSD_SUBOPREPLY:
+    MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
+    if (r->ops.size() >= 1) {
+      OSDOp &first = r->ops[0];
+      switch (first.op.op) {
+      case CEPH_OSD_OP_PUSH:
+       // continue peer recovery
+       sub_op_push_reply(op);
+       return true;
+      }
+    }
+    break;
+  }
+  return false;
+}
+
+void ReplicatedBackend::clear_state()
+{
+  // clear pushing/pulling maps
+  pushing.clear();
+  pulling.clear();
+  pull_from_peer.clear();
+}
+
+void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
+{
+  dout(10) << __func__ << dendl;
+  // clear temp
+  for (set<hobject_t>::iterator i = temp_contents.begin();
+       i != temp_contents.end();
+       ++i) {
+    dout(10) << __func__ << ": Removing oid "
+            << *i << " from the temp collection" << dendl;
+    t->remove(get_temp_coll(t), *i);
+  }
+  temp_contents.clear();
+  clear_state();
+}
+
+coll_t ReplicatedBackend::get_temp_coll(ObjectStore::Transaction *t)
+{
+  if (temp_created)
+    return temp_coll;
+  if (!osd->store->collection_exists(temp_coll))
+      t->create_collection(temp_coll);
+  temp_created = true;
+  return temp_coll;
+}
+
+void ReplicatedBackend::on_flushed()
+{
+  if (have_temp_coll() &&
+      !osd->store->collection_empty(get_temp_coll())) {
+    vector<hobject_t> objects;
+    osd->store->collection_list(get_temp_coll(), objects);
+    derr << __func__ << ": found objects in the temp collection: "
+        << objects << ", crashing now"
+        << dendl;
+    assert(0 == "found garbage in the temp collection");
+  }
+}
diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h
new file mode 100644 (file)
index 0000000..f2a7e4c
--- /dev/null
@@ -0,0 +1,252 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef REPBACKEND_H
+#define REPBACKEND_H
+
+#include "OSD.h"
+#include "PGBackend.h"
+#include "osd_types.h"
+
+class ReplicatedBackend : public PGBackend {
+  struct RPGHandle : public PGBackend::RecoveryHandle {
+    map<int, vector<PushOp> > pushes;
+    map<int, vector<PushReplyOp> > push_replies;
+    map<int, vector<PullOp> > pulls;
+  };
+private:
+  bool temp_created;
+  coll_t temp_coll;
+  coll_t get_temp_coll(ObjectStore::Transaction *t);
+  coll_t get_temp_coll() const {
+    return temp_coll;
+  }
+  bool have_temp_coll() const { return temp_created; }
+
+  // Track contents of temp collection, clear on reset
+  set<hobject_t> temp_contents;
+public:
+  coll_t coll;
+  OSDService *osd;
+
+  ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
+
+  /// @see PGBackend::open_recovery_op
+  PGBackend::RecoveryHandle *open_recovery_op() {
+    return new RPGHandle();
+  }
+
+  /// @see PGBackend::run_recovery_op
+  void run_recovery_op(
+    PGBackend::RecoveryHandle *h,
+    int priority);
+
+  /// @see PGBackend::recover_object
+  void recover_object(
+    const hobject_t &hoid,
+    ObjectContextRef head,
+    ObjectContextRef obc,
+    RecoveryHandle *h
+    );
+
+  /// @see PGBackend::handle_message
+  bool handle_message(
+    OpRequestRef op
+    );
+
+  void on_change(ObjectStore::Transaction *t);
+  void clear_state();
+  void on_flushed();
+
+  void temp_colls(list<coll_t> *out) {
+    if (temp_created)
+      out->push_back(temp_coll);
+  }
+  void split_colls(
+    pg_t child,
+    int split_bits,
+    int seed,
+    ObjectStore::Transaction *t) {
+    if (!temp_created)
+      return;
+    t->create_collection(temp_coll);
+    t->split_collection(
+      temp_coll,
+      split_bits,
+      seed,
+      coll_t::make_temp_coll(child));
+  }
+
+  virtual void dump_recovery_info(Formatter *f) const {
+    {
+      f->open_array_section("pull_from_peer");
+      for (map<int, set<hobject_t> >::const_iterator i = pull_from_peer.begin();
+          i != pull_from_peer.end();
+          ++i) {
+       f->open_object_section("pulling_from");
+       f->dump_int("pull_from", i->first);
+       {
+         f->open_array_section("pulls");
+         for (set<hobject_t>::const_iterator j = i->second.begin();
+              j != i->second.end();
+              ++j) {
+           f->open_object_section("pull_info");
+           assert(pulling.count(*j));
+           pulling.find(*j)->second.dump(f);
+           f->close_section();
+         }
+         f->close_section();
+       }
+       f->close_section();
+      }
+      f->close_section();
+    }
+    {
+      f->open_array_section("pushing");
+      for (map<hobject_t, map<int, PushInfo> >::const_iterator i =
+            pushing.begin();
+          i != pushing.end();
+          ++i) {
+       f->open_object_section("object");
+       f->dump_stream("pushing") << i->first;
+       {
+         f->open_array_section("pushing_to");
+         for (map<int, PushInfo>::const_iterator j = i->second.begin();
+              j != i->second.end();
+              ++j) {
+           f->open_object_section("push_progress");
+           f->dump_stream("object_pushing") << j->first;
+           {
+             f->open_object_section("push_info");
+             j->second.dump(f);
+             f->close_section();
+           }
+           f->close_section();
+         }
+         f->close_section();
+       }
+       f->close_section();
+      }
+      f->close_section();
+    }
+  }
+private:
+  // push
+  struct PushInfo {
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
+    int priority;
+
+    void dump(Formatter *f) const {
+      {
+       f->open_object_section("recovery_progress");
+       recovery_progress.dump(f);
+       f->close_section();
+      }
+      {
+       f->open_object_section("recovery_info");
+       recovery_info.dump(f);
+       f->close_section();
+      }
+    }
+  };
+  map<hobject_t, map<int, PushInfo> > pushing;
+
+  // pull
+  struct PullInfo {
+    ObjectRecoveryProgress recovery_progress;
+    ObjectRecoveryInfo recovery_info;
+    int priority;
+
+    void dump(Formatter *f) const {
+      {
+       f->open_object_section("recovery_progress");
+       recovery_progress.dump(f);
+       f->close_section();
+      }
+      {
+       f->open_object_section("recovery_info");
+       recovery_info.dump(f);
+       f->close_section();
+      }
+    }
+
+    bool is_complete() const {
+      return recovery_progress.is_complete(recovery_info);
+    }
+  };
+  map<hobject_t, PullInfo> pulling;
+
+  // Reverse mapping from osd peer to objects beging pulled from that peer
+  map<int, set<hobject_t> > pull_from_peer;
+
+  void sub_op_push(OpRequestRef op) {}
+  void sub_op_push_reply(OpRequestRef op) {}
+  void sub_op_pull(OpRequestRef op) {}
+
+  void _do_push(OpRequestRef op) {}
+  void _do_pull_response(OpRequestRef op) {}
+  void do_push(OpRequestRef op) {
+    if (is_primary()) {
+      _do_pull_response(op);
+    } else {
+      _do_push(op);
+    }
+  }
+  void do_pull(OpRequestRef op) {}
+  void do_push_reply(OpRequestRef op) {}
+
+  bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) { return true; }
+  void handle_pull(int peer, PullOp &op, PushOp *reply) {}
+  bool handle_pull_response(int from, PushOp &op, PullOp *response,
+                           ObjectStore::Transaction *t) { return true; }
+  void handle_push(int from, PushOp &op, PushReplyOp *response,
+                  ObjectStore::Transaction *t) {}
+
+  static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
+                              const interval_set<uint64_t> &intervals_received,
+                              bufferlist data_received,
+                              interval_set<uint64_t> *intervals_usable,
+                              bufferlist *data_usable) {}
+  void _failed_push(int from, const hobject_t &soid) {}
+
+  void send_pushes(int prio, map<int, vector<PushOp> > &pushes) {}
+  void prep_push_op_blank(const hobject_t& soid, PushOp *op) {}
+  int send_push_op_legacy(int priority, int peer,
+                         PushOp &pop) { return 1; }
+  int send_pull_legacy(int priority, int peer,
+                      const ObjectRecoveryInfo& recovery_info,
+                      ObjectRecoveryProgress progress) { return 1;}
+  void send_pulls(
+    int priority,
+    map<int, vector<PullOp> > &pulls) {}
+
+  int build_push_op(const ObjectRecoveryInfo &recovery_info,
+                   const ObjectRecoveryProgress &progress,
+                   ObjectRecoveryProgress *out_progress,
+                   PushOp *out_op) { return 1; }
+  void submit_push_data(ObjectRecoveryInfo &recovery_info,
+                       bool first,
+                       bool complete,
+                       const interval_set<uint64_t> &intervals_included,
+                       bufferlist data_included,
+                       bufferlist omap_header,
+                       map<string, bufferptr> &attrs,
+                       map<string, bufferlist> &omap_entries,
+                       ObjectStore::Transaction *t) {}
+  void submit_push_complete(ObjectRecoveryInfo &recovery_info,
+                           ObjectStore::Transaction *t) {}
+};
+
+#endif
index 8b72c8494ed2e9792da9ba7b8502afba1eb63ab0..f158a3b4d7476b15446cf063cc077b97f1ce029f 100644 (file)
@@ -628,6 +628,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
                           const PGPool &_pool, pg_t p, const hobject_t& oid,
                           const hobject_t& ioid) :
   PG(o, curmap, _pool, p, oid, ioid),
+  pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
   snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
   temp_created(false),
   temp_coll(coll_t::make_temp_coll(p)),
@@ -7228,6 +7229,7 @@ void ReplicatedPG::on_change(ObjectStore::Transaction *t)
   // any dups
   apply_and_flush_repops(is_primary());
 
+  pgbackend->on_change(t);
   // clear pushing/pulling maps
   pushing.clear();
   pulling.clear();
@@ -7267,6 +7269,7 @@ void ReplicatedPG::_clear_recovery_state()
   backfill_pos = hobject_t();
   backfills_in_flight.clear();
   pending_backfill_updates.clear();
+  pgbackend->clear_state();
   pulling.clear();
   pushing.clear();
   pull_from_peer.clear();
index 4fb5e4d2d8ecc43b5846001e53a80160c57936b6..595fec5eed4189d848dce1c541c3edb36936e984 100644 (file)
@@ -33,6 +33,9 @@
 
 #include "common/sharedptr_registry.hpp"
 
+#include "PGBackend.h"
+#include "ReplicatedBackend.h"
+
 class MOSDSubOpReply;
 
 class ReplicatedPG;
@@ -80,7 +83,7 @@ public:
   virtual bool filter(bufferlist& xattr_data, bufferlist& outdata);
 };
 
-class ReplicatedPG : public PG {
+class ReplicatedPG : public PG, public PGBackend::Listener {
   friend class OSD;
   friend class Watch;
 
@@ -122,6 +125,109 @@ public:
   };
   typedef boost::shared_ptr<CopyOp> CopyOpRef;
 
+  boost::scoped_ptr<PGBackend> pgbackend;
+
+  /// Listener methods
+  void on_local_recover_start(
+    const hobject_t &oid,
+    ObjectStore::Transaction *t) {}
+  void on_local_recover(
+    const hobject_t &oid,
+    const object_stat_sum_t &stat_diff,
+    const ObjectRecoveryInfo &recovery_info,
+    ObjectStore::Transaction *t
+    ) {}
+  void on_peer_recover(
+    int peer,
+    const hobject_t &oid,
+    const ObjectRecoveryInfo &recovery_info) {}
+  void on_global_recover(
+    const hobject_t &oid) {}
+  void failed_push(int from, const hobject_t &soid);
+
+  template <typename T>
+  class BlessedGenContext : public GenContext<T> {
+    ReplicatedPG *pg;
+    GenContext<T> *c;
+    epoch_t e;
+  public:
+    BlessedGenContext(ReplicatedPG *pg, GenContext<T> *c, epoch_t e)
+      : pg(pg), c(c), e(e) {}
+    void finish(T t) {
+      pg->lock();
+      if (pg->pg_has_reset_since(e))
+       delete c;
+      else
+       c->complete(t);
+      pg->unlock();
+    }
+  };
+  class BlessedContext : public Context {
+    ReplicatedPG *pg;
+    Context *c;
+    epoch_t e;
+  public:
+    BlessedContext(ReplicatedPG *pg, Context *c, epoch_t e)
+      : pg(pg), c(c), e(e) {}
+    void finish(int r) {
+      pg->lock();
+      if (pg->pg_has_reset_since(e))
+       delete c;
+      else
+       c->complete(r);
+      pg->unlock();
+    }
+  };
+  Context *bless_context(Context *c) {
+    return new BlessedContext(this, c, get_osdmap()->get_epoch());
+  }
+  GenContext<ThreadPool::TPHandle&> *bless_gencontext(
+    GenContext<ThreadPool::TPHandle&> *c) {
+    return new BlessedGenContext<ThreadPool::TPHandle&>(
+      this, c, get_osdmap()->get_epoch());
+  }
+    
+  void send_message(int to_osd, Message *m) {
+    osd->send_message_osd_cluster(to_osd, m, get_osdmap()->get_epoch());
+  }
+  void queue_transaction(ObjectStore::Transaction *t) {
+    osd->store->queue_transaction(osr.get(), t);
+  }
+  epoch_t get_epoch() {
+    return get_osdmap()->get_epoch();
+  }
+  const vector<int> &get_acting() {
+    return acting;
+  }
+  std::string gen_dbg_prefix() const { return gen_prefix(); }
+  
+  const map<hobject_t, set<int> > &get_missing_loc() {
+    return missing_loc;
+  }
+  const map<int, pg_missing_t> &get_peer_missing() {
+    return peer_missing;
+  }
+  const pg_missing_t &get_local_missing() {
+    return pg_log.get_missing();
+  }
+  const PGLog &get_log() {
+    return pg_log;
+  }
+  bool pgb_is_primary() const {
+    return is_primary();
+  }
+  OSDMapRef pgb_get_osdmap() const {
+    return get_osdmap();
+  }
+  const pg_info_t &get_info() const {
+    return info;
+  }
+  ObjectContextRef get_obc(
+    const hobject_t &hoid,
+    map<string, bufferptr> &attrs) {
+    return get_object_context(hoid, true, &attrs);
+  }
+
   /*
    * Capture all object state associated with an in-progress read or write.
    */
@@ -955,7 +1061,10 @@ public:
   void on_role_change();
   void on_change(ObjectStore::Transaction *t);
   void on_activate();
-  void on_flushed();
+  void on_flushed() {
+    assert(object_contexts.empty());
+    pgbackend->on_flushed();
+  }
   void on_removal(ObjectStore::Transaction *t);
   void on_shutdown();
 };
index 091b2b95e8f099d61b799768793a5f87b3d288ba..1df55ce5cab41de1e952aef9ab380f5688ae5ea0 100644 (file)
@@ -45,6 +45,7 @@
 
 typedef hobject_t collection_list_handle_t;
 
+typedef uint8_t shard_id_t;
 
 /**
  * osd request identifier