]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: add op_cancel_writes
authorJohn Spray <john.spray@redhat.com>
Tue, 7 Oct 2014 10:40:54 +0000 (11:40 +0100)
committerJohn Spray <john.spray@redhat.com>
Tue, 16 Dec 2014 20:45:58 +0000 (20:45 +0000)
This is for use by Client when it encounters
the FULL flag and wants to abort operations
rather than having them pause.

Signed-off-by: John Spray <john.spray@redhat.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 5ad65d488853fc9a8f4e9aff209df28251985cd2..3f60c71975e40856cd08677ef98a0ec8130383a1 100644 (file)
@@ -1881,7 +1881,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
 
   map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
   if (p == s->ops.end()) {
-    ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+    ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " << s->osd << dendl;
     return -ENOENT;
   }
 
@@ -1891,7 +1891,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
     s->con->revoke_rx_buffer(tid);
   }
 
-  ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+  ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd << dendl;
   Op *op = p->second;
   if (op->onack) {
     op->onack->complete(r);
@@ -1913,6 +1913,17 @@ int Objecter::op_cancel(ceph_tid_t tid, int r)
   int ret = 0;
 
   rwlock.get_write();
+  ret = _op_cancel(tid, r);
+  rwlock.unlock();
+
+  return ret;
+}
+
+int Objecter::_op_cancel(ceph_tid_t tid, int r)
+{
+  int ret = 0;
+
+  ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r << dendl;
 
 start:
 
@@ -1926,12 +1937,13 @@ start:
         /* oh no! raced, maybe tid moved to another session, restarting */
         goto start;
       }
-      rwlock.unlock();
       return ret;
     }
     s->lock.unlock();
   }
 
+  ldout(cct, 5) << __func__ << ": tid " << tid << " not found in live sessions" << dendl;
+
   // Handle case where the op is in homeless session
   homeless_session->lock.get_read();
   if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
@@ -1941,18 +1953,54 @@ start:
       /* oh no! raced, maybe tid moved to another session, restarting */
       goto start;
     } else {
-      rwlock.unlock();
       return ret;
     }
   } else {
     homeless_session->lock.unlock();
   }
 
-  rwlock.unlock();
+  ldout(cct, 5) << __func__ << ": tid " << tid << " not found in homeless session" << dendl;
 
   return ret;
 }
 
+/**
+ * Any write op which is in progress at the start of this call shall no longer
+ * be in progress when this call ends.  Operations started after the start
+ * of this call may still be in progress when this call ends.
+ *
+ * @return the latest possible epoch in which a cancelled op could have existed
+ */
+epoch_t Objecter::op_cancel_writes(int r)
+{
+  rwlock.get_write();
+
+  std::vector<ceph_tid_t> to_cancel;
+
+  for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+    OSDSession *s = siter->second;
+    s->lock.get_read();
+    for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin(); op_i != s->ops.end(); ++op_i) {
+      if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE) {
+        to_cancel.push_back(op_i->first);
+      }
+    }
+    s->lock.unlock();
+  }
+
+  for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
+    int cancel_result = _op_cancel(*titer, r);
+    // We hold rwlock across search and cancellation, so cancels should always succeed
+    assert(cancel_result == 0);
+  }
+
+  const epoch_t epoch = osdmap->get_epoch();
+
+  rwlock.unlock();
+
+  return epoch;
+}
+
 bool Objecter::is_pg_changed(
   int oldprimary,
   const vector<int>& oldacting,
index c436bf23686d8e3b90d3cf12bae3cc85d5184ec9..94ca79be7a5ba818f5a347556f000c7cb68df1a7 100644 (file)
@@ -1825,9 +1825,11 @@ public:
   /// cancel an in-progress request with the given return code
 private:
   int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
+  int _op_cancel(ceph_tid_t tid, int r);
   friend class C_CancelOp;
 public:
   int op_cancel(ceph_tid_t tid, int r);
+  epoch_t op_cancel_writes(int r);
 
   // commands
   int osd_command(int osd, vector<string>& cmd,