[] (auto&& ctx, ObjectContextRef obc) {
auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
if (emplaced) {
- const auto& cookie = ctx.key.first;
- it->second = crimson::osd::Watch::create(ctx.info, obc);
+ const auto& [cookie, entity] = ctx.key;
+ it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
logger().info("op_effect: added new watcher: {}", ctx.key);
} else {
logger().info("op_effect: found existing watcher: {}", ctx.key);
return crimson::ct_error::enoent::make();
}
struct notify_ctx_t {
+ crimson::net::ConnectionRef conn;
notify_info_t ninfo;
const uint64_t client_gid;
const epoch_t epoch;
notify_ctx_t(const MOSDOp& msg)
- : client_gid(msg.get_reqid().name.num()),
+ : conn(msg.get_connection()),
+ client_gid(msg.get_reqid().name.num()),
epoch(msg.get_map_epoch()) {
}
};
return crimson::osd::Notify::create_n_propagate(
std::begin(alive_watchers),
std::end(alive_watchers),
+ std::move(ctx.conn),
ctx.ninfo,
ctx.client_gid,
obc->obs.oi.user_version);
return is_connected() ? send_notify_msg(*it) : seastar::now();
}
+seastar::future<> Watch::notify_ack(
+ const uint64_t notify_id,
+ const ceph::bufferlist& reply_bl)
+{
+ logger().info("{}", __func__);
+ return seastar::do_for_each(in_progress_notifies,
+ [this_shared=shared_from_this(), &reply_bl] (auto notify) {
+ return notify->complete_watcher(this_shared, reply_bl);
+ }
+ ).then([this] {
+ in_progress_notifies.clear();
+ return seastar::now();
+ });
+}
+
+bool notify_reply_t::operator<(const notify_reply_t& rhs) const
+{
+ // comparing std::pairs to emphasize our legacy. ceph-osd stores
+ // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
+ // unfortunately, what seems to be an implementation detail, got
+ // exposed as part of our public API (the `reply_buffer` parameter
+ // of the `rados_notify` family).
+ const auto lhsp = std::make_pair(watcher_gid, watcher_cookie);
+ const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie);
+ return lhsp < rhsp;
+}
+
+seastar::future<> Notify::complete_watcher(
+ WatchRef watch,
+ const ceph::bufferlist& reply_bl)
+{
+ if (discarded || complete) {
+ return seastar::now();
+ }
+ notify_replies.emplace(notify_reply_t{
+ watch->get_watcher_gid(),
+ watch->get_cookie(),
+ reply_bl});
+ watchers.erase(watch);
+ return maybe_send_completion();
+}
+
+seastar::future<> Notify::maybe_send_completion()
+{
+ logger().info("{} -- {} in progress watchers", __func__, watchers.size());
+ if (watchers.empty()) {
+ // prepare reply
+ ceph::bufferlist bl;
+ encode(notify_replies, bl);
+ // FIXME: this is just a stub
+ std::list<std::pair<uint64_t,uint64_t>> missed;
+ encode(missed, bl);
+
+ complete = true;
+
+ ceph::bufferlist empty;
+ auto reply = make_message<MWatchNotify>(
+ ninfo.cookie,
+ user_version,
+ ninfo.notify_id,
+ CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
+ empty,
+ client_gid);
+ reply->set_data(bl);
+ return conn->send(std::move(reply));
+ }
+ return seastar::now();
+}
+
} // namespace crimson::osd
#pragma once
#include <iterator>
+#include <map>
#include <set>
#include <seastar/core/shared_ptr.hh>
#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
+#include "include/denc.h"
namespace crimson::osd {
crimson::osd::ObjectContextRef obc;
watch_info_t winfo;
+ entity_name_t entity_name;
seastar::future<> start_notify(NotifyRef);
seastar::future<> send_notify_msg(NotifyRef);
public:
Watch(private_ctag_t,
+ crimson::osd::ObjectContextRef obc,
const watch_info_t& winfo,
- crimson::osd::ObjectContextRef obc)
- : winfo(winfo),
- obc(std::move(obc)) {
+ const entity_name_t& entity_name)
+ : obc(std::move(obc)),
+ winfo(winfo),
+ entity_name(entity_name) {
}
seastar::future<> connect(crimson::net::ConnectionRef, bool);
/// Call when notify_ack received on notify_id
seastar::future<> notify_ack(
uint64_t notify_id, ///< [in] id of acked notify
- const ceph::bufferlist& reply_bl) { ///< [in] notify reply buffer
- return seastar::now();
- }
+ const ceph::bufferlist& reply_bl); ///< [in] notify reply buffer
template <class... Args>
static seastar::shared_ptr<Watch> create(Args&&... args) {
return seastar::make_shared<Watch>(private_ctag_t{},
std::forward<Args>(args)...);
};
+
+ uint64_t get_watcher_gid() const {
+ return entity_name.num();
+ }
+ uint64_t get_cookie() const {
+ return winfo.cookie;
+ }
};
using WatchRef = seastar::shared_ptr<Watch>;
+struct notify_reply_t {
+ uint64_t watcher_gid;
+ uint64_t watcher_cookie;
+ ceph::bufferlist bl;
+
+ bool operator<(const notify_reply_t& rhs) const;
+ DENC(notify_reply_t, v, p) {
+ DENC_START(1, 1, p);
+ denc(v.watcher_gid, p);
+ denc(v.watcher_cookie, p);
+ denc(v.bl, p);
+ DENC_FINISH(p);
+ }
+};
+
class Notify {
std::set<WatchRef> watchers;
notify_info_t ninfo;
+ crimson::net::ConnectionRef conn;
uint64_t client_gid;
uint64_t user_version;
+ bool complete = false;
+ bool discarded = false;
+
+ /// (gid,cookie) -> reply_bl for everyone who acked the notify
+ std::multiset<notify_reply_t> notify_replies;
uint64_t get_id() const { return ninfo.notify_id; }
- seastar::future<> propagate() { return seastar::now(); }
+ seastar::future<> maybe_send_completion();
template <class WatchIteratorT>
Notify(WatchIteratorT begin,
WatchIteratorT end,
+ crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
WatchIteratorT begin,
WatchIteratorT end,
Args&&... args);
+
+ seastar::future<> complete_watcher(WatchRef watch,
+ const ceph::bufferlist& reply_bl);
};
template <class WatchIteratorT>
Notify::Notify(WatchIteratorT begin,
WatchIteratorT end,
+ crimson::net::ConnectionRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
: watchers(begin, end),
ninfo(ninfo),
+ conn(std::move(conn)),
client_gid(client_gid),
user_version(user_version) {
}
return seastar::do_for_each(begin, end, [=] (auto& watchref) {
return watchref->start_notify(notify);
}).then([notify = std::move(notify)] {
- return notify->propagate();
+ return notify->maybe_send_completion();
});
}
} // namespace crimson::osd
+
+WRITE_CLASS_DENC(crimson::osd::notify_reply_t)