goto out_mc_start_failed;
// start up network
- messenger = Messenger::create(g_ceph_context,
+ messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::CLIENT(), "client",
getpid());
messenger->set_default_policy(Messenger::Policy::lossy_client(0, 0));
"MDS names may not start with a numeric digit." << dendl;
}
- Messenger *messenger = Messenger::create(g_ceph_context,
+ Messenger *messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MDS(-1), "mds",
getpid());
messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL);
// bind
int rank = monmap.get_rank(g_conf->name.get_id());
- Messenger *messenger = Messenger::create(g_ceph_context,
+ Messenger *messenger = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::MON(rank),
"mon",
0);
<< TEXT_NORMAL << dendl;
}
- Messenger *ms_public = Messenger::create(g_ceph_context,
+ Messenger *ms_public = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "client",
getpid());
- Messenger *ms_cluster = Messenger::create(g_ceph_context,
+ Messenger *ms_cluster = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "cluster",
getpid());
- Messenger *ms_hbclient = Messenger::create(g_ceph_context,
+ Messenger *ms_hbclient = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hbclient",
getpid());
- Messenger *ms_hb_back_server = Messenger::create(g_ceph_context,
+ Messenger *ms_hb_back_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_back_server",
getpid());
- Messenger *ms_hb_front_server = Messenger::create(g_ceph_context,
+ Messenger *ms_hb_front_server = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "hb_front_server",
getpid());
- Messenger *ms_objecter = Messenger::create(g_ceph_context,
+ Messenger *ms_objecter = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t::OSD(whoami), "ms_objecter",
getpid());
ms_cluster->set_cluster_protocol(CEPH_OSD_PROTOCOL);
cout << "ceph-syn: starting " << g_conf->num_client << " syn client(s)" << std::endl;
for (int i=0; i<g_conf->num_client; i++) {
- messengers[i] = Messenger::create(g_ceph_context,
+ messengers[i] = Messenger::create(g_ceph_context, g_conf->ms_type,
entity_name_t(entity_name_t::TYPE_CLIENT,-1), "synclient",
i * 1000000 + getpid());
messengers[i]->bind(g_conf->public_addr);
goto fail;
//network connection
- messenger = Messenger::create(cct, entity_name_t::CLIENT(), "client", msgr_nonce);
+ messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(), "client", msgr_nonce);
//at last the client
ret = -CEPHFS_ERROR_NEW_CLIENT; //defined in libcephfs.h;
err = -ENOMEM;
nonce = getpid() + (1000000 * (uint64_t)rados_instance.inc());
- messenger = Messenger::create(cct, entity_name_t::CLIENT(-1),
+ messenger = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"radosclient", nonce);
if (!messenger)
goto out;
bool temp_msgr = false;
Messenger* smessenger = NULL;
if (!messenger) {
- messenger = smessenger = Messenger::create(cct,
+ messenger = smessenger = Messenger::create(cct, cct->_conf->ms_type,
entity_name_t::CLIENT(-1),
"temp_mon_client", getpid());
messenger->add_dispatcher_head(this);
MonClientPinger *pinger = new MonClientPinger(cct, result_reply);
- Messenger *smsgr = Messenger::create(cct,
+ Messenger *smsgr = Messenger::create(cct, cct->_conf->ms_type,
entity_name_t::CLIENT(-1),
"temp_ping_client", getpid());
smsgr->add_dispatcher_head(pinger);
#include "msg/simple/SimpleMessenger.h"
#include "msg/async/AsyncMessenger.h"
-Messenger *Messenger::create(CephContext *cct,
- entity_name_t name,
- string lname,
- uint64_t nonce)
+Messenger *Messenger::create(CephContext *cct, const string &type,
+ entity_name_t name, string lname,
+ uint64_t nonce)
{
int r = -1;
- if (cct->_conf->ms_type == "random")
+ if (type == "random")
r = rand() % 2;
- if (r == 0 || cct->_conf->ms_type == "simple")
+ if (r == 0 || type == "simple")
return new SimpleMessenger(cct, name, lname, nonce);
- else if (r == 1 || cct->_conf->ms_type == "async")
+ else if (r == 1 || type == "async")
return new AsyncMessenger(cct, name, lname, nonce);
lderr(cct) << "unrecognized ms_type '" << cct->_conf->ms_type << "'" << dendl;
return NULL;
* available or specified via the configuration in cct.
*
* @param cct context
+ * @param type name of messenger type
* @param name entity name to register
* @param lname logical name of the messenger in this process (e.g., "client")
* @param nonce nonce value to uniquely identify this instance on the current host
*/
static Messenger *create(CephContext *cct,
+ const string &type,
entity_name_t name,
string lname,
uint64_t nonce);
int init_messenger() {
dout(1) << __func__ << dendl;
- msg = Messenger::create(cct, entity_name_t::CLIENT(-1),
+ msg = Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"test-mon-msg", 0);
assert(msg != NULL);
msg->set_default_policy(Messenger::Policy::lossy_client(0,0));
return err;
}
- messenger.reset(Messenger::create(cct, entity_name_t::CLIENT(-1),
+ messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::CLIENT(-1),
"stubclient", getpid()));
assert(messenger.get() != NULL);
<< cct->_conf->auth_supported << dendl;
stringstream ss;
ss << "client-osd" << whoami;
- messenger.reset(Messenger::create(cct, entity_name_t::OSD(whoami),
+ messenger.reset(Messenger::create(cct, cct->_conf->ms_type, entity_name_t::OSD(whoami),
ss.str().c_str(), getpid()));
Throttle throttler(g_ceph_context, "osd_client_bytes",
#include "msg/msg_types.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
-#include "msg/simple/SimpleMessenger.h"
-#include "msg/async/AsyncMessenger.h"
#include "msg/Connection.h"
#include "messages/MPing.h"
MessengerTest(): server_msgr(NULL), client_msgr(NULL) {}
virtual void SetUp() {
cerr << __func__ << " start set up " << GetParam() << std::endl;
- if (strcmp(GetParam(), "simple")) {
- server_msgr = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(0), "server", getpid());
- client_msgr = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT(-1), "client", getpid());
- } else if (strcmp(GetParam(), "async")) {
- server_msgr = new AsyncMessenger(g_ceph_context, entity_name_t::OSD(0), "server", getpid());
- client_msgr = new AsyncMessenger(g_ceph_context, entity_name_t::CLIENT(-1), "client", getpid());
- server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
- }
+ server_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::OSD(0), "server", getpid());
+ client_msgr = Messenger::create(g_ceph_context, string(GetParam()), entity_name_t::CLIENT(-1), "client", getpid());
server_msgr->set_default_policy(Messenger::Policy::stateless_server(0, 0));
client_msgr->set_default_policy(Messenger::Policy::lossy_client(0, 0));
}
waiting_for_mds_map(NULL)
{
monc = new MonClient(g_ceph_context);
- messenger = Messenger::create(g_ceph_context, entity_name_t::CLIENT(), "mds", getpid());
+ messenger = Messenger::create(g_ceph_context, g_ceph_context->_conf->ms_type, entity_name_t::CLIENT(), "mds", getpid());
mdsmap = new MDSMap();
objecter = new Objecter(g_ceph_context, messenger, monc, 0, 0);
}