cop->put();
}
+ // Split ops can only contain standard ops...
+ while(!splitop_session->ops.empty()) {
+ auto i = splitop_session->ops.begin();
+ ldout(cct, 10) << " op " << i->first << dendl;
+ auto op = i->second;
+ {
+ std::unique_lock swl(splitop_session->lock);
+ _session_op_remove(splitop_session, op);
+ }
+ op->put();
+ }
+
if (tick_event) {
if (timer.cancel_event(tick_event)) {
ldout(cct, 10) << " successfully canceled tick" << dendl;
// read | write ---------------------------
+void Objecter::op_post_split_op_complete(Op* op, bs::error_code ec, int rc) {
+ ceph_assert(op->session == splitop_session);
+
+ op->get(); // Keep alive during async operation
+
+ boost::asio::post(service, [this, op, ec, rc]() {
+ shunique_lock rl(rwlock, ceph::acquire_shared);
+
+ bool freed = op->get_nref() == 1;
+ op->put();
+
+ if (freed || op->session != splitop_session) {
+ ceph_assert(!initialized); // Should only happen during shutdown.
+ return;
+ }
+
+ unique_lock sl(op->session->lock);
-void Objecter::op_post_submit(Op* op) {
- boost::asio::post(service, [this, op]() {
- op_submit(op);
+ if (rc != -EAGAIN) {
+ op->trace.event("post op complete");
+ // This removes from session and unlocks sl.
+ complete_op_reply(op, ec, op->session, sl, rc);
+ } else {
+ _session_op_remove(op->session, op);
+ sl.unlock();
+ op->split_op_tids.reset();
+ ceph_tid_t tid = 0;
+ _op_submit(op, rl, &tid);
+ }
});
}
ptid = &tid;
op->trace.event("op submit");
- bool was_split = SplitOp::create(op, *this, rl, ptid, ctx_budget, cct);
+ _op_submit_with_budget(op, rl, ptid, ctx_budget);
+}
- if (!was_split) {
- _op_submit_with_budget(op, rl, ptid, ctx_budget);
+void Objecter::add_op_to_splitop_session(Op *op) {
+ unique_lock sl(splitop_session->lock);
+ if (op->tid == 0) {
+ op->tid = ++last_tid;
}
+ _session_op_assign(splitop_session, op);
+ inflight_ops++;
+ sl.unlock();
}
void Objecter::_op_submit_with_budget(Op *op,
op_cancel(tid, -ETIMEDOUT); });
}
- _op_submit(op, sul, ptid);
+
+ bool was_split = SplitOp::create(op, *this, sul, cct);
+
+ if (was_split) {
+ *ptid = op->tid;
+ } else {
+ _op_submit(op, sul, ptid);
+ }
}
void Objecter::_send_op_account(Op *op)
}
}
- // Handle case where the op is in homeless session
- shared_lock sl(homeless_session->lock);
- while (auto tid = next_subsystem_op(homeless_session->ops)) {
+ {
+ // Handle case where the op is in homeless session
+ shared_lock sl(homeless_session->lock);
+ while (auto tid = next_subsystem_op(homeless_session->ops)) {
+ sl.unlock();
+ auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec);
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ }
+ }
sl.unlock();
- auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec);
- if (ret == -ENOENT) {
- /* oh no! raced, maybe tid moved to another session, restarting */
- goto start;
+ }
+ {
+ // Handle case where the op is in splitop session
+ shared_lock sl(splitop_session->lock);
+ while (auto tid = next_subsystem_op(splitop_session->ops)) {
+ sl.unlock();
+ auto ret = op_cancel(splitop_session, *tid, ceph::from_error_code(ec), ec);
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ }
}
+ sl.unlock();
}
- sl.unlock();
}
int Objecter::op_cancel(ceph_tid_t tid, int r)
ldout(cct, 5) << __func__ << ": tid " << tid
<< " not found in live sessions" << dendl;
- // Handle case where the op is in homeless session
- shared_lock sl(homeless_session->lock);
- if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
- sl.unlock();
- ret = op_cancel(homeless_session, tid, r, osdcode(r));
- if (ret == -ENOENT) {
- /* oh no! raced, maybe tid moved to another session, restarting */
- goto start;
+ {
+ // Handle case where the op is in homeless session
+ shared_lock sl(homeless_session->lock);
+ if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
+ sl.unlock();
+ ret = op_cancel(homeless_session, tid, r, osdcode(r));
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ } else {
+ return ret;
+ }
} else {
- return ret;
+ sl.unlock();
+ }
+ }
+ {
+ shared_lock sl(splitop_session->lock);
+ if (splitop_session->ops.find(tid) != splitop_session->ops.end()) {
+ sl.unlock();
+ ret = op_cancel(splitop_session, tid, r, osdcode(r));
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ } else {
+ return ret;
+ }
+ } else {
+ sl.unlock();
}
- } else {
- sl.unlock();
}
ldout(cct, 5) << __func__ << ": tid " << tid
sl.unlock();
}
_dump_active(homeless_session);
+ _dump_active(splitop_session);
}
void Objecter::dump_active()
sl.unlock();
}
_dump_ops(homeless_session, fmt);
+ _dump_ops(splitop_session, fmt);
fmt->close_section(); // ops array
}
sl.unlock();
}
_dump_linger_ops(homeless_session, fmt);
+ // No linger ops in splitop_session
fmt->close_section(); // linger_ops array
}
sl.unlock();
}
_dump_command_ops(homeless_session, fmt);
+ // No command_ops for splitops session.
fmt->close_section(); // command_ops array
}
Objecter::~Objecter()
{
ceph_assert(homeless_session->get_nref() == 1);
+ ceph_assert(splitop_session->get_nref() == 1);
ceph_assert(num_homeless_ops == 0);
homeless_session->put();
+ splitop_session->put();
ceph_assert(osd_sessions.empty());
ceph_assert(poolstat_ops.empty());