std::string public_msgr_type = g_conf()->ms_public_type.empty() ? g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MDS(-1), "mds",
- Messenger::get_random_nonce(),
- Messenger::HAS_MANY_CONNECTIONS);
+ Messenger::get_random_nonce());
if (!msgr)
forker.exit(1);
msgr->set_cluster_protocol(CEPH_MDS_PROTOCOL);
int rank = monmap.get_rank(g_conf()->name.get_id());
std::string public_msgr_type = g_conf()->ms_public_type.empty() ? g_conf().get_val<std::string>("ms_type") : g_conf()->ms_public_type;
Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
- entity_name_t::MON(rank), "mon",
- 0, // zero nonce
- Messenger::HAS_MANY_CONNECTIONS);
+ entity_name_t::MON(rank), "mon", 0);
if (!msgr)
exit(1);
msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
Messenger *mgr_msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MON(rank), "mon-mgrc",
- Messenger::get_pid_nonce(),
- 0);
+ Messenger::get_pid_nonce());
if (!mgr_msgr) {
derr << "unable to create mgr_msgr" << dendl;
prefork.exit(1);
cluster_msg_type = cluster_msg_type.empty() ? msg_type : cluster_msg_type;
uint64_t nonce = Messenger::get_pid_nonce();
Messenger *ms_public = Messenger::create(g_ceph_context, public_msg_type,
- entity_name_t::OSD(whoami), "client",
- nonce,
- Messenger::HAS_HEAVY_TRAFFIC |
- Messenger::HAS_MANY_CONNECTIONS);
+ entity_name_t::OSD(whoami), "client", nonce);
Messenger *ms_cluster = Messenger::create(g_ceph_context, cluster_msg_type,
- entity_name_t::OSD(whoami), "cluster",
- nonce,
- Messenger::HAS_HEAVY_TRAFFIC |
- Messenger::HAS_MANY_CONNECTIONS);
+ entity_name_t::OSD(whoami), "cluster", nonce);
Messenger *ms_hb_back_client = Messenger::create(g_ceph_context, cluster_msg_type,
- entity_name_t::OSD(whoami), "hb_back_client",
- nonce, Messenger::HEARTBEAT);
+ entity_name_t::OSD(whoami), "hb_back_client", nonce);
Messenger *ms_hb_front_client = Messenger::create(g_ceph_context, public_msg_type,
- entity_name_t::OSD(whoami), "hb_front_client",
- nonce, Messenger::HEARTBEAT);
+ entity_name_t::OSD(whoami), "hb_front_client", nonce);
Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, cluster_msg_type,
- entity_name_t::OSD(whoami), "hb_back_server",
- nonce, Messenger::HEARTBEAT);
+ entity_name_t::OSD(whoami), "hb_back_server", nonce);
Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, public_msg_type,
- entity_name_t::OSD(whoami), "hb_front_server",
- nonce, Messenger::HEARTBEAT);
+ entity_name_t::OSD(whoami), "hb_front_server", nonce);
Messenger *ms_objecter = Messenger::create(g_ceph_context, public_msg_type,
- entity_name_t::OSD(whoami), "ms_objecter",
- nonce, 0);
+ entity_name_t::OSD(whoami), "ms_objecter", nonce);
if (!ms_public || !ms_cluster || !ms_hb_front_client || !ms_hb_back_client || !ms_hb_back_server || !ms_hb_front_server || !ms_objecter)
forker.exit(1);
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
msgr = Messenger::create(g_ceph_context, public_msgr_type,
entity_name_t::MGR(gid),
"mgr",
- Messenger::get_pid_nonce(),
- 0);
+ Messenger::get_pid_nonce());
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
msgr->set_auth_client(monc);
cct->_conf.get_val<std::string>("ms_type"),
entity_name_t::MGR(),
"mgr",
- Messenger::get_pid_nonce(),
- 0)),
+ Messenger::get_pid_nonce())),
objecter{g_ceph_context, client_messenger.get(), &monc, poolctx, 0, 0},
client{client_messenger.get(), &monc, &objecter},
mgrc(g_ceph_context, client_messenger.get(), &monc.monmap),
std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
auto nonce = get_random_nonce();
return Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(),
- std::move(lname), nonce, 0);
+ std::move(lname), nonce);
}
uint64_t Messenger::get_pid_nonce()
Messenger *Messenger::create(CephContext *cct, const std::string &type,
entity_name_t name, std::string lname,
- uint64_t nonce, uint64_t cflags)
+ uint64_t nonce)
{
int r = -1;
if (type == "random") {
Interceptor *interceptor = nullptr;
#endif
- /**
- * Various Messenger conditional config/type flags to allow
- * different "transport" Messengers to tune themselves
- */
- static const int HAS_HEAVY_TRAFFIC = 0x0001;
- static const int HAS_MANY_CONNECTIONS = 0x0002;
- static const int HEARTBEAT = 0x0004;
-
/**
* The CephContext this Messenger uses. Many other components initialize themselves
* from this value.
* @param lname logical name of the messenger in this process (e.g., "client")
* @param nonce nonce value to uniquely identify this instance on the current host
* @param features bits for the local connection
- * @param cflags general std::set of flags to configure transport resources
*/
static Messenger *create(CephContext *cct,
const std::string &type,
entity_name_t name,
std::string lname,
- uint64_t nonce,
- uint64_t cflags);
+ uint64_t nonce);
static uint64_t get_random_nonce();
static uint64_t get_pid_nonce();
Server(CephContext* cct, const entity_inst_t& entity)
: dummy_auth(cct), dispatcher(cct)
{
- msgr.reset(Messenger::create(cct, "async",
- entity.name, "pong", entity.addr.get_nonce(), 0));
+ msgr.reset(Messenger::create(cct, "async", entity.name, "pong", entity.addr.get_nonce()));
dummy_auth.auth_registry.refresh_config();
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
Client(CephContext *cct)
: dummy_auth(cct), dispatcher(cct)
{
- msgr.reset(Messenger::create(cct, "async",
- entity_name_t::CLIENT(-1), "ping",
- getpid(), 0));
+ msgr.reset(Messenger::create(cct, "async", entity_name_t::CLIENT(-1), "ping", getpid()));
dummy_auth.auth_registry.refresh_config();
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
msgr->set_default_policy(Messenger::Policy::lossy_client(0));
private:
void init(entity_addr_t test_peer_addr, SocketPolicy policy) {
- peer_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(4), "TestPeer", 4, 0));
+ peer_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(4), "TestPeer", 4));
dummy_auth.auth_registry.refresh_config();
peer_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
peer_msgr->set_default_policy(policy);
}
void init(entity_addr_t cmd_peer_addr) {
- cmd_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(3), "CmdSrv", 3, 0));
+ cmd_msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(3), "CmdSrv", 3));
dummy_auth.auth_registry.refresh_config();
cmd_msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cmd_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
std::string lname = o->is_receiver ?
"receiver" : "sender";
- /* Does anybody really uses those flags in messenger? Seems not. */
- unsigned flags = o->is_receiver ?
- Messenger::HAS_HEAVY_TRAFFIC |
- Messenger::HAS_MANY_CONNECTIONS : 0;
-
std::string ms_type = o->ms_type != CEPH_MSGR_TYPE_UNDEF ?
ceph_msgr_types[o->ms_type] :
g_ceph_context->_conf.get_val<std::string>("ms_type");
/* o->td__>pid doesn't set value, so use getpid() instead*/
auto nonce = o->is_receiver ? 0 : (getpid() + o->td__->thread_number);
Messenger *msgr = Messenger::create(g_ceph_context, ms_type.c_str(),
- ename, lname, nonce, flags);
+ ename, lname, nonce);
if (o->is_receiver) {
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
msgr->bind(hostname_to_addr(o));
std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
msg = Messenger::create(cct, public_msgr_type, entity_name_t::CLIENT(-1),
- "test-mon-msg", 0, 0);
+ "test-mon-msg", 0);
ceph_assert(msg != NULL);
msg->set_default_policy(Messenger::Policy::lossy_client(0));
dout(0) << __func__ << " starting messenger at "
ss << "client-osd" << whoami;
std::string public_msgr_type = cct->_conf->ms_public_type.empty() ? cct->_conf.get_val<std::string>("ms_type") : cct->_conf->ms_public_type;
messenger.reset(Messenger::create(cct, public_msgr_type, entity_name_t::OSD(whoami),
- ss.str().c_str(), getpid(), 0));
+ ss.str().c_str(), getpid()));
Throttle throttler(g_ceph_context, "osd_client_bytes",
g_conf()->osd_client_message_size_cap);
addr.set_nonce(0);
dummy_auth.auth_registry.refresh_config();
for (int i = 0; i < jobs; ++i) {
- Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i, 0);
+ Messenger *msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(0), "client", getpid()+i);
msgr->set_default_policy(Messenger::Policy::lossless_client(0));
msgr->set_auth_client(&dummy_auth);
msgr->start();
MessengerServer(const string &t, const string &addr, int threads, int delay):
msgr(NULL), type(t), bindaddr(addr), dispatcher(threads, delay),
dummy_auth(g_ceph_context) {
- msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0, 0);
+ msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0), "server", 0);
msgr->set_default_policy(Messenger::Policy::stateless_server(0));
dummy_auth.auth_registry.refresh_config();
msgr->set_auth_server(&dummy_auth);
}
void SetUp() override {
lderr(g_ceph_context) << __func__ << " start set up " << GetParam() << dendl;
- server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
- client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid(), 0);
+ server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
server_msgr->set_default_policy(Messenger::Policy::stateless_server(0));
client_msgr->set_default_policy(Messenger::Policy::lossy_client(0));
server_msgr->set_auth_client(&dummy_auth);
char addr[64];
for (int i = 0; i < servers; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::OSD(0),
- "server", getpid()+i, 0);
+ "server", getpid()+i);
snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
base_port+i);
bind_addr.parse(addr);
for (int i = 0; i < clients; ++i) {
msgr = Messenger::create(g_ceph_context, type, entity_name_t::CLIENT(-1),
- "client", getpid()+i+servers, 0);
+ "client", getpid()+i+servers);
if (cli_policy.standby) {
snprintf(addr, sizeof(addr), "v2:127.0.0.1:%d",
base_port+i+servers);
// Markdown with external lock
TEST_P(MessengerTest, MarkdownTest) {
- Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid(), 0);
+ Messenger *server_msgr2 = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
MarkdownDispatcher cli_dispatcher(false), srv_dispatcher(true);
DummyAuthClientServer dummy_auth(g_ceph_context);
dummy_auth.auth_registry.refresh_config();
std::string cluster_msgr_type = g_conf()->ms_cluster_type.empty() ? g_conf().get_val<std::string>("ms_type") : g_conf()->ms_cluster_type;
Messenger *ms = Messenger::create(g_ceph_context, cluster_msgr_type,
entity_name_t::OSD(0), "make_checker",
- getpid(), 0);
+ getpid());
ms->set_cluster_protocol(CEPH_OSD_PROTOCOL);
ms->set_default_policy(Messenger::Policy::stateless_server(0));
ms->bind(g_conf()->public_addr);
Server(CephContext* cct, unsigned msg_len)
: dummy_auth(cct), dispatcher(cct, msg_len)
{
- msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0, 0));
+ msgr.reset(Messenger::create(cct, "async", entity_name_t::OSD(0), "server", 0));
dummy_auth.auth_registry.refresh_config();
msgr->set_cluster_protocol(CEPH_OSD_PROTOCOL);
msgr->set_default_policy(Messenger::Policy::stateless_server(0));