]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/PrimaryLogPG: cancel all objecter ops atomically 20484/head
authorSage Weil <sage@redhat.com>
Mon, 19 Feb 2018 14:54:58 +0000 (08:54 -0600)
committerSage Weil <sage@redhat.com>
Mon, 19 Feb 2018 14:54:58 +0000 (08:54 -0600)
We want to avoid a situation like this:

- start proxy op A (epoch E)
- start proxy op B (epoch E)
- start proxy op C (epoch E)
- objecter sends none of these because target is down in epoch E
- osdmap update to E+1
- pg cancels requeues A, B
- objecter updates to E+1
- objecter sends C
- pg cancels/requeues C

Note that the key thing is that operations on each object are canceled
atomically.  On the interval change we do it all at once.  In the other
cases, we cancel everything on the given object together.

Fixes: http://tracker.ceph.com/issues/22123
Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/PrimaryLogPG.cc
src/osd/PrimaryLogPG.h

index 02279f0a3373043c1b58c5959bc778832cba7bde..98f6779d867851db8ed8a90b87e57a71fdc5cd53 100644 (file)
@@ -3155,14 +3155,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid)
   in_progress_proxy_ops.erase(p);
 }
 
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+                                    vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << prdop->soid << dendl;
   prdop->canceled = true;
 
   // cancel objecter op, if we can
   if (prdop->objecter_tid) {
-    osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+    tids->push_back(prdop->objecter_tid);
     for (uint32_t i = 0; i < prdop->ops.size(); i++) {
       prdop->ops[i].outdata.clear();
     }
@@ -3171,20 +3172,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
   }
 }
 
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
 
   // cancel proxy reads
   map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
   while (p != proxyread_ops.end()) {
-    cancel_proxy_read((p++)->second);
+    cancel_proxy_read((p++)->second, tids);
   }
 
   // cancel proxy writes
   map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
   while (q != proxywrite_ops.end()) {
-    cancel_proxy_write((q++)->second);
+    cancel_proxy_write((q++)->second, tids);
   }
 
   if (requeue) {
@@ -3523,14 +3524,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
   pwop->ctx = NULL;
 }
 
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+                                     vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << pwop->soid << dendl;
   pwop->canceled = true;
 
   // cancel objecter op, if we can
   if (pwop->objecter_tid) {
-    osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+    tids->push_back(pwop->objecter_tid);
     delete pwop->ctx;
     pwop->ctx = NULL;
     proxywrite_ops.erase(pwop->objecter_tid);
@@ -8514,7 +8516,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
     // FIXME: if the src etc match, we could avoid restarting from the
     // beginning.
     CopyOpRef cop = copy_ops[dest];
-    cancel_copy(cop, false);
+    vector<ceph_tid_t> tids;
+    cancel_copy(cop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
@@ -9000,10 +9004,11 @@ void PrimaryLogPG::process_copy_chunk_manifest(hobject_t oid, ceph_tid_t tid, in
 }
 
 void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+  vector<ceph_tid_t> tids;
   for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
       it != proxyread_ops.end();) {
     if (it->second->soid == oid) {
-      cancel_proxy_read((it++)->second);
+      cancel_proxy_read((it++)->second, &tids);
     } else {
       ++it;
     }
@@ -9011,11 +9016,12 @@ void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
   for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
        it != proxywrite_ops.end();) {
     if (it->second->soid == oid) {
-      cancel_proxy_write((it++)->second);
+      cancel_proxy_write((it++)->second, &tids);
     } else {
       ++it;
     }
   }
+  osd->objecter->op_cancel(tids, -ECANCELED);
   kick_proxy_ops_blocked(oid);
 }
 
@@ -9394,7 +9400,8 @@ void PrimaryLogPG::finish_promote_manifest(int r, CopyResults *results,
     agent_choose_mode();
 }
 
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+                              vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
           << " from " << cop->src << " " << cop->oloc
@@ -9402,10 +9409,10 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
 
   // cancel objecter op, if we can
   if (cop->objecter_tid) {
-    osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+    tids->push_back(cop->objecter_tid);
     cop->objecter_tid = 0;
     if (cop->objecter_tid2) {
-      osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+      tids->push_back(cop->objecter_tid2);
       cop->objecter_tid2 = 0;
     }
   }
@@ -9424,13 +9431,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
   cop->obc = ObjectContextRef();
 }
 
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
   while (p != copy_ops.end()) {
     // requeue this op? can I queue up all of them?
-    cancel_copy((p++)->second, requeue);
+    cancel_copy((p++)->second, requeue, tids);
   }
 }
 
@@ -9566,7 +9573,9 @@ int PrimaryLogPG::start_flush(
       osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
       fop->dup_ops.pop_front();
     }
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   if (obc->obs.oi.has_manifest() && obc->obs.oi.manifest.is_chunked()) {
@@ -9772,7 +9781,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       return -EAGAIN;    // will retry
     } else {
       osd->logger->inc(l_osd_tier_try_flush_fail);
-      cancel_flush(fop, false);
+      vector<ceph_tid_t> tids;
+      cancel_flush(fop, false, &tids);
+      osd->objecter->op_cancel(tids, -ECANCELED);
       return -ECANCELED;
     }
   }
@@ -9811,7 +9822,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
     dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
     close_op_ctx(ctx.release());
     osd->logger->inc(l_osd_tier_try_flush_fail);
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
     return -ECANCELED;
   }
 
@@ -9885,17 +9898,18 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   return -EINPROGRESS;
 }
 
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+                               vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
           << fop->objecter_tid << dendl;
   if (fop->objecter_tid) {
-    osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+    tids->push_back(fop->objecter_tid);
     fop->objecter_tid = 0;
   }
   if (fop->io_tids.size()) {
     for (auto &p : fop->io_tids) {
-      osd->objecter->op_cancel(p.second, -ECANCELED);
+      tids->push_back(p.second);
       p.second = 0;
     } 
   }
@@ -9915,12 +9929,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
   flush_ops.erase(fop->obc->obs.oi.soid);
 }
 
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
   while (p != flush_ops.end()) {
-    cancel_flush((p++)->second, requeue);
+    cancel_flush((p++)->second, requeue, tids);
   }
 }
 
@@ -11659,9 +11673,13 @@ void PrimaryLogPG::on_shutdown()
   scrub_clear_state();
 
   unreg_next_scrub();
-  cancel_copy_ops(false);
-  cancel_flush_ops(false);
-  cancel_proxy_ops(false);
+
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(false, &tids);
+  cancel_flush_ops(false, &tids);
+  cancel_proxy_ops(false, &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
+
   apply_and_flush_repops(false);
   cancel_log_updates();
   // we must remove PGRefs, so do this this prior to release_backoffs() callers
@@ -11766,9 +11784,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   clear_scrub_reserved();
 
-  cancel_copy_ops(is_primary());
-  cancel_flush_ops(is_primary());
-  cancel_proxy_ops(is_primary());
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(is_primary(), &tids);
+  cancel_flush_ops(is_primary(), &tids);
+  cancel_proxy_ops(is_primary(), &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
   for (auto& p : waiting_for_unreadable_object) {
index a283fd2864e83c935a4469fdb273a9a126edbf46..f67b43fecad2420d451534ff1171fb7c6504d421 100644 (file)
@@ -1318,8 +1318,8 @@ protected:
   void _copy_some(ObjectContextRef obc, CopyOpRef cop);
   void finish_copyfrom(CopyFromCallback *cb);
   void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
-  void cancel_copy(CopyOpRef cop, bool requeue);
-  void cancel_copy_ops(bool requeue);
+  void cancel_copy(CopyOpRef cop, bool requeue, vector<ceph_tid_t> *tids);
+  void cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   friend struct C_Copyfrom;
 
@@ -1333,8 +1333,8 @@ protected:
     boost::optional<std::function<void()>> &&on_flush);
   void finish_flush(hobject_t oid, ceph_tid_t tid, int r);
   int try_flush_mark_clean(FlushOpRef fop);
-  void cancel_flush(FlushOpRef fop, bool requeue);
-  void cancel_flush_ops(bool requeue);
+  void cancel_flush(FlushOpRef fop, bool requeue, vector<ceph_tid_t> *tids);
+  void cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   /// @return false if clone is has been evicted
   bool is_present_clone(hobject_t coid);
@@ -1383,14 +1383,14 @@ protected:
 
   map<hobject_t, list<OpRequestRef>> in_progress_proxy_ops;
   void kick_proxy_ops_blocked(hobject_t& soid);
-  void cancel_proxy_ops(bool requeue);
+  void cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids);
 
   // -- proxyread --
   map<ceph_tid_t, ProxyReadOpRef> proxyread_ops;
 
   void do_proxy_read(OpRequestRef op, ObjectContextRef obc = NULL);
   void finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r);
-  void cancel_proxy_read(ProxyReadOpRef prdop);
+  void cancel_proxy_read(ProxyReadOpRef prdop, vector<ceph_tid_t> *tids);
 
   friend struct C_ProxyRead;
 
@@ -1399,7 +1399,7 @@ protected:
 
   void do_proxy_write(OpRequestRef op, ObjectContextRef obc = NULL);
   void finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r);
-  void cancel_proxy_write(ProxyWriteOpRef pwop);
+  void cancel_proxy_write(ProxyWriteOpRef pwop, vector<ceph_tid_t> *tids);
 
   friend struct C_ProxyWrite_Commit;