*
* May be invoked from any core, and the send order will be preserved upon
* the call.
+ *
+ * The returned future will be resolved only after the message is enqueued
+ * remotely.
*/
- virtual seastar::future<> send(MessageURef msg) = 0;
+ virtual seastar::future<> send(
+ MessageURef msg) = 0;
+
+ /**
+ * send_with_throttling
+ *
+ * Send a message over a connection that has completed its handshake.
+ *
+ * May be invoked from any core, and the send order will be preserved upon
+ * the call.
+ *
+ * TODO:
+ *
+ * The returned future is reserved for throttling.
+ *
+ * Gating is needed for graceful shutdown, to wait until the message is
+ * enqueued remotely.
+ */
+ seastar::future<> send_with_throttling(
+ MessageURef msg /* , seastar::gate & */) {
+ std::ignore = send(std::move(msg));
+ return seastar::now();
+ }
/**
* send_keepalive
return pg->do_pg_ops(
m
).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
- return conn->send(std::move(reply));
+ // TODO: gate the crosscore sending
+ return conn->send_with_throttling(std::move(reply));
});
}
!m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
reply->set_reply_versions(eversion_t(), 0);
reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
- return conn->send(std::move(reply));
+ // TODO: gate the crosscore sending
+ return conn->send_with_throttling(std::move(reply));
}
ClientRequest::interruptible_future<>
m.get(), completed->err, pg->get_osdmap_epoch(),
CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
reply->set_reply_versions(completed->version, completed->user_version);
- return conn->send(std::move(reply));
+ // TODO: gate the crosscore sending
+ return conn->send_with_throttling(std::move(reply));
} else {
return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
).then_interruptible(
SnapContext snapc = get_snapc(pg,obc);
- if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
- snapc.seq < obc->ssc->snapset.seq) {
- DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
- " < snapset seq {} on {}",
- __func__, snapc.seq, obc->ssc->snapset.seq,
- obc->obs.oi.soid);
- return reply_op_error(pg, -EOLDSNAPC);
+ if (m->has_flag(CEPH_OSD_FLAG_ORDERSNAP) &&
+ snapc.seq < obc->ssc->snapset.seq) {
+ DEBUGI("{} ORDERSNAP flag set and snapc seq {}",
+ " < snapset seq {} on {}",
+ __func__, snapc.seq, obc->ssc->snapset.seq,
+ obc->obs.oi.soid);
+ return reply_op_error(pg, -EOLDSNAPC);
}
if (!pg->is_primary()) {
[this, reply=std::move(reply)]() mutable {
LOG_PREFIX(ClientRequest::do_process);
DEBUGI("{}: sending response", *this);
- return conn->send(std::move(reply));
- });
+ // TODO: gate the crosscore sending
+ return conn->send_with_throttling(std::move(reply));
+ }
+ );
}, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
return process_op(ihref, pg);
}));