// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include <algorithm>
+
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/range/algorithm_ext/insert.hpp>
+
#include "crimson/osd/watch.h"
#include "messages/MWatchNotify.h"
});
}
+void Watch::cancel_notify(const uint64_t notify_id)
+{
+ logger().info("{} notify_id={}", __func__, notify_id);
+ const auto it = std::find_if(
+ std::begin(in_progress_notifies), std::end(in_progress_notifies),
+ [notify_id] (const auto& notify) {
+ return notify->ninfo.notify_id == notify_id;
+ });
+ assert(it != std::end(in_progress_notifies));
+ in_progress_notifies.erase(it);
+}
+
bool notify_reply_t::operator<(const notify_reply_t& rhs) const
{
// comparing std::pairs to emphasize our legacy. ceph-osd stores
return remove_watcher(std::move(watch));
}
-seastar::future<> Notify::maybe_send_completion()
+seastar::future<> Notify::maybe_send_completion(
+ std::set<WatchRef> timedout_watchers)
{
- logger().info("{} -- {} in progress watchers", __func__, watchers.size());
- if (watchers.empty()) {
+ logger().info("{} -- {} in progress watchers, {} timedout watchers {}",
+ __func__, watchers.size(), timedout_watchers.size());
+ if (watchers.empty() || !timedout_watchers.empty()) {
logger().debug("{} sending notify replies: {}", __func__, notify_replies);
- // prepare reply
- ceph::bufferlist bl;
- encode(notify_replies, bl);
- // FIXME: this is just a stub
- std::list<std::pair<uint64_t,uint64_t>> missed;
- encode(missed, bl);
-
complete = true;
+ timeout_timer.cancel();
ceph::bufferlist empty;
auto reply = make_message<MWatchNotify>(
CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
empty,
client_gid);
- reply->set_data(bl);
+ ceph::bufferlist reply_bl;
+ {
+ std::vector<std::pair<uint64_t,uint64_t>> missed;
+ missed.reserve(std::size(timedout_watchers));
+ boost::insert(
+ missed, std::begin(missed),
+ timedout_watchers | boost::adaptors::transformed([] (auto w) {
+ return std::make_pair(w->get_watcher_gid(), w->get_cookie());
+ }));
+ ceph::encode(notify_replies, reply_bl);
+ ceph::encode(missed, reply_bl);
+ }
+ reply->set_data(std::move(reply_bl));
+ if (!timedout_watchers.empty()) {
+ reply->return_code = -ETIMEDOUT;
+ }
return conn->send(std::move(reply));
}
return seastar::now();
}
+void Notify::do_timeout()
+{
+ logger().debug("{} complete={}", __func__, complete);
+ if (complete) {
+ return;
+ }
+ decltype(watchers) timedout_watchers;
+ std::swap(watchers, timedout_watchers);
+ for (auto& watcher : timedout_watchers) {
+ watcher->cancel_notify(ninfo.notify_id);
+ }
+ std::ignore = maybe_send_completion(std::move(timedout_watchers));
+}
+
} // namespace crimson::osd
uint64_t get_cookie() const {
return winfo.cookie;
}
+ void cancel_notify(const uint64_t notify_id);
};
using WatchRef = seastar::shared_ptr<Watch>;
uint64_t user_version;
bool complete = false;
bool discarded = false;
+ seastar::timer<seastar::lowres_clock> timeout_timer{
+ [this] { do_timeout(); }
+ };
/// (gid,cookie) -> reply_bl for everyone who acked the notify
std::multiset<notify_reply_t> notify_replies;
uint64_t get_id() const { return ninfo.notify_id; }
- seastar::future<> maybe_send_completion();
+
+ /// Sends notify completion if watchers.empty() or timeout
+ seastar::future<> maybe_send_completion(
+ std::set<WatchRef> timedout_watchers = {});
+
+ /// Called on Notify timeout
+ void do_timeout();
template <class WatchIteratorT>
Notify(WatchIteratorT begin,
conn(std::move(conn)),
client_gid(client_gid),
user_version(user_version) {
+ if (ninfo.timeout) {
+ timeout_timer.arm(std::chrono::seconds{ninfo.timeout});
+ }
}
template <class WatchIteratorT, class... Args>