struct MForward : public Message {
uint64_t tid;
- entity_inst_t client;
+ uint8_t client_type;
+ entity_addrvec_t client_addrs;
+ entity_addr_t client_socket_addr;
MonCap client_caps;
uint64_t con_features;
EntityName entity_name;
string msg_desc; // for operator<< only
- static const int HEAD_VERSION = 3;
- static const int COMPAT_VERSION = 3;
+ static const int HEAD_VERSION = 4;
+ static const int COMPAT_VERSION = 4;
MForward() : Message(MSG_FORWARD, HEAD_VERSION, COMPAT_VERSION),
tid(0), con_features(0), msg(NULL) {}
MForward(uint64_t t, PaxosServiceMessage *m, uint64_t feat) :
Message(MSG_FORWARD, HEAD_VERSION, COMPAT_VERSION),
tid(t), msg(NULL) {
- client = m->get_source_inst();
+ client_type = m->get_source().type();
+ client_addrs = m->get_source_addrs();
+ if (auto con = m->get_connection()) {
+ client_socket_addr = con->get_peer_socket_addr();
+ }
client_caps = m->get_session()->caps;
con_features = feat;
// we may need to reencode for the target mon
const MonCap& caps) :
Message(MSG_FORWARD, HEAD_VERSION, COMPAT_VERSION),
tid(t), client_caps(caps), msg(NULL) {
- client = m->get_source_inst();
+ client_type = m->get_source().type();
+ client_addrs = m->get_source_addrs();
+ if (auto con = m->get_connection()) {
+ client_socket_addr = con->get_peer_socket_addr();
+ }
con_features = feat;
msg = (PaxosServiceMessage*)m->get();
}
public:
void encode_payload(uint64_t features) override {
using ceph::encode;
+ if (!HAVE_FEATURE(features, SERVER_NAUTILUS)) {
+ header.version = 3;
+ header.compat_version = 3;
+ encode(tid, payload);
+ entity_inst_t client;
+ client.name = entity_name_t(client_type, -1);
+ client.addr = client_addrs.legacy_addr();
+ encode(client, payload, features);
+ encode(client_caps, payload, features);
+ // Encode client message with intersection of target and source
+ // features. This could matter if the semantics of the encoded
+ // message are changed when reencoding with more features than the
+ // client had originally. That should never happen, but we may as
+ // well be defensive here.
+ if (con_features != features) {
+ msg->clear_payload();
+ }
+ encode_message(msg, features & con_features, payload);
+ encode(con_features, payload);
+ encode(entity_name, payload);
+ return;
+ }
+ header.version = HEAD_VERSION;
+ header.compat_version = COMPAT_VERSION;
encode(tid, payload);
- encode(client, payload, features);
+ encode(client_type, payload, features);
+ encode(client_addrs, payload, features);
+ encode(client_socket_addr, payload, features);
encode(client_caps, payload, features);
// Encode client message with intersection of target and source
// features. This could matter if the semantics of the encoded
void decode_payload() override {
auto p = payload.cbegin();
decode(tid, p);
- decode(client, p);
+ if (header.version < 4) {
+ entity_inst_t client;
+ decode(client, p);
+ client_type = client.name.type();
+ client_addrs = entity_addrvec_t(client.addr);
+ client_socket_addr = client.addr;
+ } else {
+ decode(client_type, p);
+ decode(client_addrs, p);
+ decode(client_socket_addr, p);
+ }
decode(client_caps, p);
msg = (PaxosServiceMessage *)decode_message(NULL, 0, p);
decode(con_features, p);
} else if (!session->closed) {
RoutedRequest *rr = new RoutedRequest;
rr->tid = ++routed_request_tid;
- rr->client_inst = req->get_source_inst();
rr->con = req->get_connection();
rr->con_features = rr->con->get_features();
encode_message(req, CEPH_FEATURES_ALL, rr->request_bl); // for my use only; use all features
// fake connection attached to forwarded messages
struct AnonConnection : public Connection {
- explicit AnonConnection(CephContext *cct) : Connection(cct, NULL) {}
+ entity_addr_t socket_addr;
+ explicit AnonConnection(CephContext *cct,
+ const entity_addr_t& sa)
+ : Connection(cct, NULL),
+ socket_addr(sa) {}
int send_message(Message *m) override {
assert(!"send_message on anonymous connection");
// silengtly ignore
}
bool is_connected() override { return false; }
+ entity_addr_t get_peer_socket_addr() const override {
+ return socket_addr;
+ }
};
//extract the original message and put it into the regular dispatch function
void Monitor::handle_forward(MonOpRequestRef op)
{
MForward *m = static_cast<MForward*>(op->get_req());
- dout(10) << "received forwarded message from " << m->client
+ dout(10) << "received forwarded message from "
+ << ceph_entity_type_name(m->client_type)
+ << " " << m->client_addrs
<< " via " << m->get_source_inst() << dendl;
MonSession *session = op->get_session();
assert(session);
PaxosServiceMessage *req = m->claim_message();
assert(req != NULL);
- ConnectionRef c(new AnonConnection(cct));
+ ConnectionRef c(new AnonConnection(cct, m->client_socket_addr));
MonSession *s = new MonSession(req->get_source(),
req->get_source_addrs(),
static_cast<Connection*>(c.get()));
c->set_priv(RefCountedPtr{s, false});
- c->set_peer_addr(m->client.addr);
- c->set_peer_type(m->client.name.type());
+ c->set_peer_addrs(m->client_addrs);
+ c->set_peer_type(m->client_type);
c->set_features(m->con_features);
s->authenticated = true;
delete rr;
} else {
auto q = rr->request_bl.cbegin();
- PaxosServiceMessage *req = (PaxosServiceMessage *)decode_message(cct, 0, q);
+ PaxosServiceMessage *req =
+ (PaxosServiceMessage *)decode_message(cct, 0, q);
rr->op->mark_event("resend forwarded message to leader");
- dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req << dendl;
+ dout(10) << " resend to mon." << mon << " tid " << rr->tid << " " << *req
+ << dendl;
MForward *forward = new MForward(rr->tid, req, rr->con_features,
rr->session->caps);
req->put(); // forward takes its own ref; drop ours.
- forward->client = rr->client_inst;
+ forward->client_type = rr->con->get_peer_type();
+ forward->client_addrs = rr->con->get_peer_addrs();
+ forward->client_socket_addr = rr->con->get_peer_socket_addr();
forward->set_priority(req->get_priority());
send_mon_message(forward, mon);
}