map<ceph_tid_t, Op*>::iterator p = s->ops.find(tid);
if (p == s->ops.end()) {
- ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
+ ldout(cct, 10) << __func__ << " tid " << tid << " dne in session " << s->osd << dendl;
return -ENOENT;
}
s->con->revoke_rx_buffer(tid);
}
- ldout(cct, 10) << __func__ << " tid " << tid << dendl;
+ ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd << dendl;
Op *op = p->second;
if (op->onack) {
op->onack->complete(r);
int ret = 0;
rwlock.get_write();
+ ret = _op_cancel(tid, r);
+ rwlock.unlock();
+
+ return ret;
+}
+
+int Objecter::_op_cancel(ceph_tid_t tid, int r)
+{
+ int ret = 0;
+
+ ldout(cct, 5) << __func__ << ": cancelling tid " << tid << " r=" << r << dendl;
start:
/* oh no! raced, maybe tid moved to another session, restarting */
goto start;
}
- rwlock.unlock();
return ret;
}
s->lock.unlock();
}
+ ldout(cct, 5) << __func__ << ": tid " << tid << " not found in live sessions" << dendl;
+
// Handle case where the op is in homeless session
homeless_session->lock.get_read();
if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
/* oh no! raced, maybe tid moved to another session, restarting */
goto start;
} else {
- rwlock.unlock();
return ret;
}
} else {
homeless_session->lock.unlock();
}
- rwlock.unlock();
+ ldout(cct, 5) << __func__ << ": tid " << tid << " not found in homeless session" << dendl;
return ret;
}
+/**
+ * Any write op which is in progress at the start of this call shall no longer
+ * be in progress when this call ends. Operations started after the start
+ * of this call may still be in progress when this call ends.
+ *
+ * @return the latest possible epoch in which a cancelled op could have existed
+ */
+epoch_t Objecter::op_cancel_writes(int r)
+{
+ rwlock.get_write();
+
+ std::vector<ceph_tid_t> to_cancel;
+
+ for (map<int, OSDSession *>::iterator siter = osd_sessions.begin(); siter != osd_sessions.end(); ++siter) {
+ OSDSession *s = siter->second;
+ s->lock.get_read();
+ for (map<ceph_tid_t, Op*>::iterator op_i = s->ops.begin(); op_i != s->ops.end(); ++op_i) {
+ if (op_i->second->target.flags & CEPH_OSD_FLAG_WRITE) {
+ to_cancel.push_back(op_i->first);
+ }
+ }
+ s->lock.unlock();
+ }
+
+ for (std::vector<ceph_tid_t>::iterator titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
+ int cancel_result = _op_cancel(*titer, r);
+ // We hold rwlock across search and cancellation, so cancels should always succeed
+ assert(cancel_result == 0);
+ }
+
+ const epoch_t epoch = osdmap->get_epoch();
+
+ rwlock.unlock();
+
+ return epoch;
+}
+
bool Objecter::is_pg_changed(
int oldprimary,
const vector<int>& oldacting,
/// cancel an in-progress request with the given return code
private:
int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
+ int _op_cancel(ceph_tid_t tid, int r);
friend class C_CancelOp;
public:
int op_cancel(ceph_tid_t tid, int r);
+ epoch_t op_cancel_writes(int r);
// commands
int osd_command(int osd, vector<string>& cmd,