]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
removed ports from messenger interface
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 25 Oct 2007 22:48:33 +0000 (22:48 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 25 Oct 2007 22:48:33 +0000 (22:48 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1997 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
trunk/ceph/client/Client.cc
trunk/ceph/include/ceph_fs.h
trunk/ceph/mds/Anchor.h
trunk/ceph/mds/AnchorClient.cc
trunk/ceph/mds/AnchorTable.cc
trunk/ceph/mds/Locker.cc
trunk/ceph/mds/MDBalancer.cc
trunk/ceph/mds/MDCache.cc
trunk/ceph/mds/MDS.cc
trunk/ceph/mds/MDS.h
trunk/ceph/mds/Migrator.cc
trunk/ceph/mds/Server.cc
trunk/ceph/mds/mdstypes.h
trunk/ceph/msg/FakeMessenger.cc
trunk/ceph/msg/FakeMessenger.h
trunk/ceph/msg/Message.cc
trunk/ceph/msg/Message.h
trunk/ceph/msg/Messenger.h
trunk/ceph/msg/SimpleMessenger.cc
trunk/ceph/msg/SimpleMessenger.h

index c7a777d750c2cd3746498e3b009a9813d6b296a1..5ffbf890f42e98a5b15fe65957641670573cb0fb 100644 (file)
@@ -691,7 +691,7 @@ MClientReply *Client::make_request(MClientRequest *req,
       if (waiting_for_session.count(mds) == 0) {
        dout(10) << "opening session to mds" << mds << dendl;
        messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN),
-                               mdsmap->get_inst(mds), MDS_PORT_SERVER);
+                               mdsmap->get_inst(mds));
       }
       
       // wait
@@ -803,7 +803,7 @@ void Client::send_request(MetaRequest *request, int mds)
   request->request = 0;
 
   dout(10) << "send_request " << *r << " to mds" << mds << dendl;
-  messenger->send_message(r, mdsmap->get_inst(mds), MDS_PORT_SERVER);
+  messenger->send_message(r, mdsmap->get_inst(mds));
   
   request->mds.insert(mds);
 }
@@ -1051,7 +1051,7 @@ void Client::send_reconnect(int mds)
     m->closed = true;
   }
 
-  messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER);
+  messenger->send_message(m, mdsmap->get_inst(mds));
 }
 
 
@@ -1198,7 +1198,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
             << ", which we don't want caps for, releasing." << dendl;
     m->set_caps(0);
     m->set_wanted(0);
-    messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
+    messenger->send_message(m, m->get_source_inst());
     return;
   }
 
@@ -1308,7 +1308,7 @@ void Client::implemented_caps(MClientFileCaps *m, Inode *in)
     in->file_wr_size = 0;
   }
 
-  messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
+  messenger->send_message(m, m->get_source_inst());
 }
 
 
@@ -1334,7 +1334,7 @@ void Client::release_caps(Inode *in,
                                                it->second.seq,
                                                it->second.caps,
                                                in->file_caps_wanted()); 
-      messenger->send_message(m, mdsmap->get_inst(it->first), MDS_PORT_LOCKER);
+      messenger->send_message(m, mdsmap->get_inst(it->first));
     }
   }
   
@@ -1359,8 +1359,7 @@ void Client::update_caps_wanted(Inode *in)
                                              it->second.seq,
                                              it->second.caps,
                                              in->file_caps_wanted());
-    messenger->send_message(m,
-                            mdsmap->get_inst(it->first), MDS_PORT_LOCKER);
+    messenger->send_message(m, mdsmap->get_inst(it->first));
   }
 }
 
@@ -1374,9 +1373,9 @@ void Client::_try_mount()
   dout(10) << "_try_mount" << dendl;
   int mon = monmap->pick_mon();
   dout(2) << "sending client_mount to mon" << mon << " as instance " << my_instance << dendl;
-  messenger->send_first_message(this,  // simultaneously go active (if we haven't already)
-                               new MClientMount(messenger->get_myaddr(), my_instance),
-                               monmap->get_inst(mon));
+  messenger->set_dispatcher(this);
+  messenger->send_message(new MClientMount(messenger->get_myaddr(), my_instance),
+                         monmap->get_inst(mon));
 
   // schedule timeout?
   assert(mount_timeout_event == 0);
@@ -1528,7 +1527,7 @@ int Client::unmount()
     dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << dendl;
     messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE,
                                               p->second),
-                           mdsmap->get_inst(p->first), MDS_PORT_SERVER);
+                           mdsmap->get_inst(p->first));
   }
 
   // send unmount!
index fbe36ab801cfa9231adfcb9083ac2beb38614c80..0a812c70b13d860bc8ee4658ff6e3cc2f5d65651 100644 (file)
@@ -168,9 +168,9 @@ struct ceph_entity_inst {
  * message header
  */
 struct ceph_message_header {
+       __u32 seq;
        __u32 type;
        struct ceph_entity_inst src, dst;
-       __u32 source_port, dest_port;
        __u32 nchunks;
 };
 
index 748091306a44d3c934ada896e6ab62eb63756e2e..a55a07dd3068ec78cb9b3100bcdf01068056f946 100644 (file)
@@ -25,19 +25,19 @@ using std::string;
 
 // anchor ops
 #define ANCHOR_OP_LOOKUP          1
-#define ANCHOR_OP_LOOKUP_REPLY    2
+#define ANCHOR_OP_LOOKUP_REPLY    -2
 
 #define ANCHOR_OP_CREATE_PREPARE  11
-#define ANCHOR_OP_CREATE_AGREE    12
+#define ANCHOR_OP_CREATE_AGREE    -12
 
 #define ANCHOR_OP_DESTROY_PREPARE 21
-#define ANCHOR_OP_DESTROY_AGREE   22
+#define ANCHOR_OP_DESTROY_AGREE   -22
 
 #define ANCHOR_OP_UPDATE_PREPARE  31
-#define ANCHOR_OP_UPDATE_AGREE    32
+#define ANCHOR_OP_UPDATE_AGREE    -32
 
 #define ANCHOR_OP_COMMIT   41
-#define ANCHOR_OP_ACK      42
+#define ANCHOR_OP_ACK      -42
 #define ANCHOR_OP_ROLLBACK 43
 
 
index b2fb1fb50d7bd45b21f66592e6807d2880282fbc..1cc18dc7d8fa4d3205c8c9a8d895ee84db34b22e 100644 (file)
@@ -92,8 +92,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     else {
       dout(10) << "stray create_agree on " << ino
@@ -102,8 +101,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     break;
 
@@ -126,8 +124,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     else {
       dout(10) << "stray destroy_agree on " << ino
@@ -136,8 +133,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     break;
 
@@ -160,8 +156,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     else {
       dout(10) << "stray update_agree on " << ino
@@ -170,8 +165,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m)
               << dendl;      
       MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
       mds->messenger->send_message(req, 
-                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                                  MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                                  mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
     }
     break;
 
@@ -237,8 +231,7 @@ void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinis
   pending_lookup[ino].trace = &trace;
 
   mds->send_message_mds(req, 
-                       mds->mdsmap->get_anchortable(),
-                       MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                       mds->mdsmap->get_anchortable());
 }
 
 
@@ -258,8 +251,7 @@ void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace,
   pending_create_prepare[ino].onfinish = onfinish;
 
   mds->send_message_mds(req, 
-                       mds->mdsmap->get_anchortable(),
-                       MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                       mds->mdsmap->get_anchortable());
 }
 
 void AnchorClient::prepare_destroy(inodeno_t ino, 
@@ -272,8 +264,7 @@ void AnchorClient::prepare_destroy(inodeno_t ino,
   pending_destroy_prepare[ino].onfinish = onfinish;
   pending_destroy_prepare[ino].patid = patid;
   mds->messenger->send_message(req, 
-                         mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
 }
 
 
@@ -291,8 +282,7 @@ void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace,
   pending_update_prepare[ino].onfinish = onfinish;
   
   mds->messenger->send_message(req, 
-                         mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
 }
 
 
@@ -309,8 +299,7 @@ void AnchorClient::commit(version_t atid, LogSegment *ls)
   // send message
   MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
   mds->messenger->send_message(req, 
-                         mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                              mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
 }
 
 
@@ -332,8 +321,7 @@ void AnchorClient::resend_commits()
     dout(10) << "resending commit on " << p->first << dendl;
     MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
     mds->send_message_mds(req, 
-                         mds->mdsmap->get_anchortable(),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                         mds->mdsmap->get_anchortable());
   }
 }
 
@@ -346,8 +334,7 @@ void AnchorClient::resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepar
     MAnchor *req = new MAnchor(op, p->first);
     req->set_trace(p->second.trace);
     mds->send_message_mds(req, 
-                         mds->mdsmap->get_anchortable(),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                         mds->mdsmap->get_anchortable());
   } 
 }
 
@@ -365,8 +352,7 @@ void AnchorClient::handle_mds_recovery(int who)
        p++) {
     dout(10) << "resending lookup on " << p->first << dendl;
     mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
-                         mds->mdsmap->get_anchortable(),
-                         MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+                         mds->mdsmap->get_anchortable());
   }
   
   // resend any pending prepares.
index 65c09278c98508ccd2d45ff3f17db1db045644e2..f3c4fb05b772b623588327329cb3e3a4b33fca74 100644 (file)
@@ -134,7 +134,7 @@ void AnchorTable::handle_lookup(MAnchor *req)
   // reply
   MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, req->get_ino());
   reply->set_trace(trace);
-  mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+  mds->messenger->send_message(reply, req->get_source_inst());
 
   delete req;
 }
@@ -284,7 +284,7 @@ void AnchorTable::_create_prepare_logged(MAnchor *req, version_t atid)
 
   // reply
   MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+  mds->messenger->send_message(reply, req->get_source_inst());
 
   delete req;
 }
@@ -324,7 +324,7 @@ void AnchorTable::_destroy_prepare_logged(MAnchor *req, version_t atid)
 
   // reply
   MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+  mds->messenger->send_message(reply, req->get_source_inst());
   delete req;
 }
 
@@ -367,7 +367,7 @@ void AnchorTable::_update_prepare_logged(MAnchor *req, version_t atid)
 
   // reply
   MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid);
-  mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+  mds->messenger->send_message(reply, req->get_source_inst());
   delete req;
 }
 
@@ -402,7 +402,7 @@ void AnchorTable::handle_commit(MAnchor *req)
            << ", already committed, sending ack." 
            << dendl;
     MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid);
-    mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+    mds->messenger->send_message(reply, req->get_source_inst());
     delete req;
     return;
   } 
@@ -421,7 +421,7 @@ void AnchorTable::_commit_logged(MAnchor *req)
 {
   dout(7) << "_commit_logged, sending ACK" << dendl;
   MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid());
-  mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+  mds->messenger->send_message(reply, req->get_source_inst());
   delete req;
 }
 
@@ -686,16 +686,16 @@ void AnchorTable::resend_agree(version_t v, int who)
 {
   if (pending_create.count(v)) {
     MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, pending_create[v], v);
-    mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+    mds->send_message_mds(reply, who);
   }
   else if (pending_destroy.count(v)) {
     MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, pending_destroy[v], v);
-    mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+    mds->send_message_mds(reply, who);
   }
   else {
     assert(pending_update.count(v));
     MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, pending_update[v].first, v);
-    mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+    mds->send_message_mds(reply, who);
   }
 }
 
index 55f38cd799b5fab3bbe9bc32dc6471522a8817cc..10b7adc0d0eaf1588f45005fb480630d3614efdc 100644 (file)
@@ -90,7 +90,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg)
     if (mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN) 
       continue;
     MLock *m = new MLock(lock, msg, mds->get_nodeid());
-    mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
+    mds->send_message_mds(m, it->first);
   }
 }
 
@@ -103,7 +103,7 @@ void Locker::send_lock_message(SimpleLock *lock, int msg, const bufferlist &data
       continue;
     MLock *m = new MLock(lock, msg, mds->get_nodeid());
     m->set_data(data);
-    mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
+    mds->send_message_mds(m, it->first);
   }
 }
 
@@ -238,7 +238,7 @@ bool Locker::acquire_locks(MDRequest *mdr,
        (*q)->set_object_info(info);
        req->get_authpins().push_back(info);      
       }
-      mds->send_message_mds(req, p->first, MDS_PORT_SERVER);
+      mds->send_message_mds(req, p->first);
 
       // put in waiting list
       assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
@@ -625,7 +625,7 @@ void Locker::request_inode_file_caps(CInode *in)
     if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
       mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(),
                                               in->replica_caps_wanted),
-                           auth, MDS_PORT_LOCKER);
+                           auth);
   } else {
     in->replica_caps_wanted_keep_until.sec_ref() = 0;
   }
@@ -925,8 +925,7 @@ void Locker::handle_simple_lock(SimpleLock *lock, MLock *m)
     } else {
       // update lock and reply
       lock->set_state(LOCK_LOCK);
-      mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), 
-                           from, MDS_PORT_LOCKER);
+      mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), from);
     }
     break;
 
@@ -1007,7 +1006,7 @@ void Locker::simple_eval_gather(SimpleLock *lock)
       int auth = lock->get_parent()->authority().first;
       if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) 
        mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), 
-                             lock->get_parent()->authority().first, MDS_PORT_LOCKER);
+                             lock->get_parent()->authority().first);
     }
     
     lock->set_state(LOCK_LOCK);
@@ -1200,7 +1199,7 @@ bool Locker::simple_xlock_start(SimpleLock *lock, MDRequest *mdr)
     MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCK);
     r->set_lock_type(lock->get_type());
     lock->get_parent()->set_object_info(r->get_object_info());
-    mds->send_message_mds(r, auth, MDS_PORT_SERVER);
+    mds->send_message_mds(r, auth);
     
     // wait
     lock->add_waiter(SimpleLock::WAIT_REMOTEXLOCK, new C_MDS_RetryRequest(mdcache, mdr));
@@ -1229,7 +1228,7 @@ void Locker::simple_xlock_finish(SimpleLock *lock, MDRequest *mdr)
       MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNXLOCK);
       slavereq->set_lock_type(lock->get_type());
       lock->get_parent()->set_object_info(slavereq->get_object_info());
-      mds->send_message_mds(slavereq, auth, MDS_PORT_SERVER);
+      mds->send_message_mds(slavereq, auth);
     }
   }
 
@@ -1389,8 +1388,7 @@ bool Locker::scatter_wrlock_start(ScatterLock *lock, MDRequest *mdr)
       int auth = lock->get_parent()->authority().first;
       dout(10) << "requesting scatter from auth on " 
               << *lock << " on " << *lock->get_parent() << dendl;
-      mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()),
-                           auth, MDS_PORT_LOCKER);
+      mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
     }
   }
 
@@ -1467,8 +1465,7 @@ void Locker::scatter_eval_gather(ScatterLock *lock)
       if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
        bufferlist data;
        lock->encode_locked_state(data);
-       mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data),
-                             auth, MDS_PORT_LOCKER);
+       mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
       }
       lock->set_state(LOCK_LOCK);
     }
@@ -1638,8 +1635,7 @@ void Locker::scatter_try_unscatter(ScatterLock *lock, Context *c)
   int auth = lock->get_parent()->authority().first;
   if (lock->get_state() == LOCK_SCATTER &&
       mds->mdsmap->get_state(auth) >= MDSMap::STATE_ACTIVE) 
-    mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()),
-                         auth, MDS_PORT_LOCKER);
+    mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()), auth);
   
   // wait...
   lock->add_waiter(SimpleLock::WAIT_STABLE, c);
@@ -1895,8 +1891,7 @@ void Locker::handle_scatter_lock(ScatterLock *lock, MLock *m)
       // encode and reply
       bufferlist data;
       lock->encode_locked_state(data);
-      mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data),
-                           from, MDS_PORT_LOCKER);
+      mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), from);
       lock->set_state(LOCK_LOCK);
     }
     break;
@@ -2374,7 +2369,7 @@ void Locker::file_eval_gather(FileLock *lock)
        
        // ack
        MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid());
-       mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER);
+       mds->send_message_mds(reply, in->authority().first);
       }
       break;
 
@@ -2384,7 +2379,7 @@ void Locker::file_eval_gather(FileLock *lock)
         
         // ack
         MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid());
-        mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER);
+        mds->send_message_mds(reply, in->authority().first);
       }
       break;
 
@@ -2793,7 +2788,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m)
       lock->set_state(LOCK_LOCK);
       
       MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid());
-      mds->send_message_mds(reply, from, MDS_PORT_LOCKER);
+      mds->send_message_mds(reply, from);
     }
     break;
     
@@ -2814,7 +2809,7 @@ void Locker::handle_file_lock(FileLock *lock, MLock *m)
 
         // ack
         MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid());
-        mds->send_message_mds(reply, from, MDS_PORT_LOCKER);
+        mds->send_message_mds(reply, from);
       }
     } else {
       // LOCK
index 8e9d0e2dd46fa93dfe3e87ca7534408baf3e6b00..7bf6ea4f7eb80f4fd81aec70745f8ab0dc695134 100644 (file)
@@ -210,8 +210,7 @@ void MDBalancer::send_heartbeat()
     MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
     hb->get_import_map() = import_map;
     mds->messenger->send_message(hb,
-                                 mds->mdsmap->get_inst(*p),
-                                MDS_PORT_BALANCER, MDS_PORT_BALANCER);
+                                 mds->mdsmap->get_inst(*p));
   }
 }
 
index 32201986d9f40dbb639e87109a627e45fc4ddf52..1fc19c2f5787436ed7938946a92774e9afdaf130 100644 (file)
@@ -1093,7 +1093,7 @@ void MDCache::send_resolve_now(int who)
 
 
   // send
-  mds->send_message_mds(m, who, MDS_PORT_CACHE);
+  mds->send_message_mds(m, who);
 }
 
 
@@ -1263,7 +1263,7 @@ void MDCache::handle_resolve(MMDSResolve *m)
       }
     }
 
-    mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+    mds->send_message_mds(ack, from);
   }
 
   // am i a surviving ambiguous importer?
@@ -1773,7 +1773,7 @@ void MDCache::rejoin_send_rejoins()
     assert(rejoin_ack_gather.count(p->first) == 0);
     rejoin_sent.insert(p->first);
     rejoin_ack_gather.insert(p->first);
-    mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+    mds->send_message_mds(p->second, p->first);
   }
 
   // nothing?
@@ -2065,7 +2065,7 @@ void MDCache::handle_cache_rejoin_weak(MMDSCacheRejoin *weak)
   if (survivor) {
     // survivor.  do everything now.
     rejoin_scour_survivor_replicas(from, ack);
-    mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+    mds->send_message_mds(ack, from);
   } else {
     // done?
     assert(rejoin_gather.count(from));
@@ -2356,7 +2356,7 @@ void MDCache::handle_cache_rejoin_strong(MMDSCacheRejoin *strong)
   // send missing?
   if (missing) {
     // we expect a FULL soon.
-    mds->send_message_mds(missing, from, MDS_PORT_CACHE);
+    mds->send_message_mds(missing, from);
   } else {
     // done?
     assert(rejoin_gather.count(from));
@@ -2487,7 +2487,7 @@ void MDCache::handle_cache_rejoin_missing(MMDSCacheRejoin *missing)
     full->add_full_inode(in->inode, in->symlink, in->dirfragtree);
   }
 
-  mds->send_message_mds(full, missing->get_source().num(), MDS_PORT_CACHE);
+  mds->send_message_mds(full, missing->get_source().num());
 }
 
 void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *full)
@@ -2658,9 +2658,7 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t&
                                              in->client_caps[client].wanted());
   
   reap->set_mds( frommds ); // reap from whom?
-  mds->messenger->send_message(reap, 
-                              mds->clientmap.get_inst(client),
-                              0, MDS_PORT_CACHE);
+  mds->messenger->send_message(reap, mds->clientmap.get_inst(client));
 }
 
 void MDCache::rejoin_send_acks()
@@ -2765,7 +2763,7 @@ void MDCache::rejoin_send_acks()
   for (map<int,MMDSCacheRejoin*>::iterator p = ack.begin();
        p != ack.end();
        ++p) 
-    mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+    mds->send_message_mds(p->second, p->first);
   
 }
 
@@ -2976,7 +2974,7 @@ void MDCache::send_expire_messages(map<int, MCacheExpire*>& expiremap)
        it != expiremap.end();
        it++) {
     dout(7) << "sending cache_expire to " << it->first << dendl;
-    mds->send_message_mds(it->second, it->first, MDS_PORT_CACHE);
+    mds->send_message_mds(it->second, it->first);
   }
 }
 
@@ -3927,7 +3925,7 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req,     // who
            reply->add_dentry( dn->replicate_to( from ) );
            if (dn->is_primary())
              reply->add_inode( dn->inode->replicate_to( from ) );
-           mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
+           mds->send_message_mds(reply, req->get_source().num());
          }
        }
       }
@@ -3990,9 +3988,9 @@ int MDCache::path_traverse(MDRequest *mdr, Message *req,     // who
        }
        
        if (mdr) 
-         request_forward(mdr, dauth.first, req->get_dest_port());
+         request_forward(mdr, dauth.first);
        else
-         mds->forward_message_mds(req, dauth.first, req->get_dest_port());
+         mds->forward_message_mds(req, dauth.first);
        
        if (mds->logger) mds->logger->inc("tfw");
        return 2;
@@ -4318,10 +4316,9 @@ void MDCache::request_finish(MDRequest *mdr)
 
 void MDCache::request_forward(MDRequest *mdr, int who, int port)
 {
-  if (!port) port = MDS_PORT_SERVER;
   dout(7) << "request_forward " << *mdr << " to mds" << who << " req " << *mdr << dendl;
   
-  mds->forward_message_mds(mdr->client_request, who, port);  
+  mds->forward_message_mds(mdr->client_request, who);  
   request_cleanup(mdr);
 
   if (mds->logger) mds->logger->inc("fw");
@@ -4372,7 +4369,7 @@ void MDCache::request_cleanup(MDRequest *mdr)
        p != mdr->more()->slaves.end();
        ++p) {
     MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_FINISH);
-    mds->send_message_mds(r, *p, MDS_PORT_SERVER);
+    mds->send_message_mds(r, *p);
   }
   // strip foreign xlocks out of lock lists, since the OP_FINISH drops them implicitly.
   request_forget_foreign_locks(mdr);
@@ -4774,7 +4771,7 @@ void MDCache::discover_base_ino(inodeno_t want_ino,
                                   want_ino,
                                   want_path,
                                   false);
-    mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+    mds->send_message_mds(dis, from);
   }
 
   waiting_for_base_ino[from][want_ino].push_back(onfinish);
@@ -4798,7 +4795,7 @@ void MDCache::discover_dir_frag(CInode *base,
                                   want_path,
                                   true);  // need the base dir open
     dis->set_base_dir_frag(approx_fg);
-    mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+    mds->send_message_mds(dis, from);
   }
 
   // register + wait
@@ -4831,7 +4828,7 @@ void MDCache::discover_path(CInode *base,
                                   want_path,
                                   true,        // we want the base dir; we are relative to ino.
                                   want_xlocked);
-    mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+    mds->send_message_mds(dis, from);
   }
 
   // register + wait
@@ -4862,7 +4859,7 @@ void MDCache::discover_path(CDir *base,
                                   want_path,
                                   false,   // no base dir; we are relative to dir
                                   want_xlocked);
-    mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+    mds->send_message_mds(dis, from);
   }
 
   // register + wait
@@ -4886,7 +4883,7 @@ void MDCache::discover_ino(CDir *base,
                                   base->dirfrag(),
                                   want_ino,
                                   want_xlocked);
-    mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+    mds->send_message_mds(dis, from);
   }
   
   // register + wait
@@ -5187,7 +5184,7 @@ void MDCache::handle_discover(MDiscover *dis)
   // how did we do?
   assert(!reply->is_empty());
   dout(7) << "handle_discover sending result back to asker mds" << dis->get_asker() << dendl;
-  mds->send_message_mds(reply, dis->get_asker(), MDS_PORT_CACHE);
+  mds->send_message_mds(reply, dis->get_asker());
 
   delete dis;
 }
@@ -5522,7 +5519,7 @@ int MDCache::send_inode_updates(CInode *in)
        it++) {
     dout(7) << "sending inode_update on " << *in << " to " << *it << dendl;
     assert(*it != mds->get_nodeid());
-    mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it, MDS_PORT_CACHE);
+    mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it);
   }
 
   return 0;
@@ -5538,7 +5535,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m)
     dout(7) << "inode_update on " << m->get_ino() << ", don't have it, sending expire" << dendl;
     MCacheExpire *expire = new MCacheExpire(mds->get_nodeid());
     expire->add_inode(m->get_ino(), m->get_nonce());
-    mds->send_message_mds(expire, m->get_source().num(), MDS_PORT_CACHE);
+    mds->send_message_mds(expire, m->get_source().num());
     goto out;
   }
 
@@ -5595,7 +5592,7 @@ int MDCache::send_dir_updates(CDir *dir, bool bcast)
                                         dir->dir_rep_by,
                                         path,
                                         bcast),
-                         *it, MDS_PORT_CACHE);
+                         *it);
   }
 
   return 0;
@@ -6006,7 +6003,7 @@ void MDCache::fragment_stored(CInode *diri, frag_t basefrag, int bits,
       basedis->_encode(notify->basebl);
       delete basedis;
     }
-    mds->send_message_mds(notify, *p, MDS_PORT_CACHE);
+    mds->send_message_mds(notify, *p);
   }
 
 }
index e82e81c26496bf67f8d680f649961f3a381a8f11..69cc54a6bc61ff4da6ac747388e28330b7295f63 100644 (file)
@@ -58,6 +58,7 @@
 #include "messages/MClientRequest.h"
 #include "messages/MClientRequestForward.h"
 
+#include "messages/MAnchor.h"
 
 #include "config.h"
 
@@ -233,7 +234,7 @@ void MDS::reopen_logger(utime_t start)
   server->reopen_logger(start, append);
 }
 
-void MDS::send_message_mds(Message *m, int mds, int port, int fromport)
+void MDS::send_message_mds(Message *m, int mds)
 {
   // send mdsmap first?
   if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
@@ -243,12 +244,10 @@ void MDS::send_message_mds(Message *m, int mds, int port, int fromport)
   }
 
   // send message
-  if (port && !fromport) 
-    fromport = port;
-  messenger->send_message(m, mdsmap->get_inst(mds), port, fromport);
+  messenger->send_message(m, mdsmap->get_inst(mds));
 }
 
-void MDS::forward_message_mds(Message *req, int mds, int port)
+void MDS::forward_message_mds(Message *req, int mds)
 {
   // client request?
   if (req->get_type() == MSG_CLIENT_REQUEST) {
@@ -266,7 +265,7 @@ void MDS::forward_message_mds(Message *req, int mds, int port)
   }
   
   // forward
-  send_message_mds(req, mds, port);
+  send_message_mds(req, mds);
 }
 
 
@@ -1097,18 +1096,12 @@ void MDS::my_dispatch(Message *m)
   }
 
 
-  switch (m->get_dest_port()) {
-    
-  case MDS_PORT_ANCHORTABLE:
-    anchortable->dispatch(m);
-    break;
-  case MDS_PORT_ANCHORCLIENT:
-    anchorclient->dispatch(m);
-    break;
-    
+  int port = m->get_type() & 0xff00;
+  switch (port) {
   case MDS_PORT_CACHE:
     mdcache->dispatch(m);
     break;
+
   case MDS_PORT_LOCKER:
     locker->dispatch(m);
     break;
@@ -1116,25 +1109,48 @@ void MDS::my_dispatch(Message *m)
   case MDS_PORT_MIGRATOR:
     mdcache->migrator->dispatch(m);
     break;
-  case MDS_PORT_RENAMER:
-    //mdcache->renamer->dispatch(m);
-    break;
 
-  case MDS_PORT_BALANCER:
-    balancer->proc_message(m);
-    break;
-    
-  case MDS_PORT_MAIN:
-    proc_message(m);
-    break;
+  default:
+    switch (m->get_type()) {
+      // SERVER
+    case MSG_CLIENT_SESSION:
+    case MSG_CLIENT_REQUEST:
+    case MSG_MDS_SLAVE_REQUEST:
+      server->dispatch(m);
+      break;
+      
+    case MSG_MDS_HEARTBEAT:
+      balancer->proc_message(m);
+      break;
 
-  case MDS_PORT_SERVER:
-    server->dispatch(m);
-    break;
+      // anchor
+    case MSG_MDS_ANCHOR:
+      if (((MAnchor*)m)->get_op() < 0)
+       anchorclient->dispatch(m);
+      else
+       anchortable->dispatch(m);
+      break;
 
-  default:
-    dout(1) << "MDS dispatch unknown message port" << m->get_dest_port() << dendl;
-    assert(0);
+      // OSD
+    case MSG_OSD_OPREPLY:
+      objecter->handle_osd_op_reply((class MOSDOpReply*)m);
+      break;
+    case MSG_OSD_MAP:
+      handle_osd_map((MOSDMap*)m);
+      break;
+      
+      // MDS
+    case MSG_MDS_MAP:
+      handle_mds_map((MMDSMap*)m);
+      break;
+    case MSG_MDS_BEACON:
+      handle_mds_beacon((MMDSBeacon*)m);
+      break;
+      
+    default:
+      dout(1) << "MDS unknown messge " << m->get_type() << dendl;
+      assert(0);
+    }
   }
   
   // finish any triggered contexts
index ff032f7f8de1a90e0cc2874b3c58386a923e1daa..7dcd921d05f4e79e9a13b3237209fcf3b4a04679 100644 (file)
@@ -218,8 +218,8 @@ class MDS : public Dispatcher {
   MDSMap *get_mds_map() { return mdsmap; }
   OSDMap *get_osd_map() { return osdmap; }
 
-  void send_message_mds(Message *m, int mds, int port=0, int fromport=0);
-  void forward_message_mds(Message *req, int mds, int port=0);
+  void send_message_mds(Message *m, int mds);
+  void forward_message_mds(Message *req, int mds);
 
   void send_message_client(Message *m, int client);
   void send_message_client(Message *m, entity_inst_t clientinst);
index ac02938ddbe886962ce096d59bc557c451f1923b..1c443c7bf6f79e8db647dc01ab8ef6bf47d577a0 100644 (file)
@@ -179,7 +179,7 @@ void Migrator::handle_mds_failure_or_stop(int who)
        export_state.erase(dir); // clean up
        dir->state_clear(CDir::STATE_EXPORTING);
        if (export_peer[dir] != who) // tell them.
-         mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR);
+         mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
        break;
        
       case EXPORT_FREEZING:
@@ -188,7 +188,7 @@ void Migrator::handle_mds_failure_or_stop(int who)
        export_state.erase(dir); // clean up
        dir->state_clear(CDir::STATE_EXPORTING);
        if (export_peer[dir] != who) // tell them.
-         mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR);
+         mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
        break;
 
        // NOTE: state order reversal, warning comes after loggingstart+prepping
@@ -573,7 +573,7 @@ void Migrator::export_dir(CDir *dir, int dest)
   dir->state_set(CDir::STATE_EXPORTING);
 
   // send ExportDirDiscover (ask target)
-  mds->send_message_mds(new MExportDirDiscover(dir), dest, MDS_PORT_MIGRATOR);
+  mds->send_message_mds(new MExportDirDiscover(dir), dest);
 
   // start the freeze, but hold it up with an auth_pin.
   dir->auth_pin();
@@ -704,7 +704,7 @@ void Migrator::export_frozen(CDir *dir)
 
   // send.
   export_state[dir] = EXPORT_PREPPING;
-  mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR);
+  mds->send_message_mds(prep, dest);
 }
 
 void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
@@ -744,7 +744,7 @@ void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
                                                    pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
                                                    pair<int,int>(mds->get_nodeid(),export_peer[dir]));
     notify->copy_bounds(bounds);
-    mds->send_message_mds(notify, p->first, MDS_PORT_MIGRATOR);
+    mds->send_message_mds(notify, p->first);
     
   }
   export_state[dir] = EXPORT_WARNING;
@@ -803,7 +803,7 @@ void Migrator::export_go(CDir *dir)
     req->add_export((*p)->dirfrag());
 
   // send
-  mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR);
+  mds->send_message_mds(req, dest);
 
   // stats
   if (mds->logger) mds->logger->inc("ex");
@@ -1167,7 +1167,7 @@ void Migrator::export_logged_finish(CDir *dir)
 
     notify->copy_bounds(bounds);
     
-    mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
+    mds->send_message_mds(notify, *p);
   }
 
   // wait for notifyacks
@@ -1242,8 +1242,7 @@ void Migrator::export_finish(CDir *dir)
 
   // send finish/commit to new auth
   if (mds->mdsmap->is_active_or_stopping(export_peer[dir])) {
-    mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), 
-                         export_peer[dir], MDS_PORT_MIGRATOR);
+    mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), export_peer[dir]);
   } else {
     dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
   }
@@ -1364,8 +1363,7 @@ void Migrator::handle_export_discover(MExportDirDiscover *m)
 
   // reply
   dout(7) << " sending export_discover_ack on " << *in << dendl;
-  mds->send_message_mds(new MExportDirDiscoverAck(df),
-                       import_peer[df], MDS_PORT_MIGRATOR);
+  mds->send_message_mds(new MExportDirDiscoverAck(df), import_peer[df]);
 }
 
 void Migrator::handle_export_cancel(MExportDirCancel *m)
@@ -1552,8 +1550,7 @@ void Migrator::handle_export_prep(MExportDirPrep *m)
   
   // ok!
   dout(7) << " sending export_prep_ack on " << *dir << dendl;
-  mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()),
-                       m->get_source().num(), MDS_PORT_MIGRATOR);
+  mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()), m->get_source().num());
   
   // note new state
   import_state[dir->dirfrag()] = IMPORT_PREPPED;
@@ -1768,7 +1765,7 @@ void Migrator::import_notify_abort(CDir *dir, set<CDir*>& bounds)
                           pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
                           pair<int,int>(import_peer[dir->dirfrag()], CDIR_AUTH_UNKNOWN));
     notify->copy_bounds(bounds);
-    mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
+    mds->send_message_mds(notify, *p);
   }
 }
 
@@ -1808,8 +1805,7 @@ void Migrator::import_logged_start(CDir *dir, int from)
 
   // send notify's etc.
   dout(7) << "sending ack for " << *dir << " to old auth mds" << from << dendl;
-  mds->send_message_mds(new MExportDirAck(dir->dirfrag()),
-                       from, MDS_PORT_MIGRATOR);
+  mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from);
 
   cache->show_subtrees();
 }
@@ -2090,8 +2086,7 @@ void Migrator::handle_export_notify(MExportDirNotify *m)
   
   // send ack
   if (m->wants_ack()) {
-    mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()),
-                         from, MDS_PORT_MIGRATOR);
+    mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), from);
   } else {
     // aborted.  no ack.
     dout(7) << "handle_export_notify no ack requested" << dendl;
index 3be92948cf0b3f20f57699c6f5c4479efe3668c4..0c2559af324b7643fe26b31a396c25807cf9abfd 100644 (file)
@@ -471,8 +471,7 @@ void Server::handle_client_request(MClientRequest *req)
   if (req->get_retry_attempt()) {
     if (mds->clientmap.have_completed_request(req->get_reqid())) {
       dout(5) << "already completed " << req->get_reqid() << dendl;
-      mds->messenger->send_message(new MClientReply(req, 0),
-                                  req->get_client_inst());
+      mds->messenger->send_message(new MClientReply(req, 0), req->get_client_inst());
       delete req;
       return;
     }
@@ -721,7 +720,7 @@ void Server::dispatch_slave_request(MDRequest *mdr)
        MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCKACK);
        r->set_lock_type(lock->get_type());
        lock->get_parent()->set_object_info(r->get_object_info());
-       mds->send_message_mds(r, mdr->slave_request->get_source().num(), MDS_PORT_SERVER);
+       mds->send_message_mds(r, mdr->slave_request->get_source().num());
       } else {
        if (lock) {
          dout(10) << "not auth for remote xlock attempt, dropping on " 
@@ -841,7 +840,7 @@ void Server::handle_slave_auth_pin(MDRequest *mdr)
     reply->get_authpins().push_back(info);
   }
 
-  mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+  mds->send_message_mds(reply, mdr->slave_to_mds);
   
   // clean up this request
   delete mdr->slave_request;
@@ -1995,7 +1994,7 @@ void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
     MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREP);
     targeti->set_object_info(req->get_object_info());
     req->now = mdr->now;
-    mds->send_message_mds(req, linkauth, MDS_PORT_SERVER);
+    mds->send_message_mds(req, linkauth);
 
     assert(mdr->more()->waiting_on_slave.count(linkauth) == 0);
     mdr->more()->waiting_on_slave.insert(linkauth);
@@ -2152,7 +2151,7 @@ void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, utime_t old_cti
 
   // ack
   MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREPACK);
-  mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+  mds->send_message_mds(reply, mdr->slave_to_mds);
   
   // set up commit waiter
   mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti, old_ctime, old_version, inc);
@@ -2440,7 +2439,7 @@ void Server::_unlink_local_finish(MDRequest *mdr,
       unlink->straydir = straydn->dir->replicate_to(it->first);
       unlink->straydn = straydn->replicate_to(it->first);
     }
-    mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE);
+    mds->send_message_mds(unlink, it->first);
   }
   
   // commit anchor update?
@@ -2492,7 +2491,7 @@ void Server::_unlink_remote(MDRequest *mdr, CDentry *dn)
     MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNLINKPREP);
     dn->inode->set_object_info(req->get_object_info());
     req->now = mdr->now;
-    mds->send_message_mds(req, inauth, MDS_PORT_SERVER);
+    mds->send_message_mds(req, inauth);
 
     assert(mdr->more()->waiting_on_slave.count(inauth) == 0);
     mdr->more()->waiting_on_slave.insert(inauth);
@@ -2546,7 +2545,7 @@ void Server::_unlink_remote_finish(MDRequest *mdr,
        it++) {
     dout(7) << "_unlink_remote_finish sending MDentryUnlink to mds" << it->first << dendl;
     MDentryUnlink *unlink = new MDentryUnlink(dn->dir->dirfrag(), dn->name);
-    mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE);
+    mds->send_message_mds(unlink, it->first);
   }
 
   // commit anchor update?
@@ -2974,7 +2973,7 @@ void Server::_rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CD
   // srcdn auth will verify our current witness list is sufficient
   req->witnesses = mdr->more()->witnessed;
 
-  mds->send_message_mds(req, who, MDS_PORT_SERVER);
+  mds->send_message_mds(req, who);
   
   assert(mdr->more()->waiting_on_slave.count(who) == 0);
   mdr->more()->waiting_on_slave.insert(who);
@@ -3337,7 +3336,7 @@ void Server::handle_slave_rename_prep(MDRequest *mdr)
       dout(10) << " witness list insufficient; providing srcdn replica list" << dendl;
       MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREPACK);
       reply->witnesses.swap(srcdnrep);
-      mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+      mds->send_message_mds(reply, mdr->slave_to_mds);
       delete mdr->slave_request;
       mdr->slave_request = 0;
       return;  
@@ -3411,7 +3410,7 @@ void Server::_logged_slave_rename(MDRequest *mdr,
   // apply
   _rename_apply(mdr, srcdn, destdn, straydn);   
 
-  mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+  mds->send_message_mds(reply, mdr->slave_to_mds);
   
   // bump popularity
   //if (srcdn->is_auth())
index a2f779757255e4264ea49e49046f8ca7cc2134b1..ee14474761ada7f96a57ba170cc0aaf5bf5bf028 100644 (file)
@@ -21,16 +21,11 @@ using namespace std;
 
 #define MDS_REF_SET    // define me for improved debug output, sanity checking
 
-#define MDS_PORT_MAIN     0
-#define MDS_PORT_SERVER   1
-#define MDS_PORT_CACHE    2
-#define MDS_PORT_LOCKER   3
-#define MDS_PORT_STORE    4
-#define MDS_PORT_BALANCER 5
-#define MDS_PORT_MIGRATOR 6
-#define MDS_PORT_RENAMER  7
-#define MDS_PORT_ANCHORCLIENT 10
-#define MDS_PORT_ANCHORTABLE  11
+
+#define MDS_PORT_CACHE   0x200
+#define MDS_PORT_LOCKER  0x300
+#define MDS_PORT_MIGRATOR 0x400
+
 
 #define MAX_MDS                   0x100
 
index 7a7a65792de4b3e8429a275e69d3b1ec59f06657..87019ba010a33707ee7aab1ffef7e4f47cc8bf1c 100644 (file)
@@ -361,15 +361,14 @@ void FakeMessenger::reset_myname(entity_name_t m)
 }
 
 
-int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fromport)
+int FakeMessenger::send_message(Message *m, entity_inst_t inst)
 {
   entity_name_t dest = inst.name;
   
-  m->set_source(get_myname(), fromport);
+  m->set_source(get_myname());
   m->set_source_addr(get_myaddr());
 
   m->set_dest_inst(inst);
-  m->set_dest_port(port);
 
   lock.Lock();
 
index 2284ea110b51f48fd7feed521968ae33a20875f2..ab89b7e47f983ad58724ebb831851fcdf3c58ef6 100644 (file)
@@ -52,7 +52,7 @@ class FakeMessenger : public Messenger {
   void reset_myname(entity_name_t m);
 
   // msg interface
-  virtual int send_message(Message *m, entity_inst_t dest, int port=0, int fromport=0);
+  int send_message(Message *m, entity_inst_t dest);
   
   // events
   //virtual void trigger_timer(Timer *t);
index e3c7ce827ac614740652b387ea6765c9d1b190e0..9a8dbb26f2c18382cd62e9c1443fd4e1961d38b8 100644 (file)
@@ -347,14 +347,11 @@ decode_message(ceph_message_header& env, bufferlist& payload)
 
   case MSG_CLOSE:
   case MSG_SHUTDOWN:
-  case MSG_MDS_SHUTDOWNSTART:
-  case MSG_MDS_SHUTDOWNFINISH:
-  case MSG_OSD_MKFS_ACK:
     m = new MGenericMessage(env.type);
     break;
 
   default:
-    dout(1) << "can't decode unknown message type " << env.type << dendl;
+    dout(0) << "can't decode unknown message type " << env.type << dendl;
     assert(0);
   }
   
index 9f0175e7a7d1e2f7a32547f5212fbda43f19edb8..c4be3da01616beddf17cc311cbdb2c5d292db4f3 100644 (file)
@@ -49,7 +49,6 @@
 #define MSG_OSD_MAP          44
 
 #define MSG_OSD_BOOT         45
-#define MSG_OSD_MKFS_ACK     46
 
 #define MSG_OSD_FAILURE      47
 
 #define MSG_CLIENT_REQUEST         80
 #define MSG_CLIENT_REQUEST_FORWARD 81
 #define MSG_CLIENT_REPLY           82
-#define MSG_CLIENT_FILECAPS        83
+#define MSG_CLIENT_FILECAPS        0x310  // 
 
 
 
 // *** MDS ***
 
+
+#define MSG_MDS_RESOLVE            0x200
+#define MSG_MDS_RESOLVEACK         0x201
+#define MSG_MDS_CACHEREJOIN        0x202
+#define MSG_MDS_DISCOVER           0x203
+#define MSG_MDS_DISCOVERREPLY      0x204
+#define MSG_MDS_INODEUPDATE  0x205
+#define MSG_MDS_DIRUPDATE    0x206
+#define MSG_MDS_CACHEEXPIRE  0x207
+#define MSG_MDS_DENTRYUNLINK      0x208
+#define MSG_MDS_FRAGMENTNOTIFY 0x209
+
+#define MSG_MDS_LOCK             0x300
+#define MSG_MDS_INODEFILECAPS    0x301
+
+#define MSG_MDS_EXPORTDIRDISCOVER     0x449
+#define MSG_MDS_EXPORTDIRDISCOVERACK  0x450
+#define MSG_MDS_EXPORTDIRCANCEL       0x451
+#define MSG_MDS_EXPORTDIRPREP         0x452
+#define MSG_MDS_EXPORTDIRPREPACK      0x453
+#define MSG_MDS_EXPORTDIRWARNING      0x454
+#define MSG_MDS_EXPORTDIRWARNINGACK   0x455
+#define MSG_MDS_EXPORTDIR             0x456
+#define MSG_MDS_EXPORTDIRACK          0x457
+#define MSG_MDS_EXPORTDIRNOTIFY       0x458
+#define MSG_MDS_EXPORTDIRNOTIFYACK    0x459
+#define MSG_MDS_EXPORTDIRFINISH       0x460
+
+
 #define MSG_MDS_GETMAP             102
 #define MSG_MDS_MAP                103
-#define MSG_MDS_HEARTBEAT          104  // for mds load balancer
 #define MSG_MDS_BEACON             105  // to monitor
 
-#define MSG_MDS_RESOLVE            106
-#define MSG_MDS_RESOLVEACK         107
-
-#define MSG_MDS_CACHEREJOIN        108
+#define MSG_MDS_ANCHOR 0x100
+#define MSG_MDS_HEARTBEAT          0x500  // for mds load balancer
 
-#define MSG_MDS_DISCOVER           110
-#define MSG_MDS_DISCOVERREPLY      111
+#define MSG_MDS_SLAVE_REQUEST         170
 
+/*
 #define MSG_MDS_INODEGETREPLICA    112
 #define MSG_MDS_INODEGETREPLICAACK 113
 
-#define MSG_MDS_INODEFILECAPS      115
-
-#define MSG_MDS_INODEUPDATE  120
-#define MSG_MDS_DIRUPDATE    121
-#define MSG_MDS_INODEEXPIRE  122
-#define MSG_MDS_DIREXPIRE    123
-
 #define MSG_MDS_DIREXPIREREQ 124
+*/
 
-#define MSG_MDS_CACHEEXPIRE  125
-
-#define MSG_MDS_ANCHOR 130
-
-#define MSG_MDS_FRAGMENTNOTIFY 140
-
-#define MSG_MDS_EXPORTDIRDISCOVER     149
-#define MSG_MDS_EXPORTDIRDISCOVERACK  150
-#define MSG_MDS_EXPORTDIRCANCEL       151
-#define MSG_MDS_EXPORTDIRPREP         152
-#define MSG_MDS_EXPORTDIRPREPACK      153
-#define MSG_MDS_EXPORTDIRWARNING      154
-#define MSG_MDS_EXPORTDIRWARNINGACK   155
-#define MSG_MDS_EXPORTDIR             156
-#define MSG_MDS_EXPORTDIRACK          157
-#define MSG_MDS_EXPORTDIRNOTIFY       158
-#define MSG_MDS_EXPORTDIRNOTIFYACK    159
-#define MSG_MDS_EXPORTDIRFINISH       160
-
-#define MSG_MDS_SLAVE_REQUEST         170
-
-#define MSG_MDS_DENTRYUNLINK      200
-
-#define MSG_MDS_LOCK             500
 
-#define MSG_MDS_SHUTDOWNSTART  900
-#define MSG_MDS_SHUTDOWNFINISH 901
 
 
 #include <stdlib.h>
@@ -174,11 +167,9 @@ public:
 
  public:
   Message() { 
-    env.source_port = env.dest_port = 0;
     env.nchunks = 0;
   };
   Message(int t) {
-    env.source_port = env.dest_port = 0;
     env.nchunks = 0;
     env.type = t;
   }
@@ -225,13 +216,10 @@ public:
   void set_source_inst(entity_inst_t& inst) { env.src = *(ceph_entity_inst*)&inst; }
 
   entity_name_t& get_dest() { return *(entity_name_t*)&env.dst.name; }
-  void set_dest(entity_name_t a, int p) { env.dst.name = *(ceph_entity_name*)&a; env.dest_port = p; }
-  int get_dest_port() { return env.dest_port; }
-  void set_dest_port(int p) { env.dest_port = p; }
+  void set_dest(entity_name_t a) { env.dst.name = *(ceph_entity_name*)&a; }
   
   entity_name_t& get_source() { return *(entity_name_t*)&env.src.name; }
-  void set_source(entity_name_t a, int p) { env.src.name = *(ceph_entity_name*)&a; env.source_port = p; }
-  int get_source_port() { return env.source_port; }
+  void set_source(entity_name_t a) { env.src.name = *(ceph_entity_name*)&a; }
 
   entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; }
   void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; }
index d29441a744ca0e7de5ac2710e2f4d82e877c4cd5..1bb9c8acb28edf3eaa9269a0e618ab13032efd24 100644 (file)
@@ -76,17 +76,7 @@ class Messenger {
 
   // send message
   virtual void prepare_dest(const entity_addr_t& addr) {}
-  virtual int send_message(Message *m, entity_inst_t dest,
-                          int port=0, int fromport=0) = 0;
-  virtual int send_first_message(Dispatcher *d,
-                                Message *m, entity_inst_t dest,
-                                int port=0, int fromport=0) {
-    set_dispatcher(d);
-    return send_message(m, dest, port, fromport);
-  }
-
-  // make a procedure call
-  //virtual Message* sendrecv(Message *m, msg_name_t dest, int port=0);
+  virtual int send_message(Message *m, entity_inst_t dest) = 0;
 
   virtual void mark_down(entity_addr_t a) {}
 
index 0761c11ae410cd38e6b7460e0c0b901eea814ff9..b6cac09880eb50c01967cb14ea84fce0be3d240e 100644 (file)
@@ -136,7 +136,7 @@ int Rank::Accepter::start()
   dout(10) << "accepter.start bound to " << listen_addr << dendl;
 
   // listen!
-  rc = ::listen(listen_sd, 1000);
+  rc = ::listen(listen_sd, 128);
   assert(rc >= 0);
   
   // figure out my_addr
@@ -431,8 +431,10 @@ void Rank::Pipe::reader()
       break;
     }
 
-    dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message " 
-            << m << " " << *m
+    in_seq++;
+
+    dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message "
+            << in_seq << " " << m << " " << *m
             << " for " << m->get_dest() << dendl;
 
     // deliver
@@ -1210,45 +1212,12 @@ void Rank::EntityMessenger::prepare_dest(const entity_addr_t& addr)
   rank.lock.Unlock();
 }
 
-int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest,
-                                       int port, int fromport)
-{
-  // set envelope
-  m->set_source(get_myname(), fromport);
-  m->set_source_addr(my_addr);
-  m->set_dest_inst(dest);
-  m->set_dest_port(port);
-  dout(1) << m->get_source()
-          << " --> " << dest.name << " " << dest.addr
-          << " -- " << *m
-         << " -- " << m
-          << dendl;
-
-  rank.submit_message(m, dest.addr);
-
-  return 0;
-}
-
-int Rank::EntityMessenger::send_first_message(Dispatcher *d,
-                                             Message *m, entity_inst_t dest,
-                                             int port, int fromport)
+int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
 {
-  /* hacky thing for csyn and newsyn:
-   * set dispatcher (go active) AND set sender for this 
-   * message while holding rank.lock.  this prevents any
-   * races against incoming unnamed messages naming us before
-   * we fire off our first message.  
-   */
-  rank.lock.Lock();
-  set_dispatcher(d);
-
   // set envelope
-  m->set_source(get_myname(), fromport);
+  m->set_source(get_myname());
   m->set_source_addr(my_addr);
   m->set_dest_inst(dest);
-  m->set_dest_port(port);
-  rank.lock.Unlock();
  
   dout(1) << m->get_source()
           << " --> " << dest.name << " " << dest.addr
index e5fa8005df28d72223b515a33d31b35c931d353b..d1c8e72dd30a86be7e611634cae3dd7c884f1673 100644 (file)
@@ -76,8 +76,12 @@ private:
     bool writer_running;
 
     list<Message*> q;
+    list<Message*> sent;
     Mutex lock;
     Cond cond;
+
+    int out_seq, out_acked;
+    int in_seq;
     
     int accept();   // server handshake
     int connect();  // client handshake
@@ -111,6 +115,7 @@ private:
                  done(false), server(true), 
                  need_to_send_close(true),
                  reader_running(false), writer_running(false),
+                 out_seq(0), out_acked(0), in_seq(0),
                  reader_thread(this), writer_thread(this) {
       // server
       reader_running = true;
@@ -228,11 +233,7 @@ private:
     int shutdown();
     void suicide();
     void prepare_dest(const entity_addr_t& addr);
-    int send_message(Message *m, entity_inst_t dest,
-                    int port=0, int fromport=0);
-    int send_first_message(Dispatcher *d,
-                          Message *m, entity_inst_t dest,
-                          int port=0, int fromport=0);
+    int send_message(Message *m, entity_inst_t dest);
     
     void mark_down(entity_addr_t a);
     void mark_up(entity_name_t a, entity_addr_t& i);