});
}
+seastar::future<> Watch::send_disconnect_msg()
+{
+ if (!is_connected()) {
+ return seastar::now();
+ }
+ ceph::bufferlist empty;
+ return conn->send(make_message<MWatchNotify>(
+ winfo.cookie,
+ 0,
+ 0,
+ CEPH_WATCH_EVENT_DISCONNECT,
+ empty));
+}
+
+void Watch::discard_state()
+{
+ ceph_assert(obc);
+ in_progress_notifies.clear();
+}
+
+seastar::future<> Watch::remove(const bool send_disconnect)
+{
+ logger().info("{}", __func__);
+ auto disconnected = send_disconnect ? send_disconnect_msg()
+ : seastar::now();
+ return std::move(disconnected).then([this] {
+ return seastar::do_for_each(in_progress_notifies,
+ [this_shared=shared_from_this()] (auto notify) {
+ return notify->remove_watcher(this_shared);
+ }).then([this] {
+ discard_state();
+ return seastar::now();
+ });
+ });
+}
+
bool notify_reply_t::operator<(const notify_reply_t& rhs) const
{
// comparing std::pairs to emphasize our legacy. ceph-osd stores
return lhsp < rhsp;
}
+seastar::future<> Notify::remove_watcher(WatchRef watch)
+{
+ if (discarded || complete) {
+ return seastar::now();
+ }
+ [[maybe_unused]] const auto num_removed = watchers.erase(watch);
+ assert(num_removed > 0);
+ return maybe_send_completion();
+}
+
+
seastar::future<> Notify::complete_watcher(
WatchRef watch,
const ceph::bufferlist& reply_bl)
watch->get_watcher_gid(),
watch->get_cookie(),
reply_bl});
- watchers.erase(watch);
- return maybe_send_completion();
+ return remove_watcher(std::move(watch));
}
seastar::future<> Notify::maybe_send_completion()
seastar::future<> start_notify(NotifyRef);
seastar::future<> send_notify_msg(NotifyRef);
+ seastar::future<> send_disconnect_msg();
+ void discard_state();
friend Notify;
// NOP
}
- seastar::future<> remove(bool) {
- return seastar::now();
- }
+ seastar::future<> remove(bool send_disconnect);
/// Call when notify_ack received on notify_id
seastar::future<> notify_ack(
WatchIteratorT end,
Args&&... args);
+ seastar::future<> remove_watcher(WatchRef watch);
seastar::future<> complete_watcher(WatchRef watch,
const ceph::bufferlist& reply_bl);
};