]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PrimaryLogPG: cancel all objecter ops atomically 20609/head
authorSage Weil <sage@redhat.com>
Mon, 19 Feb 2018 14:54:58 +0000 (08:54 -0600)
committerPrashant D <pdhange@redhat.com>
Tue, 27 Feb 2018 06:04:14 +0000 (01:04 -0500)
We want to avoid a situation like this:

- start proxy op A (epoch E)
- start proxy op B (epoch E)
- start proxy op C (epoch E)
- objecter sends none of these because target is down in epoch E
- osdmap update to E+1
- pg cancels requeues A, B
- objecter updates to E+1
- objecter sends C
- pg cancels/requeues C

Note that the key thing is that operations on each object are canceled
atomically.  On the interval change we do it all at once.  In the other
cases, we cancel everything on the given object together.

Fixes: http://tracker.ceph.com/issues/22123
Signed-off-by: Sage Weil <sage@redhat.com>
(cherry picked from commit 93fd56ed039363c4f169259a0a560b968d1a0333)

Conflicts:
src/osd/PrimaryLogPG.cc: - Resolved in cancel_copy and added
  cancel_and_requeue_proxy_ops.
 - Define io_tids to FlushOp

src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 7aba1df5d2d9852b45dbc6ebc807c2273dd40fd6..6442b4718fba3758c9288a517d098748391dbe9b 100644 (file)
@@ -2899,14 +2899,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid)
   in_progress_proxy_ops.erase(p);
 }
 
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+                                    vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << prdop->soid << dendl;
   prdop->canceled = true;
 
   // cancel objecter op, if we can
   if (prdop->objecter_tid) {
-    osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+    tids->push_back(prdop->objecter_tid);
     for (uint32_t i = 0; i < prdop->ops.size(); i++) {
       prdop->ops[i].outdata.clear();
     }
@@ -2915,20 +2916,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
   }
 }
 
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
 
   // cancel proxy reads
   map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
   while (p != proxyread_ops.end()) {
-    cancel_proxy_read((p++)->second);
+    cancel_proxy_read((p++)->second, tids);
   }
 
   // cancel proxy writes
   map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
   while (q != proxywrite_ops.end()) {
-    cancel_proxy_write((q++)->second);
+    cancel_proxy_write((q++)->second, tids);
   }
 
   if (requeue) {
@@ -3082,14 +3083,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
   pwop->ctx = NULL;
 }
 
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+                                     vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << pwop->soid << dendl;
   pwop->canceled = true;
 
   // cancel objecter op, if we can
   if (pwop->objecter_tid) {
-    osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+    tids->push_back(pwop->objecter_tid);
     delete pwop->ctx;
     pwop->ctx = NULL;
     proxywrite_ops.erase(pwop->objecter_tid);
@@ -8026,7 +8028,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
     // FIXME: if the src etc match, we could avoid restarting from the
     // beginning.
     CopyOpRef cop = copy_ops[dest];
-    cancel_copy(cop, false);
+    vector<ceph_tid_t> tids;
+    cancel_copy(cop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
@@ -8106,6 +8110,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
 
 void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 {
+  vector<ceph_tid_t> tids;
   dout(10) << __func__ << " " << oid << " tid " << tid
           << " " << cpp_strerror(r) << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
@@ -8292,7 +8297,7 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
        it != proxyread_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_read((it++)->second);
+       cancel_proxy_read((it++)->second, &tids);
       } else {
        ++it;
       }
@@ -8300,17 +8305,40 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
         it != proxywrite_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_write((it++)->second);
+       cancel_proxy_write((it++)->second, &tids);
       } else {
        ++it;
       }
     }
+    osd->objecter->op_cancel(tids, -ECANCELED);
     kick_proxy_ops_blocked(cobc->obs.oi.soid);
   }
 
   kick_object_context_blocked(cobc);
 }
 
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+  vector<ceph_tid_t> tids;
+  for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+      it != proxyread_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_read((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+       it != proxywrite_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_write((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  osd->objecter->op_cancel(tids, -ECANCELED);
+  kick_proxy_ops_blocked(oid);
+}
+
 void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
@@ -8645,7 +8673,8 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     agent_choose_mode();
 }
 
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+                              vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
           << " from " << cop->src << " " << cop->oloc
@@ -8653,10 +8682,10 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
 
   // cancel objecter op, if we can
   if (cop->objecter_tid) {
-    osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+    tids->push_back(cop->objecter_tid);
     cop->objecter_tid = 0;
     if (cop->objecter_tid2) {
-      osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+      tids->push_back(cop->objecter_tid2);
       cop->objecter_tid2 = 0;
     }
   }
@@ -8675,13 +8704,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
   cop->obc = ObjectContextRef();
 }
 
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
   while (p != copy_ops.end()) {
     // requeue this op? can I queue up all of them?
-    cancel_copy((p++)->second, requeue);
+    cancel_copy((p++)->second, requeue, tids);
   }
 }
 
@@ -8817,7 +8846,9 @@ int PrimaryLogPG::start_flush(
       osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
       fop->dup_ops.pop_front();
     }
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   /**
@@ -9017,7 +9048,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       return -EAGAIN;    // will retry
     } else {
       osd->logger->inc(l_osd_tier_try_flush_fail);
-      cancel_flush(fop, false);
+      vector<ceph_tid_t> tids;
+      cancel_flush(fop, false, &tids);
+      osd->objecter->op_cancel(tids, -ECANCELED);
       return -ECANCELED;
     }
   }
@@ -9056,7 +9089,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
     close_op_ctx(ctx.release());
     osd->logger->inc(l_osd_tier_try_flush_fail);
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
     return -ECANCELED;
   }
 
@@ -9096,15 +9131,22 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   return -EINPROGRESS;
 }
 
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+                               vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
           << fop->objecter_tid << dendl;
   if (fop->objecter_tid) {
-    osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+    tids->push_back(fop->objecter_tid);
     fop->objecter_tid = 0;
   }
-  if (fop->blocking) {
+  if (fop->io_tids.size()) {
+    for (auto &p : fop->io_tids) {
+      tids->push_back(p.second);
+      p.second = 0;
+    } 
+  }
+  if (fop->blocking && fop->obc->is_blocked()) {
     fop->obc->stop_block();
     kick_object_context_blocked(fop->obc);
   }
@@ -9120,12 +9162,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
   flush_ops.erase(fop->obc->obs.oi.soid);
 }
 
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
   while (p != flush_ops.end()) {
-    cancel_flush((p++)->second, requeue);
+    cancel_flush((p++)->second, requeue, tids);
   }
 }
 
@@ -11002,9 +11044,13 @@ void PrimaryLogPG::on_shutdown()
   scrub_clear_state();
 
   unreg_next_scrub();
-  cancel_copy_ops(false);
-  cancel_flush_ops(false);
-  cancel_proxy_ops(false);
+
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(false, &tids);
+  cancel_flush_ops(false, &tids);
+  cancel_proxy_ops(false, &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
+
   apply_and_flush_repops(false);
   cancel_log_updates();
   // we must remove PGRefs, so do this this prior to release_backoffs() callers
@@ -11109,9 +11155,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   clear_scrub_reserved();
 
-  cancel_copy_ops(is_primary());
-  cancel_flush_ops(is_primary());
-  cancel_proxy_ops(is_primary());
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(is_primary(), &tids);
+  cancel_flush_ops(is_primary(), &tids);
+  cancel_proxy_ops(is_primary(), &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
   for (auto& p : waiting_for_unreadable_object) {
index e660466b685e03e1eea2a4cfa2cc21be33c503e6..e59f8c662daa6a9a473c855b786fe1f6483973d3 100644 (file)
@@ -228,6 +228,8 @@ public:
     bool blocking;              ///< whether we are blocking updates
     bool removal;               ///< we are removing the backend object
     boost::optional<std::function<void()>> on_flush; ///< callback, may be null
+    // for chunked object
+    map<uint64_t, ceph_tid_t> io_tids;
 
     FlushOp()
       : flushed_version(0), objecter_tid(0), rval(0),
@@ -1288,8 +1290,8 @@ protected:
   void _copy_some(ObjectContextRef obc, CopyOpRef cop);
   void finish_copyfrom(CopyFromCallback *cb);
   void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
-  void cancel_copy(CopyOpRef cop, bool requeue);
-  void cancel_copy_ops(bool requeue);
+  void cancel_copy(CopyOpRef cop, bool requeue, vector<ceph_tid_t> *tids);
+  void cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   friend struct C_Copyfrom;
 
@@ -1303,8 +1305,8 @@ protected:
     boost::optional<std::function<void()>> &&on_flush);
   void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
   int try_flush_mark_clean(FlushOpRef fop);
-  void cancel_flush(FlushOpRef fop, bool requeue);
-  void cancel_flush_ops(bool requeue);
+  void cancel_flush(FlushOpRef fop, bool requeue, vector<ceph_tid_t> *tids);
+  void cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   /// @return false if clone is has been evicted
   bool is_present_clone(hobject_t coid);
@@ -1351,14 +1353,14 @@ protected:
 
   map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
   void kick_proxy_ops_blocked(hobject_t& soid);
-  void cancel_proxy_ops(bool requeue);
+  void cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   // -- proxyread --
   map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
 
   void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
   void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
-  void cancel_proxy_read(ProxyReadOpRef prdop);
+  void cancel_proxy_read(ProxyReadOpRef prdop, vector<ceph_tid_t> *tids);
 
   friend struct C_ProxyRead;
 
@@ -1366,8 +1368,9 @@ protected:
   map<ceph_tid_t, ProxyWriteOpRef> proxywrite_ops;
 
   void do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc = NULL);
+  void cancel_and_requeue_proxy_ops(hobject_t oid);
   void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
-  void cancel_proxy_write(ProxyWriteOpRef pwop);
+  void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);
 
   friend struct C_ProxyWrite_Commit;