return -1;
// start up network
- SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
- messenger->register_entity(entity_name_t::CLIENT());
+ SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT());
Client *client = new Client(messenger, &mc);
if (filer_flags) {
client->set_filer_flags(filer_flags);
const std::string &dump_file, int rank)
{
common_init_finish(g_ceph_context);
- SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
+ SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
+ entity_name_t::CLIENT());
int r = messenger->bind(g_conf->public_addr, getpid());
if (r < 0)
return r;
global_print_banner();
- SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
+ SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
+ entity_name_t::MDS(-1));
messenger->set_cluster_protocol(CEPH_MDS_PROTOCOL);
int r = messenger->bind(g_conf->public_addr, getpid());
cout << "starting " << g_conf->name << " at " << messenger->get_ms_addr()
<< std::endl;
- messenger->register_entity(entity_name_t::MDS(-1));
uint64_t supported =
CEPH_FEATURE_UID |
CEPH_FEATURE_NOSRCADDR |
}
// bind
- SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context);
- messenger->set_cluster_protocol(CEPH_MON_PROTOCOL);
-
int rank = monmap.get_rank(g_conf->name.get_id());
+ SimpleMessenger *messenger = new SimpleMessenger(g_ceph_context,
+ entity_name_t::MON(rank));
+ messenger->set_cluster_protocol(CEPH_MON_PROTOCOL);
global_print_banner();
return 1;
// start monitor
- messenger->register_entity(entity_name_t::MON(rank));
messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
mon = new Monitor(g_ceph_context, g_conf->name.get_id(), &store, messenger, &monmap);
<< TEXT_NORMAL << dendl;
}
- SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context);
- SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context);
- SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context);
- SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context);
+ SimpleMessenger *client_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
+ SimpleMessenger *cluster_messenger = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
+ SimpleMessenger *messenger_hbin = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
+ SimpleMessenger *messenger_hbout = new SimpleMessenger(g_ceph_context, entity_name_t::OSD(whoami));
cluster_messenger->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbin->set_cluster_protocol(CEPH_OSD_PROTOCOL);
messenger_hbout->set_cluster_protocol(CEPH_OSD_PROTOCOL);
"(no journal)" : g_conf->osd_journal)
<< std::endl;
- client_messenger->register_entity(entity_name_t::OSD(whoami));
- cluster_messenger->register_entity(entity_name_t::OSD(whoami));
- messenger_hbin->register_entity(entity_name_t::OSD(whoami));
- messenger_hbout->register_entity(entity_name_t::OSD(whoami));
-
Throttle client_throttler(g_conf->osd_client_message_size_cap);
uint64_t supported =
cout << "ceph-syn: starting " << g_conf->num_client << " syn client(s)" << std::endl;
for (int i=0; i<g_conf->num_client; i++) {
- messengers[i] = new SimpleMessenger(g_ceph_context);
- messengers[i]->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1));
+ messengers[i] = new SimpleMessenger(g_ceph_context, entity_name_t(entity_name_t::TYPE_CLIENT,-1));
messengers[i]->bind(g_conf->public_addr, i * 1000000 + getpid());
mclients[i] = new MonClient(g_ceph_context);
mclients[i]->build_initial_monmap();
goto fail;
//network connection
- messenger = new SimpleMessenger(cct);
- if (!messenger->register_entity(entity_name_t::CLIENT())) {
- messenger->destroy();
- messenger = NULL;
- ret = -1001;
- goto fail;
- }
+ messenger = new SimpleMessenger(cct, entity_name_t::CLIENT());
//at last the client
ret = -1002;
goto out;
err = -ENOMEM;
- messenger = new SimpleMessenger(cct);
+ messenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
if (!messenger)
goto out;
ldout(cct, 1) << "starting msgr at " << messenger->get_ms_addr() << dendl;
- messenger->register_entity(entity_name_t::CLIENT(-1));
ldout(cct, 1) << "starting objecter" << dendl;
err = -ENOMEM;
objecter->set_client_incarnation(0);
- messenger->register_entity(entity_name_t::CLIENT());
messenger->add_dispatcher_head(this);
messenger->start_with_nonce(getpid());
objecter->set_client_incarnation(0);
- messenger->register_entity(entity_name_t::CLIENT());
messenger->add_dispatcher_head(this);
messenger->start_with_nonce(getpid());
bool temp_msgr = false;
SimpleMessenger* smessenger = NULL;
if (!messenger) {
- messenger = smessenger = new SimpleMessenger(cct);
- smessenger->register_entity(entity_name_t::CLIENT(-1));
+ messenger = smessenger = new SimpleMessenger(cct, entity_name_t::CLIENT(-1));
messenger->add_dispatcher_head(this);
smessenger->start_with_nonce(getpid());
temp_msgr = true;
return ms_deliver_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid);
}
-
-
-/* register_entity
- */
-bool SimpleMessenger::register_entity(entity_name_t name)
-{
- ldout(cct,10) << "register_entity " << name << dendl;
- lock.Lock();
-
- if (!destination_stopped) { //already have a working entity set
- lock.Unlock();
- return false;
- }
-
- // set it up
- Messenger::set_myname(name);
- // now i know my type.
- if (my_type >= 0)
- assert(my_type == name.type());
- else
- my_type = name.type();
-
- destination_stopped = false;
-
- ldout(cct,10) << "register_entity " << name << " at " << get_myaddr() << dendl;
-
- msgr->init_local_pipe();
-
- lock.Unlock();
- return true;
-}
-
void SimpleMessenger::submit_message(Message *m, Pipe *pipe)
{
lock.Lock();
int get_proto_version(int peer_type, bool connect);
public:
- SimpleMessenger(CephContext *cct) :
- Messenger(cct, entity_name_t()),
+ SimpleMessenger(CephContext *cct, entity_name_t name) :
+ Messenger(cct, name),
accepter(this),
lock("SimpleMessenger::lock"), started(false), did_bind(false),
dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true),
- destination_stopped(true), my_type(-1),
+ destination_stopped(false), my_type(name.type()),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
reaper_thread(this), reaper_started(false), reaper_stop(false),
dispatch_thread(this), msgr(this),
{
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
+ init_local_pipe();
}
~SimpleMessenger() {
delete dispatch_queue.local_pipe;
bool verify_authorizer(Connection *con, int peer_type, int protocol, bufferlist& auth, bufferlist& auth_reply,
bool& isvalid);
- bool register_entity(entity_name_t addr);
-
void submit_message(Message *m, const entity_addr_t& addr, int dest_type, bool lazy);
void submit_message(Message *m, Pipe *pipe);
std::string sss(ss.str());
g_ceph_context->_conf->set_val("public_addr", sss.c_str());
g_ceph_context->_conf->apply_changes(NULL);
- SimpleMessenger *rank = new SimpleMessenger(g_ceph_context);
+ SimpleMessenger *rank = new SimpleMessenger(g_ceph_context,
+ entity_name_t::MON(whoami));
int err = rank->bind(g_ceph_context->_conf->public_addr, getpid());
if (err < 0)
return 1;
// start monitor
- rank->register_entity(entity_name_t::MON(whoami));
messenger = rank;
messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
messenger->add_dispatcher_head(&dispatcher);
tok = tok_init(NULL);
// start up network
- messenger = new SimpleMessenger(g_ceph_context);
- messenger->register_entity(entity_name_t::CLIENT());
+ messenger = new SimpleMessenger(g_ceph_context, entity_name_t::CLIENT());
messenger->start_with_nonce(getpid());
ctx->dispatcher = new Admin(ctx.get());
messenger->add_dispatcher_head(ctx->dispatcher);