#include "crimson/common/config_proxy.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Messenger.h"
-#include "crimson/osd/osdmap_service.h"
+#include "crimson/osd/shard_services.h"
#include "crimson/mon/MonClient.h"
#include "osd/OSDMap.h"
}
}
-Heartbeat::Heartbeat(const OSDMapService& service,
+Heartbeat::Heartbeat(const ceph::osd::ShardServices& service,
ceph::mon::Client& monc,
ceph::net::Messenger& front_msgr,
ceph::net::Messenger& back_msgr)
auto found = peers.find(peer);
if (found == peers.end()) {
logger().info("add_peer({})", peer);
- auto osdmap = service.get_map();
+ auto osdmap = service.get_osdmap_service().get_map();
// TODO: use addrs
return seastar::when_all_succeed(
front_msgr.connect(osdmap->get_hb_front_addrs(peer).front(),
}
return seastar::map_reduce(std::move(osds),
[this](auto& osd) {
- auto osdmap = service.get_map();
+ auto osdmap = service.get_osdmap_service().get_map();
if (!osdmap->is_up(osd)) {
return remove_peer(osd).then([] {
return seastar::make_ready_future<osd_id_t>(-1);
void Heartbeat::add_reporter_peers(int whoami)
{
- auto osdmap = service.get_map();
+ auto osdmap = service.get_osdmap_service().get_map();
// include next and previous up osds to ensure we have a fully-connected set
set<int> want;
if (auto next = osdmap->get_next_up_osd_after(whoami); next >= 0) {
});
}).then([=] {
// or too few?
- auto osdmap = service.get_map();
+ auto osdmap = service.get_osdmap_service().get_map();
for (auto next = osdmap->get_next_up_osd_after(whoami);
peers.size() < min_peers && next >= 0 && next != whoami;
next = osdmap->get_next_up_osd_after(next)) {
auto min_message = static_cast<uint32_t>(
local_conf()->osd_heartbeat_min_size);
auto reply =
- make_message<MOSDPing>(m->fsid,
- service.get_map()->get_epoch(),
- MOSDPing::PING_REPLY,
- m->ping_stamp,
- m->mono_ping_stamp,
- service.get_mnow(),
- service.get_up_epoch(),
- min_message);
+ make_message<MOSDPing>(
+ m->fsid,
+ service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING_REPLY,
+ m->ping_stamp,
+ m->mono_ping_stamp,
+ service.get_mnow(),
+ service.get_osdmap_service().get_up_epoch(),
+ min_message);
return conn->send(reply);
}
if (con) {
auto min_message = static_cast<uint32_t>(
local_conf()->osd_heartbeat_min_size);
- auto ping = make_message<MOSDPing>(monc.get_fsid(),
- service.get_map()->get_epoch(),
- MOSDPing::PING,
- sent_stamp,
- mnow,
- mnow,
- service.get_up_epoch(),
- min_message);
+ auto ping = make_message<MOSDPing>(
+ monc.get_fsid(),
+ service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDPing::PING,
+ sent_stamp,
+ mnow,
+ mnow,
+ service.get_osdmap_service().get_up_epoch(),
+ min_message);
return con->send(ping).then([&reply] {
reply.unacknowledged++;
return seastar::now();
}
auto failed_for = chrono::duration_cast<chrono::seconds>(
clock::now() - failed_since).count();
- auto osdmap = service.get_map();
+ auto osdmap = service.get_osdmap_service().get_map();
auto failure_report =
make_message<MOSDFailure>(monc.get_fsid(),
osd,
seastar::future<> Heartbeat::send_still_alive(osd_id_t osd,
const entity_addrvec_t& addrs)
{
- auto still_alive = make_message<MOSDFailure>(monc.get_fsid(),
- osd,
- addrs,
- 0,
- service.get_map()->get_epoch(),
- MOSDFailure::FLAG_ALIVE);
+ auto still_alive = make_message<MOSDFailure>(
+ monc.get_fsid(),
+ osd,
+ addrs,
+ 0,
+ service.get_osdmap_service().get_map()->get_epoch(),
+ MOSDFailure::FLAG_ALIVE);
return monc.send_message(still_alive).then([=] {
failure_pending.erase(osd);
return seastar::now();
#include "crimson/net/Fwd.h"
class MOSDPing;
-class OSDMapService;
+
+namespace ceph::osd {
+ class ShardServices;
+}
namespace ceph::mon {
class Client;
public:
using osd_id_t = int;
- Heartbeat(const OSDMapService& service,
+ Heartbeat(const ceph::osd::ShardServices& service,
ceph::mon::Client& monc,
ceph::net::Messenger& front_msgr,
ceph::net::Messenger& back_msgr);
seastar::future<> start_messenger(ceph::net::Messenger& msgr,
const entity_addrvec_t& addrs);
private:
- const OSDMapService& service;
+ const ceph::osd::ShardServices& service;
ceph::mon::Client& monc;
ceph::net::Messenger& front_msgr;
ceph::net::Messenger& back_msgr;
public_msgr{public_msgr},
monc{new ceph::mon::Client{public_msgr, *this}},
mgrc{new ceph::mgr::Client{public_msgr, *this}},
- heartbeat{new Heartbeat{*this, *monc, hb_front_msgr, hb_back_msgr}},
- heartbeat_timer{[this] { update_heartbeat_peers(); }},
store{ceph::os::FuturizedStore::create(
local_conf().get_val<std::string>("osd_objectstore"),
local_conf().get_val<std::string>("osd_data"))},
- shard_services{cluster_msgr, public_msgr, *monc, *mgrc, *store},
+ shard_services{*this, cluster_msgr, public_msgr, *monc, *mgrc, *store},
+ heartbeat{new Heartbeat{shard_services, *monc, hb_front_msgr, hb_back_msgr}},
+ heartbeat_timer{[this] { update_heartbeat_peers(); }},
osdmap_gate("OSD::osdmap_gate", std::make_optional(std::ref(shard_services)))
{
osdmaps[0] = boost::make_local_shared<OSDMap>();
return m;
}
-ceph::signedspan OSD::get_mnow() const
-{
- return ceph::mono_clock::now() - startup_time;
-}
-
OSD::cached_map_t OSD::get_map() const
{
return osdmap;
std::unique_ptr<ceph::mon::Client> monc;
std::unique_ptr<ceph::mgr::Client> mgrc;
- std::unique_ptr<Heartbeat> heartbeat;
- seastar::timer<seastar::lowres_clock> heartbeat_timer;
-
SharedLRU<epoch_t, OSDMap> osdmaps;
SimpleLRU<epoch_t, bufferlist, false> map_bl_cache;
cached_map_t osdmap;
ceph::osd::ShardServices shard_services;
std::unordered_map<spg_t, Ref<PG>> pgs;
+ std::unique_ptr<Heartbeat> heartbeat;
+ seastar::timer<seastar::lowres_clock> heartbeat_timer;
+
public:
OSD(int id, uint32_t nonce,
ceph::net::Messenger& cluster_msgr,
seastar::future<> _send_alive();
// OSDMapService methods
- ceph::signedspan get_mnow() const final;
epoch_t get_up_epoch() const final {
return up_epoch;
}
virtual seastar::future<cached_map_t> get_map(epoch_t e) = 0;
/// get the latest map
virtual cached_map_t get_map() const = 0;
- virtual ceph::signedspan get_mnow() const = 0;
virtual epoch_t get_up_epoch() const = 0;
};
events);
}
+ceph::signedspan PG::get_mnow()
+{
+ return shard_services.get_mnow();
+}
+
+HeartbeatStampsRef PG::get_hb_stamps(int peer)
+{
+ return shard_services.get_hb_stamps(peer);
+}
+
void PG::init(
ceph::os::CollectionRef coll,
int role,
return OstreamTemp(CLOG_ERROR, nullptr);
}
- ceph::signedspan get_mnow() final {
-#warning writeme
- }
- HeartbeatStampsRef get_hb_stamps(int peer) final {
-#warning write me
- }
+ ceph::signedspan get_mnow() final;
+ HeartbeatStampsRef get_hb_stamps(int peer) final;
// Utility
bool is_primary() const {
namespace ceph::osd {
ShardServices::ShardServices(
+ OSDMapService &osdmap_service,
ceph::net::Messenger &cluster_msgr,
ceph::net::Messenger &public_msgr,
ceph::mon::Client &monc,
ceph::mgr::Client &mgrc,
ceph::os::FuturizedStore &store)
- : cluster_msgr(cluster_msgr),
+ : osdmap_service(osdmap_service),
+ cluster_msgr(cluster_msgr),
public_msgr(public_msgr),
monc(monc),
mgrc(mgrc),
}
}
-ceph::signedspan ShardServices::get_mnow()
-{
-#warning write me
-}
-
HeartbeatStampsRef ShardServices::get_hb_stamps(int peer)
{
-#warning write me
+#warning writeme
+ return HeartbeatStampsRef();
}
};
#include "msg/MessageRef.h"
#include "crimson/os/cyan_collection.h"
#include "osd/PeeringState.h"
+#include "crimson/osd/osdmap_service.h"
namespace ceph::net {
class Messenger;
*/
class ShardServices {
using cached_map_t = boost::local_shared_ptr<const OSDMap>;
+ OSDMapService &osdmap_service;
ceph::net::Messenger &cluster_msgr;
ceph::net::Messenger &public_msgr;
ceph::mon::Client &monc;
public:
ShardServices(
+ OSDMapService &osdmap_service,
ceph::net::Messenger &cluster_msgr,
ceph::net::Messenger &public_msgr,
ceph::mon::Client &monc,
return &cct;
}
+ // OSDMapService
+ const OSDMapService &get_osdmap_service() const {
+ return osdmap_service;
+ }
+
// Op Tracking
OperationRegistry registry;
seastar::future<> osdmap_subscribe(version_t epoch, bool force_request);
- ceph::signedspan get_mnow();
+ // Time state
+ ceph::mono_time startup_time = ceph::mono_clock::now();
+ ceph::signedspan get_mnow() const {
+ return ceph::mono_clock::now() - startup_time;
+ }
HeartbeatStampsRef get_hb_stamps(int peer);
-
};