// help find socket resource leaks
-int sockopen = 0;
+//static int sockopen = 0;
#define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl;
#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
* Accepter
*/
-void simplemessenger_sigint(int r)
-{
- rank.sigint();
- if (old_sigint_handler)
- old_sigint_handler(r);
-}
-
-void Rank::sigint()
-{
- lock.Lock();
- derr(0) << "got control-c, exiting" << dendl;
-
- // force close listener socket
- if (accepter.listen_sd >= 0) {
- ::close(accepter.listen_sd);
- accepter.listen_sd = -1;
- closed_socket();
- }
-
- // force close all pipe sockets, too
- for (hash_map<entity_addr_t, Pipe*>::iterator p = rank_pipe.begin();
- p != rank_pipe.end();
- ++p)
- p->second->force_close();
-
- lock.Unlock();
-}
-
-
-
void noop_signal_handler(int s)
{
//dout(0) << "blah_handler got " << s << dendl;
return -errno;
}
- rank.rank_addr = g_my_addr;
- if (rank.rank_addr != entity_addr_t())
- rank.need_addr = false;
+ rank->rank_addr = g_my_addr;
+ if (rank->rank_addr != entity_addr_t())
+ rank->need_addr = false;
else
- rank.need_addr = true;
- if (rank.rank_addr.get_port() == 0) {
+ rank->need_addr = true;
+ if (rank->rank_addr.get_port() == 0) {
entity_addr_t tmp;
tmp.ipaddr = listen_addr;
- rank.rank_addr.set_port(tmp.get_port());
+ rank->rank_addr.set_port(tmp.get_port());
if (force_nonce >= 0)
- rank.rank_addr.nonce = force_nonce;
+ rank->rank_addr.nonce = force_nonce;
else
- rank.rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+ rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
}
- rank.rank_addr.erank = 0;
+ rank->rank_addr.erank = 0;
- dout(1) << "accepter.bind rank_addr is " << rank.rank_addr
- << " need_addr=" << rank.need_addr
+ dout(1) << "accepter.bind rank_addr is " << rank->rank_addr
+ << " need_addr=" << rank->need_addr
<< dendl;
return 0;
}
int Rank::Accepter::start()
{
dout(1) << "accepter.start" << dendl;
- // set up signal handler
- //old_sigint_handler = signal(SIGINT, simplemessenger_sigint);
// set a harmless handle for SIGUSR1 (we'll use it to stop the accepter)
struct sigaction sa;
dout(0) << "accepter could't set TCP_NODELAY: " << strerror(errno) << dendl;
}
- rank.lock.Lock();
- if (rank.num_local > 0) {
- Pipe *p = new Pipe(Pipe::STATE_ACCEPTING);
+ rank->lock.Lock();
+ if (rank->num_local > 0) {
+ Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
p->sd = sd;
p->start_reader();
- rank.pipes.insert(p);
+ rank->pipes.insert(p);
}
- rank.lock.Unlock();
+ rank->lock.Unlock();
} else {
dout(0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << strerror(errno) << dendl;
if (++errors > 4)
int Rank::start(bool nodaemon)
{
// register at least one entity, first!
- assert(rank.my_type >= 0);
+ assert(my_type >= 0);
lock.Lock();
if (started) {
*/
Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
{
- assert(rank.lock.is_locked());
- assert(addr != rank.rank_addr);
+ assert(lock.is_locked());
+ assert(addr != rank_addr);
dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
// create pipe
- Pipe *pipe = new Pipe(Pipe::STATE_CONNECTING);
+ Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
pipe->policy = p;
pipe->peer_addr = addr;
pipe->start_writer();
/* register_entity
*/
-Rank::EntityMessenger *Rank::register_entity(entity_name_t name)
+Rank::Endpoint *Rank::register_entity(entity_name_t name)
{
dout(10) << "register_entity " << name << dendl;
lock.Lock();
// create messenger
int erank = max_local;
- EntityMessenger *msgr = new EntityMessenger(name, erank);
+ Endpoint *msgr = new Endpoint(this, name, erank);
// now i know my type.
if (my_type >= 0)
}
-void Rank::unregister_entity(EntityMessenger *msgr)
+void Rank::unregister_entity(Endpoint *msgr)
{
lock.Lock();
dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
/**********************************
- * EntityMessenger
+ * Endpoint
*/
-void Rank::EntityMessenger::dispatch_entry()
+void Rank::Endpoint::dispatch_entry()
{
lock.Lock();
while (!stop) {
dout(15) << "dispatch: ending loop " << dendl;
// deregister
- rank.unregister_entity(this);
+ rank->unregister_entity(this);
put();
}
-void Rank::EntityMessenger::ready()
+void Rank::Endpoint::ready()
{
dout(10) << "ready " << get_myaddr() << dendl;
assert(!dispatch_thread.is_started());
}
-int Rank::EntityMessenger::shutdown()
+int Rank::Endpoint::shutdown()
{
dout(10) << "shutdown " << get_myaddr() << dendl;
return 0;
}
-void Rank::EntityMessenger::suicide()
+void Rank::Endpoint::suicide()
{
dout(10) << "suicide " << get_myaddr() << dendl;
shutdown();
// hmm, or exit(0)?
}
-void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst)
+void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
{
- rank.lock.Lock();
+ rank->lock.Lock();
{
- if (rank.rank_pipe.count(inst.addr) == 0)
- rank.connect_rank(inst.addr, rank.policy_map[inst.name.type()]);
+ if (rank->rank_pipe.count(inst.addr) == 0)
+ rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
}
- rank.lock.Unlock();
+ rank->lock.Unlock();
}
-int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
<< " " << m
<< dendl;
- rank.submit_message(m, dest.addr);
+ rank->submit_message(m, dest.addr);
return 0;
}
-int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
<< " " << m
<< dendl;
- rank.submit_message(m, dest.addr);
+ rank->submit_message(m, dest.addr);
return 0;
}
-int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest)
+int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
{
// set envelope
m->set_source_inst(_myinst);
<< " " << m
<< dendl;
- rank.submit_message(m, dest.addr, true);
+ rank->submit_message(m, dest.addr, true);
return 0;
}
-void Rank::EntityMessenger::reset_myname(entity_name_t newname)
+void Rank::Endpoint::reset_myname(entity_name_t newname)
{
entity_name_t oldname = get_myname();
dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
}
-void Rank::EntityMessenger::mark_down(entity_addr_t a)
+void Rank::Endpoint::mark_down(entity_addr_t a)
{
- rank.mark_down(a);
+ rank->mark_down(a);
}
void Rank::mark_down(entity_addr_t addr)
#define dout_prefix _pipe_prefix()
ostream& Rank::Pipe::_pipe_prefix() {
return *_dout << dbeginl << pthread_self()
- << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this
+ << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
}
// and my addr
- rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr));
+ rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
if (rc < 0) {
dout(10) << "accept couldn't write my addr" << dendl;
state = STATE_CLOSED;
<< " global_seq " << connect.global_seq
<< dendl;
- rank.lock.Lock();
+ rank->lock.Lock();
// note peer's type, flags
- policy = rank.policy_map[connect.host_type]; /* apply policy */
+ policy = rank->policy_map[connect.host_type]; /* apply policy */
lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
memset(&reply, 0, sizeof(reply));
// existing?
- if (rank.rank_pipe.count(peer_addr)) {
- existing = rank.rank_pipe[peer_addr];
+ if (rank->rank_pipe.count(peer_addr)) {
+ existing = rank->rank_pipe[peer_addr];
existing->lock.Lock();
if (connect.global_seq < existing->peer_global_seq) {
reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
existing->lock.Unlock();
- rank.lock.Unlock();
+ rank->lock.Unlock();
goto reply;
} else {
dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq; // so we can send it below..
existing->lock.Unlock();
- rank.lock.Unlock();
+ rank->lock.Unlock();
goto reply;
}
}
if (connect.connect_seq == existing->connect_seq) {
// connection race?
- if (peer_addr < rank.rank_addr) {
+ if (peer_addr < rank->rank_addr) {
// incoming wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", replacing my attempt" << dendl;
// our existing outgoing wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > rank.rank_addr);
+ assert(peer_addr > rank->rank_addr);
assert(existing->state == STATE_CONNECTING); // this will win
reply.tag = CEPH_MSGR_TAG_WAIT;
existing->lock.Unlock();
- rank.lock.Unlock();
+ rank->lock.Unlock();
goto reply;
}
}
<< ", " << existing << ".cseq = " << existing->connect_seq
<< "), sending RESETSESSION" << dendl;
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- rank.lock.Unlock();
+ rank->lock.Unlock();
existing->lock.Unlock();
goto reply;
}
else if (connect.connect_seq > 0) {
// we reset, and they are opening a new session
dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
- rank.lock.Unlock();
+ rank->lock.Unlock();
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
goto reply;
} else {
// send READY reply
reply.tag = CEPH_MSGR_TAG_READY;
- reply.global_seq = rank.get_global_seq();
+ reply.global_seq = rank->get_global_seq();
reply.connect_seq = connect_seq;
reply.flags = 0;
if (policy.lossy_tx)
// ok!
register_pipe();
- rank.lock.Unlock();
+ rank->lock.Unlock();
rc = tcp_write(sd, (char*)&reply, sizeof(reply));
if (rc < 0)
fail:
- rank.lock.Unlock();
+ rank->lock.Unlock();
fail_unlocked:
lock.Lock();
state = STATE_CLOSED;
closed_socket();
}
__u32 cseq = connect_seq;
- __u32 gseq = rank.get_global_seq();
+ __u32 gseq = rank->get_global_seq();
// stop reader thrad
join_reader();
// identify myself
memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&rank.rank_addr;
- msgvec[0].iov_len = sizeof(rank.rank_addr);
+ msgvec[0].iov_base = (char*)&rank->rank_addr;
+ msgvec[0].iov_len = sizeof(rank->rank_addr);
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
goto fail;
}
- dout(10) << "connect sent my addr " << rank.rank_addr << dendl;
+ dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
while (1) {
ceph_msg_connect connect;
- connect.host_type = rank.my_type;
+ connect.host_type = rank->my_type;
connect.global_seq = gseq;
connect.connect_seq = cseq;
connect.flags = 0;
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
- gseq = rank.get_global_seq(reply.global_seq);
+ gseq = rank->get_global_seq(reply.global_seq);
dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
<< " chose new " << gseq << dendl;
lock.Unlock();
void Rank::Pipe::register_pipe()
{
dout(10) << "register_pipe" << dendl;
- assert(rank.lock.is_locked());
- assert(rank.rank_pipe.count(peer_addr) == 0);
- rank.rank_pipe[peer_addr] = this;
+ assert(rank->lock.is_locked());
+ assert(rank->rank_pipe.count(peer_addr) == 0);
+ rank->rank_pipe[peer_addr] = this;
}
void Rank::Pipe::unregister_pipe()
{
- assert(rank.lock.is_locked());
- if (rank.rank_pipe.count(peer_addr) &&
- rank.rank_pipe[peer_addr] == this) {
+ assert(rank->lock.is_locked());
+ if (rank->rank_pipe.count(peer_addr) &&
+ rank->rank_pipe[peer_addr] == this) {
dout(10) << "unregister_pipe" << dendl;
- rank.rank_pipe.erase(peer_addr);
+ rank->rank_pipe.erase(peer_addr);
} else {
dout(10) << "unregister_pipe - not registered" << dendl;
}
stop();
report_failures();
- for (unsigned i=0; i<rank.local.size(); i++)
- if (rank.local[i] && rank.local[i]->get_dispatcher())
- rank.local[i]->queue_reset(peer_addr, last_dest_name);
+ for (unsigned i=0; i<rank->local.size(); i++)
+ if (rank->local[i] && rank->local[i]->get_dispatcher())
+ rank->local[i]->queue_reset(peer_addr, last_dest_name);
// unregister
lock.Unlock();
- rank.lock.Lock();
+ rank->lock.Lock();
unregister_pipe();
- rank.lock.Unlock();
+ rank->lock.Unlock();
lock.Lock();
}
dout(10) << "was_session_reset" << dendl;
report_failures();
- for (unsigned i=0; i<rank.local.size(); i++)
- if (rank.local[i] && rank.local[i]->get_dispatcher())
- rank.local[i]->queue_remote_reset(peer_addr, last_dest_name);
+ for (unsigned i=0; i<rank->local.size(); i++)
+ if (rank->local[i] && rank->local[i]->get_dispatcher())
+ rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
out_seq = 0;
in_seq = 0;
if (policy.drop_msg_callback) {
unsigned srcrank = m->get_source_inst().addr.erank;
- if (srcrank >= rank.max_local || rank.local[srcrank] == 0) {
+ if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
- } else if (rank.local[srcrank]->is_stopped()) {
+ } else if (rank->local[srcrank]->is_stopped()) {
dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
} else {
dout(10) << "fail on " << *m << dendl;
- rank.local[srcrank]->queue_failure(m, m->get_dest_inst());
+ rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
}
}
m->put();
<< " for " << m->get_dest() << dendl;
// deliver
- EntityMessenger *entity = 0;
+ Endpoint *entity = 0;
- rank.lock.Lock();
+ rank->lock.Lock();
{
unsigned erank = m->get_dest_inst().addr.erank;
- if (erank < rank.max_local && rank.local[erank]) {
+ if (erank < rank->max_local && rank->local[erank]) {
// find entity
- entity = rank.local[erank];
+ entity = rank->local[erank];
entity->get();
// first message?
entity->need_addr = false;
}
- if (rank.need_addr) {
- rank.rank_addr = m->get_dest_inst().addr;
- rank.rank_addr.erank = 0;
- dout(2) << "reader rank_addr is " << rank.rank_addr << dendl;
- rank.need_addr = false;
+ if (rank->need_addr) {
+ rank->rank_addr = m->get_dest_inst().addr;
+ rank->rank_addr.erank = 0;
+ dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
+ rank->need_addr = false;
}
} else {
derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
}
}
- rank.lock.Unlock();
+ rank->lock.Unlock();
if (entity) {
entity->queue_message(m); // queue
sd = -1;
closed_socket();
}
- rank.lock.Lock();
+ rank->lock.Lock();
{
- rank.pipe_reap_queue.push_back(this);
- rank.wait_cond.Signal();
+ rank->pipe_reap_queue.push_back(this);
+ rank->wait_cond.Signal();
}
- rank.lock.Unlock();
+ rank->lock.Unlock();
}
dout(10) << "reader done" << dendl;
sd = -1;
closed_socket();
}
- rank.lock.Lock();
+ rank->lock.Lock();
{
- rank.pipe_reap_queue.push_back(this);
- rank.wait_cond.Signal();
+ rank->pipe_reap_queue.push_back(this);
+ rank->wait_cond.Signal();
}
- rank.lock.Unlock();
+ rank->lock.Unlock();
}
dout(10) << "writer done" << dendl;