gate.dispatch_in_background(__func__, *this, [this, c] {
if (conn == c) {
// ask for the mgrconfigure message
- auto m = ceph::make_message<MMgrOpen>();
+ auto m = crimson::make_message<MMgrOpen>();
m->daemon_name = local_conf()->name.get_id();
return conn->send(std::move(m));
} else {
// implement WithStats if you want to report stats to mgr periodically
class WithStats {
public:
- virtual MessageRef get_stats() const = 0;
+ virtual MessageURef get_stats() const = 0;
virtual ~WithStats() {}
};
/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageURef msg) = 0;
- // The version with MessageRef will be dropped in the future
- virtual seastar::future<> send(MessageRef msg) = 0;
/// send a keepalive message over a connection that has completed its
/// handshake
return bl;
}
-seastar::future<> Protocol::send(MessageRef msg)
+seastar::future<> Protocol::send(MessageURef msg)
{
if (write_state != write_state_t::drop) {
conn.out_q.push_back(std::move(msg));
conn.out_seq -= conn.sent.size();
logger().debug("{} requeue {} items, revert out_seq to {}",
conn, conn.sent.size(), conn.out_seq);
- for (MessageRef& msg : conn.sent) {
+ for (MessageURef& msg : conn.sent) {
msg->clear_payload();
msg->set_seq(0);
}
}
while (!conn.sent.empty() && conn.sent.front()->get_seq() <= seq) {
logger().trace("{} got ack seq {} >= {}, pop {}",
- conn, seq, conn.sent.front()->get_seq(), conn.sent.front());
+ conn, seq, conn.sent.front()->get_seq(), *conn.sent.front());
conn.sent.pop_front();
}
}
virtual void trigger_close() = 0;
virtual ceph::bufferlist do_sweep_messages(
- const std::deque<MessageRef>& msgs,
+ const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack,
// the write state-machine
public:
- seastar::future<> send(MessageRef msg);
+ seastar::future<> send(MessageURef msg);
seastar::future<> keepalive();
// TODO: encapsulate a SessionedSender class
// READY state
ceph::bufferlist ProtocolV2::do_sweep_messages(
- const std::deque<MessageRef>& msgs,
+ const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> _keepalive_ack,
INTERCEPT_FRAME(ceph::msgr::v2::Tag::ACK, bp_type_t::WRITE);
}
- std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageRef& msg) {
+ std::for_each(msgs.begin(), msgs.begin()+num_msgs, [this, &bl](const MessageURef& msg) {
// TODO: move to common code
// set priority
msg->get_header().src = messenger.get_myname();
void trigger_close() override;
ceph::bufferlist do_sweep_messages(
- const std::deque<MessageRef>& msgs,
+ const std::deque<MessageURef>& msgs,
size_t num_msgs,
bool require_keepalive,
std::optional<utime_t> keepalive_ack,
}
seastar::future<> SocketConnection::send(MessageURef msg)
-{
- assert(seastar::this_shard_id() == shard_id());
- return protocol->send(MessageRef{msg.release(), false});
-}
-
-seastar::future<> SocketConnection::send(MessageRef msg)
{
assert(seastar::this_shard_id() == shard_id());
return protocol->send(std::move(msg));
bool update_rx_seq(seq_num_t seq);
// messages to be resent after connection gets reset
- std::deque<MessageRef> out_q;
+ std::deque<MessageURef> out_q;
// messages sent, but not yet acked by peer
- std::deque<MessageRef> sent;
+ std::deque<MessageURef> sent;
seastar::shard_id shard_id() const;
#endif
seastar::future<> send(MessageURef msg) override;
- seastar::future<> send(MessageRef msg) override;
seastar::future<> keepalive() override;
});
}
-MessageRef OSD::get_stats() const
+MessageURef OSD::get_stats() const
{
// todo: m-to-n: collect stats using map-reduce
// MPGStats::had_map_for is not used since PGMonitor was removed
- auto m = ceph::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
+ auto m = crimson::make_message<MPGStats>(monc->get_fsid(), osdmap->get_epoch());
m->osd_stat = osd_stat;
for (auto [pgid, pg] : pg_map.get_pgs()) {
if (pg->is_primary()) {
osd_stat_t osd_stat;
uint32_t osd_stat_seq = 0;
void update_stats();
- MessageRef get_stats() const final;
+ MessageURef get_stats() const final;
// AuthHandler methods
void handle_authentication(const EntityName& name,
Ref<PG> &pg)
{
return pg->do_pg_ops(m)
- .then_interruptible([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
- return conn->send(reply);
+ .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
+ return conn->send(std::move(reply));
});
}
}).then_interruptible(
[this, pg, all_completed=std::move(all_completed)]() mutable {
return all_completed.safe_then_interruptible(
- [this, pg](Ref<MOSDOpReply> reply) {
+ [this, pg](MURef<MOSDOpReply> reply) {
return with_blocking_future_interruptible<IOInterruptCondition>(
handle.enter(pp(*pg).send_reply)).then_interruptible(
- [this, reply=std::move(reply)] {
+ [this, reply=std::move(reply)]() mutable{
return conn->send(std::move(reply));
});
}, crimson::ct_error::eagain::handle([this, pg]() mutable {
}));
}
-PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<Ref<MOSDOpReply>>>
+PG::do_osd_ops_iertr::future<PG::pg_rep_op_fut_t<MURef<MOSDOpReply>>>
PG::do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
}
- return do_osd_ops_execute<Ref<MOSDOpReply>>(
+ return do_osd_ops_execute<MURef<MOSDOpReply>>(
seastar::make_lw_shared<OpsExecuter>(
std::move(obc), op_info, get_pool().info, get_backend(), *m),
m->ops,
if (result > 0 && !rvec) {
result = 0;
}
- auto reply = ceph::make_message<MOSDOpReply>(m.get(),
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(),
result,
get_osdmap_epoch(),
0,
"do_osd_ops: {} - object {} sending reply",
*m,
m->get_hobj());
- return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(
std::move(reply));
},
[m, this] (const std::error_code& e) {
- auto reply = ceph::make_message<MOSDOpReply>(
+ auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(
peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
- return do_osd_ops_iertr::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ return do_osd_ops_iertr::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
});
}
std::move(failure_func));
}
-PG::interruptible_future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
+PG::interruptible_future<MURef<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
{
if (__builtin_expect(stopping, false)) {
throw crimson::common::system_shutdown_exception();
logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
return ox->execute_op(osd_op);
}).then_interruptible([m, this, ox = std::move(ox)] {
- auto reply = ceph::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
+ auto reply = crimson::make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
false);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
}).handle_exception_type_interruptible([=](const crimson::osd::error& e) {
- auto reply = ceph::make_message<MOSDOpReply>(
+ auto reply = crimson::make_message<MOSDOpReply>(
m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
reply->set_enoent_reply_versions(peering_state.get_info().last_update,
peering_state.get_info().last_user_version);
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ return seastar::make_ready_future<MURef<MOSDOpReply>>(std::move(reply));
});
}
using pg_rep_op_fut_t =
std::tuple<interruptible_future<>,
do_osd_ops_iertr::future<Ret>>;
- do_osd_ops_iertr::future<pg_rep_op_fut_t<Ref<MOSDOpReply>>> do_osd_ops(
+ do_osd_ops_iertr::future<pg_rep_op_fut_t<MURef<MOSDOpReply>>> do_osd_ops(
Ref<MOSDOp> m,
ObjectContextRef obc,
const OpInfo &op_info);
const OpInfo &op_info,
SuccessFunc&& success_func,
FailureFunc&& failure_func);
- interruptible_future<Ref<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
+ interruptible_future<MURef<MOSDOpReply>> do_pg_ops(Ref<MOSDOp> m);
std::tuple<interruptible_future<>, interruptible_future<>>
submit_transaction(
const OpInfo& op_info,
// allocate a pair if target is seen for the first time
auto& req = backfill_drop_requests[target];
if (!req) {
- req = ceph::make_message<MOSDPGBackfillRemove>(
+ req = crimson::make_message<MOSDPGBackfillRemove>(
spg_t(pg->get_pgid().pgid, target.shard), pg->get_osdmap_epoch());
}
req->ls.emplace_back(obj, v);
// backfill begin
std::unique_ptr<crimson::osd::BackfillState> backfill_state;
std::map<pg_shard_t,
- ceph::ref_t<MOSDPGBackfillRemove>> backfill_drop_requests;
+ MURef<MOSDPGBackfillRemove>> backfill_drop_requests;
template <class EventT>
void start_backfill_recovery(