From fe32d6ee88ab0b9e9b4b808a60638b425e4eb728 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 7 Oct 2014 11:40:54 +0100 Subject: [PATCH] osdc/Objecter: add op_cancel_writes This is for use by Client when it encounters the FULL flag and wants to abort operations rather than having them pause. Signed-off-by: John Spray --- src/osdc/Objecter.cc | 58 ++++++++++++++++++++++++++++++++++++++++---- src/osdc/Objecter.h | 2 ++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 5ad65d488853f..3f60c71975e40 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -1881,7 +1881,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) map::iterator p = s->ops.find(tid); if (p == s->ops.end()) { - ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl; + ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " << s->osd << dendl; return -ENOENT; } @@ -1891,7 +1891,7 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) s->con->revoke_rx_buffer(tid); } - ldout(cct, 10) << __func__ << " tid " << tid << dendl; + ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd << dendl; Op *op = p->second; if (op->onack) { op->onack->complete(r); @@ -1913,6 +1913,17 @@ int Objecter::op_cancel(ceph_tid_t tid, int r) int ret = 0; rwlock.get_write(); + ret = _op_cancel(tid, r); + rwlock.unlock(); + + return ret; +} + +int Objecter::_op_cancel(ceph_tid_t tid, int r) +{ + int ret = 0; + + ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r << dendl; start: @@ -1926,12 +1937,13 @@ start: /* oh no! raced, maybe tid moved to another session, restarting */ goto start; } - rwlock.unlock(); return ret; } s->lock.unlock(); } + ldout(cct, 5) << __func__ << ": tid " << tid << " not found in live sessions" << dendl; + // Handle case where the op is in homeless session homeless_session->lock.get_read(); if (homeless_session->ops.find(tid) != homeless_session->ops.end()) { @@ -1941,18 +1953,54 @@ start: /* oh no! raced, maybe tid moved to another session, restarting */ goto start; } else { - rwlock.unlock(); return ret; } } else { homeless_session->lock.unlock(); } - rwlock.unlock(); + ldout(cct, 5) << __func__ << ": tid " << tid << " not found in homeless session" << dendl; return ret; } +/** + * Any write op which is in progress at the start of this call shall no longer + * be in progress when this call ends. Operations started after the start + * of this call may still be in progress when this call ends. + * + * @return the latest possible epoch in which a cancelled op could have existed + */ +epoch_t Objecter::op_cancel_writes(int r) +{ + rwlock.get_write(); + + std::vector to_cancel; + + for (map::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) { + OSDSession *s = siter->second; + s->lock.get_read(); + for (map::iterator op_i = s->ops.begin(); op_i != s->ops.end(); ++op_i) { + if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE) { + to_cancel.push_back(op_i->first); + } + } + s->lock.unlock(); + } + + for (std::vector::iterator titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) { + int cancel_result = _op_cancel(*titer, r); + // We hold rwlock across search and cancellation, so cancels should always succeed + assert(cancel_result == 0); + } + + const epoch_t epoch = osdmap->get_epoch(); + + rwlock.unlock(); + + return epoch; +} + bool Objecter::is_pg_changed( int oldprimary, const vector& oldacting, diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index c436bf23686d8..94ca79be7a5ba 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1825,9 +1825,11 @@ public: /// cancel an in-progress request with the given return code private: int op_cancel(OSDSession *s, ceph_tid_t tid, int r); + int _op_cancel(ceph_tid_t tid, int r); friend class C_CancelOp; public: int op_cancel(ceph_tid_t tid, int r); + epoch_t op_cancel_writes(int r); // commands int osd_command(int osd, vector& cmd, -- 2.39.5