Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MDS(-1), "mds",
- nonce, 0, Messenger::HAS_MANY_CONNECTIONS);
+ nonce, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
int rank = monmap.get_rank(g_conf->name.get_id());
Messenger *msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(rank), "mon",
- 0, 0, Messenger::HAS_MANY_CONNECTIONS);
+ 0, Messenger::HAS_MANY_CONNECTIONS);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
- getpid(), 0,
+ getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
- getpid(), CEPH_FEATURES_ALL,
+ getpid(),
Messenger::HAS_HEAVY_TRAFFIC |
Messenger::HAS_MANY_CONNECTIONS);
Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
- getpid(), 0, Messenger::HEARTBEAT);
+ getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
- getpid(), 0, Messenger::HEARTBEAT);
+ getpid(), Messenger::HEARTBEAT);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_server",
- getpid(), 0, Messenger::HEARTBEAT);
+ getpid(), Messenger::HEARTBEAT);
Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
- getpid());
+ getpid(), 0);
if (!ms_public || !ms_cluster || !ms_hbclient || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
{
// Initialize Messenger
msgr = Messenger::create(g_ceph_context, g_conf->ms_type,
- entity_name_t::MGR(gid), "server", getpid());
+ entity_name_t::MGR(gid), "server", getpid(), 0);
int r = msgr->bind(g_conf->public_addr);
if (r < 0)
return r;
Messenger *Messenger::create(CephContext *cct, const string &type,
entity_name_t name, string lname,
- uint64_t nonce, uint64_t features, uint64_t cflags)
+ uint64_t nonce, uint64_t cflags)
{
int r = -1;
if (type == "random") {
r = dis(random_engine);
}
if (r == 0 || type == "simple")
- return new SimpleMessenger(cct, name, lname, nonce, features);
+ return new SimpleMessenger(cct, name, lname, nonce);
else if (r == 1 || type == "async")
- return new AsyncMessenger(cct, name, lname, nonce, features);
+ return new AsyncMessenger(cct, name, lname, nonce);
#ifdef HAVE_XIO
else if ((type == "xio") &&
cct->check_experimental_feature_enabled("ms-type-xio"))
- return new XioMessenger(cct, name, lname, nonce, features, cflags);
+ return new XioMessenger(cct, name, lname, nonce, cflags);
#endif
lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
return nullptr;
entity_name_t name,
string lname,
uint64_t nonce,
- uint64_t features = 0,
- uint64_t cflags = 0);
+ uint64_t cflags);
/**
* create a new messenger
*/
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce, uint64_t features)
+ string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
dispatch_queue(cct, this, mname),
lock("AsyncMessenger::lock"),
stack->start();
local_worker = stack->get_worker();
local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker);
- local_features = features;
init_local_connection();
reap_handler = new C_handle_reap(this);
unsigned processor_num = 1;
* be a value that will be repeated if the daemon restarts.
*/
AsyncMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce, uint64_t features);
+ string mname, uint64_t _nonce);
/**
* Destroy the AsyncMessenger. Pretty simple since all the work is done
assert(lock.is_locked());
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
- local_connection->set_features(local_features);
+ local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
/// con used for sending messages to ourselves
ConnectionRef local_connection;
- uint64_t local_features;
/**
* @defgroup AsyncMessenger internals
*/
SimpleMessenger::SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce, uint64_t features)
+ string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
accepter(this, _nonce),
dispatch_queue(cct, this, mname),
ANNOTATE_BENIGN_RACE_SIZED(&timeout, sizeof(timeout),
"SimpleMessenger read timeout");
ceph_spin_init(&global_seq_lock);
- local_features = features;
init_local_connection();
}
{
local_connection->peer_addr = my_inst.addr;
local_connection->peer_type = my_inst.name.type();
- local_connection->set_features(local_features);
+ local_connection->set_features(CEPH_FEATURES_ALL);
ms_deliver_handle_fast_connect(local_connection.get());
}
* features The local features bits for the local_connection
*/
SimpleMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce, uint64_t features);
+ string mname, uint64_t _nonce);
/**
* Destroy the SimpleMessenger. Pretty simple since all the work is done
/// con used for sending messages to ourselves
ConnectionRef local_connection;
- uint64_t local_features;
/**
* @defgroup SimpleMessenger internals
}
XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t _nonce, uint64_t features,
+ string mname, uint64_t _nonce,
uint64_t cflags, DispatchStrategy *ds)
: SimplePolicyMessenger(cct, name, mname, _nonce),
XioInit(cct),
/* update class instance count */
nInstances.inc();
- local_features = features;
loop_con->set_features(features);
ldout(cct,2) << "Create msgr: " << this << " instance: "
return NULL;
XioMsg *xmsg = reinterpret_cast<XioMsg*>(mp_mem.addr);
assert(!!xmsg);
- new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt,
- static_cast<XioMessenger*>(
- xcon->get_messenger())->local_features);
+ new (xmsg) XioMsg(m, xcon, mp_mem, ex_cnt, CEPH_FEATURES_ALL);
return xmsg;
}
public:
XioMessenger(CephContext *cct, entity_name_t name,
- string mname, uint64_t nonce, uint64_t features,
+ string mname, uint64_t nonce,
uint64_t cflags = 0,
DispatchStrategy* ds = new QueueStrategy(1));
protected:
virtual void ready()
{ }
-
-public:
- uint64_t local_features;
};
XioCommand* pool_alloc_xio_command(XioConnection *xcon);
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"client",
- getpid());
+ getpid(), 0);
// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(-1),
"simple_server",
- 0 /* nonce */);
+ 0 /* nonce */,
+ 0 /* flags */);
// enable timing prints
messenger->set_magic(MSG_MAGIC_TRACE_CTR);
messenger->set_default_policy(
dout(1) << __func__ << dendl;
msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
- "test-mon-msg", 0);
+ "test-mon-msg", 0, 0);
assert(msg != NULL);
msg->set_default_policy(Messenger::Policy::lossy_client(0,0));
dout(0) << __func__ << " starting messenger at "
stringstream ss;
ss << "client-osd" << whoami;
messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami),
- ss.str().c_str(), getpid()));
+ ss.str().c_str(), getpid(), 0));
Throttle throttler(g_ceph_context, "osd_client_bytes",
g_conf->osd_client_message_size_cap);
addr.parse(serveraddr.c_str());
addr.set_nonce(0);
for (int i = 0; i < jobs; ++i) {
- Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
+ Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
msgr->set_default_policy(Messenger::Policy::lossless_client(0, 0));
entity_inst_t inst(entity_name_t::OSD(0), addr);
ConnectionRef conn = msgr->get_connection(inst);
public:
MessengerServer(string t, string addr, int threads, int delay):
msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay) {
- msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0);
msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
}
~MessengerServer() {
MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
virtual void SetUp() {
lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
- server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
- client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
+ server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
+ client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0));
}
char addr[64];
for (int i = 0; i < servers; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
- "server", getpid()+i);
+ "server", getpid()+i, 0);
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i);
bind_addr.parse(addr);
msgr->bind(bind_addr);
for (int i = 0; i < clients; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
- "client", getpid()+i+servers);
+ "client", getpid()+i+servers, 0);
if (cli_policy.standby) {
snprintf(addr, sizeof(addr), "127.0.0.1:%d", base_port+i+servers);
bind_addr.parse(addr);
// Markdown with external lock
TEST_P(MessengerTest, MarkdownTest) {
- Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
entity_addr_t bind_addr;
bind_addr.parse("127.0.0.1:16800");
g_conf->osd_data,
g_conf->osd_journal);
Messenger *ms = Messenger::create(g_ceph_context, g_conf->ms_type,
- entity_name_t::OSD(0), "make_checker",
- getpid());
+ entity_name_t::OSD(0), "make_checker",
+ getpid(), 0);
ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms->set_default_policy(Messenger::Policy::stateless_server(0, 0));
ms->bind(g_conf->public_addr);