int librados::IoCtxImpl::watch_check(uint64_t cookie)
{
- auto linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
- auto r = objecter->linger_check(linger_op);
+ boost::intrusive_ptr linger_op = objecter->linger_by_cookie(cookie);
+ if (!linger_op) {
+ return -ENOTCONN;
+ }
+ auto r = objecter->linger_check(linger_op.get());
if (r)
return 1 + std::chrono::duration_cast<
std::chrono::milliseconds>(*r).count();
int librados::IoCtxImpl::unwatch(uint64_t cookie)
{
- Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ boost::intrusive_ptr linger_op = objecter->linger_by_cookie(cookie);
+ if (!linger_op) {
+ return -ENOTCONN;
+ }
+
C_SaferCond onfinish;
version_t ver = 0;
objecter->mutate(linger_op->target.base_oid, oloc, wr,
snapc, ceph::real_clock::now(), extra_op_flags,
&onfinish, &ver);
- objecter->linger_cancel(linger_op);
+ objecter->linger_cancel(linger_op.get());
int r = onfinish.wait();
set_sync_op_version(ver);
int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
{
c->io = this;
- Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ boost::intrusive_ptr linger_op = objecter->linger_by_cookie(cookie);
+ if (!linger_op) {
+ return -ENOTCONN;
+ }
Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
::ObjectOperation wr;
}
void RADOS::next_notification_(uint64_t cookie, NextNotificationComp c) {
- Objecter::LingerOp* linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
- if (!impl->objecter->is_valid_watch(linger_op)) {
+ boost::intrusive_ptr linger_op = impl->objecter->linger_by_cookie(cookie);
+ if (!linger_op) {
dispatch(asio::append(std::move(c),
bs::error_code(ENOTCONN, bs::generic_category()),
Notification{}));
tl::expected<ceph::timespan, bs::error_code> RADOS::check_watch(uint64_t cookie)
{
- auto linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
- if (impl->objecter->is_valid_watch(linger_op)) {
- return impl->objecter->linger_check(linger_op);
+ boost::intrusive_ptr linger_op = impl->objecter->linger_by_cookie(cookie);
+ if (linger_op) {
+ return impl->objecter->linger_check(linger_op.get());
} else {
return tl::unexpected(bs::error_code(ENOTCONN, bs::generic_category()));
}
{
auto ioc = reinterpret_cast<const IOContextImpl*>(&_ioc.impl);
- Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ boost::intrusive_ptr linger_op = impl->objecter->linger_by_cookie(cookie);
+ if (!linger_op) {
+ dispatch(asio::append(std::move(c),
+ bs::error_code(ENOTCONN, bs::generic_category())));
+ return;
+ }
ObjectOperation op;
op.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
[objecter = impl->objecter,
linger_op, c = std::move(c)]
(bs::error_code ec) mutable {
- objecter->linger_cancel(linger_op);
+ objecter->linger_cancel(linger_op.get());
asio::dispatch(asio::append(std::move(c), ec));
}));
}
boost::intrusive_ptr<Objecter::LingerOp> info;
bs::error_code ec;
public:
- CB_DoWatchError(Objecter *o, Objecter::LingerOp *i,
+ CB_DoWatchError(Objecter *o,
+ boost::intrusive_ptr<Objecter::LingerOp> i,
bs::error_code ec)
- : objecter(o), info(i), ec(ec) {
+ : objecter(o), info(std::move(i)), ec(ec) {
info->_queued_async();
}
void operator()() {
}
}
+auto Objecter::linger_by_cookie(uint64_t cookie)
+ -> boost::intrusive_ptr<LingerOp>
+{
+ auto lock = std::shared_lock{rwlock};
+ return _linger_by_cookie(cookie);
+}
+auto Objecter::_linger_by_cookie(uint64_t cookie)
+ -> boost::intrusive_ptr<LingerOp>
+{
+ auto info = reinterpret_cast<LingerOp*>(cookie);
+ if (!linger_ops_set.contains(info)) {
+ return nullptr; // invalid or canceled op
+ }
+ return info;
+}
Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
const object_locator_t& oloc,
Objecter *objecter;
boost::intrusive_ptr<Objecter::LingerOp> info;
boost::intrusive_ptr<MWatchNotify> msg;
- CB_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
- : objecter(o), info(i), msg(m) {
+ CB_DoWatchNotify(Objecter *o,
+ boost::intrusive_ptr<Objecter::LingerOp> i,
+ MWatchNotify *m)
+ : objecter(o), info(std::move(i)), msg(m) {
info->_queued_async();
}
void operator()() {
return;
}
- LingerOp *info = reinterpret_cast<LingerOp*>(m->cookie);
- if (linger_ops_set.count(info) == 0) {
+ boost::intrusive_ptr info = _linger_by_cookie(m->cookie);
+ if (!info) {
ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
return;
}
if (!info->last_error) {
info->last_error = bs::error_code(ENOTCONN, osd_category());
if (info->handle) {
- asio::defer(finish_strand, CB_DoWatchError(this, info,
- info->last_error));
+ boost::system::error_code ec = info->last_error;
+ asio::defer(finish_strand, CB_DoWatchError(this, std::move(info), ec));
}
}
} else if (!info->is_watch) {
info->on_notify_finish = nullptr;
}
} else {
- asio::defer(finish_strand, CB_DoWatchNotify(this, info, m));
+ asio::defer(finish_strand, CB_DoWatchNotify(this, std::move(info), m));
}
}
friend class CB_DoWatchError;
public:
- bool is_valid_watch(LingerOp* op) {
- std::shared_lock l(rwlock);
- return linger_ops_set.contains(op);
- }
-
template<typename CT>
auto linger_callback_flush(CT&& ct) {
auto consigned = boost::asio::consign(
void linger_cancel(LingerOp *info); // releases a reference
void _linger_cancel(LingerOp *info);
+ // return the LingerOp associated with the given cookie.
+ // may return nullptr if the cookie is no longer valid
+ boost::intrusive_ptr<LingerOp> linger_by_cookie(uint64_t cookie);
+ private:
+ // internal version that expects the caller to hold rwlock
+ boost::intrusive_ptr<LingerOp> _linger_by_cookie(uint64_t cookie);
+ public:
+
void _do_watch_notify(boost::intrusive_ptr<LingerOp> info,
boost::intrusive_ptr<MWatchNotify> m);