cout << "cfuse[" << getpid() << "]: starting ceph client" << std::endl;
- messenger->start(false); // Do not daemonize here
+ messenger->start(false, getpid()); // Do not daemonize here
// start client
client->init();
return -1;
SimpleMessenger *messenger = new SimpleMessenger();
- messenger->bind();
+ messenger->bind(getpid());
if (dump_journal >= 0) {
Dumper *journal_dumper = new Dumper(messenger, &mc);
journal_dumper->init(dump_journal);
<< " mon_data " << g_conf.mon_data
<< " fsid " << monmap.get_fsid()
<< std::endl;
- g_conf.public_addr = monmap.get_addr(g_conf.name->get_id());
- err = messenger->bind();
+ err = messenger->bind(monmap.get_addr(g_conf.name->get_id()), 0);
if (err < 0)
return 1;
SimpleMessenger *messenger_hb = new SimpleMessenger();
if (client_addr_set)
- client_messenger->bind(g_conf.public_addr);
+ client_messenger->bind(g_conf.public_addr, getpid());
else
- client_messenger->bind();
+ client_messenger->bind(getpid());
entity_addr_t hb_addr; // hb should bind to same ip ad cluster_addr (if specified)
if (cluster_addr_set) {
- cluster_messenger->bind(g_conf.cluster_addr);
+ cluster_messenger->bind(g_conf.cluster_addr, getpid());
hb_addr = g_conf.cluster_addr;
hb_addr.set_port(0);
} else {
- cluster_messenger->bind();
+ cluster_messenger->bind(getpid());
}
- messenger_hb->bind(hb_addr);
+ messenger_hb->bind(hb_addr, getpid());
cout << "starting osd" << whoami
<< " at " << client_messenger->get_ms_addr()
for (int i=0; i<g_conf.num_client; i++) {
messengers[i] = new SimpleMessenger();
messengers[i]->register_entity(entity_name_t(entity_name_t::TYPE_CLIENT,-1));
- messengers[i]->bind();
+ messengers[i]->bind(i * 1000000 + getpid());
mclients[i] = new MonClient();
mclients[i]->build_initial_monmap();
Client *client = new Client(messengers[i], mclients[i]);
static Client *client = NULL;
static MonClient *monclient = NULL;
static SimpleMessenger *messenger = NULL;
+static int instance = 0;
extern "C" int ceph_initialize(int argc, const char **argv)
{
//at last the client
client = new Client(messenger, monclient);
- messenger->start(false); // do not daemonize
+ uint64_t nonce = (uint64_t)++instance * 1000000ull + (uint64_t)getpid();
+ messenger->start(false, nonce); // do not daemonize
client->init();
}
#define dout_prefix *_dout << "librados: "
+static atomic_t rados_instance;
+
+
/*
* Structure of this file
*
messenger->add_dispatcher_head(this);
- messenger->start(false); // do not daemonize
+ uint64_t nonce;
+ rados_instance.inc();
+ nonce = getpid() + (1000000 * (uint64_t)rados_instance.read());
+
+ messenger->start(false, nonce); // do not daemonize
messenger->add_dispatcher_head(this);
dout(1) << "setting wanted keys" << dendl;
messenger->register_entity(entity_name_t::CLIENT());
messenger->add_dispatcher_head(this);
- messenger->start(false); // do not daemonize
+ messenger->start(false, getpid()); // do not daemonize
monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
monc->set_messenger(messenger);
messenger->register_entity(entity_name_t::CLIENT());
messenger->add_dispatcher_head(this);
- messenger->start(true);
+ messenger->start(true, getpid());
monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD|CEPH_ENTITY_TYPE_MDS);
monc->set_messenger(messenger);
messenger = smessenger = new SimpleMessenger();
smessenger->register_entity(entity_name_t::CLIENT(-1));
messenger->add_dispatcher_head(this);
- smessenger->start(false); // do not daemonize!
+ smessenger->start(false, getpid()); // do not daemonize!
temp_msgr = true;
}
* Accepter
*/
-int SimpleMessenger::Accepter::bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
+int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2)
{
// bind to a socket
dout(10) << "accepter.bind" << dendl;
if (messenger->ms_addr.get_port() == 0) {
messenger->ms_addr = listen_addr;
- if (force_nonce >= 0)
- messenger->ms_addr.nonce = force_nonce;
- else
- messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+ messenger->ms_addr.nonce = nonce;
}
messenger->init_local_pipe();
-int SimpleMessenger::bind(entity_addr_t &bind_addr, int64_t force_nonce)
+int SimpleMessenger::bind(entity_addr_t bind_addr, int64_t nonce)
{
lock.Lock();
if (started) {
lock.Unlock();
// bind to a socket
- return accepter.bind(force_nonce, bind_addr);
+ return accepter.bind(nonce, bind_addr);
}
int SimpleMessenger::rebind(int avoid_port)
return 0;
}
-int SimpleMessenger::start(bool daemonize)
+int SimpleMessenger::start(bool daemonize, uint64_t nonce)
{
// register at least one entity, first!
assert(my_type >= 0);
lock.Unlock();
return 0;
}
-
- if (!did_bind)
- ms_addr.nonce = getpid();
+
+ if (!did_bind) {
+ ms_addr.nonce = nonce;
+ }
dout(1) << "messenger.start" << dendl;
started = true;
void *entry();
void stop();
- int bind(int64_t force_nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
+ int bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1=0, int avoid_port2=0);
int rebind(int avoid_port);
int start();
} accepter;
//void set_listen_addr(tcpaddr_t& a);
- int bind(entity_addr_t& bind_addr, int64_t force_nonce = -1);
- int bind(int64_t force_nonce = -1) { return bind(g_conf.public_addr, force_nonce); }
- int start(bool daemonize);
+ int bind(entity_addr_t bind_addr, int64_t nonce);
+ int bind(uint64_t nonce) {
+ return bind(g_conf.public_addr, nonce);
+ }
+ int start(bool daemonize, uint64_t nonce); // if we didn't bind
+ int start(bool daemonize) { // if we did
+ assert(did_bind);
+ start(daemonize, 0);
+ }
void wait();
int write_pid_file(int pid);
// start up network
messenger = new SimpleMessenger();
messenger->register_entity(entity_name_t::CLIENT());
- messenger->start(false); // do not daemonize
+ messenger->start(false, getpid()); // do not daemonize
messenger->add_dispatcher_head(&dispatcher);
g.lock.Lock();