using permission_denied = ct_error_code<std::errc::permission_denied>;
using operation_not_supported =
ct_error_code<std::errc::operation_not_supported>;
+ using not_connected = ct_error_code<std::errc::not_connected>;
}
using stateful_errc = stateful_error_t<std::errc>;
});
}
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ const entity_name_t& entity = get_message().get_reqid().name;
+ const auto& cookie = osd_op.op.watch.cookie;
+ if (!os.oi.watchers.count(std::make_pair(cookie, entity))) {
+ return crimson::ct_error::not_connected::make();
+ } else {
+ logger().info("found existing watch by {}", entity);
+ return do_op_watch_subop_watch(osd_op, os, txn);
+ }
+}
+
OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
OSDOp& osd_op,
ObjectState& os,
case CEPH_OSD_WATCH_OP_WATCH:
return do_op_watch_subop_watch(osd_op, os, txn);
case CEPH_OSD_WATCH_OP_RECONNECT:
- // TODO: implement reconnect
- break;
+ return do_op_watch_subop_reconnect(osd_op, os, txn);
case CEPH_OSD_WATCH_OP_PING:
// TODO: implement ping
break;
using get_attr_errorator = PGBackend::get_attr_errorator;
using watch_errorator = crimson::errorator<
crimson::ct_error::enoent,
- crimson::ct_error::invarg>;
+ crimson::ct_error::invarg,
+ crimson::ct_error::not_connected>;
public:
// because OpsExecuter is pretty heavy-weight object we want to ensure
class OSDOp& osd_op,
class ObjectState& os,
ceph::os::Transaction& txn);
+ watch_errorator::future<> do_op_watch_subop_reconnect(
+ class OSDOp& osd_op,
+ class ObjectState& os,
+ ceph::os::Transaction& txn);
watch_errorator::future<> do_op_watch_subop_unwatch(
class OSDOp& osd_op,
class ObjectState& os,