};
asio::io_context::executor_type ex;
+ Objecter::LingerOp* linger_op;
// Zero for unbounded. I would not recommend this.
const uint32_t capacity;
uint64_t next_id = 0;
void service_shutdown() {
+ if (linger_op) {
+ linger_op->put();
+ }
std::unique_lock l(m);
handlers.clear();
}
public:
- Notifier(asio::io_context::executor_type ex, uint32_t capacity)
- : ex(ex), capacity(capacity),
+ Notifier(asio::io_context::executor_type ex, Objecter::LingerOp* linger_op,
+ uint32_t capacity)
+ : ex(ex), linger_op(linger_op), capacity(capacity),
svc(asio::use_service<async::service<Notifier>>(
asio::query(ex, boost::asio::execution::context))) {
// register for service_shutdown() notifications
linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
asio::bind_executor(
std::move(e),
- [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+ [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable {
+ if (e) {
+ linger_op->objecter->linger_cancel(linger_op);
+ cookie = 0;
+ }
asio::dispatch(asio::append(std::move(c), e, cookie));
}), nullptr);
}
uint64_t cookie = linger_op->get_cookie();
// Shared pointer to avoid a potential race condition
linger_op->user_data.emplace<std::shared_ptr<Notifier>>(
- std::make_shared<Notifier>(get_executor(), queue_size));
+ std::make_shared<Notifier>(get_executor(), linger_op, queue_size));
auto& n = ceph::any_cast<std::shared_ptr<Notifier>&>(
linger_op->user_data);
linger_op->handle = std::ref(*n);
linger_op, op, ioc->snapc, ceph::real_clock::now(), bl,
asio::bind_executor(
std::move(e),
- [c = std::move(c), cookie](bs::error_code e, cb::list) mutable {
+ [c = std::move(c), cookie, linger_op](bs::error_code e, cb::list) mutable {
+ if (e) {
+ linger_op->user_data.reset();
+ linger_op->objecter->linger_cancel(linger_op);
+ cookie = 0;
+ }
asio::dispatch(asio::append(std::move(c), e, cookie));
}), nullptr);
}
[objecter = impl->objecter,
linger_op, c = std::move(c)]
(bs::error_code ec) mutable {
- if (!ec) {
- objecter->linger_cancel(linger_op);
- }
+ objecter->linger_cancel(linger_op);
asio::dispatch(asio::append(std::move(c), ec));
}));
}
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
std::move(opv), info->target.flags | CEPH_OSD_FLAG_READ,
- fu2::unique_function<Op::OpSig>{CB_Linger_Ping(this, info, now)},
+ CB_Linger_Ping(this, info, now),
nullptr, nullptr);
o->target = info->target;
o->should_resend = false;
ec = _normalize_watch_error(ec);
info->last_error = ec;
if (info->handle) {
- asio::defer(finish_strand, CB_DoWatchError(this, info, ec));
+ asio::post(finish_strand, CB_DoWatchError(this, info, ec));
}
}
} else {
}
Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
- int f, OpComp&& fin, version_t *ov, int *offset = nullptr,
+ int f, OpComp fin, version_t *ov, int *offset = nullptr,
ZTracer::Trace *parent_trace = nullptr) :
target(o, ol, f),
ops(std::move(_ops)),
}
}
- Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
- int f, fu2::unique_function<OpSig>&& fin, version_t *ov, int *offset = nullptr,
- ZTracer::Trace *parent_trace = nullptr) :
- target(o, ol, f),
- ops(std::move(_ops)),
- out_bl(ops.size(), nullptr),
- out_handler(ops.size()),
- out_rval(ops.size(), nullptr),
- out_ec(ops.size(), nullptr),
- onfinish(std::move(fin)),
- objver(ov),
- data_offset(offset) {
- if (target.base_oloc.key == o)
- target.base_oloc.key.clear();
- if (parent_trace && parent_trace->valid()) {
- trace.init("op", nullptr, parent_trace);
- trace.event("start");
- }
- }
-
bool operator<(const Op& other) const {
return tid < other.tid;
}