From a6bc29336374b43aa293ac410152a6f6f12ffa2d Mon Sep 17 00:00:00 2001 From: sageweil Date: Sat, 25 Aug 2007 22:40:01 +0000 Subject: [PATCH] fixed parallel client naming race git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1697 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/TODO | 1 + trunk/ceph/client/Client.cc | 17 ++++++++------- trunk/ceph/msg/Messenger.h | 6 ++++++ trunk/ceph/msg/SimpleMessenger.cc | 35 +++++++++++++++++++++++++++++-- trunk/ceph/msg/SimpleMessenger.h | 3 +++ trunk/ceph/newsyn.cc | 16 ++++++-------- 6 files changed, 58 insertions(+), 20 deletions(-) diff --git a/trunk/ceph/TODO b/trunk/ceph/TODO index d52d60d3acfee..b20a555dff8e1 100644 --- a/trunk/ceph/TODO +++ b/trunk/ceph/TODO @@ -155,6 +155,7 @@ rados snapshots objecter +- maybe_request_map should set a timer event to periodically re-request. - transaction prepare/commit - read+floor_lockout diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index b287950859b62..324832af22e41 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -124,7 +124,6 @@ Client::Client(Messenger *m, MonMap *mm) : timer(client_lock) // set up messengers messenger = m; - messenger->set_dispatcher(this); // osd interfaces osdmap = new OSDMap(); // initially blank.. see mount() @@ -232,8 +231,6 @@ void Client::dump_cache() void Client::init() { - - } void Client::shutdown() { @@ -1330,9 +1327,10 @@ void Client::_try_mount() dout(10) << "_try_mount" << dendl; int mon = monmap->pick_mon(); dout(2) << "sending client_mount to mon" << mon << dendl; - messenger->send_message(new MClientMount(messenger->get_myaddr(), - client_instance_this_process), - monmap->get_inst(mon)); + messenger->send_first_message(this, // simultaneously go active (if we haven't already) + new MClientMount(messenger->get_myaddr(), + client_instance_this_process), + monmap->get_inst(mon)); // schedule timeout? if (g_conf.num_client <= 1) { // don't do this if we have multiple instances in our process! @@ -1353,9 +1351,9 @@ int Client::mount() { client_lock.Lock(); assert(!mounted); // caller is confused? - assert(!mdsmap); - + _try_mount(); + //messenger->set_dispatcher(this); // FIXME: there is still a race condition here! while (!mdsmap || !osdmap || @@ -2604,13 +2602,16 @@ int Client::_release(Fh *f) if (g_conf.client_oc) { // caching on. if (in->num_open_rd == 0 && in->num_open_wr == 0) { + dout(20) << "calling empty" << dendl; in->fc.empty(new C_Client_CloseRelease(this, in)); } else if (in->num_open_rd == 0) { + dout(20) << "calling release" << dendl; in->fc.release_clean(); close_release(in); } else if (in->num_open_wr == 0) { + dout(20) << "calling flush dirty" << dendl; in->fc.flush_dirty(new C_Client_CloseRelease(this,in)); } diff --git a/trunk/ceph/msg/Messenger.h b/trunk/ceph/msg/Messenger.h index be60c5061b086..22815ab3c6f73 100644 --- a/trunk/ceph/msg/Messenger.h +++ b/trunk/ceph/msg/Messenger.h @@ -73,6 +73,12 @@ class Messenger { virtual void prepare_dest(const entity_addr_t& addr) {} virtual int send_message(Message *m, entity_inst_t dest, int port=0, int fromport=0) = 0; + virtual int send_first_message(Dispatcher *d, + Message *m, entity_inst_t dest, + int port=0, int fromport=0) { + set_dispatcher(d); + return send_message(m, dest, port, fromport); + } // make a procedure call //virtual Message* sendrecv(Message *m, msg_name_t dest, int port=0); diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index ea011bb20f1b4..31477587e4cd3 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -908,11 +908,12 @@ Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr) Rank::EntityMessenger *Rank::find_unnamed(entity_name_t a) { - // find an unnamed local entity of the right type + // find an unnamed (and _ready_) local entity of the right type for (map::iterator p = local.begin(); p != local.end(); ++p) { - if (p->first.type() == a.type() && p->first.is_new()) + if (p->first.type() == a.type() && p->first.is_new() && + p->second->is_ready()) return p->second; } return 0; @@ -1208,6 +1209,36 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest, return 0; } +int Rank::EntityMessenger::send_first_message(Dispatcher *d, + Message *m, entity_inst_t dest, + int port, int fromport) +{ + /* hacky thing for csyn and newsyn: + * set dispatcher (go active) AND set sender for this + * message while holding rank.lock. this prevents any + * races against incoming unnamed messages naming us before + * we fire off our first message. + */ + rank.lock.Lock(); + set_dispatcher(d); + + // set envelope + m->set_source(get_myname(), fromport); + m->set_source_addr(rank.my_addr); + m->set_dest_inst(dest); + m->set_dest_port(port); + rank.lock.Unlock(); + + dout(1) << m->get_source() + << " --> " << dest.name << " " << dest.addr + << " -- " << *m + << " -- " << m + << dendl; + + rank.submit_message(m, dest.addr); + + return 0; +} const entity_addr_t &Rank::EntityMessenger::get_myaddr() diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index 85732307b543e..759a7fbcda473 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -209,6 +209,9 @@ private: void prepare_dest(const entity_addr_t& addr); int send_message(Message *m, entity_inst_t dest, int port=0, int fromport=0); + int send_first_message(Dispatcher *d, + Message *m, entity_inst_t dest, + int port=0, int fromport=0); void mark_down(entity_addr_t a); void mark_up(entity_name_t a, entity_addr_t& i); diff --git a/trunk/ceph/newsyn.cc b/trunk/ceph/newsyn.cc index dd9f55683a420..4926fac0eb35a 100644 --- a/trunk/ceph/newsyn.cc +++ b/trunk/ceph/newsyn.cc @@ -321,13 +321,10 @@ int main(int argc, char **argv) clientlist.insert(i); client[i] = new Client(rank.register_entity(entity_name_t(entity_name_t::TYPE_CLIENT, -1-i)), //MSG_ADDR_CLIENT_NEW), monmap); - + syn[i] = new SyntheticClient(client[i]); + // logger? if (client_logger == 0) { - char s[80]; - sprintf(s,"clnode.%d", myrank); - client_logger = new Logger(s, &client_logtype); - client_logtype.add_inc("lsum"); client_logtype.add_inc("lnum"); client_logtype.add_inc("lwsum"); @@ -344,14 +341,13 @@ int main(int argc, char **argv) client_logtype.add_inc("ldirnum"); client_logtype.add_inc("readdir"); client_logtype.add_inc("stat"); + + char s[80]; + sprintf(s,"clnode.%d", myrank); + client_logger = new Logger(s, &client_logtype); } - //client[i]->init(); started++; - - syn[i] = new SyntheticClient(client[i]); - - //client[i]->mount(); nclients++; } -- 2.39.5