Tag operations with a subsystem so we can cancel them all in one go.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
auto execute(Object o, IOContext ioc, ReadOp op,
ceph::buffer::list* bl,
CompletionToken&& token, uint64_t* objver = nullptr,
- const blkin_trace_info* trace_info = nullptr) {
+ const blkin_trace_info* trace_info = nullptr,
+ std::uint64_t subsystem = 0) {
auto consigned = consign(std::forward<CompletionToken>(token));
return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
- [bl, objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
- ReadOp op) {
+ [bl, objver, trace_info,
+ subsystem, this](auto&& handler, Object o, IOContext ioc,
+ ReadOp op) {
execute_(std::move(o), std::move(ioc), std::move(op), bl,
- std::move(handler), objver, trace_info);
+ std::move(handler), objver, trace_info, subsystem);
}, consigned, std::move(o), std::move(ioc), std::move(op));
}
template<boost::asio::completion_token_for<Op::Signature> CompletionToken>
auto execute(Object o, IOContext ioc, WriteOp op,
CompletionToken&& token, uint64_t* objver = nullptr,
- const blkin_trace_info* trace_info = nullptr) {
+ const blkin_trace_info* trace_info = nullptr,
+ std::uint64_t subsystem = 0) {
auto consigned = consign(std::forward<CompletionToken>(token));
return boost::asio::async_initiate<decltype(consigned), Op::Signature>(
- [objver, trace_info, this](auto&& handler, Object o, IOContext ioc,
- WriteOp op) {
+ [objver, trace_info,
+ subsystem, this](auto&& handler, Object o, IOContext ioc, WriteOp op) {
execute_(std::move(o), std::move(ioc), std::move(op),
- std::move(handler), objver, trace_info);
+ std::move(handler), objver, trace_info, subsystem);
}, consigned, std::move(o), std::move(ioc), std::move(op));
}
uint64_t instance_id() const;
+ uint64_t new_subsystem() const;
+ void cancel_subsystem(uint64_t subsystem) const;
+
private:
RADOS();
void execute_(Object o, IOContext ioc, ReadOp op,
ceph::buffer::list* bl, Op::Completion c,
- uint64_t* objver, const blkin_trace_info* trace_info);
+ uint64_t* objver, const blkin_trace_info* trace_info,
+ uint64_t subsystem);
void execute_(Object o, IOContext ioc, WriteOp op,
Op::Completion c, uint64_t* objver,
- const blkin_trace_info* trace_info);
+ const blkin_trace_info* trace_info,
+ uint64_t subsystem);
void lookup_pool_(std::string name, LookupPoolComp c);
void list_pools_(LSPoolsComp c);
void RADOS::execute_(Object o, IOContext _ioc, ReadOp _op,
cb::list* bl,
ReadOp::Completion c, version_t* objver,
- const blkin_trace_info *trace_info) {
+ const blkin_trace_info *trace_info,
+ std::uint64_t subsystem) {
if (_op.size() == 0) {
asio::dispatch(asio::append(std::move(c), bs::error_code{}));
return;
trace.event("init");
impl->objecter->read(
*oid, ioc->oloc, std::move(op->op), ioc->snap_seq, bl, flags,
- std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace);
+ std::move(c), objver, nullptr /* data_offset */, 0 /* features */, &trace,
+ subsystem);
trace.event("submitted");
}
void RADOS::execute_(Object o, IOContext _ioc, WriteOp _op,
WriteOp::Completion c, version_t* objver,
- const blkin_trace_info *trace_info) {
+ const blkin_trace_info *trace_info,
+ std::uint64_t subsystem) {
if (_op.size() == 0) {
asio::dispatch(asio::append(std::move(c), bs::error_code{}));
return;
impl->objecter->mutate(
*oid, ioc->oloc, std::move(op->op), ioc->snapc,
mtime, flags,
- std::move(c), objver, osd_reqid_t{}, &trace);
+ std::move(c), objver, osd_reqid_t{}, &trace, subsystem);
trace.event("submitted");
}
return impl->get_instance_id();
}
+uint64_t RADOS::new_subsystem() const
+{
+ return impl->objecter->unique_subsystem_id();
+}
+
+void RADOS::cancel_subsystem(uint64_t subsystem) const
+{
+ impl->objecter->subsystem_cancel(subsystem,
+ asio::error::operation_aborted);
+}
+
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
#pragma clang diagnostic push
ldout(cct, 5) << num_in_flight << " in flight" << dendl;
}
-int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
+int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r,
+ bs::error_code ec)
{
ceph_assert(initialized);
Op *op = p->second;
if (op->has_completion()) {
num_in_flight--;
- op->complete(osdcode(r), r, service.get_executor());
+ op->complete(ec, r, service.get_executor());
}
_op_cancel_map_check(op);
_finish_op(op, r);
return 0;
}
+void Objecter::subsystem_cancel(uint64_t subsystem, bs::error_code ec)
+{
+ unique_lock wl(rwlock);
+
+ ldout(cct, 5) << __func__ << ": cancelling subsystem " << subsystem
+ << " ec=" << ec.message() << dendl;
+
+ auto next_subsystem_op = [subsystem](const decltype(OSDSession::ops)& ops) {
+ auto i = std::find_if(
+ ops.begin(), ops.end(),
+ [subsystem](const auto& el) -> std::optional<ceph_tid_t> {
+ return el.second->subsystem == subsystem;
+ });
+ if (i != ops.cend()) {
+ return std::make_optional(i->second->tid);
+ }
+ return std::optional<ceph_tid_t>{std::nullopt};
+ };
+
+ // Since we only do this at shutdown, better to have it be slow than
+ // make it fast and introduce another cost per op.
+
+start:
+
+ for (auto siter = osd_sessions.begin();
+ siter != osd_sessions.end(); ++siter) {
+ auto s = siter->second;
+ shared_lock sl(s->lock);
+ while (auto tid = next_subsystem_op(s->ops)) {
+ sl.unlock();
+ auto ret = op_cancel(s, *tid, ceph::from_error_code(ec), ec);
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ }
+ sl.lock();
+ }
+ }
+
+ // Handle case where the op is in homeless session
+ shared_lock sl(homeless_session->lock);
+ while (auto tid = next_subsystem_op(homeless_session->ops)) {
+ sl.unlock();
+ auto ret = op_cancel(homeless_session, *tid, ceph::from_error_code(ec), ec);
+ if (ret == -ENOENT) {
+ /* oh no! raced, maybe tid moved to another session, restarting */
+ goto start;
+ }
+ }
+ sl.unlock();
+}
+
int Objecter::op_cancel(ceph_tid_t tid, int r)
{
int ret = 0;
shared_lock sl(s->lock);
if (s->ops.find(tid) != s->ops.end()) {
sl.unlock();
- ret = op_cancel(s, tid, r);
+ ret = op_cancel(s, tid, r, osdcode(r));
if (ret == -ENOENT) {
/* oh no! raced, maybe tid moved to another session, restarting */
goto start;
shared_lock sl(homeless_session->lock);
if (homeless_session->ops.find(tid) != homeless_session->ops.end()) {
sl.unlock();
- ret = op_cancel(homeless_session, tid, r);
+ ret = op_cancel(homeless_session, tid, r, osdcode(r));
if (ret == -ENOENT) {
/* oh no! raced, maybe tid moved to another session, restarting */
goto start;
sl.unlock();
for (auto titer = to_cancel.begin(); titer != to_cancel.end(); ++titer) {
- int cancel_result = op_cancel(s, *titer, r);
+ int cancel_result = op_cancel(s, *titer, r, osdcode(r));
// We hold rwlock across search and cancellation, so cancels
// should always succeed
ceph_assert(cancel_result == 0);
private:
std::atomic<uint64_t> last_tid{0};
+ std::atomic<uint64_t> last_subsystem{0};
std::atomic<unsigned> inflight_ops{0};
std::atomic<int> client_inc{-1};
uint64_t max_linger_id{0};
void maybe_request_map();
void enable_blocklist_events();
+
+ uint64_t unique_subsystem_id() {
+ return ++last_subsystem;
+ }
+
private:
void _maybe_request_map();
osd_reqid_t reqid; // explicitly setting reqid
ZTracer::Trace trace;
+ std::uint64_t subsystem = 0;
const jspan_context* otel_trace = nullptr;
static bool has_completion(decltype(onfinish)& f) {
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
int f, OpComp fin, version_t *ov, int *offset = nullptr,
- ZTracer::Trace *parent_trace = nullptr) :
+ ZTracer::Trace *parent_trace = nullptr, uint64_t subsystem = 0) :
target(o, ol, f),
ops(std::move(_ops)),
out_bl(ops.size(), nullptr),
out_ec(ops.size(), nullptr),
onfinish(std::move(fin)),
objver(ov),
- data_offset(offset) {
+ data_offset(offset), subsystem(subsystem) {
if (target.base_oloc.key == o)
target.base_oloc.key.clear();
if (parent_trace && parent_trace->valid()) {
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
int f, Context* fin, version_t *ov, int *offset = nullptr,
- ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr) :
+ ZTracer::Trace *parent_trace = nullptr, const jspan_context *otel_trace = nullptr,
+ uint64_t subsystem = 0) :
target(o, ol, f),
ops(std::move(_ops)),
out_bl(ops.size(), nullptr),
onfinish(fin),
objver(ov),
data_offset(offset),
+ subsystem(subsystem),
otel_trace(otel_trace) {
if (target.base_oloc.key == o)
target.base_oloc.key.clear();
/// cancel an in-progress request with the given return code
private:
- int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
+ int op_cancel(OSDSession *s, ceph_tid_t tid, int r,
+ boost::system::error_code ec);
int _op_cancel(ceph_tid_t tid, int r);
int get_read_flags(int flags) {
public:
int op_cancel(ceph_tid_t tid, int r);
+ void subsystem_cancel(uint64_t subsystem, boost::system::error_code e);
int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
/**
ceph::real_time mtime, int flags,
Op::OpComp oncommit,
version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
- ZTracer::Trace *parent_trace = nullptr) {
+ ZTracer::Trace *parent_trace = nullptr, uint32_t subsystem = 0) {
Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
- nullptr, parent_trace);
+ nullptr, parent_trace, subsystem);
o->priority = op.priority;
o->mtime = mtime;
o->snapc = snapc;
ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
int flags, Op::OpComp onack,
version_t *objver = nullptr, int *data_offset = nullptr,
- uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
+ uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr,
+ uint64_t subsystem = 0) {
Op *o = new Op(oid, oloc, std::move(op.ops), get_read_flags(flags),
std::move(onack), objver,
- data_offset, parent_trace);
+ data_offset, parent_trace, subsystem);
o->priority = op.priority;
o->snapid = snapid;
o->outbl = pbl;
void RADOS::execute_(Object o, IOContext ioc, ReadOp op,
ceph::buffer::list* bl, Op::Completion c,
- uint64_t* objver, const blkin_trace_info* trace_info) {
+ uint64_t* objver, const blkin_trace_info* trace_info,
+ uint64_t subsystem) {
auto io_ctx = impl->get_io_ctx(ioc);
if (io_ctx == nullptr) {
asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));
void RADOS::execute_(Object o, IOContext ioc, WriteOp op,
Op::Completion c, uint64_t* objver,
- const blkin_trace_info* trace_info) {
+ const blkin_trace_info* trace_info,
+ uint64_t subsystem) {
auto io_ctx = impl->get_io_ctx(ioc);
if (io_ctx == nullptr) {
asio::dispatch(asio::append(std::move(c), osdc_errc::pool_dne));