]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PrimaryLogPG: cancel in-flight manifest ops on interval changing
authorxie xingguo <xie.xingguo@zte.com.cn>
Tue, 27 Aug 2019 01:51:04 +0000 (09:51 +0800)
committerxie xingguo <xie.xingguo@zte.com.cn>
Wed, 28 Aug 2019 10:03:20 +0000 (18:03 +0800)
We don't want to cache any object_contexts through the interval change.
We actually assert that all currently live references are dead by the
time the flush for the next interval completes.

On detecting a interval changing, let's cancel any in-flight manifest
ops, just as we do for any other cache-tier ops.

Fixes: https://tracker.ceph.com/issues/41514
Signed-off-by: xie xingguo <xie.xingguo@zte.com.cn>
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index e5a642c44c9541fbc370aa568e74ebcfb169b0de..eb8cf3586688076d4cfec8cd4fba7d095d1a4ad3 100644 (file)
@@ -3174,29 +3174,33 @@ void PrimaryLogPG::do_proxy_chunked_op(OpRequestRef op, const hobject_t& missing
 
 struct RefCountCallback : public Context {
 public:
-  PrimaryLogPG *pg;
   PrimaryLogPG::OpContext *ctx;
   OSDOp& osd_op;
-  epoch_t last_peering_reset;
+  bool requeue = false;
     
-  RefCountCallback(PrimaryLogPG *pg, PrimaryLogPG::OpContext *ctx,
-                  OSDOp &osd_op, epoch_t lpr) 
-    : pg(pg), ctx(ctx), osd_op(osd_op), last_peering_reset(lpr)
-  {}
+  RefCountCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+    : ctx(ctx), osd_op(osd_op) {}
   void finish(int r) override {
-    std::scoped_lock locker{*pg};
-    if (last_peering_reset == pg->get_last_peering_reset()) {
-      if (r >= 0) {
-       osd_op.rval = 0;
-       pg->execute_ctx(ctx);
-      } else {
-       if (ctx->op) {
-         pg->osd->reply_op_error(ctx->op, r);
-       }
-       pg->close_op_ctx(ctx);
+    // NB: caller must already have pg->lock held
+    if (r >= 0) {
+      osd_op.rval = 0;
+      ctx->pg->execute_ctx(ctx);
+    } else {
+       // on cancel simply toss op out,
+       // or requeue as requested
+      if (r != -ECANCELED) {
+        if (ctx->op)
+          ctx->pg->osd->reply_op_error(ctx->op, r);
+      } else if (requeue) {
+        if (ctx->op)
+          ctx->pg->requeue_op(ctx->op);
       }
+      ctx->pg->close_op_ctx(ctx);
     }
   }
+  void set_requeue(bool rq) {
+    requeue = rq;
+  }
 };
 
 struct SetManifestFinisher : public PrimaryLogPG::OpFinisher {
@@ -3210,8 +3214,45 @@ struct SetManifestFinisher : public PrimaryLogPG::OpFinisher {
   }
 };
 
+struct C_SetManifestRefCountDone : public Context {
+  RefCountCallback* cb;
+  hobject_t soid;
+  C_SetManifestRefCountDone(
+    RefCountCallback* cb, hobject_t soid) : cb(cb), soid(soid) {}
+  void finish(int r) override {
+    if (r == -ECANCELED)
+      return;
+    auto pg = cb->ctx->pg;
+    std::scoped_lock locker{*pg};
+    auto it = pg->manifest_ops.find(soid);
+    if (it == pg->manifest_ops.end()) {
+      // raced with cancel_manifest_ops
+      return;
+    }
+    pg->manifest_ops.erase(it);
+    cb->complete(r);
+  }
+};
+
+void PrimaryLogPG::cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids)
+{
+  dout(10) << __func__ << dendl;
+  auto p = manifest_ops.begin();
+  while (p != manifest_ops.end()) {
+    auto mop = p->second;
+    // cancel objecter op, if we can
+    if (mop->objecter_tid) {
+      tids->push_back(mop->objecter_tid);
+      mop->objecter_tid = 0;
+    }
+    mop->cb->set_requeue(requeue);
+    mop->cb->complete(-ECANCELED);
+    manifest_ops.erase(p++);
+  }
+}
+
 void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
-                                     SnapContext snapc, bool get, Context *cb, uint64_t offset)
+                                     SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset)
 {
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY |
                    CEPH_OSD_FLAG_RWORDERED;                      
@@ -3232,18 +3273,21 @@ void PrimaryLogPG::refcount_manifest(ObjectContextRef obc, object_locator_t oloc
     obj_op.call("cas", "chunk_put", in);         
   }                                                     
   
-  unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
-  Context *c;
+  Context *c = nullptr;
   if (cb) {
-    c = new C_OnFinisher(cb, osd->objecter_finishers[n]);
-  } else {
-    c = NULL;
+    unsigned n = info.pgid.hash_to_shard(osd->m_objecter_finishers);
+    C_SetManifestRefCountDone *fin =
+      new C_SetManifestRefCountDone(cb, obc->obs.oi.soid);
+    c = new C_OnFinisher(fin, osd->objecter_finishers[n]);
   }
 
-  osd->objecter->mutate(
+  auto tid = osd->objecter->mutate(
     soid.oid, oloc, obj_op, snapc,
     ceph::real_clock::from_ceph_timespec(obc->obs.oi.mtime),
     flags, c);
+  if (cb) {
+    manifest_ops[obc->obs.oi.soid] = std::make_shared<ManifestOp>(cb, tid);
+  }
 }  
 
 void PrimaryLogPG::do_proxy_chunked_read(OpRequestRef op, ObjectContextRef obc, int op_index,
@@ -6644,8 +6688,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          // start
          ctx->op_finishers[ctx->current_osd_subop_num].reset(
            new SetManifestFinisher(osd_op));
-         RefCountCallback *fin = new RefCountCallback(
-           this, ctx, osd_op, get_last_peering_reset());
+         RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
          refcount_manifest(ctx->obc, target_oloc, target, SnapContext(),
                            true, fin, 0);
          result = -EINPROGRESS;
@@ -6769,8 +6812,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          // start
          ctx->op_finishers[ctx->current_osd_subop_num].reset(
            new SetManifestFinisher(osd_op));
-         RefCountCallback *fin = new RefCountCallback(
-           this, ctx, osd_op, get_last_peering_reset());
+         RefCountCallback *fin = new RefCountCallback(ctx, osd_op);
          refcount_manifest(ctx->obc, tgt_oloc, target, SnapContext(),
                            true, fin, src_offset);
          result = -EINPROGRESS;
@@ -11849,6 +11891,7 @@ void PrimaryLogPG::on_shutdown()
   cancel_copy_ops(false, &tids);
   cancel_flush_ops(false, &tids);
   cancel_proxy_ops(false, &tids);
+  cancel_manifest_ops(false, &tids);
   osd->objecter->op_cancel(tids, -ECANCELED);
 
   apply_and_flush_repops(false);
@@ -11967,6 +12010,7 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction &t)
   cancel_copy_ops(is_primary(), &tids);
   cancel_flush_ops(is_primary(), &tids);
   cancel_proxy_ops(is_primary(), &tids);
+  cancel_manifest_ops(is_primary(), &tids);
   osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
index e8eeca9e009406cc8c7292c4e56328d4912bfe9e..c5d05615092ca2e396b400f99a7f40a236e16972 100644 (file)
@@ -34,6 +34,7 @@
 
 class CopyFromCallback;
 class PromoteCallback;
+struct RefCountCallback;
 
 class PrimaryLogPG;
 class PGLSFilter;
@@ -257,6 +258,17 @@ public:
   };
   typedef std::shared_ptr<FlushOp> FlushOpRef;
 
+  friend struct RefCountCallback;
+  struct ManifestOp {
+    RefCountCallback *cb;
+    ceph_tid_t objecter_tid;
+
+    ManifestOp(RefCountCallback* cb, ceph_tid_t tid)
+      : cb(cb), objecter_tid(tid) {}
+  };
+  typedef std::shared_ptr<ManifestOp> ManifestOpRef;
+  map<hobject_t, ManifestOpRef> manifest_ops;
+
   boost::scoped_ptr<PGBackend> pgbackend;
   PGBackend *get_pgbackend() override {
     return pgbackend.get();
@@ -1456,8 +1468,9 @@ protected:
                             uint64_t last_offset);
   void handle_manifest_flush(hobject_t oid, ceph_tid_t tid, int r,
                             uint64_t offset, uint64_t last_offset, epoch_t lpr);
+  void cancel_manifest_ops(bool requeue, vector<ceph_tid_t> *tids);
   void refcount_manifest(ObjectContextRef obc, object_locator_t oloc, hobject_t soid,
-                         SnapContext snapc, bool get, Context *cb, uint64_t offset);
+                         SnapContext snapc, bool get, RefCountCallback *cb, uint64_t offset);
 
   friend struct C_ProxyChunkRead;
   friend class PromoteManifestCallback;