#define DOUT_SUBSYS ms
#undef dout_prefix
-#define dout_prefix _prefix()
-static ostream& _prefix() {
- return *_dout << dbeginl << pthread_self() << " -- " << rank.rank_addr << " ";
+#define dout_prefix _prefix(rank)
+static ostream& _prefix(SimpleMessenger *rank) {
+ return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " ";
}
#define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl;
-Rank rank;
-
#ifdef DARWIN
sig_t old_sigint_handler = 0;
#else
//dout(0) << "blah_handler got " << s << dendl;
}
-int Rank::Accepter::bind(int64_t force_nonce)
+int SimpleMessenger::Accepter::bind(int64_t force_nonce)
{
// bind to a socket
dout(10) << "accepter.bind" << dendl;
return 0;
}
-int Rank::Accepter::start()
+int SimpleMessenger::Accepter::start()
{
dout(1) << "accepter.start" << dendl;
return 0;
}
-void *Rank::Accepter::entry()
+void *SimpleMessenger::Accepter::entry()
{
dout(10) << "accepter starting" << dendl;
return 0;
}
-void Rank::Accepter::stop()
+void SimpleMessenger::Accepter::stop()
{
done = true;
dout(10) << "stop sending SIGUSR1" << dendl;
-/********************************************
- * Rank
- */
-/*
- * note: assumes lock is held
+/**********************************
+ * Endpoint
*/
-void Rank::reaper()
-{
- dout(10) << "reaper" << dendl;
- assert(lock.is_locked());
-
- while (!pipe_reap_queue.empty()) {
- Pipe *p = pipe_reap_queue.front();
- dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
- p->unregister_pipe();
- pipe_reap_queue.pop_front();
- assert(pipes.count(p));
- pipes.erase(p);
- p->join();
- p->discard_queue();
- dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
- assert(p->sd < 0);
- delete p;
- dout(10) << "reaper deleted pipe " << p << dendl;
- }
-}
-
-int Rank::bind(int64_t force_nonce)
+void SimpleMessenger::Endpoint::dispatch_entry()
{
lock.Lock();
- if (started) {
- dout(10) << "rank.bind already started" << dendl;
- lock.Unlock();
- return -1;
+ while (!stop) {
+ if (!dispatch_queue.empty()) {
+ list<Message*> ls;
+
+ // take highest priority message off the queue
+ map<int, list<Message*> >::reverse_iterator p = dispatch_queue.rbegin();
+ ls.push_back(p->second.front());
+ p->second.pop_front();
+ if (p->second.empty())
+ dispatch_queue.erase(p->first);
+ qlen--;
+
+ lock.Unlock();
+ {
+ // deliver
+ while (!ls.empty()) {
+ if (stop) {
+ dout(1) << "dispatch: stop=true, discarding " << ls.size()
+ << " messages in dispatch queue" << dendl;
+ break;
+ }
+ Message *m = ls.front();
+ ls.pop_front();
+ if ((long)m == BAD_REMOTE_RESET) {
+ lock.Lock();
+ entity_addr_t a = remote_reset_q.front().first;
+ entity_name_t n = remote_reset_q.front().second;
+ remote_reset_q.pop_front();
+ lock.Unlock();
+ get_dispatcher()->ms_handle_remote_reset(a, n);
+ } else if ((long)m == BAD_RESET) {
+ lock.Lock();
+ entity_addr_t a = reset_q.front().first;
+ entity_name_t n = reset_q.front().second;
+ reset_q.pop_front();
+ lock.Unlock();
+ get_dispatcher()->ms_handle_reset(a, n);
+ } else if ((long)m == BAD_FAILED) {
+ lock.Lock();
+ m = failed_q.front().first;
+ entity_inst_t i = failed_q.front().second;
+ failed_q.pop_front();
+ lock.Unlock();
+ get_dispatcher()->ms_handle_failure(m, i);
+ m->put();
+ } else {
+ dout(1) << m->get_dest()
+ << " <== " << m->get_source_inst()
+ << " " << m->get_seq()
+ << " ==== " << *m
+ << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
+ << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
+ << " " << m
+ << dendl;
+ dispatch(m);
+ dout(20) << "done calling dispatch on " << m << dendl;
+ }
+ }
+ }
+ lock.Lock();
+ continue;
+ }
+ cond.Wait(lock);
}
- dout(10) << "rank.bind" << dendl;
lock.Unlock();
+ dout(15) << "dispatch: ending loop " << dendl;
- // bind to a socket
- return accepter.bind(force_nonce);
+ // deregister
+ rank->unregister_entity(this);
+ put();
}
+void SimpleMessenger::Endpoint::ready()
+{
+ dout(10) << "ready " << get_myaddr() << dendl;
+ assert(!dispatch_thread.is_started());
+ get();
+ dispatch_thread.create();
+}
-class C_Die : public Context {
-public:
- void finish(int) {
- cerr << "die" << std::endl;
- exit(1);
- }
-};
-static void write_pid_file(int pid)
+int SimpleMessenger::Endpoint::shutdown()
{
- if (!g_conf.pid_file)
- return;
-
- int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
- if (fd >= 0) {
- char buf[20];
- int len = sprintf(buf, "%d\n", pid);
- ::write(fd, buf, len);
- ::close(fd);
+ dout(10) << "shutdown " << get_myaddr() << dendl;
+
+ // stop my dispatch thread
+ if (dispatch_thread.am_self()) {
+ dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
+ stop = true;
+ } else {
+ dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
+ lock.Lock();
+ stop = true;
+ cond.Signal();
+ lock.Unlock();
}
+ return 0;
}
-static void remove_pid_file()
+void SimpleMessenger::Endpoint::suicide()
{
- if (!g_conf.pid_file)
- return;
-
- // only remove it if it has OUR pid in it!
- int fd = ::open(g_conf.pid_file, O_RDONLY);
- if (fd >= 0) {
- char buf[20];
- ::read(fd, buf, 20);
- ::close(fd);
- int a = atoi(buf);
+ dout(10) << "suicide " << get_myaddr() << dendl;
+ shutdown();
+ // hmm, or exit(0)?
+}
- if (a == getpid())
- ::unlink(g_conf.pid_file);
- else
- dout(0) << "strange, pid file " << g_conf.pid_file
- << " has " << a << ", not expected " << getpid()
- << dendl;
+void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst)
+{
+ rank->lock.Lock();
+ {
+ if (rank->rank_pipe.count(inst.addr) == 0)
+ rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
}
+ rank->lock.Unlock();
}
-int Rank::start(bool nodaemon)
+int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
{
- // register at least one entity, first!
- assert(my_type >= 0);
-
- lock.Lock();
- if (started) {
- dout(10) << "rank.start already started" << dendl;
- lock.Unlock();
- return 0;
- }
+ // set envelope
+ m->set_source_inst(_myinst);
+ m->set_orig_source_inst(_myinst);
+ m->set_dest_inst(dest);
+ if (!g_conf.ms_nocrc)
+ m->calc_data_crc();
+ else
+ m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+ if (!m->get_priority()) m->set_priority(get_default_send_priority());
+
+ dout(1) << m->get_source()
+ << " --> " << dest.name << " " << dest.addr
+ << " -- " << *m
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
+ << dendl;
- dout(1) << "rank.start at " << rank_addr << dendl;
- started = true;
- lock.Unlock();
+ rank->submit_message(m, dest.addr);
- // daemonize?
- if (g_conf.daemonize && !nodaemon) {
- if (Thread::get_num_threads() > 0) {
- derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
- << " already started that will now die! call rank.start() sooner."
- << dendl;
- }
- dout(1) << "rank.start daemonizing" << dendl;
+ return 0;
+}
- if (1) {
- daemon(1, 0);
- write_pid_file(getpid());
- } else {
- pid_t pid = fork();
- if (pid) {
- // i am parent
- write_pid_file(pid);
- ::close(0);
- ::close(1);
- ::close(2);
- _exit(0);
- }
- }
+int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
+{
+ // set envelope
+ m->set_source_inst(_myinst);
+ m->set_dest_inst(dest);
+ if (!g_conf.ms_nocrc)
+ m->calc_data_crc();
+ else
+ m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+ if (!m->get_priority()) m->set_priority(get_default_send_priority());
- if (g_conf.chdir && g_conf.chdir[0]) {
- ::mkdir(g_conf.chdir, 0700);
- ::chdir(g_conf.chdir);
- }
-
- _dout_rename_output_file();
- } else if (g_daemon) {
- write_pid_file(getpid());
- }
+ dout(1) << m->get_source()
+ << " **> " << dest.name << " " << dest.addr
+ << " -- " << *m
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
+ << dendl;
- // some debug hackery?
- if (g_conf.kill_after)
- g_timer.add_event_after(g_conf.kill_after, new C_Die);
+ rank->submit_message(m, dest.addr);
- // go!
- accepter.start();
return 0;
}
-/* connect_rank
- * NOTE: assumes rank.lock held.
- */
-Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p)
+
+int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
{
- assert(lock.is_locked());
- assert(addr != rank_addr);
-
- dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
-
- // create pipe
- Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
- pipe->policy = p;
- pipe->peer_addr = addr;
- pipe->start_writer();
- pipe->register_pipe();
- pipes.insert(pipe);
-
- return pipe;
-}
-
+ // set envelope
+ m->set_source_inst(_myinst);
+ m->set_orig_source_inst(_myinst);
+ m->set_dest_inst(dest);
+ if (!g_conf.ms_nocrc)
+ m->calc_data_crc();
+ else
+ m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
+ if (!m->get_priority()) m->set_priority(get_default_send_priority());
+
+ dout(1) << "lazy " << m->get_source()
+ << " --> " << dest.name << " " << dest.addr
+ << " -- " << *m
+ << " -- ?+" << m->get_data().length()
+ << " (? " << m->get_footer().data_crc << ")"
+ << " " << m
+ << dendl;
+ rank->submit_message(m, dest.addr, true);
+ return 0;
+}
+void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname)
+{
+ entity_name_t oldname = get_myname();
+ dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
+ _set_myname(newname);
+}
-/* register_entity
- */
-Rank::Endpoint *Rank::register_entity(entity_name_t name)
+void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
{
- dout(10) << "register_entity " << name << dendl;
- lock.Lock();
-
- // create messenger
- int erank = max_local;
- Endpoint *msgr = new Endpoint(this, name, erank);
+ rank->mark_down(a);
+}
- // now i know my type.
- if (my_type >= 0)
- assert(my_type == name.type());
- else
- my_type = name.type();
- // add to directory
- max_local++;
- local.resize(max_local);
- stopped.resize(max_local);
- msgr->get();
- local[erank] = msgr;
- stopped[erank] = false;
- msgr->_myinst.addr = rank_addr;
- if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
- msgr->need_addr = true;
- msgr->_myinst.addr.erank = erank;
- dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr
- << " need_addr=" << need_addr
- << dendl;
- num_local++;
-
- lock.Unlock();
- return msgr;
-}
+/**************************************
+ * Pipe
+ */
+#undef dout_prefix
+#define dout_prefix _pipe_prefix()
+ostream& SimpleMessenger::Pipe::_pipe_prefix() {
+ return *_dout << dbeginl << pthread_self()
+ << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
+ << " sd=" << sd
+ << " pgs=" << peer_global_seq
+ << " cs=" << connect_seq
+ << ").";
+}
-void Rank::unregister_entity(Endpoint *msgr)
+int SimpleMessenger::Pipe::accept()
{
- lock.Lock();
- dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
-
- // remove from local directory.
- assert(msgr->my_rank >= 0);
- assert(local[msgr->my_rank] == msgr);
- local[msgr->my_rank] = 0;
- stopped[msgr->my_rank] = true;
- num_local--;
- msgr->my_rank = -1;
-
- assert(msgr->nref.test() > 1);
- msgr->put();
-
- wait_cond.Signal();
+ dout(10) << "accept" << dendl;
- lock.Unlock();
-}
+ // my creater gave me sd via accept()
+ assert(state == STATE_ACCEPTING);
+
+ // announce myself.
+ int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(10) << "accept couldn't write banner" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ // and my addr
+ rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
+ if (rc < 0) {
+ dout(10) << "accept couldn't write my addr" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ dout(10) << "accept sd=" << sd << dendl;
+
+ // identify peer
+ char banner[strlen(CEPH_BANNER)+1];
+ rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read banner" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ banner[strlen(CEPH_BANNER)] = 0;
+ dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read peer_addr" << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ dout(10) << "accept peer addr is " << peer_addr << dendl;
+ if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+ // peer apparently doesn't know what ip they have; figure it out for them.
+ entity_addr_t old_addr = peer_addr;
+ socklen_t len = sizeof(peer_addr.ipaddr);
+ int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len);
+ if (r < 0) {
+ dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl;
+ state = STATE_CLOSED;
+ return -1;
+ }
+ peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port;
+ dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
+ }
+
+ ceph_msg_connect connect;
+ ceph_msg_connect_reply reply;
+ Pipe *existing = 0;
+
+ // this should roughly mirror pseudocode at
+ // http://ceph.newdream.net/wiki/Messaging_protocol
-void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
-{
- const entity_name_t dest = m->get_dest();
+ while (1) {
+ rc = tcp_read(sd, (char*)&connect, sizeof(connect));
+ if (rc < 0) {
+ dout(10) << "accept couldn't read connect" << dendl;
+ goto fail_unlocked;
+ }
+ dout(20) << "accept got peer connect_seq " << connect.connect_seq
+ << " global_seq " << connect.global_seq
+ << dendl;
+
+ rank->lock.Lock();
- assert(m->nref.test() == 0);
+ // note peer's type, flags
+ policy = rank->policy_map[connect.host_type]; /* apply policy */
+ lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
- m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
- m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
- m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
- m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
- m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
- m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
+ memset(&reply, 0, sizeof(reply));
- // lookup
- entity_addr_t dest_proc_addr = dest_addr;
- dest_proc_addr.erank = 0;
+ // existing?
+ if (rank->rank_pipe.count(peer_addr)) {
+ existing = rank->rank_pipe[peer_addr];
+ existing->lock.Lock();
- lock.Lock();
- {
- // local?
- if (rank_addr.is_local_to(dest_addr)) {
- if (dest_addr.erank < max_local && local[dest_addr.erank]) {
- // local
- dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
- local[dest_addr.erank]->queue_message(m);
+ if (connect.global_seq < existing->peer_global_seq) {
+ dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+ << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
+ reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
+ reply.global_seq = existing->peer_global_seq; // so we can send it below..
+ existing->lock.Unlock();
+ rank->lock.Unlock();
+ goto reply;
} else {
- derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl;
- //assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
- delete m;
+ dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
+ << " <= " << connect.global_seq << ", looks ok" << dendl;
}
- }
- else {
- // remote.
- Pipe *pipe = 0;
- if (rank_pipe.count( dest_proc_addr )) {
- // connected?
- pipe = rank_pipe[ dest_proc_addr ];
- pipe->lock.Lock();
- if (pipe->state == Pipe::STATE_CLOSED) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
- pipe->unregister_pipe();
- pipe->lock.Unlock();
- pipe = 0;
+
+ if (existing->policy.lossy_tx) {
+ dout(-10) << "accept replacing existing (lossy) channel" << dendl;
+ existing->was_session_reset();
+ goto replace;
+ }
+ if (lossy_rx) {
+ if (existing->state == STATE_STANDBY) {
+ dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
+ << existing << dendl;
+ existing->state = STATE_CONNECTING;
+ existing->cond.Signal();
} else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
-
- // if this pipe was created by an incoming connection, but we haven't received
- // a message yet, then it won't have the policy set.
- if (pipe->get_out_seq() == 0)
- pipe->policy = policy_map[m->get_dest().type()];
-
- pipe->_send(m);
- pipe->lock.Unlock();
+ dout(-10) << "accept incoming lossy connection, our lossless " << existing
+ << " has state " << existing->state << ", doing nothing" << dendl;
}
+ existing->lock.Unlock();
+ goto fail;
}
- if (!pipe) {
- if (lazy) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
- delete m;
- } else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
- // not connected.
- pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
- pipe->send(m);
+
+ dout(-10) << "accept connect_seq " << connect.connect_seq
+ << " vs existing " << existing->connect_seq
+ << " state " << existing->state << dendl;
+
+ if (connect.connect_seq < existing->connect_seq) {
+ if (connect.connect_seq == 0) {
+ dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
+ existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
+ goto replace;
+ } else {
+ // old attempt, or we sent READY but they didn't get it.
+ dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
+ << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
+ reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
+ reply.connect_seq = existing->connect_seq; // so we can send it below..
+ existing->lock.Unlock();
+ rank->lock.Unlock();
+ goto reply;
+ }
+ }
+
+ if (connect.connect_seq == existing->connect_seq) {
+ // connection race?
+ if (peer_addr < rank->rank_addr) {
+ // incoming wins
+ dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+ << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
+ assert(existing->state == STATE_CONNECTING ||
+ existing->state == STATE_STANDBY ||
+ existing->state == STATE_WAIT);
+ goto replace;
+ } else {
+ // our existing outgoing wins
+ dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
+ << " == " << connect.connect_seq << ", sending WAIT" << dendl;
+ assert(peer_addr > rank->rank_addr);
+ assert(existing->state == STATE_CONNECTING); // this will win
+ reply.tag = CEPH_MSGR_TAG_WAIT;
+ existing->lock.Unlock();
+ rank->lock.Unlock();
+ goto reply;
}
}
+
+ assert(connect.connect_seq > existing->connect_seq);
+ assert(connect.global_seq >= existing->peer_global_seq);
+ if (existing->connect_seq == 0) {
+ dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq
+ << ", " << existing << ".cseq = " << existing->connect_seq
+ << "), sending RESETSESSION" << dendl;
+ reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+ rank->lock.Unlock();
+ existing->lock.Unlock();
+ goto reply;
+ }
+
+ // reconnect
+ dout(10) << "accept peer sent cseq " << connect.connect_seq
+ << " > " << existing->connect_seq << dendl;
+ goto replace;
+ } // existing
+ else if (connect.connect_seq > 0) {
+ // we reset, and they are opening a new session
+ dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
+ rank->lock.Unlock();
+ reply.tag = CEPH_MSGR_TAG_RESETSESSION;
+ goto reply;
+ } else {
+ // new session
+ dout(10) << "accept new session" << dendl;
+ goto open;
}
- }
+ assert(0);
- lock.Unlock();
-}
+ reply:
+ rc = tcp_write(sd, (char*)&reply, sizeof(reply));
+ if (rc < 0)
+ goto fail_unlocked;
+ }
+
+ replace:
+ dout(10) << "accept replacing " << existing << dendl;
+ existing->state = STATE_CLOSED;
+ existing->cond.Signal();
+ existing->reader_thread.kill(SIGUSR1);
+ existing->unregister_pipe();
+
+ // steal queue and out_seq
+ existing->requeue_sent();
+ out_seq = existing->out_seq;
+ in_seq = existing->in_seq;
+ dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl;
+ for (map<int, list<Message*> >::iterator p = existing->q.begin();
+ p != existing->q.end();
+ p++)
+ q[p->first].splice(q[p->first].begin(), p->second);
+
+ existing->lock.Unlock();
+ open:
+ // open
+ connect_seq = connect.connect_seq + 1;
+ peer_global_seq = connect.global_seq;
+ dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
+ // send READY reply
+ reply.tag = CEPH_MSGR_TAG_READY;
+ reply.global_seq = rank->get_global_seq();
+ reply.connect_seq = connect_seq;
+ reply.flags = 0;
+ if (policy.lossy_tx)
+ reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
+ // ok!
+ register_pipe();
+ rank->lock.Unlock();
+ rc = tcp_write(sd, (char*)&reply, sizeof(reply));
+ if (rc < 0)
+ goto fail;
-void Rank::wait()
-{
lock.Lock();
- while (1) {
- // reap dead pipes
- reaper();
-
- if (num_local == 0) {
- dout(10) << "wait: everything stopped" << dendl;
- break; // everything stopped.
- } else {
- dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
- }
-
- wait_cond.Wait(lock);
+ if (state != STATE_CLOSED) {
+ dout(10) << "accept starting writer, " << "state=" << state << dendl;
+ start_writer();
}
+ dout(20) << "accept done" << dendl;
lock.Unlock();
-
- // done! clean up.
- dout(20) << "wait: stopping accepter thread" << dendl;
- accepter.stop();
- dout(20) << "wait: stopped accepter thread" << dendl;
+ return 0; // success.
- // close+reap all pipes
- lock.Lock();
- {
- dout(10) << "wait: closing pipes" << dendl;
- list<Pipe*> toclose;
- for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
- i != rank_pipe.end();
- i++)
- toclose.push_back(i->second);
- for (list<Pipe*>::iterator i = toclose.begin();
- i != toclose.end();
- i++) {
- (*i)->unregister_pipe();
- (*i)->lock.Lock();
- (*i)->stop();
- (*i)->lock.Unlock();
- }
- reaper();
- dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
- while (!pipes.empty()) {
- wait_cond.Wait(lock);
- reaper();
- }
- }
+ fail:
+ rank->lock.Unlock();
+ fail_unlocked:
+ lock.Lock();
+ state = STATE_CLOSED;
+ fault();
lock.Unlock();
-
- dout(10) << "wait: done." << dendl;
- dout(1) << "shutdown complete." << dendl;
- remove_pid_file();
- started = false;
- my_type = -1;
+ return -1;
}
+int SimpleMessenger::Pipe::connect()
+{
+ dout(10) << "connect " << connect_seq << dendl;
+ assert(lock.is_locked());
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
+ }
+ __u32 cseq = connect_seq;
+ __u32 gseq = rank->get_global_seq();
+ // stop reader thrad
+ join_reader();
+ lock.Unlock();
+
+ char tag = -1;
+ int rc;
+ struct sockaddr_in myAddr;
+ struct msghdr msg;
+ struct iovec msgvec[2];
+ int msglen;
+ char banner[strlen(CEPH_BANNER)];
+ entity_addr_t paddr;
+ // create socket?
+ sd = ::socket(AF_INET, SOCK_STREAM, 0);
+ if (sd < 0) {
+ dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
+ assert(0);
+ goto fail;
+ }
+ opened_socket();
+
+ // bind any port
+ myAddr.sin_family = AF_INET;
+ myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+ myAddr.sin_port = htons( 0 );
+ dout(10) << "binding to " << myAddr << dendl;
+ rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
+ if (rc < 0) {
+ dout(2) << "bind error " << myAddr
+ << ", " << errno << ": " << strerror(errno) << dendl;
+ goto fail;
+ }
-/**********************************
- * Endpoint
- */
-
-void Rank::Endpoint::dispatch_entry()
-{
- lock.Lock();
- while (!stop) {
- if (!dispatch_queue.empty()) {
- list<Message*> ls;
+ // connect!
+ dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
+ rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
+ if (rc < 0) {
+ dout(2) << "connect error " << peer_addr.ipaddr
+ << ", " << errno << ": " << strerror(errno) << dendl;
+ goto fail;
+ }
- // take highest priority message off the queue
- map<int, list<Message*> >::reverse_iterator p = dispatch_queue.rbegin();
- ls.push_back(p->second.front());
- p->second.pop_front();
- if (p->second.empty())
- dispatch_queue.erase(p->first);
- qlen--;
+ // disable Nagle algorithm?
+ if (g_conf.ms_tcp_nodelay) {
+ int flag = 1;
+ int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
+ if (r < 0)
+ dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
+ }
- lock.Unlock();
- {
- // deliver
- while (!ls.empty()) {
- if (stop) {
- dout(1) << "dispatch: stop=true, discarding " << ls.size()
- << " messages in dispatch queue" << dendl;
- break;
- }
- Message *m = ls.front();
- ls.pop_front();
- if ((long)m == BAD_REMOTE_RESET) {
- lock.Lock();
- entity_addr_t a = remote_reset_q.front().first;
- entity_name_t n = remote_reset_q.front().second;
- remote_reset_q.pop_front();
- lock.Unlock();
- get_dispatcher()->ms_handle_remote_reset(a, n);
- } else if ((long)m == BAD_RESET) {
- lock.Lock();
- entity_addr_t a = reset_q.front().first;
- entity_name_t n = reset_q.front().second;
- reset_q.pop_front();
- lock.Unlock();
- get_dispatcher()->ms_handle_reset(a, n);
- } else if ((long)m == BAD_FAILED) {
- lock.Lock();
- m = failed_q.front().first;
- entity_inst_t i = failed_q.front().second;
- failed_q.pop_front();
- lock.Unlock();
- get_dispatcher()->ms_handle_failure(m, i);
- m->put();
- } else {
- dout(1) << m->get_dest()
- << " <== " << m->get_source_inst()
- << " " << m->get_seq()
- << " ==== " << *m
- << " ==== " << m->get_payload().length() << "+" << m->get_data().length()
- << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")"
- << " " << m
- << dendl;
- dispatch(m);
- dout(20) << "done calling dispatch on " << m << dendl;
- }
- }
- }
- lock.Lock();
+ // verify banner
+ // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
+ rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
+ if (rc < 0) {
+ dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
+ dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
+ goto fail;
+ }
+
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = banner;
+ msgvec[0].iov_len = strlen(CEPH_BANNER);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+ if (do_sendmsg(sd, &msg, msglen)) {
+ dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
+ goto fail;
+ }
+
+ // identify peer
+ rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
+ if (rc < 0) {
+ dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
+ if (!peer_addr.is_local_to(paddr)) {
+ if (paddr.ipaddr.sin_addr.s_addr == 0 &&
+ peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port &&
+ peer_addr.nonce == paddr.nonce) {
+ dout(0) << "connect claims to be "
+ << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
+ } else {
+ dout(0) << "connect claims to be "
+ << paddr << " not " << peer_addr << " - wrong node!" << dendl;
+ goto fail;
+ }
+ }
+
+ // identify myself
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = (char*)&rank->rank_addr;
+ msgvec[0].iov_len = sizeof(rank->rank_addr);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+ if (do_sendmsg(sd, &msg, msglen)) {
+ dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
+ goto fail;
+ }
+ dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
+
+ while (1) {
+ ceph_msg_connect connect;
+ connect.host_type = rank->my_type;
+ connect.global_seq = gseq;
+ connect.connect_seq = cseq;
+ connect.flags = 0;
+ if (policy.lossy_tx)
+ connect.flags |= CEPH_MSG_CONNECT_LOSSY;
+ memset(&msg, 0, sizeof(msg));
+ msgvec[0].iov_base = (char*)&connect;
+ msgvec[0].iov_len = sizeof(connect);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 1;
+ msglen = msgvec[0].iov_len;
+
+ dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
+ if (do_sendmsg(sd, &msg, msglen)) {
+ dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
+ goto fail;
+ }
+
+ dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
+ ceph_msg_connect_reply reply;
+ if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
+ dout(2) << "connect read reply " << strerror(errno) << dendl;
+ goto fail;
+ }
+ dout(20) << "connect got reply tag " << (int)reply.tag
+ << " connect_seq " << reply.connect_seq
+ << " global_seq " << reply.global_seq
+ << " flags " << (int)reply.flags
+ << dendl;
+
+ lock.Lock();
+ if (state != STATE_CONNECTING) {
+ dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
+ goto stop_locked;
+ }
+
+ if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
+ dout(0) << "connect got RESETSESSION" << dendl;
+ was_session_reset();
+ cseq = 0;
+ lock.Unlock();
continue;
}
- cond.Wait(lock);
+ if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
+ gseq = rank->get_global_seq(reply.global_seq);
+ dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
+ << " chose new " << gseq << dendl;
+ lock.Unlock();
+ continue;
+ }
+ if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
+ assert(reply.connect_seq > connect_seq);
+ dout(10) << "connect got RETRY_SESSION " << connect_seq
+ << " -> " << reply.connect_seq << dendl;
+ cseq = connect_seq = reply.connect_seq;
+ lock.Unlock();
+ continue;
+ }
+
+ if (reply.tag == CEPH_MSGR_TAG_WAIT) {
+ dout(3) << "connect got WAIT (connection race)" << dendl;
+ state = STATE_WAIT;
+ goto stop_locked;
+ }
+
+ if (reply.tag == CEPH_MSGR_TAG_READY) {
+ // hooray!
+ peer_global_seq = reply.global_seq;
+ lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ state = STATE_OPEN;
+ connect_seq = cseq + 1;
+ assert(connect_seq == reply.connect_seq);
+ first_fault = last_attempt = utime_t();
+ dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+
+ if (!reader_running) {
+ dout(20) << "connect starting reader" << dendl;
+ start_reader();
+ }
+ return 0;
+ }
+
+ // protocol error
+ dout(0) << "connect got bad tag " << (int)tag << dendl;
+ goto fail_locked;
}
- lock.Unlock();
- dout(15) << "dispatch: ending loop " << dendl;
- // deregister
- rank->unregister_entity(this);
- put();
+ fail:
+ lock.Lock();
+ fail_locked:
+ if (state == STATE_CONNECTING)
+ fault();
+ else
+ dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+
+ stop_locked:
+ return -1;
}
-void Rank::Endpoint::ready()
+void SimpleMessenger::Pipe::register_pipe()
{
- dout(10) << "ready " << get_myaddr() << dendl;
- assert(!dispatch_thread.is_started());
- get();
- dispatch_thread.create();
+ dout(10) << "register_pipe" << dendl;
+ assert(rank->lock.is_locked());
+ assert(rank->rank_pipe.count(peer_addr) == 0);
+ rank->rank_pipe[peer_addr] = this;
}
-
-int Rank::Endpoint::shutdown()
+void SimpleMessenger::Pipe::unregister_pipe()
{
- dout(10) << "shutdown " << get_myaddr() << dendl;
-
- // stop my dispatch thread
- if (dispatch_thread.am_self()) {
- dout(10) << "shutdown i am dispatch, setting stop flag" << dendl;
- stop = true;
+ assert(rank->lock.is_locked());
+ if (rank->rank_pipe.count(peer_addr) &&
+ rank->rank_pipe[peer_addr] == this) {
+ dout(10) << "unregister_pipe" << dendl;
+ rank->rank_pipe.erase(peer_addr);
} else {
- dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
- lock.Lock();
- stop = true;
- cond.Signal();
- lock.Unlock();
+ dout(10) << "unregister_pipe - not registered" << dendl;
}
- return 0;
}
-void Rank::Endpoint::suicide()
-{
- dout(10) << "suicide " << get_myaddr() << dendl;
- shutdown();
- // hmm, or exit(0)?
-}
-void Rank::Endpoint::prepare_dest(const entity_inst_t& inst)
+void SimpleMessenger::Pipe::requeue_sent()
{
- rank->lock.Lock();
- {
- if (rank->rank_pipe.count(inst.addr) == 0)
- rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
+ if (sent.empty())
+ return;
+
+ list<Message*>& rq = q[CEPH_MSG_PRIO_HIGHEST];
+ while (!sent.empty()) {
+ Message *m = sent.back();
+ sent.pop_back();
+ dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
+ << " (" << m->get_seq() << ")" << dendl;
+ rq.push_front(m);
+ out_seq--;
}
- rank->lock.Unlock();
}
-int Rank::Endpoint::send_message(Message *m, entity_inst_t dest)
+void SimpleMessenger::Pipe::discard_queue()
{
- // set envelope
- m->set_source_inst(_myinst);
- m->set_orig_source_inst(_myinst);
- m->set_dest_inst(dest);
- if (!g_conf.ms_nocrc)
- m->calc_data_crc();
- else
- m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
- dout(1) << m->get_source()
- << " --> " << dest.name << " " << dest.addr
- << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " (? " << m->get_footer().data_crc << ")"
- << " " << m
- << dendl;
-
- rank->submit_message(m, dest.addr);
-
- return 0;
+ dout(10) << "discard_queue" << dendl;
+ for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
+ (*p)->put();
+ sent.clear();
+ for (map<int,list<Message*> >::iterator p = q.begin(); p != q.end(); p++)
+ for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
+ (*r)->put();
+ q.clear();
}
-int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest)
+
+void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
{
- // set envelope
- m->set_source_inst(_myinst);
- m->set_dest_inst(dest);
- if (!g_conf.ms_nocrc)
- m->calc_data_crc();
- else
- m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
- dout(1) << m->get_source()
- << " **> " << dest.name << " " << dest.addr
- << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " (? " << m->get_footer().data_crc << ")"
- << " " << m
- << dendl;
+ assert(lock.is_locked());
+ cond.Signal();
- rank->submit_message(m, dest.addr);
+ if (onread && state == STATE_CONNECTING) {
+ dout(10) << "fault already connecting, reader shutting down" << dendl;
+ return;
+ }
- return 0;
-}
+ if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
+ if (state == STATE_CLOSED ||
+ state == STATE_CLOSING) {
+ dout(10) << "fault already closed|closing" << dendl;
+ return;
+ }
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
+ }
-int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
-{
- // set envelope
- m->set_source_inst(_myinst);
- m->set_orig_source_inst(_myinst);
- m->set_dest_inst(dest);
- if (!g_conf.ms_nocrc)
- m->calc_data_crc();
- else
- m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC;
- if (!m->get_priority()) m->set_priority(get_default_send_priority());
-
- dout(1) << "lazy " << m->get_source()
- << " --> " << dest.name << " " << dest.addr
- << " -- " << *m
- << " -- ?+" << m->get_data().length()
- << " (? " << m->get_footer().data_crc << ")"
- << " " << m
- << dendl;
+ // lossy channel?
+ if (policy.lossy_tx) {
+ dout(10) << "fault on lossy channel, failing" << dendl;
+ fail();
+ return;
+ }
- rank->submit_message(m, dest.addr, true);
+ // requeue sent items
+ requeue_sent();
- return 0;
+ if (q.empty()) {
+ if (state == STATE_CLOSING || onconnect) {
+ dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
+ state = STATE_CLOSED;
+ } else {
+ dout(0) << "fault nothing to send, going to standby" << dendl;
+ state = STATE_STANDBY;
+ }
+ return;
+ }
+
+ utime_t now = g_clock.now();
+ if (state != STATE_CONNECTING) {
+ if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
+ connect_seq++;
+ state = STATE_CONNECTING;
+ first_fault = now;
+ } else if (first_fault.sec() == 0) {
+ if (!onconnect) dout(0) << "fault first fault" << dendl;
+ first_fault = now;
+ } else {
+ utime_t failinterval = now - first_fault;
+ utime_t retryinterval = now - last_attempt;
+ if (!onconnect) dout(10) << "fault failure was " << failinterval
+ << " ago, last attempt was at " << last_attempt
+ << ", " << retryinterval << " ago" << dendl;
+ if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
+ // give up
+ dout(0) << "fault giving up" << dendl;
+ fail();
+ } else if (retryinterval < policy.retry_interval) {
+ // wait
+ now += (policy.retry_interval - retryinterval);
+ dout(10) << "fault waiting until " << now << dendl;
+ cond.WaitUntil(lock, now);
+ dout(10) << "fault done waiting or woke up" << dendl;
+ }
+ }
+ last_attempt = now;
}
+void SimpleMessenger::Pipe::fail()
+{
+ derr(10) << "fail" << dendl;
+ assert(lock.is_locked());
+ stop();
+ report_failures();
-void Rank::Endpoint::reset_myname(entity_name_t newname)
-{
- entity_name_t oldname = get_myname();
- dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
- _set_myname(newname);
-}
+ for (unsigned i=0; i<rank->local.size(); i++)
+ if (rank->local[i] && rank->local[i]->get_dispatcher())
+ rank->local[i]->queue_reset(peer_addr, last_dest_name);
+ // unregister
+ lock.Unlock();
+ rank->lock.Lock();
+ unregister_pipe();
+ rank->lock.Unlock();
+ lock.Lock();
+}
-void Rank::Endpoint::mark_down(entity_addr_t a)
+void SimpleMessenger::Pipe::was_session_reset()
{
- rank->mark_down(a);
+ assert(lock.is_locked());
+
+ dout(10) << "was_session_reset" << dendl;
+ report_failures();
+ for (unsigned i=0; i<rank->local.size(); i++)
+ if (rank->local[i] && rank->local[i]->get_dispatcher())
+ rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
+
+ out_seq = 0;
+ in_seq = 0;
+ connect_seq = 0;
}
-void Rank::mark_down(entity_addr_t addr)
+void SimpleMessenger::Pipe::report_failures()
{
- lock.Lock();
- if (rank_pipe.count(addr)) {
- Pipe *p = rank_pipe[addr];
- dout(1) << "mark_down " << addr << " -- " << p << dendl;
- p->unregister_pipe();
- p->lock.Lock();
- p->stop();
- p->lock.Unlock();
- } else {
- dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
+ // report failures
+ q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent);
+ while (1) {
+ Message *m = _get_next_outgoing();
+ if (!m)
+ break;
+
+ if (policy.drop_msg_callback) {
+ unsigned srcrank = m->get_source_inst().addr.erank;
+ if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
+ dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
+ } else if (rank->local[srcrank]->is_stopped()) {
+ dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
+ } else {
+ dout(10) << "fail on " << *m << dendl;
+ rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+ }
+ }
+ m->put();
}
- lock.Unlock();
}
+void SimpleMessenger::Pipe::stop()
+{
+ dout(10) << "stop" << dendl;
+ state = STATE_CLOSED;
+ cond.Signal();
+ if (reader_running)
+ reader_thread.kill(SIGUSR1);
+ if (writer_running)
+ writer_thread.kill(SIGUSR1);
+}
+/* read msgs from socket.
+ * also, server.
+ */
+void SimpleMessenger::Pipe::reader()
+{
+ if (state == STATE_ACCEPTING)
+ accept();
+ lock.Lock();
-/**************************************
- * Pipe
- */
+ // loop.
+ while (state != STATE_CLOSED &&
+ state != STATE_CONNECTING) {
+ assert(lock.is_locked());
-#undef dout_prefix
-#define dout_prefix _pipe_prefix()
-ostream& Rank::Pipe::_pipe_prefix() {
- return *_dout << dbeginl << pthread_self()
- << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
- << " sd=" << sd
- << " pgs=" << peer_global_seq
- << " cs=" << connect_seq
- << ").";
-}
+ // sleep if (re)connecting
+ if (state == STATE_STANDBY) {
+ dout(20) << "reader sleeping during reconnect|standby" << dendl;
+ cond.Wait(lock);
+ continue;
+ }
-int Rank::Pipe::accept()
-{
- dout(10) << "accept" << dendl;
+ lock.Unlock();
- // my creater gave me sd via accept()
- assert(state == STATE_ACCEPTING);
-
- // announce myself.
- int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER));
- if (rc < 0) {
- dout(10) << "accept couldn't write banner" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
+ char tag = -1;
+ dout(20) << "reader reading tag..." << dendl;
+ int rc = tcp_read(sd, (char*)&tag, 1);
+ if (rc < 0) {
+ lock.Lock();
+ dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
+ fault(false, true);
+ continue;
+ }
- // and my addr
- rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
- if (rc < 0) {
- dout(10) << "accept couldn't write my addr" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
- dout(10) << "accept sd=" << sd << dendl;
-
- // identify peer
- char banner[strlen(CEPH_BANNER)+1];
- rc = tcp_read(sd, banner, strlen(CEPH_BANNER));
- if (rc < 0) {
- dout(10) << "accept couldn't read banner" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
- if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
- banner[strlen(CEPH_BANNER)] = 0;
- dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
- rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
- if (rc < 0) {
- dout(10) << "accept couldn't read peer_addr" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
- dout(10) << "accept peer addr is " << peer_addr << dendl;
- if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- // peer apparently doesn't know what ip they have; figure it out for them.
- entity_addr_t old_addr = peer_addr;
- socklen_t len = sizeof(peer_addr.ipaddr);
- int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len);
- if (r < 0) {
- dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl;
- state = STATE_CLOSED;
- return -1;
- }
- peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port;
- dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl;
- }
-
- ceph_msg_connect connect;
- ceph_msg_connect_reply reply;
- Pipe *existing = 0;
-
- // this should roughly mirror pseudocode at
- // http://ceph.newdream.net/wiki/Messaging_protocol
-
- while (1) {
- rc = tcp_read(sd, (char*)&connect, sizeof(connect));
- if (rc < 0) {
- dout(10) << "accept couldn't read connect" << dendl;
- goto fail_unlocked;
- }
- dout(20) << "accept got peer connect_seq " << connect.connect_seq
- << " global_seq " << connect.global_seq
- << dendl;
-
- rank->lock.Lock();
-
- // note peer's type, flags
- policy = rank->policy_map[connect.host_type]; /* apply policy */
- lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
-
- memset(&reply, 0, sizeof(reply));
-
- // existing?
- if (rank->rank_pipe.count(peer_addr)) {
- existing = rank->rank_pipe[peer_addr];
- existing->lock.Lock();
-
- if (connect.global_seq < existing->peer_global_seq) {
- dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
- << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
- reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
- reply.global_seq = existing->peer_global_seq; // so we can send it below..
- existing->lock.Unlock();
- rank->lock.Unlock();
- goto reply;
- } else {
- dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
- << " <= " << connect.global_seq << ", looks ok" << dendl;
- }
-
- if (existing->policy.lossy_tx) {
- dout(-10) << "accept replacing existing (lossy) channel" << dendl;
- existing->was_session_reset();
- goto replace;
- }
- if (lossy_rx) {
- if (existing->state == STATE_STANDBY) {
- dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
- << existing << dendl;
- existing->state = STATE_CONNECTING;
- existing->cond.Signal();
- } else {
- dout(-10) << "accept incoming lossy connection, our lossless " << existing
- << " has state " << existing->state << ", doing nothing" << dendl;
+ // open ...
+ if (tag == CEPH_MSGR_TAG_ACK) {
+ dout(20) << "reader got ACK" << dendl;
+ __u32 seq;
+ int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
+ lock.Lock();
+ if (rc < 0) {
+ dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
+ fault(false, true);
+ } else if (state != STATE_CLOSED) {
+ dout(15) << "reader got ack seq " << seq << dendl;
+ // trim sent list
+ while (!sent.empty() &&
+ sent.front()->get_seq() <= seq) {
+ Message *m = sent.front();
+ sent.pop_front();
+ dout(10) << "reader got ack seq "
+ << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
+ m->put();
}
- existing->lock.Unlock();
- goto fail;
}
+ continue;
+ }
- dout(-10) << "accept connect_seq " << connect.connect_seq
- << " vs existing " << existing->connect_seq
- << " state " << existing->state << dendl;
-
- if (connect.connect_seq < existing->connect_seq) {
- if (connect.connect_seq == 0) {
- dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl;
- existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s
- goto replace;
- } else {
- // old attempt, or we sent READY but they didn't get it.
- dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq
- << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
- reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
- reply.connect_seq = existing->connect_seq; // so we can send it below..
- existing->lock.Unlock();
- rank->lock.Unlock();
- goto reply;
- }
+ else if (tag == CEPH_MSGR_TAG_MSG) {
+ dout(20) << "reader got MSG" << dendl;
+ Message *m = read_message();
+ lock.Lock();
+
+ if (!m) {
+ derr(2) << "reader read null message, " << strerror(errno) << dendl;
+ fault(false, true);
+ continue;
}
- if (connect.connect_seq == existing->connect_seq) {
- // connection race?
- if (peer_addr < rank->rank_addr) {
- // incoming wins
- dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
- << " == " << connect.connect_seq << ", replacing my attempt" << dendl;
- assert(existing->state == STATE_CONNECTING ||
- existing->state == STATE_STANDBY ||
- existing->state == STATE_WAIT);
- goto replace;
- } else {
- // our existing outgoing wins
- dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
- << " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > rank->rank_addr);
- assert(existing->state == STATE_CONNECTING); // this will win
- reply.tag = CEPH_MSGR_TAG_WAIT;
- existing->lock.Unlock();
- rank->lock.Unlock();
- goto reply;
- }
- }
+ if (state == STATE_CLOSED ||
+ state == STATE_CONNECTING)
+ continue;
- assert(connect.connect_seq > existing->connect_seq);
- assert(connect.global_seq >= existing->peer_global_seq);
- if (existing->connect_seq == 0) {
- dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq
- << ", " << existing << ".cseq = " << existing->connect_seq
- << "), sending RESETSESSION" << dendl;
- reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- rank->lock.Unlock();
- existing->lock.Unlock();
- goto reply;
+ // check received seq#
+ if (m->get_seq() <= in_seq) {
+ dout(-10) << "reader got old message "
+ << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
+ << " for " << m->get_dest()
+ << ", discarding" << dendl;
+ delete m;
+ continue;
}
+ in_seq++;
- // reconnect
- dout(10) << "accept peer sent cseq " << connect.connect_seq
- << " > " << existing->connect_seq << dendl;
- goto replace;
- } // existing
- else if (connect.connect_seq > 0) {
- // we reset, and they are opening a new session
- dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
- rank->lock.Unlock();
- reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- goto reply;
- } else {
- // new session
- dout(10) << "accept new session" << dendl;
- goto open;
- }
- assert(0);
-
- reply:
- rc = tcp_write(sd, (char*)&reply, sizeof(reply));
- if (rc < 0)
- goto fail_unlocked;
- }
-
- replace:
- dout(10) << "accept replacing " << existing << dendl;
- existing->state = STATE_CLOSED;
- existing->cond.Signal();
- existing->reader_thread.kill(SIGUSR1);
- existing->unregister_pipe();
-
- // steal queue and out_seq
- existing->requeue_sent();
- out_seq = existing->out_seq;
- in_seq = existing->in_seq;
- dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl;
- for (map<int, list<Message*> >::iterator p = existing->q.begin();
- p != existing->q.end();
- p++)
- q[p->first].splice(q[p->first].begin(), p->second);
-
- existing->lock.Unlock();
-
- open:
- // open
- connect_seq = connect.connect_seq + 1;
- peer_global_seq = connect.global_seq;
- dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl;
-
- // send READY reply
- reply.tag = CEPH_MSGR_TAG_READY;
- reply.global_seq = rank->get_global_seq();
- reply.connect_seq = connect_seq;
- reply.flags = 0;
- if (policy.lossy_tx)
- reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
-
- // ok!
- register_pipe();
- rank->lock.Unlock();
-
- rc = tcp_write(sd, (char*)&reply, sizeof(reply));
- if (rc < 0)
- goto fail;
-
- lock.Lock();
- if (state != STATE_CLOSED) {
- dout(10) << "accept starting writer, " << "state=" << state << dendl;
- start_writer();
- }
- dout(20) << "accept done" << dendl;
- lock.Unlock();
- return 0; // success.
-
-
- fail:
- rank->lock.Unlock();
- fail_unlocked:
- lock.Lock();
- state = STATE_CLOSED;
- fault();
- lock.Unlock();
- return -1;
-}
-
-int Rank::Pipe::connect()
-{
- dout(10) << "connect " << connect_seq << dendl;
- assert(lock.is_locked());
-
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
- }
- __u32 cseq = connect_seq;
- __u32 gseq = rank->get_global_seq();
+ if (!lossy_rx && in_seq != m->get_seq()) {
+ dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
+ << " for " << *m << " from " << m->get_source() << dendl;
+ derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
+ << " for " << *m << " from " << m->get_source() << dendl;
+ assert(in_seq == m->get_seq()); // for now!
+ fault(false, true);
+ delete m;
+ continue;
+ }
- // stop reader thrad
- join_reader();
+ cond.Signal(); // wake up writer, to ack this
+ lock.Unlock();
+
+ dout(10) << "reader got message "
+ << m->get_seq() << " " << m << " " << *m
+ << " for " << m->get_dest() << dendl;
+
+ // deliver
+ Endpoint *entity = 0;
+
+ rank->lock.Lock();
+ {
+ unsigned erank = m->get_dest_inst().addr.erank;
+ if (erank < rank->max_local && rank->local[erank]) {
+ // find entity
+ entity = rank->local[erank];
+ entity->get();
- lock.Unlock();
-
- char tag = -1;
- int rc;
- struct sockaddr_in myAddr;
- struct msghdr msg;
- struct iovec msgvec[2];
- int msglen;
- char banner[strlen(CEPH_BANNER)];
- entity_addr_t paddr;
+ // first message?
+ if (entity->need_addr) {
+ entity->_set_myaddr(m->get_dest_inst().addr);
+ dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
+ entity->need_addr = false;
+ }
- // create socket?
- sd = ::socket(AF_INET, SOCK_STREAM, 0);
- if (sd < 0) {
- dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl;
- assert(0);
- goto fail;
- }
- opened_socket();
-
- // bind any port
- myAddr.sin_family = AF_INET;
- myAddr.sin_addr.s_addr = htonl(INADDR_ANY);
- myAddr.sin_port = htons( 0 );
- dout(10) << "binding to " << myAddr << dendl;
- rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr));
- if (rc < 0) {
- dout(2) << "bind error " << myAddr
- << ", " << errno << ": " << strerror(errno) << dendl;
- goto fail;
- }
+ if (rank->need_addr) {
+ rank->rank_addr = m->get_dest_inst().addr;
+ rank->rank_addr.erank = 0;
+ dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
+ rank->need_addr = false;
+ }
- // connect!
- dout(10) << "connecting to " << peer_addr.ipaddr << dendl;
- rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr));
- if (rc < 0) {
- dout(2) << "connect error " << peer_addr.ipaddr
- << ", " << errno << ": " << strerror(errno) << dendl;
- goto fail;
- }
+ } else {
+ derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
+ }
+ }
+ rank->lock.Unlock();
+
+ if (entity) {
+ entity->queue_message(m); // queue
+ entity->put();
+ }
- // disable Nagle algorithm?
- if (g_conf.ms_tcp_nodelay) {
- int flag = 1;
- int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag));
- if (r < 0)
- dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl;
+ lock.Lock();
+ }
+
+ else if (tag == CEPH_MSGR_TAG_CLOSE) {
+ dout(20) << "reader got CLOSE" << dendl;
+ lock.Lock();
+ if (state == STATE_CLOSING)
+ state = STATE_CLOSED;
+ else
+ state = STATE_CLOSING;
+ cond.Signal();
+ break;
+ }
+ else {
+ dout(0) << "reader bad tag " << (int)tag << dendl;
+ lock.Lock();
+ fault(false, true);
+ }
}
- // verify banner
- // FIXME: this should be non-blocking, or in some other way verify the banner as we get it.
- rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER));
- if (rc < 0) {
- dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl;
- goto fail;
- }
- if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
- dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl;
- goto fail;
- }
+
+ // reap?
+ bool reap = false;
+ reader_running = false;
+ if (!writer_running)
+ reap = true;
- memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = banner;
- msgvec[0].iov_len = strlen(CEPH_BANNER);
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 1;
- msglen = msgvec[0].iov_len;
- if (do_sendmsg(sd, &msg, msglen)) {
- dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl;
- goto fail;
- }
+ lock.Unlock();
- // identify peer
- rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
- if (rc < 0) {
- dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl;
- goto fail;
- }
- dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
- if (!peer_addr.is_local_to(paddr)) {
- if (paddr.ipaddr.sin_addr.s_addr == 0 &&
- peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port &&
- peer_addr.nonce == paddr.nonce) {
- dout(0) << "connect claims to be "
- << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl;
- } else {
- dout(0) << "connect claims to be "
- << paddr << " not " << peer_addr << " - wrong node!" << dendl;
- goto fail;
+ if (reap) {
+ dout(10) << "reader queueing for reap" << dendl;
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
}
+ rank->lock.Lock();
+ {
+ rank->pipe_reap_queue.push_back(this);
+ rank->wait_cond.Signal();
+ }
+ rank->lock.Unlock();
}
-
- // identify myself
- memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&rank->rank_addr;
- msgvec[0].iov_len = sizeof(rank->rank_addr);
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 1;
- msglen = msgvec[0].iov_len;
- if (do_sendmsg(sd, &msg, msglen)) {
- dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl;
- goto fail;
- }
- dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
- while (1) {
- ceph_msg_connect connect;
- connect.host_type = rank->my_type;
- connect.global_seq = gseq;
- connect.connect_seq = cseq;
- connect.flags = 0;
- if (policy.lossy_tx)
- connect.flags |= CEPH_MSG_CONNECT_LOSSY;
- memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&connect;
- msgvec[0].iov_len = sizeof(connect);
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 1;
- msglen = msgvec[0].iov_len;
+ dout(10) << "reader done" << dendl;
+}
- dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl;
- if (do_sendmsg(sd, &msg, msglen)) {
- dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl;
- goto fail;
- }
+/*
+class FakeSocketError : public Context {
+ int sd;
+public:
+ FakeSocketError(int s) : sd(s) {}
+ void finish(int r) {
+ cout << "faking socket error on " << sd << std::endl;
+ ::close(sd);
+ }
+};
+*/
- dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl;
- ceph_msg_connect_reply reply;
- if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) {
- dout(2) << "connect read reply " << strerror(errno) << dendl;
- goto fail;
- }
- dout(20) << "connect got reply tag " << (int)reply.tag
- << " connect_seq " << reply.connect_seq
- << " global_seq " << reply.global_seq
- << " flags " << (int)reply.flags
- << dendl;
+/* write msgs to socket.
+ * also, client.
+ */
+void SimpleMessenger::Pipe::writer()
+{
+ lock.Lock();
- lock.Lock();
- if (state != STATE_CONNECTING) {
- dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
- goto stop_locked;
- }
+ while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
+ // standby?
+ if (!q.empty() && state == STATE_STANDBY)
+ state = STATE_CONNECTING;
- if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
- dout(0) << "connect got RESETSESSION" << dendl;
- was_session_reset();
- cseq = 0;
- lock.Unlock();
- continue;
- }
- if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
- gseq = rank->get_global_seq(reply.global_seq);
- dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
- << " chose new " << gseq << dendl;
- lock.Unlock();
+ // connect?
+ if (state == STATE_CONNECTING) {
+ connect();
continue;
}
- if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
- assert(reply.connect_seq > connect_seq);
- dout(10) << "connect got RETRY_SESSION " << connect_seq
- << " -> " << reply.connect_seq << dendl;
- cseq = connect_seq = reply.connect_seq;
+
+ if (state == STATE_CLOSING) {
+ // write close tag
+ dout(20) << "writer writing CLOSE tag" << dendl;
+ char tag = CEPH_MSGR_TAG_CLOSE;
+ state = STATE_CLOSED;
lock.Unlock();
+ if (sd) ::write(sd, &tag, 1);
+ lock.Lock();
continue;
}
- if (reply.tag == CEPH_MSGR_TAG_WAIT) {
- dout(3) << "connect got WAIT (connection race)" << dendl;
- state = STATE_WAIT;
- goto stop_locked;
- }
-
- if (reply.tag == CEPH_MSGR_TAG_READY) {
- // hooray!
- peer_global_seq = reply.global_seq;
- lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
- state = STATE_OPEN;
- connect_seq = cseq + 1;
- assert(connect_seq == reply.connect_seq);
- first_fault = last_attempt = utime_t();
- dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+ if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
+ (!q.empty() || in_seq > in_seq_acked)) {
- if (!reader_running) {
- dout(20) << "connect starting reader" << dendl;
- start_reader();
+ // send ack?
+ if (in_seq > in_seq_acked) {
+ int send_seq = in_seq;
+ lock.Unlock();
+ int rc = write_ack(send_seq);
+ lock.Lock();
+ if (rc < 0) {
+ dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
+ fault();
+ continue;
+ }
+ in_seq_acked = send_seq;
}
- return 0;
- }
-
- // protocol error
- dout(0) << "connect got bad tag " << (int)tag << dendl;
- goto fail_locked;
- }
- fail:
- lock.Lock();
- fail_locked:
- if (state == STATE_CONNECTING)
- fault();
- else
- dout(3) << "connect fault, but state != connecting, stopping" << dendl;
+ // grab outgoing message
+ Message *m = _get_next_outgoing();
+ if (m) {
+ m->set_seq(++out_seq);
+ sent.push_back(m); // move to sent list
+ m->get();
+ lock.Unlock();
- stop_locked:
- return -1;
-}
+ dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
-void Rank::Pipe::register_pipe()
-{
- dout(10) << "register_pipe" << dendl;
- assert(rank->lock.is_locked());
- assert(rank->rank_pipe.count(peer_addr) == 0);
- rank->rank_pipe[peer_addr] = this;
-}
+ // encode and copy out of *m
+ if (m->empty_payload())
+ m->encode_payload();
+ m->calc_front_crc();
-void Rank::Pipe::unregister_pipe()
-{
- assert(rank->lock.is_locked());
- if (rank->rank_pipe.count(peer_addr) &&
- rank->rank_pipe[peer_addr] == this) {
- dout(10) << "unregister_pipe" << dendl;
- rank->rank_pipe.erase(peer_addr);
- } else {
- dout(10) << "unregister_pipe - not registered" << dendl;
- }
-}
+ dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
+ int rc = write_message(m);
+ lock.Lock();
+ if (rc < 0) {
+ derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
+ << errno << ": " << strerror(errno) << dendl;
+ fault();
+ }
+ m->put();
+ }
+ continue;
+ }
+
+ // wait
+ dout(20) << "writer sleeping" << dendl;
+ cond.Wait(lock);
+ }
+
+ dout(20) << "writer finishing" << dendl;
-void Rank::Pipe::requeue_sent()
-{
- if (sent.empty())
- return;
+ // reap?
+ bool reap = false;
+ writer_running = false;
+ if (!reader_running) reap = true;
- list<Message*>& rq = q[CEPH_MSG_PRIO_HIGHEST];
- while (!sent.empty()) {
- Message *m = sent.back();
- sent.pop_back();
- dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq
- << " (" << m->get_seq() << ")" << dendl;
- rq.push_front(m);
- out_seq--;
+ lock.Unlock();
+
+ if (reap) {
+ dout(10) << "writer queueing for reap" << dendl;
+ if (sd >= 0) {
+ ::close(sd);
+ sd = -1;
+ closed_socket();
+ }
+ rank->lock.Lock();
+ {
+ rank->pipe_reap_queue.push_back(this);
+ rank->wait_cond.Signal();
+ }
+ rank->lock.Unlock();
}
-}
-void Rank::Pipe::discard_queue()
-{
- dout(10) << "discard_queue" << dendl;
- for (list<Message*>::iterator p = sent.begin(); p != sent.end(); p++)
- (*p)->put();
- sent.clear();
- for (map<int,list<Message*> >::iterator p = q.begin(); p != q.end(); p++)
- for (list<Message*>::iterator r = p->second.begin(); r != p->second.end(); r++)
- (*r)->put();
- q.clear();
+ dout(10) << "writer done" << dendl;
}
-void Rank::Pipe::fault(bool onconnect, bool onread)
+Message *SimpleMessenger::Pipe::read_message()
{
- assert(lock.is_locked());
- cond.Signal();
-
- if (onread && state == STATE_CONNECTING) {
- dout(10) << "fault already connecting, reader shutting down" << dendl;
- return;
- }
+ // envelope
+ //dout(10) << "receiver.read_message from sd " << sd << dendl;
+
+ ceph_msg_header header;
+ ceph_msg_footer footer;
- if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl;
+ if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
+ return 0;
+
+ dout(20) << "reader got envelope type=" << header.type
+ << " src " << header.src << " dst " << header.dst
+ << " front=" << header.front_len
+ << " data=" << header.data_len
+ << " off " << header.data_off
+ << dendl;
- if (state == STATE_CLOSED ||
- state == STATE_CLOSING) {
- dout(10) << "fault already closed|closing" << dendl;
- return;
+ // verify header crc
+ __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
+ if (header_crc != header.crc) {
+ dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
+ return 0;
}
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
+ // ok, now it's safe to change the header..
+ // munge source address?
+ if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
+ dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
+ header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr;
}
- // lossy channel?
- if (policy.lossy_tx) {
- dout(10) << "fault on lossy channel, failing" << dendl;
- fail();
- return;
+ // read front
+ bufferlist front;
+ bufferptr bp;
+ int front_len = header.front_len;
+ if (front_len) {
+ bp = buffer::create(front_len);
+ if (tcp_read( sd, bp.c_str(), front_len ) < 0)
+ return 0;
+ front.push_back(bp);
+ dout(20) << "reader got front " << front.length() << dendl;
}
- // requeue sent items
- requeue_sent();
-
- if (q.empty()) {
- if (state == STATE_CLOSING || onconnect) {
- dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
- state = STATE_CLOSED;
- } else {
- dout(0) << "fault nothing to send, going to standby" << dendl;
- state = STATE_STANDBY;
- }
- return;
- }
-
- utime_t now = g_clock.now();
- if (state != STATE_CONNECTING) {
- if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
- connect_seq++;
- state = STATE_CONNECTING;
- first_fault = now;
- } else if (first_fault.sec() == 0) {
- if (!onconnect) dout(0) << "fault first fault" << dendl;
- first_fault = now;
- } else {
- utime_t failinterval = now - first_fault;
- utime_t retryinterval = now - last_attempt;
- if (!onconnect) dout(10) << "fault failure was " << failinterval
- << " ago, last attempt was at " << last_attempt
- << ", " << retryinterval << " ago" << dendl;
- if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
- // give up
- dout(0) << "fault giving up" << dendl;
- fail();
- } else if (retryinterval < policy.retry_interval) {
- // wait
- now += (policy.retry_interval - retryinterval);
- dout(10) << "fault waiting until " << now << dendl;
- cond.WaitUntil(lock, now);
- dout(10) << "fault done waiting or woke up" << dendl;
- }
- }
- last_attempt = now;
-}
+ // read data
+ bufferlist data;
+ unsigned data_len = le32_to_cpu(header.data_len);
+ unsigned data_off = le32_to_cpu(header.data_off);
+ if (data_len) {
+ int left = data_len;
+ if (data_off & ~PAGE_MASK) {
+ // head
+ int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
+ (unsigned)left);
+ bp = buffer::create(head);
+ if (tcp_read( sd, bp.c_str(), head ) < 0)
+ return 0;
+ data.push_back(bp);
+ left -= head;
+ dout(20) << "reader got data head " << head << dendl;
+ }
-void Rank::Pipe::fail()
-{
- derr(10) << "fail" << dendl;
- assert(lock.is_locked());
+ // middle
+ int middle = left & PAGE_MASK;
+ if (middle > 0) {
+ bp = buffer::create_page_aligned(middle);
+ if (tcp_read( sd, bp.c_str(), middle ) < 0)
+ return 0;
+ data.push_back(bp);
+ left -= middle;
+ dout(20) << "reader got data page-aligned middle " << middle << dendl;
+ }
- stop();
- report_failures();
+ if (left) {
+ bp = buffer::create(left);
+ if (tcp_read( sd, bp.c_str(), left ) < 0)
+ return 0;
+ data.push_back(bp);
+ dout(20) << "reader got data tail " << left << dendl;
+ }
+ }
- for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i] && rank->local[i]->get_dispatcher())
- rank->local[i]->queue_reset(peer_addr, last_dest_name);
+ // footer
+ if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
+ return 0;
+
+ int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
+ dout(10) << "aborted = " << aborted << dendl;
+ if (aborted) {
+ dout(0) << "reader got " << front.length() << " + " << data.length()
+ << " byte message from " << header.src << ".. ABORTED" << dendl;
+ // MEH FIXME
+ Message *m = new MGenericMessage(CEPH_MSG_PING);
+ header.type = CEPH_MSG_PING;
+ m->set_header(header);
+ return m;
+ }
- // unregister
- lock.Unlock();
- rank->lock.Lock();
- unregister_pipe();
- rank->lock.Unlock();
- lock.Lock();
+ dout(20) << "reader got " << front.length() << " + " << data.length()
+ << " byte message from " << header.src << dendl;
+ return decode_message(header, footer, front, data);
}
-void Rank::Pipe::was_session_reset()
-{
- assert(lock.is_locked());
-
- dout(10) << "was_session_reset" << dendl;
- report_failures();
- for (unsigned i=0; i<rank->local.size(); i++)
- if (rank->local[i] && rank->local[i]->get_dispatcher())
- rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
-
- out_seq = 0;
- in_seq = 0;
- connect_seq = 0;
-}
-void Rank::Pipe::report_failures()
+int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
{
- // report failures
- q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent);
- while (1) {
- Message *m = _get_next_outgoing();
- if (!m)
- break;
+ while (len > 0) {
+ if (0) { // sanity
+ int l = 0;
+ for (unsigned i=0; i<msg->msg_iovlen; i++)
+ l += msg->msg_iov[i].iov_len;
+ assert(l == len);
+ }
- if (policy.drop_msg_callback) {
- unsigned srcrank = m->get_source_inst().addr.erank;
- if (srcrank >= rank->max_local || rank->local[srcrank] == 0) {
- dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl;
- } else if (rank->local[srcrank]->is_stopped()) {
- dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
+ int r = ::sendmsg(sd, msg, 0);
+ if (r == 0)
+ dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
+ if (r < 0) {
+ dout(1) << "do_sendmsg error " << strerror(errno) << dendl;
+ return -1;
+ }
+ if (state == STATE_CLOSED) {
+ dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
+ errno = EINTR;
+ return -1; // close enough
+ }
+ len -= r;
+ if (len == 0) break;
+
+ // hrmph. trim r bytes off the front of our message.
+ dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
+ while (r > 0) {
+ if (msg->msg_iov[0].iov_len <= (size_t)r) {
+ // lose this whole item
+ //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
+ r -= msg->msg_iov[0].iov_len;
+ msg->msg_iov++;
+ msg->msg_iovlen--;
} else {
- dout(10) << "fail on " << *m << dendl;
- rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+ // partial!
+ //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
+ msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r);
+ msg->msg_iov[0].iov_len -= r;
+ break;
}
}
- m->put();
}
+ return 0;
}
-void Rank::Pipe::stop()
+
+int SimpleMessenger::Pipe::write_ack(unsigned seq)
{
- dout(10) << "stop" << dendl;
- state = STATE_CLOSED;
- cond.Signal();
- if (reader_running)
- reader_thread.kill(SIGUSR1);
- if (writer_running)
- writer_thread.kill(SIGUSR1);
+ dout(10) << "write_ack " << seq << dendl;
+
+ char c = CEPH_MSGR_TAG_ACK;
+ __le32 s;
+ s = seq;
+
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ struct iovec msgvec[2];
+ msgvec[0].iov_base = &c;
+ msgvec[0].iov_len = 1;
+ msgvec[1].iov_base = &s;
+ msgvec[1].iov_len = sizeof(s);
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 2;
+
+ if (do_sendmsg(sd, &msg, 5) < 0)
+ return -1;
+ return 0;
}
-/* read msgs from socket.
- * also, server.
- */
-void Rank::Pipe::reader()
+int SimpleMessenger::Pipe::write_message(Message *m)
{
- if (state == STATE_ACCEPTING)
- accept();
+ ceph_msg_header& header = m->get_header();
+ ceph_msg_footer& footer = m->get_footer();
- lock.Lock();
+ // get envelope, buffers
+ header.front_len = m->get_payload().length();
+ header.data_len = m->get_data().length();
+ footer.flags = 0;
+ m->calc_header_crc();
- // loop.
- while (state != STATE_CLOSED &&
- state != STATE_CONNECTING) {
- assert(lock.is_locked());
+ bufferlist blist = m->get_payload();
+ blist.append(m->get_data());
+
+ dout(20) << "write_message " << m << " to " << header.dst << dendl;
+
+ // set up msghdr and iovecs
+ struct msghdr msg;
+ memset(&msg, 0, sizeof(msg));
+ struct iovec msgvec[3 + blist.buffers().size()]; // conservative upper bound
+ msg.msg_iov = msgvec;
+ int msglen = 0;
+
+ // send tag
+ char tag = CEPH_MSGR_TAG_MSG;
+ msgvec[msg.msg_iovlen].iov_base = &tag;
+ msgvec[msg.msg_iovlen].iov_len = 1;
+ msglen++;
+ msg.msg_iovlen++;
- // sleep if (re)connecting
- if (state == STATE_STANDBY) {
- dout(20) << "reader sleeping during reconnect|standby" << dendl;
- cond.Wait(lock);
- continue;
- }
+ // send envelope
+ msgvec[msg.msg_iovlen].iov_base = (char*)&header;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(header);
+ msglen += sizeof(header);
+ msg.msg_iovlen++;
- lock.Unlock();
+ // payload (front+data)
+ list<bufferptr>::const_iterator pb = blist.buffers().begin();
+ int b_off = 0; // carry-over buffer offset, if any
+ int bl_pos = 0; // blist pos
+ int left = blist.length();
- char tag = -1;
- dout(20) << "reader reading tag..." << dendl;
- int rc = tcp_read(sd, (char*)&tag, 1);
- if (rc < 0) {
- lock.Lock();
- dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl;
- fault(false, true);
- continue;
+ while (left > 0) {
+ int donow = MIN(left, (int)pb->length()-b_off);
+ if (donow == 0) {
+ dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl;
}
-
- // open ...
- if (tag == CEPH_MSGR_TAG_ACK) {
- dout(20) << "reader got ACK" << dendl;
- __u32 seq;
- int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
- lock.Lock();
- if (rc < 0) {
- dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl;
- fault(false, true);
- } else if (state != STATE_CLOSED) {
- dout(15) << "reader got ack seq " << seq << dendl;
- // trim sent list
- while (!sent.empty() &&
- sent.front()->get_seq() <= seq) {
- Message *m = sent.front();
- sent.pop_front();
- dout(10) << "reader got ack seq "
- << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl;
- m->put();
- }
- }
- continue;
+ assert(donow > 0);
+ dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
+ << " leftinchunk " << left
+ << " buffer len " << pb->length()
+ << " writing " << donow
+ << dendl;
+
+ if (msg.msg_iovlen >= IOV_MAX-2) {
+ if (do_sendmsg(sd, &msg, msglen))
+ return -1;
+
+ // and restart the iov
+ msg.msg_iov = msgvec;
+ msg.msg_iovlen = 0;
+ msglen = 0;
+ }
+
+ msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
+ msgvec[msg.msg_iovlen].iov_len = donow;
+ msglen += donow;
+ msg.msg_iovlen++;
+
+ left -= donow;
+ assert(left >= 0);
+ b_off += donow;
+ bl_pos += donow;
+ if (left == 0) break;
+ while (b_off == (int)pb->length()) {
+ pb++;
+ b_off = 0;
}
+ }
+ assert(left == 0);
- else if (tag == CEPH_MSGR_TAG_MSG) {
- dout(20) << "reader got MSG" << dendl;
- Message *m = read_message();
- lock.Lock();
-
- if (!m) {
- derr(2) << "reader read null message, " << strerror(errno) << dendl;
- fault(false, true);
- continue;
- }
-
- if (state == STATE_CLOSED ||
- state == STATE_CONNECTING)
- continue;
+ // send footer
+ msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
+ msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
+ msglen += sizeof(footer);
+ msg.msg_iovlen++;
- // check received seq#
- if (m->get_seq() <= in_seq) {
- dout(-10) << "reader got old message "
- << m->get_seq() << " <= " << in_seq << " " << m << " " << *m
- << " for " << m->get_dest()
- << ", discarding" << dendl;
- delete m;
- continue;
- }
- in_seq++;
+ // send
+ if (do_sendmsg(sd, &msg, msglen))
+ return -1;
- if (!lossy_rx && in_seq != m->get_seq()) {
- dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
- << " for " << *m << " from " << m->get_source() << dendl;
- derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
- << " for " << *m << " from " << m->get_source() << dendl;
- assert(in_seq == m->get_seq()); // for now!
- fault(false, true);
- delete m;
- continue;
- }
+ return 0;
+}
- cond.Signal(); // wake up writer, to ack this
- lock.Unlock();
-
- dout(10) << "reader got message "
- << m->get_seq() << " " << m << " " << *m
- << " for " << m->get_dest() << dendl;
-
- // deliver
- Endpoint *entity = 0;
-
- rank->lock.Lock();
- {
- unsigned erank = m->get_dest_inst().addr.erank;
- if (erank < rank->max_local && rank->local[erank]) {
- // find entity
- entity = rank->local[erank];
- entity->get();
- // first message?
- if (entity->need_addr) {
- entity->_set_myaddr(m->get_dest_inst().addr);
- dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
- entity->need_addr = false;
- }
+/********************************************
+ * SimpleMessenger
+ */
+#undef dout_prefix
+#define dout_prefix _prefix(this)
- if (rank->need_addr) {
- rank->rank_addr = m->get_dest_inst().addr;
- rank->rank_addr.erank = 0;
- dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
- rank->need_addr = false;
- }
- } else {
- derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
- }
- }
- rank->lock.Unlock();
-
- if (entity) {
- entity->queue_message(m); // queue
- entity->put();
- }
+/*
+ * note: assumes lock is held
+ */
+void SimpleMessenger::reaper()
+{
+ dout(10) << "reaper" << dendl;
+ assert(lock.is_locked());
- lock.Lock();
- }
-
- else if (tag == CEPH_MSGR_TAG_CLOSE) {
- dout(20) << "reader got CLOSE" << dendl;
- lock.Lock();
- if (state == STATE_CLOSING)
- state = STATE_CLOSED;
- else
- state = STATE_CLOSING;
- cond.Signal();
- break;
- }
- else {
- dout(0) << "reader bad tag " << (int)tag << dendl;
- lock.Lock();
- fault(false, true);
- }
+ while (!pipe_reap_queue.empty()) {
+ Pipe *p = pipe_reap_queue.front();
+ dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
+ p->unregister_pipe();
+ pipe_reap_queue.pop_front();
+ assert(pipes.count(p));
+ pipes.erase(p);
+ p->join();
+ p->discard_queue();
+ dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl;
+ assert(p->sd < 0);
+ delete p;
+ dout(10) << "reaper deleted pipe " << p << dendl;
}
+}
-
- // reap?
- bool reap = false;
- reader_running = false;
- if (!writer_running)
- reap = true;
-
- lock.Unlock();
- if (reap) {
- dout(10) << "reader queueing for reap" << dendl;
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
- }
- rank->lock.Lock();
- {
- rank->pipe_reap_queue.push_back(this);
- rank->wait_cond.Signal();
- }
- rank->lock.Unlock();
+int SimpleMessenger::bind(int64_t force_nonce)
+{
+ lock.Lock();
+ if (started) {
+ dout(10) << "rank.bind already started" << dendl;
+ lock.Unlock();
+ return -1;
}
+ dout(10) << "rank.bind" << dendl;
+ lock.Unlock();
- dout(10) << "reader done" << dendl;
+ // bind to a socket
+ return accepter.bind(force_nonce);
}
-/*
-class FakeSocketError : public Context {
- int sd;
+
+class C_Die : public Context {
public:
- FakeSocketError(int s) : sd(s) {}
- void finish(int r) {
- cout << "faking socket error on " << sd << std::endl;
- ::close(sd);
+ void finish(int) {
+ cerr << "die" << std::endl;
+ exit(1);
}
};
-*/
-/* write msgs to socket.
- * also, client.
- */
-void Rank::Pipe::writer()
+static void write_pid_file(int pid)
{
- lock.Lock();
-
- while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
- // standby?
- if (!q.empty() && state == STATE_STANDBY)
- state = STATE_CONNECTING;
-
- // connect?
- if (state == STATE_CONNECTING) {
- connect();
- continue;
- }
-
- if (state == STATE_CLOSING) {
- // write close tag
- dout(20) << "writer writing CLOSE tag" << dendl;
- char tag = CEPH_MSGR_TAG_CLOSE;
- state = STATE_CLOSED;
- lock.Unlock();
- if (sd) ::write(sd, &tag, 1);
- lock.Lock();
- continue;
- }
-
- if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
- (!q.empty() || in_seq > in_seq_acked)) {
+ if (!g_conf.pid_file)
+ return;
- // send ack?
- if (in_seq > in_seq_acked) {
- int send_seq = in_seq;
- lock.Unlock();
- int rc = write_ack(send_seq);
- lock.Lock();
- if (rc < 0) {
- dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl;
- fault();
- continue;
- }
- in_seq_acked = send_seq;
- }
+ int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644);
+ if (fd >= 0) {
+ char buf[20];
+ int len = sprintf(buf, "%d\n", pid);
+ ::write(fd, buf, len);
+ ::close(fd);
+ }
+}
- // grab outgoing message
- Message *m = _get_next_outgoing();
- if (m) {
- m->set_seq(++out_seq);
- sent.push_back(m); // move to sent list
- m->get();
- lock.Unlock();
+static void remove_pid_file()
+{
+ if (!g_conf.pid_file)
+ return;
- dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
+ // only remove it if it has OUR pid in it!
+ int fd = ::open(g_conf.pid_file, O_RDONLY);
+ if (fd >= 0) {
+ char buf[20];
+ ::read(fd, buf, 20);
+ ::close(fd);
+ int a = atoi(buf);
- // encode and copy out of *m
- if (m->empty_payload())
- m->encode_payload();
- m->calc_front_crc();
+ if (a == getpid())
+ ::unlink(g_conf.pid_file);
+ else
+ generic_dout(0) << "strange, pid file " << g_conf.pid_file
+ << " has " << a << ", not expected " << getpid()
+ << dendl;
+ }
+}
- dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
- int rc = write_message(m);
+int SimpleMessenger::start(bool nodaemon)
+{
+ // register at least one entity, first!
+ assert(my_type >= 0);
- lock.Lock();
- if (rc < 0) {
- derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
- << errno << ": " << strerror(errno) << dendl;
- fault();
- }
- m->put();
- }
- continue;
- }
-
- // wait
- dout(20) << "writer sleeping" << dendl;
- cond.Wait(lock);
+ lock.Lock();
+ if (started) {
+ dout(10) << "rank.start already started" << dendl;
+ lock.Unlock();
+ return 0;
}
-
- dout(20) << "writer finishing" << dendl;
-
- // reap?
- bool reap = false;
- writer_running = false;
- if (!reader_running) reap = true;
+ dout(1) << "rank.start at " << rank_addr << dendl;
+ started = true;
lock.Unlock();
-
- if (reap) {
- dout(10) << "writer queueing for reap" << dendl;
- if (sd >= 0) {
- ::close(sd);
- sd = -1;
- closed_socket();
+
+ // daemonize?
+ if (g_conf.daemonize && !nodaemon) {
+ if (Thread::get_num_threads() > 0) {
+ derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
+ << " already started that will now die! call rank.start() sooner."
+ << dendl;
}
- rank->lock.Lock();
- {
- rank->pipe_reap_queue.push_back(this);
- rank->wait_cond.Signal();
+ dout(1) << "rank.start daemonizing" << dendl;
+
+ if (1) {
+ daemon(1, 0);
+ write_pid_file(getpid());
+ } else {
+ pid_t pid = fork();
+ if (pid) {
+ // i am parent
+ write_pid_file(pid);
+ ::close(0);
+ ::close(1);
+ ::close(2);
+ _exit(0);
+ }
}
- rank->lock.Unlock();
+
+ if (g_conf.chdir && g_conf.chdir[0]) {
+ ::mkdir(g_conf.chdir, 0700);
+ ::chdir(g_conf.chdir);
+ }
+
+ _dout_rename_output_file();
+ } else if (g_daemon) {
+ write_pid_file(getpid());
}
- dout(10) << "writer done" << dendl;
+ // some debug hackery?
+ if (g_conf.kill_after)
+ g_timer.add_event_after(g_conf.kill_after, new C_Die);
+
+ // go!
+ accepter.start();
+ return 0;
}
-Message *Rank::Pipe::read_message()
+/* connect_rank
+ * NOTE: assumes rank.lock held.
+ */
+SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p)
{
- // envelope
- //dout(10) << "receiver.read_message from sd " << sd << dendl;
+ assert(lock.is_locked());
+ assert(addr != rank_addr);
- ceph_msg_header header;
- ceph_msg_footer footer;
-
- if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
- return 0;
+ dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
- dout(20) << "reader got envelope type=" << header.type
- << " src " << header.src << " dst " << header.dst
- << " front=" << header.front_len
- << " data=" << header.data_len
- << " off " << header.data_off
- << dendl;
-
- // verify header crc
- __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc));
- if (header_crc != header.crc) {
- dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
- return 0;
- }
+ // create pipe
+ Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING);
+ pipe->policy = p;
+ pipe->peer_addr = addr;
+ pipe->start_writer();
+ pipe->register_pipe();
+ pipes.insert(pipe);
- // ok, now it's safe to change the header..
- // munge source address?
- if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) {
- dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
- header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr;
- }
+ return pipe;
+}
- // read front
- bufferlist front;
- bufferptr bp;
- int front_len = header.front_len;
- if (front_len) {
- bp = buffer::create(front_len);
- if (tcp_read( sd, bp.c_str(), front_len ) < 0)
- return 0;
- front.push_back(bp);
- dout(20) << "reader got front " << front.length() << dendl;
- }
- // read data
- bufferlist data;
- unsigned data_len = le32_to_cpu(header.data_len);
- unsigned data_off = le32_to_cpu(header.data_off);
- if (data_len) {
- int left = data_len;
- if (data_off & ~PAGE_MASK) {
- // head
- int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK),
- (unsigned)left);
- bp = buffer::create(head);
- if (tcp_read( sd, bp.c_str(), head ) < 0)
- return 0;
- data.push_back(bp);
- left -= head;
- dout(20) << "reader got data head " << head << dendl;
- }
- // middle
- int middle = left & PAGE_MASK;
- if (middle > 0) {
- bp = buffer::create_page_aligned(middle);
- if (tcp_read( sd, bp.c_str(), middle ) < 0)
- return 0;
- data.push_back(bp);
- left -= middle;
- dout(20) << "reader got data page-aligned middle " << middle << dendl;
- }
- if (left) {
- bp = buffer::create(left);
- if (tcp_read( sd, bp.c_str(), left ) < 0)
- return 0;
- data.push_back(bp);
- dout(20) << "reader got data tail " << left << dendl;
- }
- }
- // footer
- if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0)
- return 0;
-
- int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED);
- dout(10) << "aborted = " << aborted << dendl;
- if (aborted) {
- dout(0) << "reader got " << front.length() << " + " << data.length()
- << " byte message from " << header.src << ".. ABORTED" << dendl;
- // MEH FIXME
- Message *m = new MGenericMessage(CEPH_MSG_PING);
- header.type = CEPH_MSG_PING;
- m->set_header(header);
- return m;
- }
- dout(20) << "reader got " << front.length() << " + " << data.length()
- << " byte message from " << header.src << dendl;
- return decode_message(header, footer, front, data);
-}
-int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len)
+/* register_entity
+ */
+SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name)
{
- while (len > 0) {
- if (0) { // sanity
- int l = 0;
- for (unsigned i=0; i<msg->msg_iovlen; i++)
- l += msg->msg_iov[i].iov_len;
- assert(l == len);
- }
+ dout(10) << "register_entity " << name << dendl;
+ lock.Lock();
+
+ // create messenger
+ int erank = max_local;
+ Endpoint *msgr = new Endpoint(this, name, erank);
- int r = ::sendmsg(sd, msg, 0);
- if (r == 0)
- dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl;
- if (r < 0) {
- dout(1) << "do_sendmsg error " << strerror(errno) << dendl;
- return -1;
- }
- if (state == STATE_CLOSED) {
- dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl;
- errno = EINTR;
- return -1; // close enough
- }
- len -= r;
- if (len == 0) break;
-
- // hrmph. trim r bytes off the front of our message.
- dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl;
- while (r > 0) {
- if (msg->msg_iov[0].iov_len <= (size_t)r) {
- // lose this whole item
- //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl;
- r -= msg->msg_iov[0].iov_len;
- msg->msg_iov++;
- msg->msg_iovlen--;
- } else {
- // partial!
- //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl;
- msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r);
- msg->msg_iov[0].iov_len -= r;
- break;
- }
- }
- }
- return 0;
+ // now i know my type.
+ if (my_type >= 0)
+ assert(my_type == name.type());
+ else
+ my_type = name.type();
+
+ // add to directory
+ max_local++;
+ local.resize(max_local);
+ stopped.resize(max_local);
+
+ msgr->get();
+ local[erank] = msgr;
+ stopped[erank] = false;
+ msgr->_myinst.addr = rank_addr;
+ if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
+ msgr->need_addr = true;
+ msgr->_myinst.addr.erank = erank;
+
+ dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr
+ << " need_addr=" << need_addr
+ << dendl;
+
+ num_local++;
+
+ lock.Unlock();
+ return msgr;
}
-int Rank::Pipe::write_ack(unsigned seq)
+void SimpleMessenger::unregister_entity(Endpoint *msgr)
{
- dout(10) << "write_ack " << seq << dendl;
+ lock.Lock();
+ dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
+
+ // remove from local directory.
+ assert(msgr->my_rank >= 0);
+ assert(local[msgr->my_rank] == msgr);
+ local[msgr->my_rank] = 0;
+ stopped[msgr->my_rank] = true;
+ num_local--;
+ msgr->my_rank = -1;
- char c = CEPH_MSGR_TAG_ACK;
- __le32 s;
- s = seq;
+ assert(msgr->nref.test() > 1);
+ msgr->put();
- struct msghdr msg;
- memset(&msg, 0, sizeof(msg));
- struct iovec msgvec[2];
- msgvec[0].iov_base = &c;
- msgvec[0].iov_len = 1;
- msgvec[1].iov_base = &s;
- msgvec[1].iov_len = sizeof(s);
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 2;
-
- if (do_sendmsg(sd, &msg, 5) < 0)
- return -1;
- return 0;
+ wait_cond.Signal();
+
+ lock.Unlock();
}
-int Rank::Pipe::write_message(Message *m)
+void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
{
- ceph_msg_header& header = m->get_header();
- ceph_msg_footer& footer = m->get_footer();
+ const entity_name_t dest = m->get_dest();
- // get envelope, buffers
- header.front_len = m->get_payload().length();
- header.data_len = m->get_data().length();
- footer.flags = 0;
- m->calc_header_crc();
+ assert(m->nref.test() == 0);
- bufferlist blist = m->get_payload();
- blist.append(m->get_data());
-
- dout(20) << "write_message " << m << " to " << header.dst << dendl;
-
- // set up msghdr and iovecs
- struct msghdr msg;
- memset(&msg, 0, sizeof(msg));
- struct iovec msgvec[3 + blist.buffers().size()]; // conservative upper bound
- msg.msg_iov = msgvec;
- int msglen = 0;
-
- // send tag
- char tag = CEPH_MSGR_TAG_MSG;
- msgvec[msg.msg_iovlen].iov_base = &tag;
- msgvec[msg.msg_iovlen].iov_len = 1;
- msglen++;
- msg.msg_iovlen++;
+ m->get_header().mon_protocol = CEPH_MON_PROTOCOL;
+ m->get_header().monc_protocol = CEPH_MONC_PROTOCOL;
+ m->get_header().mds_protocol = CEPH_MDS_PROTOCOL;
+ m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL;
+ m->get_header().osd_protocol = CEPH_OSD_PROTOCOL;
+ m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL;
- // send envelope
- msgvec[msg.msg_iovlen].iov_base = (char*)&header;
- msgvec[msg.msg_iovlen].iov_len = sizeof(header);
- msglen += sizeof(header);
- msg.msg_iovlen++;
+ // lookup
+ entity_addr_t dest_proc_addr = dest_addr;
+ dest_proc_addr.erank = 0;
- // payload (front+data)
- list<bufferptr>::const_iterator pb = blist.buffers().begin();
- int b_off = 0; // carry-over buffer offset, if any
- int bl_pos = 0; // blist pos
- int left = blist.length();
+ lock.Lock();
+ {
+ // local?
+ if (rank_addr.is_local_to(dest_addr)) {
+ if (dest_addr.erank < max_local && local[dest_addr.erank]) {
+ // local
+ dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
+ local[dest_addr.erank]->queue_message(m);
+ } else {
+ derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl;
+ //assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
+ delete m;
+ }
+ }
+ else {
+ // remote.
+ Pipe *pipe = 0;
+ if (rank_pipe.count( dest_proc_addr )) {
+ // connected?
+ pipe = rank_pipe[ dest_proc_addr ];
+ pipe->lock.Lock();
+ if (pipe->state == Pipe::STATE_CLOSED) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+ pipe->unregister_pipe();
+ pipe->lock.Unlock();
+ pipe = 0;
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
- while (left > 0) {
- int donow = MIN(left, (int)pb->length()-b_off);
- if (donow == 0) {
- dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl;
+ // if this pipe was created by an incoming connection, but we haven't received
+ // a message yet, then it won't have the policy set.
+ if (pipe->get_out_seq() == 0)
+ pipe->policy = policy_map[m->get_dest().type()];
+
+ pipe->_send(m);
+ pipe->lock.Unlock();
+ }
+ }
+ if (!pipe) {
+ if (lazy) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
+ delete m;
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
+ // not connected.
+ pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
+ pipe->send(m);
+ }
+ }
}
- assert(donow > 0);
- dout(30) << " bl_pos " << bl_pos << " b_off " << b_off
- << " leftinchunk " << left
- << " buffer len " << pb->length()
- << " writing " << donow
- << dendl;
-
- if (msg.msg_iovlen >= IOV_MAX-2) {
- if (do_sendmsg(sd, &msg, msglen))
- return -1;
-
- // and restart the iov
- msg.msg_iov = msgvec;
- msg.msg_iovlen = 0;
- msglen = 0;
+ }
+
+ lock.Unlock();
+}
+
+
+
+
+
+void SimpleMessenger::wait()
+{
+ lock.Lock();
+ while (1) {
+ // reap dead pipes
+ reaper();
+
+ if (num_local == 0) {
+ dout(10) << "wait: everything stopped" << dendl;
+ break; // everything stopped.
+ } else {
+ dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl;
}
- msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off);
- msgvec[msg.msg_iovlen].iov_len = donow;
- msglen += donow;
- msg.msg_iovlen++;
-
- left -= donow;
- assert(left >= 0);
- b_off += donow;
- bl_pos += donow;
- if (left == 0) break;
- while (b_off == (int)pb->length()) {
- pb++;
- b_off = 0;
- }
+ wait_cond.Wait(lock);
}
- assert(left == 0);
+ lock.Unlock();
+
+ // done! clean up.
+ dout(20) << "wait: stopping accepter thread" << dendl;
+ accepter.stop();
+ dout(20) << "wait: stopped accepter thread" << dendl;
- // send footer
- msgvec[msg.msg_iovlen].iov_base = (void*)&footer;
- msgvec[msg.msg_iovlen].iov_len = sizeof(footer);
- msglen += sizeof(footer);
- msg.msg_iovlen++;
+ // close+reap all pipes
+ lock.Lock();
+ {
+ dout(10) << "wait: closing pipes" << dendl;
+ list<Pipe*> toclose;
+ for (hash_map<entity_addr_t,Pipe*>::iterator i = rank_pipe.begin();
+ i != rank_pipe.end();
+ i++)
+ toclose.push_back(i->second);
+ for (list<Pipe*>::iterator i = toclose.begin();
+ i != toclose.end();
+ i++) {
+ (*i)->unregister_pipe();
+ (*i)->lock.Lock();
+ (*i)->stop();
+ (*i)->lock.Unlock();
+ }
- // send
- if (do_sendmsg(sd, &msg, msglen))
- return -1;
+ reaper();
+ dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl;
+ while (!pipes.empty()) {
+ wait_cond.Wait(lock);
+ reaper();
+ }
+ }
+ lock.Unlock();
- return 0;
+ dout(10) << "wait: done." << dendl;
+ dout(1) << "shutdown complete." << dendl;
+ remove_pid_file();
+ started = false;
+ my_type = -1;
}
+
+
+void SimpleMessenger::mark_down(entity_addr_t addr)
+{
+ lock.Lock();
+ if (rank_pipe.count(addr)) {
+ Pipe *p = rank_pipe[addr];
+ dout(1) << "mark_down " << addr << " -- " << p << dendl;
+ p->unregister_pipe();
+ p->lock.Lock();
+ p->stop();
+ p->lock.Unlock();
+ } else {
+ dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
+ }
+ lock.Unlock();
+}
+