return conn;
}
-Client::mon_command_t::mon_command_t(ceph::ref_t<MMonCommand> req)
- : req(req)
+Client::mon_command_t::mon_command_t(MURef<MMonCommand> req)
+ : req(std::move(req))
{}
Client::Client(crimson::net::Messenger& messenger,
m->set_tid(tid);
m->cmd = {std::move(cmd)};
m->set_data(std::move(bl));
- auto& command = mon_commands.emplace_back(ceph::make_message<MMonCommand>(*m));
+ auto& command = mon_commands.emplace_back(crimson::make_message<MMonCommand>(*m));
return send_message(std::move(m)).then([&result=command.result] {
return result.get_future();
});
using command_result_t =
seastar::future<std::tuple<std::int32_t, std::string, ceph::bufferlist>>;
struct mon_command_t {
- ceph::ref_t<MMonCommand> req;
+ MURef<MMonCommand> req;
typename command_result_t::promise_type result;
- mon_command_t(ceph::ref_t<MMonCommand> req);
+ mon_command_t(MURef<MMonCommand> req);
};
std::vector<mon_command_t> mon_commands;
class Connection;
using ConnectionRef = seastar::shared_ptr<Connection>;
+using ConnectionFRef = seastar::foreign_ptr<ConnectionRef>;
class Dispatcher;
class ChainedDispatchers;
logger().debug("{}", __func__);
struct connect_ctx_t {
ObjectContext::watch_key_t key;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionFRef conn;
watch_info_t info;
- connect_ctx_t(const OSDOp& osd_op, const ExecutableMessage& msg)
+ connect_ctx_t(
+ const OSDOp& osd_op,
+ const ExecutableMessage& msg,
+ crimson::net::ConnectionFRef conn)
: key(osd_op.op.watch.cookie, msg.get_reqid().name),
- conn(msg.get_connection()),
+ conn(std::move(conn)),
info(create_watch_info(osd_op, msg)) {
}
};
- return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
- [&] (auto& ctx) {
- const auto& entity = ctx.key.second;
- auto [it, emplaced] =
- os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
- if (emplaced) {
- logger().info("registered new watch {} by {}", it->second, entity);
- txn.nop();
- } else {
- logger().info("found existing watch {} by {}", it->second, entity);
- }
- return seastar::now();
- },
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
- assert(pg);
- auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
- if (emplaced) {
- const auto& [cookie, entity] = ctx.key;
- it->second = crimson::osd::Watch::create(
- obc, ctx.info, entity, std::move(pg));
- logger().info("op_effect: added new watcher: {}", ctx.key);
- } else {
- logger().info("op_effect: found existing watcher: {}", ctx.key);
- }
- return it->second->connect(std::move(ctx.conn), true /* will_ping */);
- });
+ return get_message().get_connection().copy(
+ ).then([&, this](auto &&conn) {
+ return with_effect_on_obc(
+ connect_ctx_t{ osd_op, get_message(), std::move(conn) },
+ [&] (auto& ctx) {
+ const auto& entity = ctx.key.second;
+ auto [it, emplaced] =
+ os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+ if (emplaced) {
+ logger().info("registered new watch {} by {}", it->second, entity);
+ txn.nop();
+ } else {
+ logger().info("found existing watch {} by {}", it->second, entity);
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
+ assert(pg);
+ auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+ if (emplaced) {
+ const auto& [cookie, entity] = ctx.key;
+ it->second = crimson::osd::Watch::create(
+ obc, ctx.info, entity, std::move(pg));
+ logger().info("op_effect: added new watcher: {}", ctx.key);
+ } else {
+ logger().info("op_effect: found existing watcher: {}", ctx.key);
+ }
+ return it->second->connect(std::move(ctx.conn), true /* will_ping */);
+ });
+ });
}
OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_reconnect(
return crimson::ct_error::enoent::make();
}
struct notify_ctx_t {
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionFRef conn;
notify_info_t ninfo;
const uint64_t client_gid;
const epoch_t epoch;
- notify_ctx_t(const ExecutableMessage& msg)
- : conn(msg.get_connection()),
+ notify_ctx_t(const ExecutableMessage& msg, crimson::net::ConnectionFRef conn)
+ : conn(std::move(conn)),
client_gid(msg.get_reqid().name.num()),
epoch(msg.get_map_epoch()) {
}
};
- return with_effect_on_obc(notify_ctx_t{ get_message() },
- [&] (auto& ctx) {
- try {
- auto bp = osd_op.indata.cbegin();
- uint32_t ver; // obsolete
- ceph::decode(ver, bp);
- ceph::decode(ctx.ninfo.timeout, bp);
- ceph::decode(ctx.ninfo.bl, bp);
- } catch (const buffer::error&) {
- ctx.ninfo.timeout = 0;
- }
- if (!ctx.ninfo.timeout) {
- using crimson::common::local_conf;
- ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
- }
- ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
- ctx.ninfo.cookie = osd_op.op.notify.cookie;
- // return our unique notify id to the client
- ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
- return seastar::now();
- },
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
- auto alive_watchers = obc->watchers | boost::adaptors::map_values
- | boost::adaptors::filtered(
- [] (const auto& w) {
- // FIXME: filter as for the `is_ping` in `Watch::start_notify`
- return w->is_alive();
- });
- return crimson::osd::Notify::create_n_propagate(
- std::begin(alive_watchers),
- std::end(alive_watchers),
- std::move(ctx.conn),
- ctx.ninfo,
- ctx.client_gid,
- obc->obs.oi.user_version);
+ return get_message().get_connection().copy(
+ ).then([&, this](auto &&conn) {
+ return with_effect_on_obc(
+ notify_ctx_t{ get_message(), std::move(conn) },
+ [&] (auto& ctx) {
+ try {
+ auto bp = osd_op.indata.cbegin();
+ uint32_t ver; // obsolete
+ ceph::decode(ver, bp);
+ ceph::decode(ctx.ninfo.timeout, bp);
+ ceph::decode(ctx.ninfo.bl, bp);
+ } catch (const buffer::error&) {
+ ctx.ninfo.timeout = 0;
+ }
+ if (!ctx.ninfo.timeout) {
+ using crimson::common::local_conf;
+ ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
+ }
+ ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+ ctx.ninfo.cookie = osd_op.op.notify.cookie;
+ // return our unique notify id to the client
+ ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+ auto alive_watchers = obc->watchers | boost::adaptors::map_values
+ | boost::adaptors::filtered(
+ [] (const auto& w) {
+ // FIXME: filter as for the `is_ping` in `Watch::start_notify`
+ return w->is_alive();
+ });
+ return crimson::osd::Notify::create_n_propagate(
+ std::begin(alive_watchers),
+ std::end(alive_watchers),
+ std::move(ctx.conn),
+ ctx.ninfo,
+ ctx.client_gid,
+ obc->obs.oi.user_version);
+ });
});
}
// with other message types than just the `MOSDOp`. The type erasure
// happens in the ctor of `OpsExecuter`.
struct ExecutableMessage {
- virtual crimson::net::ConnectionRef get_connection() const = 0;
+ virtual const crimson::net::ConnectionFRef &get_connection() const = 0;
virtual osd_reqid_t get_reqid() const = 0;
virtual utime_t get_mtime() const = 0;
virtual epoch_t get_map_epoch() const = 0;
public:
ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) {
}
- crimson::net::ConnectionRef get_connection() const final {
+ const crimson::net::ConnectionFRef &get_connection() const final {
return pimpl->get_connection();
}
osd_reqid_t get_reqid() const final {
};
struct PG::do_osd_ops_params_t {
- crimson::net::ConnectionRef get_connection() const {
- return nullptr;
+ crimson::net::ConnectionFRef &get_connection() const {
+ return conn;
}
osd_reqid_t get_reqid() const {
return reqid;
bool has_flag(uint32_t flag) const {
return false;
}
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionFRef &conn;
osd_reqid_t reqid;
utime_t mtime;
epoch_t map_epoch;
std::move(m.begin),
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_min"),
crimson::common::local_conf().get_val<std::int64_t>("osd_backfill_scan_max")
- ).then_interruptible([this,
- query_epoch=m.query_epoch,
- conn=m.get_connection()] (auto backfill_interval) {
- auto reply = crimson::make_message<MOSDPGScan>(
- MOSDPGScan::OP_SCAN_DIGEST,
- pg.get_pg_whoami(),
- pg.get_osdmap_epoch(),
- query_epoch,
- spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
- backfill_interval.begin,
- backfill_interval.end);
- encode(backfill_interval.objects, reply->get_data());
- return conn->send(std::move(reply));
- });
+ ).then_interruptible(
+ [this, query_epoch=m.query_epoch, &conn=*(m.get_connection())
+ ](auto backfill_interval) {
+ auto reply = crimson::make_message<MOSDPGScan>(
+ MOSDPGScan::OP_SCAN_DIGEST,
+ pg.get_pg_whoami(),
+ pg.get_osdmap_epoch(),
+ query_epoch,
+ spg_t(pg.get_info().pgid.pgid, pg.get_primary().shard),
+ backfill_interval.begin,
+ backfill_interval.end);
+ encode(backfill_interval.objects, reply->get_data());
+ return conn.send(std::move(reply));
+ });
}
RecoveryBackend::interruptible_future<>
PG::do_osd_ops_params_t
WatchTimeoutRequest::get_do_osd_ops_params() const
{
- PG::do_osd_ops_params_t params;
- params.conn = watch->conn;
- params.reqid.name = watch->entity_name;
- // as in the classical's simple_opc_create()
- params.mtime = ceph_clock_now();
- params.map_epoch = get_pg().get_osdmap_epoch();
- params.orig_source_inst = { watch->entity_name, watch->winfo.addr };
- //entity_inst_t orig_source_inst;
- params.features = 0;
+ osd_reqid_t reqid;
+ reqid.name = watch->entity_name;
+ PG::do_osd_ops_params_t params{
+ watch->conn,
+ reqid,
+ ceph_clock_now(),
+ get_pg().get_osdmap_epoch(),
+ entity_inst_t{ watch->entity_name, watch->winfo.addr },
+ 0
+ };
logger().debug("{}: params.reqid={}", __func__, params.reqid);
return params;
}
logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
}
-seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+seastar::future<> Watch::connect(crimson::net::ConnectionFRef conn, bool)
{
if (this->conn == conn) {
logger().debug("conn={} already connected", conn);
return out;
}
-Notify::Notify(crimson::net::ConnectionRef conn,
+Notify::Notify(crimson::net::ConnectionFRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
struct private_ctag_t{};
std::set<NotifyRef, std::less<>> in_progress_notifies;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionFRef conn;
crimson::osd::ObjectContextRef obc;
watch_info_t winfo;
}
~Watch();
- seastar::future<> connect(crimson::net::ConnectionRef, bool);
+ seastar::future<> connect(crimson::net::ConnectionFRef, bool);
bool is_alive() const {
return true;
}
class Notify : public seastar::enable_shared_from_this<Notify> {
std::set<WatchRef> watchers;
const notify_info_t ninfo;
- crimson::net::ConnectionRef conn;
+ crimson::net::ConnectionFRef conn;
const uint64_t client_gid;
const uint64_t user_version;
bool complete{false};
/// Called on Notify timeout
void do_notify_timeout();
- Notify(crimson::net::ConnectionRef conn,
+ Notify(crimson::net::ConnectionFRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionFRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version);
template <class WatchIteratorT>
Notify::Notify(WatchIteratorT begin,
WatchIteratorT end,
- crimson::net::ConnectionRef conn,
+ crimson::net::ConnectionFRef conn,
const notify_info_t& ninfo,
const uint64_t client_gid,
const uint64_t user_version)
tid(t), client_caps(caps), msg(NULL) {
client_type = m->get_source().type();
client_addrs = m->get_source_addrs();
- if (auto con = m->get_connection()) {
+ if (auto &con = m->get_connection()) {
client_socket_addr = con->get_peer_socket_addr();
}
con_features = feat;
fsid(f)
{ }
-private:
+ MMonCommand(const MMonCommand &other)
+ : PaxosServiceMessage(MSG_MON_COMMAND, 0),
+ fsid(other.fsid),
+ cmd(other.cmd) {
+ set_tid(other.get_tid());
+ set_data(other.get_data());
+ }
+
~MMonCommand() final {}
public:
decode(fsid, p);
decode(cmd, p);
}
+
private:
template<class T, typename... Args>
friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
public:
#ifdef WITH_SEASTAR
using ConnectionRef = crimson::net::ConnectionRef;
+ using ConnectionFRef = crimson::net::ConnectionFRef;
#else
using ConnectionRef = ::ConnectionRef;
+ using ConnectionFRef = ::ConnectionRef;
#endif // WITH_SEASTAR
protected:
/* time at which message was fully read */
utime_t recv_complete_stamp;
- ConnectionRef connection;
+ ConnectionFRef connection;
uint32_t magic = 0;
completion_hook->complete(0);
}
public:
- const ConnectionRef& get_connection() const { return connection; }
+ const ConnectionFRef& get_connection() const { return connection; }
void set_connection(ConnectionRef c) {
connection = std::move(c);
}
class SyntheticDispatcher final
: public crimson::net::Dispatcher {
public:
- std::map<crimson::net::ConnectionRef, std::deque<payload_seq_t> > conn_sent;
+ std::map<crimson::net::Connection*, std::deque<payload_seq_t> > conn_sent;
std::map<payload_seq_t, bufferlist> sent;
unsigned index;
SyntheticWorkload *workload;
if (sent.count(pl.seq)) {
logger().info(" {} conn= {} {}", __func__,
m->get_connection(), pl);
- ceph_assert(conn_sent[m->get_connection()].front() == pl.seq);
+ ceph_assert(conn_sent[&*m->get_connection()].front() == pl.seq);
ceph_assert(pl.data.contents_equal(sent[pl.seq]));
- conn_sent[m->get_connection()].pop_front();
+ conn_sent[&*m->get_connection()].pop_front();
sent.erase(pl.seq);
}
encode(pl, bl);
m->set_data(bl);
sent[pl.seq] = pl.data;
- conn_sent[con].push_back(pl.seq);
+ conn_sent[&*con].push_back(pl.seq);
logger().info("{} conn= {} send i= {}",
__func__, con, pl.seq);
}
void clear_pending(crimson::net::ConnectionRef con) {
- for (std::deque<uint64_t>::iterator it = conn_sent[con].begin();
- it != conn_sent[con].end(); ++it)
+ for (std::deque<uint64_t>::iterator it = conn_sent[&*con].begin();
+ it != conn_sent[&*con].end(); ++it)
sent.erase(*it);
- conn_sent.erase(con);
+ conn_sent.erase(&*con);
}
void print() {
- for (auto && [conn, list] : conn_sent) {
+ for (auto && [connptr, list] : conn_sent) {
if (!list.empty()) {
logger().info("{} {} wait {}", __func__,
- conn, list.size());
+ (void*)connptr, list.size());
}
}
}