${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc
${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc
${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc
+ watch.cc
)
target_link_libraries(crimson-osd
crimson-common crimson-os crimson fmt::fmt)
#pragma once
#include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_ptr.hh>
#include <boost/intrusive_ptr.hpp>
#include <boost/intrusive/list.hpp>
#include "osd/object_state.h"
#include "crimson/common/config_proxy.h"
#include "crimson/osd/osd_operation.h"
-#include "crimson/osd/watch.h"
namespace crimson::osd {
+class Watch;
+
template <typename OBC>
struct obc_to_hoid {
using type = hobject_t;
// frequented paths. std::map is used mostly because of developer's
// convenience.
using watch_key_t = std::pair<uint64_t, entity_name_t>;
- std::map<watch_key_t, crimson::osd::WatchRef> watchers;
+ std::map<watch_key_t, seastar::shared_ptr<crimson::osd::Watch>> watchers;
ObjectContext(const hobject_t &hoid) : obs(hoid), loaded(false) {}
const std::set <std::string> &changed) final;
};
-}
+} // namespace crimson::osd
#include <seastar/core/thread.hh>
#include "crimson/osd/exceptions.h"
+#include "crimson/osd/watch.h"
#include "osd/ClassHandler.h"
namespace {
[] (auto&& ctx, ObjectContextRef obc) {
auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
if (emplaced) {
- it->second = crimson::osd::Watch::create();
+ const auto& cookie = ctx.key.first;
+ it->second = crimson::osd::Watch::create(ctx.info, obc);
logger().info("op_effect: added new watcher: {}", ctx.key);
} else {
logger().info("op_effect: found existing watcher: {}", ctx.key);
ObjectState& os,
ceph::os::Transaction& txn)
{
+ logger().debug("{}", __func__);
if (!os.exists) {
return crimson::ct_error::enoent::make();
}
return crimson::ct_error::invarg::make();
}
+static uint64_t get_next_notify_id(epoch_t e)
+{
+ // FIXME
+ static std::uint64_t next_notify_id = 0;
+ return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
+}
+
OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
OSDOp& osd_op,
const ObjectState& os)
{
+ logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
+
if (!os.exists) {
return crimson::ct_error::enoent::make();
}
- return with_effect_on_obc(notify_info_t{},
+ struct notify_ctx_t {
+ notify_info_t ninfo;
+ const uint64_t client_gid;
+ const epoch_t epoch;
+
+ notify_ctx_t(const MOSDOp& msg)
+ : client_gid(msg.get_reqid().name.num()),
+ epoch(msg.get_map_epoch()) {
+ }
+ };
+ return with_effect_on_obc(notify_ctx_t{ get_message() },
[&] (auto& ctx) {
try {
auto bp = osd_op.indata.cbegin();
uint32_t ver; // obsolete
ceph::decode(ver, bp);
- ceph::decode(ctx.timeout, bp);
- ceph::decode(ctx.bl, bp);
+ ceph::decode(ctx.ninfo.timeout, bp);
+ ceph::decode(ctx.ninfo.bl, bp);
} catch (const buffer::error&) {
- ctx.timeout = 0;
+ ctx.ninfo.timeout = 0;
}
- if (!ctx.timeout) {
+ if (!ctx.ninfo.timeout) {
using crimson::common::local_conf;
- ctx.timeout = local_conf()->osd_default_notify_timeout;
+ ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
}
- // FIXME
- ctx.notify_id = 42; //osd->get_next_id(get_osdmap_epoch());
- ctx.cookie = osd_op.op.notify.cookie;
+ ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+ ctx.ninfo.cookie = osd_op.op.notify.cookie;
// return our unique notify id to the client
- ceph::encode(ctx.notify_id, osd_op.outdata);
+ ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
return seastar::now();
},
[] (auto&& ctx, ObjectContextRef obc) {
});
return crimson::osd::Notify::create_n_propagate(
std::begin(alive_watchers),
- std::end(alive_watchers));
+ std::end(alive_watchers),
+ ctx.ninfo,
+ ctx.client_gid,
+ obc->obs.oi.user_version);
});
}
OSDOp& osd_op,
const ObjectState& os)
{
+ logger().debug("{}", __func__);
+
struct notifyack_ctx_t {
const entity_name_t entity;
uint64_t watch_cookie;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/watch.h"
+#include "messages/MWatchNotify.h"
+
+
+namespace {
+ seastar::logger& logger() {
+ return crimson::get_logger(ceph_subsys_osd);
+ }
+}
+
+namespace crimson::osd {
+
+bool Watch::NotifyCmp::operator()(NotifyRef lhs, NotifyRef rhs) const
+{
+ ceph_assert(lhs);
+ ceph_assert(rhs);
+ return lhs->get_id() < rhs->get_id();
+}
+
+seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+{
+ if (this->conn == conn) {
+ logger().debug("conn={} already connected", conn);
+ }
+
+ this->conn = std::move(conn);
+ return seastar::now();
+}
+
+seastar::future<> Watch::send_notify_msg(NotifyRef notify)
+{
+ logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);
+ return conn->send(make_message<MWatchNotify>(
+ winfo.cookie,
+ notify->user_version,
+ notify->ninfo.notify_id,
+ CEPH_WATCH_EVENT_NOTIFY,
+ notify->ninfo.bl,
+ notify->client_gid));
+}
+
+seastar::future<> Watch::start_notify(NotifyRef notify)
+{
+ logger().info("{} adding ¬ify={}", __func__, notify.get());
+ auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
+ ceph_assert(emplaced);
+ ceph_assert(is_alive());
+ return is_connected() ? send_notify_msg(*it) : seastar::now();
+}
+
+} // namespace crimson::osd
#include <seastar/core/shared_ptr.hh>
#include "crimson/net/Connection.h"
+#include "crimson/osd/object_context.h"
namespace crimson::osd {
};
std::set<NotifyRef, NotifyCmp> in_progress_notifies;
crimson::net::ConnectionRef conn;
+ crimson::osd::ObjectContextRef obc;
+
+ watch_info_t winfo;
seastar::future<> start_notify(NotifyRef);
seastar::future<> send_notify_msg(NotifyRef);
friend Notify;
public:
- Watch(private_ctag_t) {
+ Watch(private_ctag_t,
+ const watch_info_t& winfo,
+ crimson::osd::ObjectContextRef obc)
+ : winfo(winfo),
+ obc(std::move(obc)) {
}
seastar::future<> connect(crimson::net::ConnectionRef, bool);
class Notify {
std::set<WatchRef> watchers;
+ notify_info_t ninfo;
+ uint64_t client_gid;
+ uint64_t user_version;
- uint64_t get_id() const { return 0; }
- void propagate() {}
+ uint64_t get_id() const { return ninfo.notify_id; }
+ seastar::future<> propagate() { return seastar::now(); }
template <class WatchIteratorT>
- Notify(WatchIteratorT begin, WatchIteratorT end);
+ Notify(WatchIteratorT begin,
+ WatchIteratorT end,
+ const notify_info_t& ninfo,
+ const uint64_t client_gid,
+ const uint64_t user_version);
// this is a private tag for the public constructor that turns it into
// de facto private one. The motivation behind the hack is make_shared
// used by create_n_propagate factory.
Notify(private_ctag_t, Args&&... args) : Notify(std::forward<Args>(args)...) {
}
- template <class WatchIteratorT>
+ template <class WatchIteratorT, class... Args>
static seastar::future<> create_n_propagate(
WatchIteratorT begin,
- WatchIteratorT end);
+ WatchIteratorT end,
+ Args&&... args);
};
template <class WatchIteratorT>
-Notify::Notify(WatchIteratorT begin, WatchIteratorT end)
- : watchers(begin, end) {
+Notify::Notify(WatchIteratorT begin,
+ WatchIteratorT end,
+ const notify_info_t& ninfo,
+ const uint64_t client_gid,
+ const uint64_t user_version)
+ : watchers(begin, end),
+ ninfo(ninfo),
+ client_gid(client_gid),
+ user_version(user_version) {
}
-template <class WatchIteratorT>
+template <class WatchIteratorT, class... Args>
seastar::future<> Notify::create_n_propagate(
WatchIteratorT begin,
- WatchIteratorT end)
+ WatchIteratorT end,
+ Args&&... args)
{
static_assert(
std::is_same_v<typename std::iterator_traits<WatchIteratorT>::value_type,
crimson::osd::WatchRef>);
- auto notify = seastar::make_shared<Notify>(private_ctag_t{}, begin, end);
- seastar::do_for_each(begin, end, [=] (auto& watchref) {
+ auto notify = seastar::make_shared<Notify>(
+ private_ctag_t{},
+ begin,
+ end,
+ std::forward<Args>(args)...);
+ return seastar::do_for_each(begin, end, [=] (auto& watchref) {
return watchref->start_notify(notify);
- }).then([notify = std::move(notify)] {;
- notify->propagate();
+ }).then([notify = std::move(notify)] {
+ return notify->propagate();
});
}