}
[[maybe_unused]] const auto num_removed = watchers.erase(watch);
assert(num_removed > 0);
- return maybe_send_completion();
+ if (watchers.empty()) {
+ return send_completion();
+ } else {
+ return seastar::now();
+ }
}
return remove_watcher(std::move(watch));
}
-seastar::future<> Notify::maybe_send_completion(
+seastar::future<> Notify::send_completion(
std::set<WatchRef> timedout_watchers)
{
logger().info("{} -- {} in progress watchers, {} timedout watchers {}",
__func__, watchers.size(), timedout_watchers.size());
- if (watchers.empty() || !timedout_watchers.empty()) {
- logger().debug("{} sending notify replies: {}", __func__, notify_replies);
- complete = true;
- timeout_timer.cancel();
-
- ceph::bufferlist empty;
- auto reply = make_message<MWatchNotify>(
- ninfo.cookie,
- user_version,
- ninfo.notify_id,
- CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
- empty,
- client_gid);
- ceph::bufferlist reply_bl;
- {
- std::vector<std::pair<uint64_t,uint64_t>> missed;
- missed.reserve(std::size(timedout_watchers));
- boost::insert(
- missed, std::begin(missed),
- timedout_watchers | boost::adaptors::transformed([] (auto w) {
- return std::make_pair(w->get_watcher_gid(), w->get_cookie());
- }));
- ceph::encode(notify_replies, reply_bl);
- ceph::encode(missed, reply_bl);
- }
- reply->set_data(std::move(reply_bl));
- if (!timedout_watchers.empty()) {
- reply->return_code = -ETIMEDOUT;
- }
- return conn->send(std::move(reply));
+ logger().debug("{} sending notify replies: {}", __func__, notify_replies);
+ complete = true;
+ timeout_timer.cancel();
+
+ ceph::bufferlist empty;
+ auto reply = make_message<MWatchNotify>(
+ ninfo.cookie,
+ user_version,
+ ninfo.notify_id,
+ CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
+ empty,
+ client_gid);
+ ceph::bufferlist reply_bl;
+ {
+ std::vector<std::pair<uint64_t,uint64_t>> missed;
+ missed.reserve(std::size(timedout_watchers));
+ boost::insert(
+ missed, std::begin(missed),
+ timedout_watchers | boost::adaptors::transformed([] (auto w) {
+ return std::make_pair(w->get_watcher_gid(), w->get_cookie());
+ }));
+ ceph::encode(notify_replies, reply_bl);
+ ceph::encode(missed, reply_bl);
}
- return seastar::now();
+ reply->set_data(std::move(reply_bl));
+ if (!timedout_watchers.empty()) {
+ reply->return_code = -ETIMEDOUT;
+ }
+ return conn->send(std::move(reply));
}
void Notify::do_timeout()
for (auto& watcher : watchers) {
watcher->cancel_notify(ninfo.notify_id);
}
- std::ignore = maybe_send_completion(std::move(watchers));
+ std::ignore = send_completion(std::move(watchers));
watchers.clear();
}
uint64_t get_id() const { return ninfo.notify_id; }
/// Sends notify completion if watchers.empty() or timeout
- seastar::future<> maybe_send_completion(
+ seastar::future<> send_completion(
std::set<WatchRef> timedout_watchers = {});
/// Called on Notify timeout
begin,
end,
std::forward<Args>(args)...);
- return seastar::do_for_each(begin, end, [=] (auto& watchref) {
- return watchref->start_notify(notify);
- }).then([notify = std::move(notify)] {
- return notify->maybe_send_completion();
- });
+ if (begin == end) {
+ return notify->send_completion();
+ } else {
+ return seastar::do_for_each(begin, end, [=] (auto& watchref) {
+ return watchref->start_notify(notify);
+ });
+ }
}
} // namespace crimson::osd