]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: promote: first draft pass at doing object promotion
authorGreg Farnum <greg@inktank.com>
Thu, 10 Oct 2013 00:48:57 +0000 (17:48 -0700)
committerSage Weil <sage@inktank.com>
Sat, 14 Dec 2013 00:35:52 +0000 (16:35 -0800)
This is not yet at all complete -- among other things, it will
retry forever on any object which doesn't exist in the underlying
pool. But it demonstrates the approach reasonably clearly.

Signed-off-by: Greg Farnum <greg@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>y
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index f6399e340a64c6dc23048994fbbe4c37fd6d69c8..545d92e200f446fe752c3e71778b4a9a5f412ede 100644 (file)
@@ -1227,10 +1227,10 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
     return false;
     break;
   case pg_pool_t::CACHEMODE_WRITEBACK:
-    if (obc.get()) {
+    if (obc.get() && obc->obs.exists) { // we have the object already
       return false;
-    } else {
-      do_cache_redirect(op, obc);
+    } else { // try and promote!
+      promote_object(op, obc);
       return true;
     }
     break;
@@ -1238,12 +1238,17 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
     do_cache_redirect(op, obc);
     return true;
     break;
-  case pg_pool_t::CACHEMODE_READONLY:
-    if (obc.get() && !r) {
+  case pg_pool_t::CACHEMODE_READONLY: // TODO: clean this case up
+    if (!obc.get() && r == -ENOENT) { // we don't have the object and op's a read
+      promote_object(op, obc);
+      return true;
+    } else if (obc.get() && obc->obs.exists) { // we have the object locally
       return false;
-    } else {
+    } else if (!r) { // it must be a write
       do_cache_redirect(op, obc);
       return true;
+    } else { // crap, there was a failure of some kind
+      return false;
     }
     break;
   default:
@@ -1266,6 +1271,32 @@ void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
   return;
 }
 
+void ReplicatedPG::promote_object(OpRequestRef op, ObjectContextRef obc)
+{
+  MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+  if (!obc.get()) { // we need to create an ObjectContext
+    int r = find_object_context(
+      hobject_t(m->get_oid(),
+             m->get_object_locator().key,
+             m->get_snapid(),
+             m->get_pg().ps(),
+             m->get_object_locator().get_pool(),
+             m->get_object_locator().nspace),
+      &obc, true, NULL);
+    assert(r == 0); // a lookup that allows creates can't fail now
+  }
+dout(10) << __func__ << " " << obc->obs.oi.soid << dendl;
+
+  hobject_t temp_target = generate_temp_object();
+  PromoteCallback *cb = new PromoteCallback(obc, temp_target, this);
+  object_locator_t oloc(m->get_object_locator());
+  oloc.pool = pool.info.tier_of;
+  start_copy(cb, obc, obc->obs.oi.soid, oloc, 0, temp_target);
+
+  assert(obc->is_blocked());
+  wait_for_blocked_object(obc->obs.oi.soid, op);
+}
+
 void ReplicatedPG::execute_ctx(OpContext *ctx)
 {
   dout(10) << __func__ << " " << ctx << dendl;
@@ -4575,6 +4606,46 @@ void ReplicatedPG::finish_copyfrom(OpContext *ctx)
   ctx->delta_stats.num_wr_kb += SHIFT_ROUND_UP(obs.oi.size, 10);
 }
 
+void ReplicatedPG::finish_promote(CopyResults *results, ObjectContextRef obc,
+                                  hobject_t& temp_obj)
+{
+  vector<OSDOp> ops;
+  tid_t rep_tid = osd->get_tid();
+  osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
+  OpContext *tctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, obc->ssc, this);
+  tctx->mtime = ceph_clock_now(g_ceph_context);
+  tctx->op_t.swap(results->final_tx);
+  if (results->started_temp_obj) {
+       tctx->discard_temp_oid = temp_obj;
+  }
+
+  RepGather *repop = new_repop(tctx, obc, rep_tid);
+  C_KickBlockedObject *blockedcb = new C_KickBlockedObject(obc, this);
+  repop->ondone = blockedcb;
+  object_stat_sum_t delta;
+  ++delta.num_objects;
+  obc->obs.exists = true;
+  delta.num_bytes += results->object_size;
+  obc->obs.oi.category = results->category;
+  info.stats.stats.add(delta, obc->obs.oi.category);
+  tctx->at_version.epoch = get_osdmap()->get_epoch();
+  tctx->at_version.version = pg_log.get_head().version + 1;
+  tctx->user_at_version = results->user_version;
+
+  tctx->log.push_back(pg_log_entry_t(
+         pg_log_entry_t::MODIFY,
+         obc->obs.oi.soid,
+         tctx->at_version,
+         tctx->obs->oi.version,
+         tctx->user_at_version,
+         osd_reqid_t(),
+         repop->ctx->mtime));
+  append_log(tctx->log, eversion_t(), tctx->local_t);
+  issue_repop(repop, repop->ctx->mtime);
+  eval_repop(repop);
+  repop->put();
+}
+
 void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
index 7ea1b13f69716e01b1dc0a0a2488c3229d99e9c2..71475f83a3cc8df817ba809b65706a873da32db8 100644 (file)
@@ -210,6 +210,23 @@ public:
   };
   friend class CopyFromCallback;
 
+  class PromoteCallback: public CopyCallback {
+    ObjectContextRef obc;
+    hobject_t temp_obj;
+    ReplicatedPG *pg;
+  public:
+    PromoteCallback(ObjectContextRef obc_, const hobject_t& temp_obj_,
+                    ReplicatedPG *pg_) :
+      obc(obc_), temp_obj(temp_obj_), pg(pg_) {}
+
+    virtual void finish(CopyCallbackResults results) {
+      CopyResults* results_data = results.get<1>();
+      assert(results.get<0>() == 0); // we don't handle errors right now
+      pg->finish_promote(results_data, obc, temp_obj);
+      delete results_data;
+    }
+  };
+
   boost::scoped_ptr<PGBackend> pgbackend;
   PGBackend *get_pgbackend() {
     return pgbackend.get();
@@ -784,8 +801,19 @@ protected:
                                   uint64_t offset, uint64_t length, bool count_bytes);
   void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
 
+  /**
+   * This helper function is called from do_op if the ObjectContext lookup fails.
+   * @returns true if the caching code is handling the Op, false otherwise.
+   */
   inline bool maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r);
+  /**
+   * This helper function tells the client to redirect their request elsewhere.
+   */
   void do_cache_redirect(OpRequestRef op, ObjectContextRef obc);
+  /**
+   * This function starts up a copy from
+   */
+  void promote_object(OpRequestRef op, ObjectContextRef obc);
 
   int prepare_transaction(OpContext *ctx);
   
@@ -954,6 +982,8 @@ protected:
   void _build_finish_copy_transaction(CopyOpRef cop,
                                       ObjectStore::Transaction& t);
   void finish_copyfrom(OpContext *ctx);
+  void finish_promote(CopyResults *results, ObjectContextRef obc,
+                      hobject_t& temp_obj);
   void cancel_copy(CopyOpRef cop, bool requeue);
   void cancel_copy_ops(bool requeue);