]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: do not preserve op context during flush
authorSamuel Just <sam.just@inktank.com>
Fri, 11 Apr 2014 01:15:30 +0000 (18:15 -0700)
committerSamuel Just <sam.just@inktank.com>
Mon, 28 Apr 2014 19:45:38 +0000 (12:45 -0700)
Any information stashed in the OpContext may be obsolete by the time we
actually mark the object clean.  Instead, let the start_flush caller
clean up its OpContext and in try_flush_mark_clean we'll create a new
one.  The primary reason to keep the OpContext would have been locking,
but we can set the obc as blocking without holding an OpContext, and
that would allow trimming to happen in the mean time (which is good
since trim_object does not respect rw locks since it doesn't change user
visible state).  In try_flush_mark_clean, we requeue the fop->op along
with (but ahead of) the fop->dup_ops.

Fixes: #8068
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 5d8419944f901a0dc79d40c99c1248c7910fee3e..9bcaa9e7524a4e5328447b634891c49a71e62104 100644 (file)
@@ -3265,7 +3265,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          break;
        }
        if (oi.is_dirty()) {
-         result = start_flush(ctx, false, NULL);
+         result = start_flush(ctx->op, ctx->obc, false, NULL, NULL);
+         if (result == -EINPROGRESS)
+           result = -EAGAIN;
        } else {
          result = 0;
        }
@@ -3290,7 +3292,9 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        }
        hobject_t missing;
        if (oi.is_dirty()) {
-         result = start_flush(ctx, true, &missing);
+         result = start_flush(ctx->op, ctx->obc, true, &missing, NULL);
+         if (result == -EINPROGRESS)
+           result = -EAGAIN;
        } else {
          result = 0;
        }
@@ -6068,9 +6072,12 @@ struct C_Flush : public Context {
   }
 };
 
-int ReplicatedPG::start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing)
+int ReplicatedPG::start_flush(
+  OpRequestRef op, ObjectContextRef obc,
+  bool blocking, hobject_t *pmissing,
+  Context *on_flush)
 {
-  const object_info_t& oi = ctx->obc->obs.oi;
+  const object_info_t& oi = obc->obs.oi;
   const hobject_t& soid = oi.soid;
   dout(10) << __func__ << " " << soid
           << " v" << oi.version
@@ -6079,7 +6086,7 @@ int ReplicatedPG::start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing
           << dendl;
 
   // verify there are no (older) check for dirty clones
-  SnapSet& snapset = ctx->obc->ssc->snapset;
+  SnapSet& snapset = obc->ssc->snapset;
   {
     dout(20) << " snapset " << snapset << dendl;
     vector<snapid_t>::reverse_iterator p = snapset.clones.rbegin();
@@ -6114,33 +6121,30 @@ int ReplicatedPG::start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing
   }
 
   if (blocking)
-    ctx->obc->start_block();
+    obc->start_block();
 
   map<hobject_t,FlushOpRef>::iterator p = flush_ops.find(soid);
   if (p != flush_ops.end()) {
     FlushOpRef fop = p->second;
-    if (fop->ctx->op == ctx->op) {
+    if (fop->op == op) {
       // we couldn't take the write lock on a cache-try-flush before;
       // now we are trying again for the lock.
-      // clean up the previous ctx and use the new one.
-      close_op_ctx(fop->ctx, -EAGAIN);
-      fop->ctx = ctx;
       return try_flush_mark_clean(fop);
     }
-    if (fop->flushed_version == ctx->obc->obs.oi.user_version &&
+    if (fop->flushed_version == obc->obs.oi.user_version &&
        (fop->blocking || !blocking)) {
       // nonblocking can join anything
       // blocking can only join a blocking flush
       dout(20) << __func__ << " piggybacking on existing flush " << dendl;
-      fop->dup_ops.push_back(ctx->op);
+      fop->dup_ops.push_back(op);
       return -EAGAIN;   // clean up this ctx; op will retry later
     }
 
     // cancel current flush since it will fail anyway, or because we
     // are blocking and the existing flush is nonblocking.
     dout(20) << __func__ << " canceling previous flush; it will fail" << dendl;
-    if (fop->ctx->op)
-      osd->reply_op_error(fop->ctx->op, -EBUSY);
+    if (fop->op)
+      osd->reply_op_error(fop->op, -EBUSY);
     while (!fop->dup_ops.empty()) {
       osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
       fop->dup_ops.pop_front();
@@ -6209,9 +6213,11 @@ int ReplicatedPG::start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing
   }
 
   FlushOpRef fop(new FlushOp);
-  fop->ctx = ctx;
+  fop->obc = obc;
   fop->flushed_version = oi.user_version;
   fop->blocking = blocking;
+  fop->on_flush = on_flush;
+  fop->op = op;
 
   ObjectOperation o;
   if (oi.is_whiteout()) {
@@ -6256,35 +6262,41 @@ void ReplicatedPG::finish_flush(hobject_t oid, ceph_tid_t tid, int r)
             << " tid " << fop->objecter_tid << dendl;
     return;
   }
-  ObjectContextRef obc = fop->ctx->obc;
+  ObjectContextRef obc = fop->obc;
   fop->objecter_tid = 0;
 
   if (r < 0 && !(r == -ENOENT && fop->removal)) {
-    reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
-             obc->obs.oi.user_version);
+    if (fop->op)
+      osd->reply_op_error(fop->op, -EBUSY);
     if (!fop->dup_ops.empty()) {
       dout(20) << __func__ << " requeueing dups" << dendl;
       requeue_ops(fop->dup_ops);
     }
+    if (fop->on_flush) {
+      Context *on_flush = fop->on_flush;
+      fop->on_flush = NULL;
+      on_flush->complete(-EBUSY);
+    }
     flush_ops.erase(oid);
     return;
   }
 
-  delete fop->ctx->op_t;
-  fop->ctx->op_t = pgbackend->get_transaction();
-
   r = try_flush_mark_clean(fop);
-  if (r == -EBUSY) {
-    reply_ctx(fop->ctx, -EBUSY, obc->obs.oi.version,
-             obc->obs.oi.user_version);
+  if (r == -EBUSY && fop->op) {
+    osd->reply_op_error(fop->op, r);
   }
 }
 
 int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
 {
-  ObjectContextRef obc = fop->ctx->obc;
+  ObjectContextRef obc = fop->obc;
   const hobject_t& oid = obc->obs.oi.soid;
 
+  if (fop->blocking) {
+    obc->stop_block();
+    kick_object_context_blocked(obc);
+  }
+
   if (fop->flushed_version != obc->obs.oi.user_version ||
       !obc->obs.exists) {
     if (obc->obs.exists)
@@ -6298,9 +6310,10 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
       dout(20) << __func__ << " requeueing dups" << dendl;
       requeue_ops(fop->dup_ops);
     }
-    if (fop->blocking) {
-      obc->stop_block();
-      kick_object_context_blocked(obc);
+    if (fop->on_flush) {
+      Context *on_flush = fop->on_flush;
+      fop->on_flush = NULL;
+      on_flush->complete(-EBUSY);
     }
     flush_ops.erase(oid);
     if (fop->blocking)
@@ -6311,38 +6324,30 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
   }
 
   // successfully flushed; can we clear the dirty bit?
-  if (!fop->blocking) {
-    // non-blocking: try to take the lock manually, since we don't
-    // have a ctx yet.
-    if (obc->get_write(fop->ctx->op)) {
-      dout(20) << __func__ << " took write lock" << dendl;
-    } else if (fop->ctx->op) {
-      dout(10) << __func__ << " waiting on write lock" << dendl;
-      return -EINPROGRESS;    // will retry.   this ctx is still alive!
-    } else {
-      dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
-      osd->logger->inc(l_osd_tier_try_flush_fail);
-      cancel_flush(fop, false);
-      return -ECANCELED;
-    }
+  // try to take the lock manually, since we don't
+  // have a ctx yet.
+  if (obc->get_write(fop->op)) {
+    dout(20) << __func__ << " took write lock" << dendl;
+  } else if (fop->op) {
+    dout(10) << __func__ << " waiting on write lock" << dendl;
+    requeue_op(fop->op);
+    requeue_ops(fop->dup_ops);
+    return -EAGAIN;    // will retry
   } else {
-    dout(20) << __func__ << " already holding write lock: "
-            << obc->rwstate << dendl;
-    assert(obc->rwstate.state == ObjectContext::RWState::RWWRITE);
-    assert(fop->ctx->lock_to_release == OpContext::W_LOCK);
-
-    // let other writes continue
-    obc->stop_block();
-    kick_object_context_blocked(obc);
+    dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
+    osd->logger->inc(l_osd_tier_try_flush_fail);
+    cancel_flush(fop, false);
+    return -ECANCELED;
   }
 
   dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
-  ceph_tid_t rep_tid = osd->get_tid();
-  RepGather *repop = new_repop(fop->ctx, obc, rep_tid);
-  OpContext *ctx = fop->ctx;
-  if (!fop->blocking) {
-    ctx->lock_to_release = OpContext::W_LOCK;  // we took it above
-  }
+  RepGather *repop = simple_repop_create(fop->obc);
+  OpContext *ctx = repop->ctx;
+
+  ctx->on_finish = fop->on_flush;
+  fop->on_flush = NULL;
+
+  ctx->lock_to_release = OpContext::W_LOCK;  // we took it above
   ctx->at_version = get_next_version();
 
   ctx->new_obs = obc->obs;
@@ -6353,10 +6358,13 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
 
   osd->logger->inc(l_osd_tier_clean);
 
-  if (!fop->dup_ops.empty()) {
-    dout(20) << __func__ << " queueing dups for " << ctx->at_version << dendl;
-    list<OpRequestRef>& ls = waiting_for_ondisk[ctx->at_version];
+  if (!fop->dup_ops.empty() || fop->op) {
+    dout(20) << __func__ << " requeueing for " << ctx->at_version << dendl;
+    list<OpRequestRef> ls;
+    if (fop->op)
+      ls.push_back(fop->op);
     ls.splice(ls.end(), fop->dup_ops);
+    requeue_ops(ls);
   }
 
   simple_repop_submit(repop);
@@ -6373,23 +6381,27 @@ int ReplicatedPG::try_flush_mark_clean(FlushOpRef fop)
 
 void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
 {
-  dout(10) << __func__ << " " << fop->ctx->obc->obs.oi.soid << " tid "
+  dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
           << fop->objecter_tid << dendl;
   if (fop->objecter_tid) {
     Mutex::Locker l(osd->objecter_lock);
     osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
   }
   if (requeue) {
-    if (fop->ctx->op)
-      requeue_op(fop->ctx->op);
+    if (fop->op)
+      requeue_op(fop->op);
     requeue_ops(fop->dup_ops);
   }
   if (fop->blocking) {
-    fop->ctx->obc->stop_block();
-    kick_object_context_blocked(fop->ctx->obc);
+    fop->obc->stop_block();
+    kick_object_context_blocked(fop->obc);
+  }
+  if (fop->on_flush) {
+    Context *on_flush = fop->on_flush;
+    fop->on_flush = NULL;
+    on_flush->complete(-ECANCELED);
   }
-  flush_ops.erase(fop->ctx->obc->obs.oi.soid);
-  close_op_ctx(fop->ctx, -ECANCELED);
+  flush_ops.erase(fop->obc->obs.oi.soid);
 }
 
 void ReplicatedPG::cancel_flush_ops(bool requeue)
@@ -11176,24 +11188,15 @@ bool ReplicatedPG::agent_maybe_flush(ObjectContextRef& obc)
   // FIXME: flush anything dirty, regardless of what distribution of
   // ages we expect.
 
-  vector<OSDOp> ops;
-  ceph_tid_t rep_tid = osd->get_tid();
-  osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
-  OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
-                                &obc->obs, obc->ssc, this);
-  ctx->op_t = pgbackend->get_transaction();
-  ctx->obc = obc;
-  ctx->mtime = ceph_clock_now(cct);
-  ctx->at_version = get_next_version();
-  ctx->on_finish = new C_AgentFlushStartStop(this, obc->obs.oi.soid);
-
-  int result = start_flush(ctx, false, NULL);
+  Context *on_flush = new C_AgentFlushStartStop(this, obc->obs.oi.soid);
+  int result = start_flush(
+    OpRequestRef(), obc, false, NULL,
+    on_flush);
   if (result != -EINPROGRESS) {
+    on_flush->complete(result);
     dout(10) << __func__ << " start_flush() failed " << obc->obs.oi
       << " with " << result << dendl;
     osd->logger->inc(l_osd_agent_skip);
-    if (result != -ECANCELED)
-      close_op_ctx(ctx, result);
     return false;
   }
 
index 38bdfbe34ad00186f93b3e6d7388c34477e25872..b69a16316138e29d59c3245293b7aecced7054b7 100644 (file)
@@ -198,17 +198,21 @@ public:
   friend class PromoteCallback;
 
   struct FlushOp {
-    OpContext *ctx;             ///< the parent OpContext
-    list<OpRequestRef> dup_ops; ///< dup flush requests
+    ObjectContextRef obc;       ///< obc we are flushing
+    OpRequestRef op;            ///< initiating op
+    list<OpRequestRef> dup_ops; ///< bandwagon jumpers
     version_t flushed_version;  ///< user version we are flushing
     ceph_tid_t objecter_tid;    ///< copy-from request tid
     int rval;                   ///< copy-from result
     bool blocking;              ///< whether we are blocking updates
     bool removal;               ///< we are removing the backend object
+    Context *on_flush;          ///< callback, may be null
 
     FlushOp()
-      : ctx(NULL), objecter_tid(0), rval(0),
-       blocking(false), removal(false) {}
+      : objecter_tid(0), rval(0),
+       blocking(false), removal(false),
+       on_flush(NULL) {}
+    ~FlushOp() { assert(!on_flush); }
   };
   typedef boost::shared_ptr<FlushOp> FlushOpRef;
 
@@ -1221,7 +1225,11 @@ protected:
   // -- flush --
   map<hobject_t, FlushOpRef> flush_ops;
 
-  int start_flush(OpContext *ctx, bool blocking, hobject_t *pmissing);
+  /// start_flush takes ownership of on_flush iff ret == -EINPROGRESS
+  int start_flush(
+    OpRequestRef op, ObjectContextRef obc,
+    bool blocking, hobject_t *pmissing,
+    Context *on_flush);
   void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
   int try_flush_mark_clean(FlushOpRef fop);
   void cancel_flush(FlushOpRef fop, bool requeue);