From 244535619b9bf175918d38ab7bc8d48312de50d3 Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 25 Oct 2007 22:48:33 +0000 Subject: [PATCH] removed ports from messenger interface git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1997 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/client/Client.cc | 23 ++++----- trunk/ceph/include/ceph_fs.h | 2 +- trunk/ceph/mds/Anchor.h | 10 ++-- trunk/ceph/mds/AnchorClient.cc | 42 +++++---------- trunk/ceph/mds/AnchorTable.cc | 18 +++---- trunk/ceph/mds/Locker.cc | 37 ++++++------- trunk/ceph/mds/MDBalancer.cc | 3 +- trunk/ceph/mds/MDCache.cc | 51 +++++++++--------- trunk/ceph/mds/MDS.cc | 78 +++++++++++++++++----------- trunk/ceph/mds/MDS.h | 4 +- trunk/ceph/mds/Migrator.cc | 31 +++++------ trunk/ceph/mds/Server.cc | 23 ++++----- trunk/ceph/mds/mdstypes.h | 15 ++---- trunk/ceph/msg/FakeMessenger.cc | 5 +- trunk/ceph/msg/FakeMessenger.h | 2 +- trunk/ceph/msg/Message.cc | 5 +- trunk/ceph/msg/Message.h | 86 +++++++++++++------------------ trunk/ceph/msg/Messenger.h | 12 +---- trunk/ceph/msg/SimpleMessenger.cc | 45 +++------------- trunk/ceph/msg/SimpleMessenger.h | 11 ++-- 20 files changed, 214 insertions(+), 289 deletions(-) diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index c7a777d750c2c..5ffbf890f42e9 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -691,7 +691,7 @@ MClientReply *Client::make_request(MClientRequest *req, if (waiting_for_session.count(mds) == 0) { dout(10) << "opening session to mds" << mds << dendl; messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN), - mdsmap->get_inst(mds), MDS_PORT_SERVER); + mdsmap->get_inst(mds)); } // wait @@ -803,7 +803,7 @@ void Client::send_request(MetaRequest *request, int mds) request->request = 0; dout(10) << "send_request " << *r << " to mds" << mds << dendl; - messenger->send_message(r, mdsmap->get_inst(mds), MDS_PORT_SERVER); + messenger->send_message(r, mdsmap->get_inst(mds)); request->mds.insert(mds); } @@ -1051,7 +1051,7 @@ void Client::send_reconnect(int mds) m->closed = true; } - messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER); + messenger->send_message(m, mdsmap->get_inst(mds)); } @@ -1198,7 +1198,7 @@ void Client::handle_file_caps(MClientFileCaps *m) << ", which we don't want caps for, releasing." << dendl; m->set_caps(0); m->set_wanted(0); - messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER); + messenger->send_message(m, m->get_source_inst()); return; } @@ -1308,7 +1308,7 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in) in->file_wr_size = 0; } - messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER); + messenger->send_message(m, m->get_source_inst()); } @@ -1334,7 +1334,7 @@ void Client::release_caps(Inode *in, it->second.seq, it->second.caps, in->file_caps_wanted()); - messenger->send_message(m, mdsmap->get_inst(it->first), MDS_PORT_LOCKER); + messenger->send_message(m, mdsmap->get_inst(it->first)); } } @@ -1359,8 +1359,7 @@ void Client::update_caps_wanted(Inode *in) it->second.seq, it->second.caps, in->file_caps_wanted()); - messenger->send_message(m, - mdsmap->get_inst(it->first), MDS_PORT_LOCKER); + messenger->send_message(m, mdsmap->get_inst(it->first)); } } @@ -1374,9 +1373,9 @@ void Client::_try_mount() dout(10) << "_try_mount" << dendl; int mon = monmap->pick_mon(); dout(2) << "sending client_mount to mon" << mon << " as instance " << my_instance << dendl; - messenger->send_first_message(this, // simultaneously go active (if we haven't already) - new MClientMount(messenger->get_myaddr(), my_instance), - monmap->get_inst(mon)); + messenger->set_dispatcher(this); + messenger->send_message(new MClientMount(messenger->get_myaddr(), my_instance), + monmap->get_inst(mon)); // schedule timeout? assert(mount_timeout_event == 0); @@ -1528,7 +1527,7 @@ int Client::unmount() dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << dendl; messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE, p->second), - mdsmap->get_inst(p->first), MDS_PORT_SERVER); + mdsmap->get_inst(p->first)); } // send unmount! diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index fbe36ab801cfa..0a812c70b13d8 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -168,9 +168,9 @@ struct ceph_entity_inst { * message header */ struct ceph_message_header { + __u32 seq; __u32 type; struct ceph_entity_inst src, dst; - __u32 source_port, dest_port; __u32 nchunks; }; diff --git a/trunk/ceph/mds/Anchor.h b/trunk/ceph/mds/Anchor.h index 748091306a44d..a55a07dd3068e 100644 --- a/trunk/ceph/mds/Anchor.h +++ b/trunk/ceph/mds/Anchor.h @@ -25,19 +25,19 @@ using std::string; // anchor ops #define ANCHOR_OP_LOOKUP 1 -#define ANCHOR_OP_LOOKUP_REPLY 2 +#define ANCHOR_OP_LOOKUP_REPLY -2 #define ANCHOR_OP_CREATE_PREPARE 11 -#define ANCHOR_OP_CREATE_AGREE 12 +#define ANCHOR_OP_CREATE_AGREE -12 #define ANCHOR_OP_DESTROY_PREPARE 21 -#define ANCHOR_OP_DESTROY_AGREE 22 +#define ANCHOR_OP_DESTROY_AGREE -22 #define ANCHOR_OP_UPDATE_PREPARE 31 -#define ANCHOR_OP_UPDATE_AGREE 32 +#define ANCHOR_OP_UPDATE_AGREE -32 #define ANCHOR_OP_COMMIT 41 -#define ANCHOR_OP_ACK 42 +#define ANCHOR_OP_ACK -42 #define ANCHOR_OP_ROLLBACK 43 diff --git a/trunk/ceph/mds/AnchorClient.cc b/trunk/ceph/mds/AnchorClient.cc index b2fb1fb50d7bd..1cc18dc7d8fa4 100644 --- a/trunk/ceph/mds/AnchorClient.cc +++ b/trunk/ceph/mds/AnchorClient.cc @@ -92,8 +92,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } else { dout(10) << "stray create_agree on " << ino @@ -102,8 +101,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } break; @@ -126,8 +124,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } else { dout(10) << "stray destroy_agree on " << ino @@ -136,8 +133,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } break; @@ -160,8 +156,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } else { dout(10) << "stray update_agree on " << ino @@ -170,8 +165,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } break; @@ -237,8 +231,7 @@ void AnchorClient::lookup(inodeno_t ino, vector& trace, Context *onfinis pending_lookup[ino].trace = &trace; mds->send_message_mds(req, - mds->mdsmap->get_anchortable(), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_anchortable()); } @@ -258,8 +251,7 @@ void AnchorClient::prepare_create(inodeno_t ino, vector& trace, pending_create_prepare[ino].onfinish = onfinish; mds->send_message_mds(req, - mds->mdsmap->get_anchortable(), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_anchortable()); } void AnchorClient::prepare_destroy(inodeno_t ino, @@ -272,8 +264,7 @@ void AnchorClient::prepare_destroy(inodeno_t ino, pending_destroy_prepare[ino].onfinish = onfinish; pending_destroy_prepare[ino].patid = patid; mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } @@ -291,8 +282,7 @@ void AnchorClient::prepare_update(inodeno_t ino, vector& trace, pending_update_prepare[ino].onfinish = onfinish; mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } @@ -309,8 +299,7 @@ void AnchorClient::commit(version_t atid, LogSegment *ls) // send message MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid); mds->messenger->send_message(req, - mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_inst(mds->mdsmap->get_anchortable())); } @@ -332,8 +321,7 @@ void AnchorClient::resend_commits() dout(10) << "resending commit on " << p->first << dendl; MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first); mds->send_message_mds(req, - mds->mdsmap->get_anchortable(), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_anchortable()); } } @@ -346,8 +334,7 @@ void AnchorClient::resend_prepares(hash_map& prepar MAnchor *req = new MAnchor(op, p->first); req->set_trace(p->second.trace); mds->send_message_mds(req, - mds->mdsmap->get_anchortable(), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_anchortable()); } } @@ -365,8 +352,7 @@ void AnchorClient::handle_mds_recovery(int who) p++) { dout(10) << "resending lookup on " << p->first << dendl; mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first), - mds->mdsmap->get_anchortable(), - MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT); + mds->mdsmap->get_anchortable()); } // resend any pending prepares. diff --git a/trunk/ceph/mds/AnchorTable.cc b/trunk/ceph/mds/AnchorTable.cc index 65c09278c9850..f3c4fb05b772b 100644 --- a/trunk/ceph/mds/AnchorTable.cc +++ b/trunk/ceph/mds/AnchorTable.cc @@ -134,7 +134,7 @@ void AnchorTable::handle_lookup(MAnchor *req) // reply MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, req->get_ino()); reply->set_trace(trace); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; } @@ -284,7 +284,7 @@ void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid) // reply MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; } @@ -324,7 +324,7 @@ void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid) // reply MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; } @@ -367,7 +367,7 @@ void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid) // reply MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; } @@ -402,7 +402,7 @@ void AnchorTable::handle_commit(MAnchor *req) << ", already committed, sending ack." << dendl; MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; return; } @@ -421,7 +421,7 @@ void AnchorTable::_commit_logged(MAnchor *req) { dout(7) << "_commit_logged, sending ACK" << dendl; MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid()); - mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port()); + mds->messenger->send_message(reply, req->get_source_inst()); delete req; } @@ -686,16 +686,16 @@ void AnchorTable::resend_agree(version_t v, int who) { if (pending_create.count(v)) { MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, pending_create[v], v); - mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT); + mds->send_message_mds(reply, who); } else if (pending_destroy.count(v)) { MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, pending_destroy[v], v); - mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT); + mds->send_message_mds(reply, who); } else { assert(pending_update.count(v)); MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, pending_update[v].first, v); - mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT); + mds->send_message_mds(reply, who); } } diff --git a/trunk/ceph/mds/Locker.cc b/trunk/ceph/mds/Locker.cc index 55f38cd799b5f..10b7adc0d0eaf 100644 --- a/trunk/ceph/mds/Locker.cc +++ b/trunk/ceph/mds/Locker.cc @@ -90,7 +90,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg) if (mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN) continue; MLock *m = new MLock(lock, msg, mds->get_nodeid()); - mds->send_message_mds(m, it->first, MDS_PORT_LOCKER); + mds->send_message_mds(m, it->first); } } @@ -103,7 +103,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data continue; MLock *m = new MLock(lock, msg, mds->get_nodeid()); m->set_data(data); - mds->send_message_mds(m, it->first, MDS_PORT_LOCKER); + mds->send_message_mds(m, it->first); } } @@ -238,7 +238,7 @@ bool Locker::acquire_locks(MDRequest *mdr, (*q)->set_object_info(info); req->get_authpins().push_back(info); } - mds->send_message_mds(req, p->first, MDS_PORT_SERVER); + mds->send_message_mds(req, p->first); // put in waiting list assert(mdr->more()->waiting_on_slave.count(p->first) == 0); @@ -625,7 +625,7 @@ void Locker::request_inode_file_caps(CInode *in) if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(), in->replica_caps_wanted), - auth, MDS_PORT_LOCKER); + auth); } else { in->replica_caps_wanted_keep_until.sec_ref() = 0; } @@ -925,8 +925,7 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m) } else { // update lock and reply lock->set_state(LOCK_LOCK); - mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), - from, MDS_PORT_LOCKER); + mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), from); } break; @@ -1007,7 +1006,7 @@ void Locker::simple_eval_gather(SimpleLock *lock) int auth = lock->get_parent()->authority().first; if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), - lock->get_parent()->authority().first, MDS_PORT_LOCKER); + lock->get_parent()->authority().first); } lock->set_state(LOCK_LOCK); @@ -1200,7 +1199,7 @@ bool Locker::simple_xlock_start(SimpleLock *lock, MDRequest *mdr) MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); - mds->send_message_mds(r, auth, MDS_PORT_SERVER); + mds->send_message_mds(r, auth); // wait lock->add_waiter(SimpleLock::WAIT_REMOTEXLOCK, new C_MDS_RetryRequest(mdcache, mdr)); @@ -1229,7 +1228,7 @@ void Locker::simple_xlock_finish(SimpleLock *lock, MDRequest *mdr) MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNXLOCK); slavereq->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(slavereq->get_object_info()); - mds->send_message_mds(slavereq, auth, MDS_PORT_SERVER); + mds->send_message_mds(slavereq, auth); } } @@ -1389,8 +1388,7 @@ bool Locker::scatter_wrlock_start(ScatterLock *lock, MDRequest *mdr) int auth = lock->get_parent()->authority().first; dout(10) << "requesting scatter from auth on " << *lock << " on " << *lock->get_parent() << dendl; - mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), - auth, MDS_PORT_LOCKER); + mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth); } } @@ -1467,8 +1465,7 @@ void Locker::scatter_eval_gather(ScatterLock *lock) if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) { bufferlist data; lock->encode_locked_state(data); - mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), - auth, MDS_PORT_LOCKER); + mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth); } lock->set_state(LOCK_LOCK); } @@ -1638,8 +1635,7 @@ void Locker::scatter_try_unscatter(ScatterLock *lock, Context *c) int auth = lock->get_parent()->authority().first; if (lock->get_state() == LOCK_SCATTER && mds->mdsmap->get_state(auth) >= MDSMap::STATE_ACTIVE) - mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()), - auth, MDS_PORT_LOCKER); + mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()), auth); // wait... lock->add_waiter(SimpleLock::WAIT_STABLE, c); @@ -1895,8 +1891,7 @@ void Locker::handle_scatter_lock(ScatterLock *lock, MLock *m) // encode and reply bufferlist data; lock->encode_locked_state(data); - mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), - from, MDS_PORT_LOCKER); + mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), from); lock->set_state(LOCK_LOCK); } break; @@ -2374,7 +2369,7 @@ void Locker::file_eval_gather(FileLock *lock) // ack MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid()); - mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER); + mds->send_message_mds(reply, in->authority().first); } break; @@ -2384,7 +2379,7 @@ void Locker::file_eval_gather(FileLock *lock) // ack MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()); - mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER); + mds->send_message_mds(reply, in->authority().first); } break; @@ -2793,7 +2788,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) lock->set_state(LOCK_LOCK); MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()); - mds->send_message_mds(reply, from, MDS_PORT_LOCKER); + mds->send_message_mds(reply, from); } break; @@ -2814,7 +2809,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m) // ack MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid()); - mds->send_message_mds(reply, from, MDS_PORT_LOCKER); + mds->send_message_mds(reply, from); } } else { // LOCK diff --git a/trunk/ceph/mds/MDBalancer.cc b/trunk/ceph/mds/MDBalancer.cc index 8e9d0e2dd46fa..7bf6ea4f7eb80 100644 --- a/trunk/ceph/mds/MDBalancer.cc +++ b/trunk/ceph/mds/MDBalancer.cc @@ -210,8 +210,7 @@ void MDBalancer::send_heartbeat() MHeartbeat *hb = new MHeartbeat(load, beat_epoch); hb->get_import_map() = import_map; mds->messenger->send_message(hb, - mds->mdsmap->get_inst(*p), - MDS_PORT_BALANCER, MDS_PORT_BALANCER); + mds->mdsmap->get_inst(*p)); } } diff --git a/trunk/ceph/mds/MDCache.cc b/trunk/ceph/mds/MDCache.cc index 32201986d9f40..1fc19c2f57874 100644 --- a/trunk/ceph/mds/MDCache.cc +++ b/trunk/ceph/mds/MDCache.cc @@ -1093,7 +1093,7 @@ void MDCache::send_resolve_now(int who) // send - mds->send_message_mds(m, who, MDS_PORT_CACHE); + mds->send_message_mds(m, who); } @@ -1263,7 +1263,7 @@ void MDCache::handle_resolve(MMDSResolve *m) } } - mds->send_message_mds(ack, from, MDS_PORT_CACHE); + mds->send_message_mds(ack, from); } // am i a surviving ambiguous importer? @@ -1773,7 +1773,7 @@ void MDCache::rejoin_send_rejoins() assert(rejoin_ack_gather.count(p->first) == 0); rejoin_sent.insert(p->first); rejoin_ack_gather.insert(p->first); - mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE); + mds->send_message_mds(p->second, p->first); } // nothing? @@ -2065,7 +2065,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak) if (survivor) { // survivor. do everything now. rejoin_scour_survivor_replicas(from, ack); - mds->send_message_mds(ack, from, MDS_PORT_CACHE); + mds->send_message_mds(ack, from); } else { // done? assert(rejoin_gather.count(from)); @@ -2356,7 +2356,7 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong) // send missing? if (missing) { // we expect a FULL soon. - mds->send_message_mds(missing, from, MDS_PORT_CACHE); + mds->send_message_mds(missing, from); } else { // done? assert(rejoin_gather.count(from)); @@ -2487,7 +2487,7 @@ void MDCache::handle_cache_rejoin_missing(MMDSCacheRejoin *missing) full->add_full_inode(in->inode, in->symlink, in->dirfragtree); } - mds->send_message_mds(full, missing->get_source().num(), MDS_PORT_CACHE); + mds->send_message_mds(full, missing->get_source().num()); } void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *full) @@ -2658,9 +2658,7 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t& in->client_caps[client].wanted()); reap->set_mds( frommds ); // reap from whom? - mds->messenger->send_message(reap, - mds->clientmap.get_inst(client), - 0, MDS_PORT_CACHE); + mds->messenger->send_message(reap, mds->clientmap.get_inst(client)); } void MDCache::rejoin_send_acks() @@ -2765,7 +2763,7 @@ void MDCache::rejoin_send_acks() for (map::iterator p = ack.begin(); p != ack.end(); ++p) - mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE); + mds->send_message_mds(p->second, p->first); } @@ -2976,7 +2974,7 @@ void MDCache::send_expire_messages(map& expiremap) it != expiremap.end(); it++) { dout(7) << "sending cache_expire to " << it->first << dendl; - mds->send_message_mds(it->second, it->first, MDS_PORT_CACHE); + mds->send_message_mds(it->second, it->first); } } @@ -3927,7 +3925,7 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, // who reply->add_dentry( dn->replicate_to( from ) ); if (dn->is_primary()) reply->add_inode( dn->inode->replicate_to( from ) ); - mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE); + mds->send_message_mds(reply, req->get_source().num()); } } } @@ -3990,9 +3988,9 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req, // who } if (mdr) - request_forward(mdr, dauth.first, req->get_dest_port()); + request_forward(mdr, dauth.first); else - mds->forward_message_mds(req, dauth.first, req->get_dest_port()); + mds->forward_message_mds(req, dauth.first); if (mds->logger) mds->logger->inc("tfw"); return 2; @@ -4318,10 +4316,9 @@ void MDCache::request_finish(MDRequest *mdr) void MDCache::request_forward(MDRequest *mdr, int who, int port) { - if (!port) port = MDS_PORT_SERVER; dout(7) << "request_forward " << *mdr << " to mds" << who << " req " << *mdr << dendl; - mds->forward_message_mds(mdr->client_request, who, port); + mds->forward_message_mds(mdr->client_request, who); request_cleanup(mdr); if (mds->logger) mds->logger->inc("fw"); @@ -4372,7 +4369,7 @@ void MDCache::request_cleanup(MDRequest *mdr) p != mdr->more()->slaves.end(); ++p) { MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_FINISH); - mds->send_message_mds(r, *p, MDS_PORT_SERVER); + mds->send_message_mds(r, *p); } // strip foreign xlocks out of lock lists, since the OP_FINISH drops them implicitly. request_forget_foreign_locks(mdr); @@ -4774,7 +4771,7 @@ void MDCache::discover_base_ino(inodeno_t want_ino, want_ino, want_path, false); - mds->send_message_mds(dis, from, MDS_PORT_CACHE); + mds->send_message_mds(dis, from); } waiting_for_base_ino[from][want_ino].push_back(onfinish); @@ -4798,7 +4795,7 @@ void MDCache::discover_dir_frag(CInode *base, want_path, true); // need the base dir open dis->set_base_dir_frag(approx_fg); - mds->send_message_mds(dis, from, MDS_PORT_CACHE); + mds->send_message_mds(dis, from); } // register + wait @@ -4831,7 +4828,7 @@ void MDCache::discover_path(CInode *base, want_path, true, // we want the base dir; we are relative to ino. want_xlocked); - mds->send_message_mds(dis, from, MDS_PORT_CACHE); + mds->send_message_mds(dis, from); } // register + wait @@ -4862,7 +4859,7 @@ void MDCache::discover_path(CDir *base, want_path, false, // no base dir; we are relative to dir want_xlocked); - mds->send_message_mds(dis, from, MDS_PORT_CACHE); + mds->send_message_mds(dis, from); } // register + wait @@ -4886,7 +4883,7 @@ void MDCache::discover_ino(CDir *base, base->dirfrag(), want_ino, want_xlocked); - mds->send_message_mds(dis, from, MDS_PORT_CACHE); + mds->send_message_mds(dis, from); } // register + wait @@ -5187,7 +5184,7 @@ void MDCache::handle_discover(MDiscover *dis) // how did we do? assert(!reply->is_empty()); dout(7) << "handle_discover sending result back to asker mds" << dis->get_asker() << dendl; - mds->send_message_mds(reply, dis->get_asker(), MDS_PORT_CACHE); + mds->send_message_mds(reply, dis->get_asker()); delete dis; } @@ -5522,7 +5519,7 @@ int MDCache::send_inode_updates(CInode *in) it++) { dout(7) << "sending inode_update on " << *in << " to " << *it << dendl; assert(*it != mds->get_nodeid()); - mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it, MDS_PORT_CACHE); + mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it); } return 0; @@ -5538,7 +5535,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m) dout(7) << "inode_update on " << m->get_ino() << ", don't have it, sending expire" << dendl; MCacheExpire *expire = new MCacheExpire(mds->get_nodeid()); expire->add_inode(m->get_ino(), m->get_nonce()); - mds->send_message_mds(expire, m->get_source().num(), MDS_PORT_CACHE); + mds->send_message_mds(expire, m->get_source().num()); goto out; } @@ -5595,7 +5592,7 @@ int MDCache::send_dir_updates(CDir *dir, bool bcast) dir->dir_rep_by, path, bcast), - *it, MDS_PORT_CACHE); + *it); } return 0; @@ -6006,7 +6003,7 @@ void MDCache::fragment_stored(CInode *diri, frag_t basefrag, int bits, basedis->_encode(notify->basebl); delete basedis; } - mds->send_message_mds(notify, *p, MDS_PORT_CACHE); + mds->send_message_mds(notify, *p); } } diff --git a/trunk/ceph/mds/MDS.cc b/trunk/ceph/mds/MDS.cc index e82e81c26496b..69cc54a6bc61f 100644 --- a/trunk/ceph/mds/MDS.cc +++ b/trunk/ceph/mds/MDS.cc @@ -58,6 +58,7 @@ #include "messages/MClientRequest.h" #include "messages/MClientRequestForward.h" +#include "messages/MAnchor.h" #include "config.h" @@ -233,7 +234,7 @@ void MDS::reopen_logger(utime_t start) server->reopen_logger(start, append); } -void MDS::send_message_mds(Message *m, int mds, int port, int fromport) +void MDS::send_message_mds(Message *m, int mds) { // send mdsmap first? if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) { @@ -243,12 +244,10 @@ void MDS::send_message_mds(Message *m, int mds, int port, int fromport) } // send message - if (port && !fromport) - fromport = port; - messenger->send_message(m, mdsmap->get_inst(mds), port, fromport); + messenger->send_message(m, mdsmap->get_inst(mds)); } -void MDS::forward_message_mds(Message *req, int mds, int port) +void MDS::forward_message_mds(Message *req, int mds) { // client request? if (req->get_type() == MSG_CLIENT_REQUEST) { @@ -266,7 +265,7 @@ void MDS::forward_message_mds(Message *req, int mds, int port) } // forward - send_message_mds(req, mds, port); + send_message_mds(req, mds); } @@ -1097,18 +1096,12 @@ void MDS::my_dispatch(Message *m) } - switch (m->get_dest_port()) { - - case MDS_PORT_ANCHORTABLE: - anchortable->dispatch(m); - break; - case MDS_PORT_ANCHORCLIENT: - anchorclient->dispatch(m); - break; - + int port = m->get_type() & 0xff00; + switch (port) { case MDS_PORT_CACHE: mdcache->dispatch(m); break; + case MDS_PORT_LOCKER: locker->dispatch(m); break; @@ -1116,25 +1109,48 @@ void MDS::my_dispatch(Message *m) case MDS_PORT_MIGRATOR: mdcache->migrator->dispatch(m); break; - case MDS_PORT_RENAMER: - //mdcache->renamer->dispatch(m); - break; - case MDS_PORT_BALANCER: - balancer->proc_message(m); - break; - - case MDS_PORT_MAIN: - proc_message(m); - break; + default: + switch (m->get_type()) { + // SERVER + case MSG_CLIENT_SESSION: + case MSG_CLIENT_REQUEST: + case MSG_MDS_SLAVE_REQUEST: + server->dispatch(m); + break; + + case MSG_MDS_HEARTBEAT: + balancer->proc_message(m); + break; - case MDS_PORT_SERVER: - server->dispatch(m); - break; + // anchor + case MSG_MDS_ANCHOR: + if (((MAnchor*)m)->get_op() < 0) + anchorclient->dispatch(m); + else + anchortable->dispatch(m); + break; - default: - dout(1) << "MDS dispatch unknown message port" << m->get_dest_port() << dendl; - assert(0); + // OSD + case MSG_OSD_OPREPLY: + objecter->handle_osd_op_reply((class MOSDOpReply*)m); + break; + case MSG_OSD_MAP: + handle_osd_map((MOSDMap*)m); + break; + + // MDS + case MSG_MDS_MAP: + handle_mds_map((MMDSMap*)m); + break; + case MSG_MDS_BEACON: + handle_mds_beacon((MMDSBeacon*)m); + break; + + default: + dout(1) << "MDS unknown messge " << m->get_type() << dendl; + assert(0); + } } // finish any triggered contexts diff --git a/trunk/ceph/mds/MDS.h b/trunk/ceph/mds/MDS.h index ff032f7f8de1a..7dcd921d05f4e 100644 --- a/trunk/ceph/mds/MDS.h +++ b/trunk/ceph/mds/MDS.h @@ -218,8 +218,8 @@ class MDS : public Dispatcher { MDSMap *get_mds_map() { return mdsmap; } OSDMap *get_osd_map() { return osdmap; } - void send_message_mds(Message *m, int mds, int port=0, int fromport=0); - void forward_message_mds(Message *req, int mds, int port=0); + void send_message_mds(Message *m, int mds); + void forward_message_mds(Message *req, int mds); void send_message_client(Message *m, int client); void send_message_client(Message *m, entity_inst_t clientinst); diff --git a/trunk/ceph/mds/Migrator.cc b/trunk/ceph/mds/Migrator.cc index ac02938ddbe88..1c443c7bf6f79 100644 --- a/trunk/ceph/mds/Migrator.cc +++ b/trunk/ceph/mds/Migrator.cc @@ -179,7 +179,7 @@ void Migrator::handle_mds_failure_or_stop(int who) export_state.erase(dir); // clean up dir->state_clear(CDir::STATE_EXPORTING); if (export_peer[dir] != who) // tell them. - mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]); break; case EXPORT_FREEZING: @@ -188,7 +188,7 @@ void Migrator::handle_mds_failure_or_stop(int who) export_state.erase(dir); // clean up dir->state_clear(CDir::STATE_EXPORTING); if (export_peer[dir] != who) // tell them. - mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]); break; // NOTE: state order reversal, warning comes after loggingstart+prepping @@ -573,7 +573,7 @@ void Migrator::export_dir(CDir *dir, int dest) dir->state_set(CDir::STATE_EXPORTING); // send ExportDirDiscover (ask target) - mds->send_message_mds(new MExportDirDiscover(dir), dest, MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirDiscover(dir), dest); // start the freeze, but hold it up with an auth_pin. dir->auth_pin(); @@ -704,7 +704,7 @@ void Migrator::export_frozen(CDir *dir) // send. export_state[dir] = EXPORT_PREPPING; - mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR); + mds->send_message_mds(prep, dest); } void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) @@ -744,7 +744,7 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m) pair(mds->get_nodeid(),CDIR_AUTH_UNKNOWN), pair(mds->get_nodeid(),export_peer[dir])); notify->copy_bounds(bounds); - mds->send_message_mds(notify, p->first, MDS_PORT_MIGRATOR); + mds->send_message_mds(notify, p->first); } export_state[dir] = EXPORT_WARNING; @@ -803,7 +803,7 @@ void Migrator::export_go(CDir *dir) req->add_export((*p)->dirfrag()); // send - mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR); + mds->send_message_mds(req, dest); // stats if (mds->logger) mds->logger->inc("ex"); @@ -1167,7 +1167,7 @@ void Migrator::export_logged_finish(CDir *dir) notify->copy_bounds(bounds); - mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR); + mds->send_message_mds(notify, *p); } // wait for notifyacks @@ -1242,8 +1242,7 @@ void Migrator::export_finish(CDir *dir) // send finish/commit to new auth if (mds->mdsmap->is_active_or_stopping(export_peer[dir])) { - mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), - export_peer[dir], MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), export_peer[dir]); } else { dout(7) << "not sending MExportDirFinish, dest has failed" << dendl; } @@ -1364,8 +1363,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m) // reply dout(7) << " sending export_discover_ack on " << *in << dendl; - mds->send_message_mds(new MExportDirDiscoverAck(df), - import_peer[df], MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirDiscoverAck(df), import_peer[df]); } void Migrator::handle_export_cancel(MExportDirCancel *m) @@ -1552,8 +1550,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m) // ok! dout(7) << " sending export_prep_ack on " << *dir << dendl; - mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()), - m->get_source().num(), MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()), m->get_source().num()); // note new state import_state[dir->dirfrag()] = IMPORT_PREPPED; @@ -1768,7 +1765,7 @@ void Migrator::import_notify_abort(CDir *dir, set& bounds) pair(mds->get_nodeid(), CDIR_AUTH_UNKNOWN), pair(import_peer[dir->dirfrag()], CDIR_AUTH_UNKNOWN)); notify->copy_bounds(bounds); - mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR); + mds->send_message_mds(notify, *p); } } @@ -1808,8 +1805,7 @@ void Migrator::import_logged_start(CDir *dir, int from) // send notify's etc. dout(7) << "sending ack for " << *dir << " to old auth mds" << from << dendl; - mds->send_message_mds(new MExportDirAck(dir->dirfrag()), - from, MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from); cache->show_subtrees(); } @@ -2090,8 +2086,7 @@ void Migrator::handle_export_notify(MExportDirNotify *m) // send ack if (m->wants_ack()) { - mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), - from, MDS_PORT_MIGRATOR); + mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), from); } else { // aborted. no ack. dout(7) << "handle_export_notify no ack requested" << dendl; diff --git a/trunk/ceph/mds/Server.cc b/trunk/ceph/mds/Server.cc index 3be92948cf0b3..0c2559af324b7 100644 --- a/trunk/ceph/mds/Server.cc +++ b/trunk/ceph/mds/Server.cc @@ -471,8 +471,7 @@ void Server::handle_client_request(MClientRequest *req) if (req->get_retry_attempt()) { if (mds->clientmap.have_completed_request(req->get_reqid())) { dout(5) << "already completed " << req->get_reqid() << dendl; - mds->messenger->send_message(new MClientReply(req, 0), - req->get_client_inst()); + mds->messenger->send_message(new MClientReply(req, 0), req->get_client_inst()); delete req; return; } @@ -721,7 +720,7 @@ void Server::dispatch_slave_request(MDRequest *mdr) MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCKACK); r->set_lock_type(lock->get_type()); lock->get_parent()->set_object_info(r->get_object_info()); - mds->send_message_mds(r, mdr->slave_request->get_source().num(), MDS_PORT_SERVER); + mds->send_message_mds(r, mdr->slave_request->get_source().num()); } else { if (lock) { dout(10) << "not auth for remote xlock attempt, dropping on " @@ -841,7 +840,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr) reply->get_authpins().push_back(info); } - mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER); + mds->send_message_mds(reply, mdr->slave_to_mds); // clean up this request delete mdr->slave_request; @@ -1995,7 +1994,7 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti) MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREP); targeti->set_object_info(req->get_object_info()); req->now = mdr->now; - mds->send_message_mds(req, linkauth, MDS_PORT_SERVER); + mds->send_message_mds(req, linkauth); assert(mdr->more()->waiting_on_slave.count(linkauth) == 0); mdr->more()->waiting_on_slave.insert(linkauth); @@ -2152,7 +2151,7 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, utime_t old_cti // ack MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREPACK); - mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER); + mds->send_message_mds(reply, mdr->slave_to_mds); // set up commit waiter mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti, old_ctime, old_version, inc); @@ -2440,7 +2439,7 @@ void Server::_unlink_local_finish(MDRequest *mdr, unlink->straydir = straydn->dir->replicate_to(it->first); unlink->straydn = straydn->replicate_to(it->first); } - mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE); + mds->send_message_mds(unlink, it->first); } // commit anchor update? @@ -2492,7 +2491,7 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn) MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNLINKPREP); dn->inode->set_object_info(req->get_object_info()); req->now = mdr->now; - mds->send_message_mds(req, inauth, MDS_PORT_SERVER); + mds->send_message_mds(req, inauth); assert(mdr->more()->waiting_on_slave.count(inauth) == 0); mdr->more()->waiting_on_slave.insert(inauth); @@ -2546,7 +2545,7 @@ void Server::_unlink_remote_finish(MDRequest *mdr, it++) { dout(7) << "_unlink_remote_finish sending MDentryUnlink to mds" << it->first << dendl; MDentryUnlink *unlink = new MDentryUnlink(dn->dir->dirfrag(), dn->name); - mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE); + mds->send_message_mds(unlink, it->first); } // commit anchor update? @@ -2974,7 +2973,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CD // srcdn auth will verify our current witness list is sufficient req->witnesses = mdr->more()->witnessed; - mds->send_message_mds(req, who, MDS_PORT_SERVER); + mds->send_message_mds(req, who); assert(mdr->more()->waiting_on_slave.count(who) == 0); mdr->more()->waiting_on_slave.insert(who); @@ -3337,7 +3336,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr) dout(10) << " witness list insufficient; providing srcdn replica list" << dendl; MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREPACK); reply->witnesses.swap(srcdnrep); - mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER); + mds->send_message_mds(reply, mdr->slave_to_mds); delete mdr->slave_request; mdr->slave_request = 0; return; @@ -3411,7 +3410,7 @@ void Server::_logged_slave_rename(MDRequest *mdr, // apply _rename_apply(mdr, srcdn, destdn, straydn); - mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER); + mds->send_message_mds(reply, mdr->slave_to_mds); // bump popularity //if (srcdn->is_auth()) diff --git a/trunk/ceph/mds/mdstypes.h b/trunk/ceph/mds/mdstypes.h index a2f779757255e..ee14474761ada 100644 --- a/trunk/ceph/mds/mdstypes.h +++ b/trunk/ceph/mds/mdstypes.h @@ -21,16 +21,11 @@ using namespace std; #define MDS_REF_SET // define me for improved debug output, sanity checking -#define MDS_PORT_MAIN 0 -#define MDS_PORT_SERVER 1 -#define MDS_PORT_CACHE 2 -#define MDS_PORT_LOCKER 3 -#define MDS_PORT_STORE 4 -#define MDS_PORT_BALANCER 5 -#define MDS_PORT_MIGRATOR 6 -#define MDS_PORT_RENAMER 7 -#define MDS_PORT_ANCHORCLIENT 10 -#define MDS_PORT_ANCHORTABLE 11 + +#define MDS_PORT_CACHE 0x200 +#define MDS_PORT_LOCKER 0x300 +#define MDS_PORT_MIGRATOR 0x400 + #define MAX_MDS 0x100 diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index 7a7a65792de4b..87019ba010a33 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -361,15 +361,14 @@ void FakeMessenger::reset_myname(entity_name_t m) } -int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fromport) +int FakeMessenger::send_message(Message *m, entity_inst_t inst) { entity_name_t dest = inst.name; - m->set_source(get_myname(), fromport); + m->set_source(get_myname()); m->set_source_addr(get_myaddr()); m->set_dest_inst(inst); - m->set_dest_port(port); lock.Lock(); diff --git a/trunk/ceph/msg/FakeMessenger.h b/trunk/ceph/msg/FakeMessenger.h index 2284ea110b51f..ab89b7e47f983 100644 --- a/trunk/ceph/msg/FakeMessenger.h +++ b/trunk/ceph/msg/FakeMessenger.h @@ -52,7 +52,7 @@ class FakeMessenger : public Messenger { void reset_myname(entity_name_t m); // msg interface - virtual int send_message(Message *m, entity_inst_t dest, int port=0, int fromport=0); + int send_message(Message *m, entity_inst_t dest); // events //virtual void trigger_timer(Timer *t); diff --git a/trunk/ceph/msg/Message.cc b/trunk/ceph/msg/Message.cc index e3c7ce827ac61..9a8dbb26f2c18 100644 --- a/trunk/ceph/msg/Message.cc +++ b/trunk/ceph/msg/Message.cc @@ -347,14 +347,11 @@ decode_message(ceph_message_header& env, bufferlist& payload) case MSG_CLOSE: case MSG_SHUTDOWN: - case MSG_MDS_SHUTDOWNSTART: - case MSG_MDS_SHUTDOWNFINISH: - case MSG_OSD_MKFS_ACK: m = new MGenericMessage(env.type); break; default: - dout(1) << "can't decode unknown message type " << env.type << dendl; + dout(0) << "can't decode unknown message type " << env.type << dendl; assert(0); } diff --git a/trunk/ceph/msg/Message.h b/trunk/ceph/msg/Message.h index 9f0175e7a7d1e..c4be3da01616b 100644 --- a/trunk/ceph/msg/Message.h +++ b/trunk/ceph/msg/Message.h @@ -49,7 +49,6 @@ #define MSG_OSD_MAP 44 #define MSG_OSD_BOOT 45 -#define MSG_OSD_MKFS_ACK 46 #define MSG_OSD_FAILURE 47 @@ -77,64 +76,58 @@ #define MSG_CLIENT_REQUEST 80 #define MSG_CLIENT_REQUEST_FORWARD 81 #define MSG_CLIENT_REPLY 82 -#define MSG_CLIENT_FILECAPS 83 +#define MSG_CLIENT_FILECAPS 0x310 // // *** MDS *** + +#define MSG_MDS_RESOLVE 0x200 +#define MSG_MDS_RESOLVEACK 0x201 +#define MSG_MDS_CACHEREJOIN 0x202 +#define MSG_MDS_DISCOVER 0x203 +#define MSG_MDS_DISCOVERREPLY 0x204 +#define MSG_MDS_INODEUPDATE 0x205 +#define MSG_MDS_DIRUPDATE 0x206 +#define MSG_MDS_CACHEEXPIRE 0x207 +#define MSG_MDS_DENTRYUNLINK 0x208 +#define MSG_MDS_FRAGMENTNOTIFY 0x209 + +#define MSG_MDS_LOCK 0x300 +#define MSG_MDS_INODEFILECAPS 0x301 + +#define MSG_MDS_EXPORTDIRDISCOVER 0x449 +#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450 +#define MSG_MDS_EXPORTDIRCANCEL 0x451 +#define MSG_MDS_EXPORTDIRPREP 0x452 +#define MSG_MDS_EXPORTDIRPREPACK 0x453 +#define MSG_MDS_EXPORTDIRWARNING 0x454 +#define MSG_MDS_EXPORTDIRWARNINGACK 0x455 +#define MSG_MDS_EXPORTDIR 0x456 +#define MSG_MDS_EXPORTDIRACK 0x457 +#define MSG_MDS_EXPORTDIRNOTIFY 0x458 +#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 +#define MSG_MDS_EXPORTDIRFINISH 0x460 + + #define MSG_MDS_GETMAP 102 #define MSG_MDS_MAP 103 -#define MSG_MDS_HEARTBEAT 104 // for mds load balancer #define MSG_MDS_BEACON 105 // to monitor -#define MSG_MDS_RESOLVE 106 -#define MSG_MDS_RESOLVEACK 107 - -#define MSG_MDS_CACHEREJOIN 108 +#define MSG_MDS_ANCHOR 0x100 +#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer -#define MSG_MDS_DISCOVER 110 -#define MSG_MDS_DISCOVERREPLY 111 +#define MSG_MDS_SLAVE_REQUEST 170 +/* #define MSG_MDS_INODEGETREPLICA 112 #define MSG_MDS_INODEGETREPLICAACK 113 -#define MSG_MDS_INODEFILECAPS 115 - -#define MSG_MDS_INODEUPDATE 120 -#define MSG_MDS_DIRUPDATE 121 -#define MSG_MDS_INODEEXPIRE 122 -#define MSG_MDS_DIREXPIRE 123 - #define MSG_MDS_DIREXPIREREQ 124 +*/ -#define MSG_MDS_CACHEEXPIRE 125 - -#define MSG_MDS_ANCHOR 130 - -#define MSG_MDS_FRAGMENTNOTIFY 140 - -#define MSG_MDS_EXPORTDIRDISCOVER 149 -#define MSG_MDS_EXPORTDIRDISCOVERACK 150 -#define MSG_MDS_EXPORTDIRCANCEL 151 -#define MSG_MDS_EXPORTDIRPREP 152 -#define MSG_MDS_EXPORTDIRPREPACK 153 -#define MSG_MDS_EXPORTDIRWARNING 154 -#define MSG_MDS_EXPORTDIRWARNINGACK 155 -#define MSG_MDS_EXPORTDIR 156 -#define MSG_MDS_EXPORTDIRACK 157 -#define MSG_MDS_EXPORTDIRNOTIFY 158 -#define MSG_MDS_EXPORTDIRNOTIFYACK 159 -#define MSG_MDS_EXPORTDIRFINISH 160 - -#define MSG_MDS_SLAVE_REQUEST 170 - -#define MSG_MDS_DENTRYUNLINK 200 - -#define MSG_MDS_LOCK 500 -#define MSG_MDS_SHUTDOWNSTART 900 -#define MSG_MDS_SHUTDOWNFINISH 901 #include @@ -174,11 +167,9 @@ public: public: Message() { - env.source_port = env.dest_port = 0; env.nchunks = 0; }; Message(int t) { - env.source_port = env.dest_port = 0; env.nchunks = 0; env.type = t; } @@ -225,13 +216,10 @@ public: void set_source_inst(entity_inst_t& inst) { env.src = *(ceph_entity_inst*)&inst; } entity_name_t& get_dest() { return *(entity_name_t*)&env.dst.name; } - void set_dest(entity_name_t a, int p) { env.dst.name = *(ceph_entity_name*)&a; env.dest_port = p; } - int get_dest_port() { return env.dest_port; } - void set_dest_port(int p) { env.dest_port = p; } + void set_dest(entity_name_t a) { env.dst.name = *(ceph_entity_name*)&a; } entity_name_t& get_source() { return *(entity_name_t*)&env.src.name; } - void set_source(entity_name_t a, int p) { env.src.name = *(ceph_entity_name*)&a; env.source_port = p; } - int get_source_port() { return env.source_port; } + void set_source(entity_name_t a) { env.src.name = *(ceph_entity_name*)&a; } entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; } void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; } diff --git a/trunk/ceph/msg/Messenger.h b/trunk/ceph/msg/Messenger.h index d29441a744ca0..1bb9c8acb28ed 100644 --- a/trunk/ceph/msg/Messenger.h +++ b/trunk/ceph/msg/Messenger.h @@ -76,17 +76,7 @@ class Messenger { // send message virtual void prepare_dest(const entity_addr_t& addr) {} - virtual int send_message(Message *m, entity_inst_t dest, - int port=0, int fromport=0) = 0; - virtual int send_first_message(Dispatcher *d, - Message *m, entity_inst_t dest, - int port=0, int fromport=0) { - set_dispatcher(d); - return send_message(m, dest, port, fromport); - } - - // make a procedure call - //virtual Message* sendrecv(Message *m, msg_name_t dest, int port=0); + virtual int send_message(Message *m, entity_inst_t dest) = 0; virtual void mark_down(entity_addr_t a) {} diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index 0761c11ae410c..b6cac09880eb5 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -136,7 +136,7 @@ int Rank::Accepter::start() dout(10) << "accepter.start bound to " << listen_addr << dendl; // listen! - rc = ::listen(listen_sd, 1000); + rc = ::listen(listen_sd, 128); assert(rc >= 0); // figure out my_addr @@ -431,8 +431,10 @@ void Rank::Pipe::reader() break; } - dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message " - << m << " " << *m + in_seq++; + + dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message " + << in_seq << " " << m << " " << *m << " for " << m->get_dest() << dendl; // deliver @@ -1210,45 +1212,12 @@ void Rank::EntityMessenger::prepare_dest(const entity_addr_t& addr) rank.lock.Unlock(); } -int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest, - int port, int fromport) -{ - // set envelope - m->set_source(get_myname(), fromport); - m->set_source_addr(my_addr); - m->set_dest_inst(dest); - m->set_dest_port(port); - - dout(1) << m->get_source() - << " --> " << dest.name << " " << dest.addr - << " -- " << *m - << " -- " << m - << dendl; - - rank.submit_message(m, dest.addr); - - return 0; -} - -int Rank::EntityMessenger::send_first_message(Dispatcher *d, - Message *m, entity_inst_t dest, - int port, int fromport) +int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) { - /* hacky thing for csyn and newsyn: - * set dispatcher (go active) AND set sender for this - * message while holding rank.lock. this prevents any - * races against incoming unnamed messages naming us before - * we fire off our first message. - */ - rank.lock.Lock(); - set_dispatcher(d); - // set envelope - m->set_source(get_myname(), fromport); + m->set_source(get_myname()); m->set_source_addr(my_addr); m->set_dest_inst(dest); - m->set_dest_port(port); - rank.lock.Unlock(); dout(1) << m->get_source() << " --> " << dest.name << " " << dest.addr diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index e5fa8005df28d..d1c8e72dd30a8 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -76,8 +76,12 @@ private: bool writer_running; list q; + list sent; Mutex lock; Cond cond; + + int out_seq, out_acked; + int in_seq; int accept(); // server handshake int connect(); // client handshake @@ -111,6 +115,7 @@ private: done(false), server(true), need_to_send_close(true), reader_running(false), writer_running(false), + out_seq(0), out_acked(0), in_seq(0), reader_thread(this), writer_thread(this) { // server reader_running = true; @@ -228,11 +233,7 @@ private: int shutdown(); void suicide(); void prepare_dest(const entity_addr_t& addr); - int send_message(Message *m, entity_inst_t dest, - int port=0, int fromport=0); - int send_first_message(Dispatcher *d, - Message *m, entity_inst_t dest, - int port=0, int fromport=0); + int send_message(Message *m, entity_inst_t dest); void mark_down(entity_addr_t a); void mark_up(entity_name_t a, entity_addr_t& i); -- 2.39.5