From 0690b4c3d2d2820356497906eceee7efe418200a Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 11 Oct 2006 21:36:38 +0000 Subject: [PATCH] mds client map; send_message cleanup git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@924 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/mds/ClientMap.h | 74 ++++++++ ceph/mds/MDCache.cc | 392 ++++++++++++++++--------------------------- ceph/mds/MDS.cc | 80 +++++---- ceph/mds/MDS.h | 12 +- 4 files changed, 273 insertions(+), 285 deletions(-) create mode 100644 ceph/mds/ClientMap.h diff --git a/ceph/mds/ClientMap.h b/ceph/mds/ClientMap.h new file mode 100644 index 0000000000000..63f310358cae8 --- /dev/null +++ b/ceph/mds/ClientMap.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __CLIENTMAP_H +#define __CLIENTMAP_H + +#include "msg/Message.h" + +#include +using namespace std; + +#include +using namespace __gnu_cxx; + +class ClientMap { + hash_map client_inst; + set client_mount; + hash_map client_ref; + + void inc_ref(int client, const entity_inst_t& inst) { + if (client_inst.count(client)) { + assert(client_inst[client] == inst); + assert(client_ref.count(client)); + } else { + client_inst[client] = inst; + } + client_ref[client]++; + } + void dec_ref(int client) { + assert(client_ref.count(client)); + assert(client_ref[client] > 0); + client_ref[client]--; + if (client_ref[client] == 0) { + client_ref.erase(client); + client_inst.erase(client); + } + } + +public: + const entity_inst_t& get_inst(int client) { + assert(client_inst.count(client)); + return client_inst[client]; + } + const set& get_mount_set() { return client_mount; } + + void add_mount(int client, const entity_inst_t& inst) { + inc_ref(client, inst); + client_mount.insert(client); + } + void rem_mount(int client) { + dec_ref(client); + client_mount.erase(client); + } + + + void add_open(int client, const entity_inst_t& inst) { + inc_ref(client, inst); + } + void dec_open(int client) { + dec_ref(client); + } +}; + +#endif diff --git a/ceph/mds/MDCache.cc b/ceph/mds/MDCache.cc index b93cde1533399..a3e1c4de39ecc 100644 --- a/ceph/mds/MDCache.cc +++ b/ceph/mds/MDCache.cc @@ -582,8 +582,7 @@ bool MDCache::trim(int max) { it != expiremap.end(); it++) { dout(7) << "sending cache_expire to " << it->first << endl; - mds->messenger->send_message(it->second, - MSG_ADDR_MDS(it->first), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(it->second, it->first, MDS_PORT_CACHE); } @@ -787,8 +786,7 @@ bool MDCache::shutdown_pass() dout(1) << "shutdown done, sending shutdown_finish" << endl; if (mds->get_nodeid() != 0) { //g_conf.debug = 25; - mds->messenger->send_message(new MGenericMessage(MSG_MDS_SHUTDOWNFINISH), - MSG_ADDR_MDS(0), MDS_PORT_MAIN, MDS_PORT_MAIN); + mds->send_message_mds(new MGenericMessage(MSG_MDS_SHUTDOWNFINISH), 0, MDS_PORT_MAIN); } else { mds->handle_shutdown_finish(NULL); } @@ -851,9 +849,7 @@ int MDCache::open_root(Context *c) 0, want, false); // there _is_ no base dir for the root inode - mds->messenger->send_message(req, - MSG_ADDR_MDS(0), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(req, 0, MDS_PORT_CACHE); } else { dout(7) << "waiting for root" << endl; } @@ -1211,12 +1207,11 @@ int MDCache::path_traverse(filepath& origpath, } else { filepath want = path.postfixpath(depth); dout(10) << "traverse: need dir for " << *cur << ", doing discover, want " << want.get_path() << endl; - mds->messenger->send_message(new MDiscover(mds->get_nodeid(), - cur->ino(), - want, - true), // need this dir too - MSG_ADDR_MDS(cur->authority()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MDiscover(mds->get_nodeid(), + cur->ino(), + want, + true), // need this dir too + cur->authority(), MDS_PORT_CACHE); } cur->add_waiter(CINODE_WAIT_DIR, ondelay); if (onfinish) delete onfinish; @@ -1353,8 +1348,7 @@ int MDCache::path_traverse(filepath& origpath, MDiscoverReply *reply = new MDiscoverReply(cur->dir->ino()); reply->add_dentry( dn->get_name(), !dn->can_read()); reply->add_inode( dn->inode->replicate_to( from ) ); - mds->messenger->send_message(reply, - req->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE); } } } @@ -1423,12 +1417,11 @@ int MDCache::path_traverse(filepath& origpath, touch_inode(cur); - mds->messenger->send_message(new MDiscover(mds->get_nodeid(), - cur->ino(), - want, - false), - MSG_ADDR_MDS(dauth), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MDiscover(mds->get_nodeid(), + cur->ino(), + want, + false), + dauth, MDS_PORT_CACHE); mds->logger->inc("dis"); } @@ -1452,9 +1445,7 @@ int MDCache::path_traverse(filepath& origpath, req->clear_payload(); // reencode! } - mds->messenger->send_message(req, - MSG_ADDR_MDS(dauth), req->get_dest_port(), - req->get_dest_port()); + mds->send_message_mds(req, dauth, req->get_dest_port()); //show_imports(); mds->logger->inc("cfw"); @@ -1497,12 +1488,11 @@ void MDCache::open_remote_dir(CInode *diri, assert(diri->dir == 0); filepath want; // no dentries, i just want the dir open - mds->messenger->send_message(new MDiscover(mds->get_nodeid(), - diri->ino(), - want, - true), // need the dir open - MSG_ADDR_MDS(diri->authority()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MDiscover(mds->get_nodeid(), + diri->ino(), + want, + true), // need the dir open + diri->authority(), MDS_PORT_CACHE); diri->add_waiter(CINODE_WAIT_DIR, fin); } @@ -1731,8 +1721,7 @@ void MDCache::request_cleanup(Message *req) int dauth = dn->dir->dentry_authority(dn->name); MLock *m = new MLock(LOCK_AC_UNXLOCK, mds->get_nodeid()); m->set_dn(dn->dir->ino(), dn->name); - mds->messenger->send_message(m, - MSG_ADDR_MDS(dauth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, dauth, MDS_PORT_CACHE); } } @@ -1807,9 +1796,7 @@ void MDCache::request_forward(Message *req, int who, int port) dout(7) << "request_forward to " << who << " req " << *req << endl; request_cleanup(req); - mds->messenger->send_message(req, - MSG_ADDR_MDS(who), port, - port); + mds->send_message_mds(req, who, port); mds->logger->inc("fw"); } @@ -1883,8 +1870,7 @@ void MDCache::handle_inode_link(MInodeLink *m) if (!in->is_auth()) { assert(in->is_proxy()); dout(7) << "handle_inode_link not auth for " << *in << ", fw to auth" << endl; - mds->messenger->send_message(m, - MSG_ADDR_MDS(in->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, in->authority(), MDS_PORT_CACHE); return; } @@ -1905,8 +1891,7 @@ void MDCache::handle_inode_link(MInodeLink *m) // reply dout(7) << " nlink++, now " << in->inode.nlink++ << endl; - mds->messenger->send_message(new MInodeLinkAck(m->get_ino(), true), - MSG_ADDR_MDS(m->get_from()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MInodeLinkAck(m->get_ino(), true), m->get_from(), MDS_PORT_CACHE); delete m; } @@ -1976,8 +1961,7 @@ void MDCache::handle_discover(MDiscover *dis) if (!cur->dir && !cur->is_auth()) { int iauth = cur->authority(); dout(7) << "no dir and not inode auth; fwd to auth " << iauth << endl; - mds->messenger->send_message( dis, - MSG_ADDR_MDS( iauth ), MDS_PORT_CACHE, MDS_PORT_CACHE ); + mds->send_message_mds( dis, iauth, MDS_PORT_CACHE); return; } @@ -2128,8 +2112,7 @@ void MDCache::handle_discover(MDiscover *dis) delete dis; } else { dout(7) << "fwd to dir auth " << dirauth << endl; - mds->messenger->send_message( dis, - MSG_ADDR_MDS( dirauth ), MDS_PORT_CACHE, MDS_PORT_CACHE ); + mds->send_message_mds( dis, dirauth, MDS_PORT_CACHE ); } return; } @@ -2140,8 +2123,7 @@ void MDCache::handle_discover(MDiscover *dis) } else { // send back to asker dout(7) << "sending result back to asker mds" << dis->get_asker() << endl; - mds->messenger->send_message(reply, - MSG_ADDR_MDS(dis->get_asker()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(reply, dis->get_asker(), MDS_PORT_CACHE); } // done. @@ -2343,9 +2325,7 @@ int MDCache::send_inode_updates(CInode *in) it++) { dout(7) << "sending inode_update on " << *in << " to " << *it << endl; assert(*it != mds->get_nodeid()); - mds->messenger->send_message(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it, MDS_PORT_CACHE); } return 0; @@ -2361,9 +2341,7 @@ void MDCache::handle_inode_update(MInodeUpdate *m) dout(7) << "inode_update on " << hex << m->get_ino() << dec << ", don't have it, sending expire" << endl; MCacheExpire *expire = new MCacheExpire(mds->get_nodeid()); expire->add_inode(m->get_ino(), m->get_nonce()); - mds->messenger->send_message(expire, - m->get_source(), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(expire, m->get_source().num(), MDS_PORT_CACHE); goto out; } @@ -2503,8 +2481,7 @@ void MDCache::handle_cache_expire(MCacheExpire *m) it != proxymap.end(); it++) { dout(7) << "sending proxy forward to " << it->first << endl; - mds->messenger->send_message(it->second, - MSG_ADDR_MDS(it->first), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(it->second, it->first, MDS_PORT_CACHE); } // done @@ -2534,13 +2511,12 @@ int MDCache::send_dir_updates(CDir *dir, bool bcast) //if (*it == except) continue; dout(7) << "sending dir_update on " << *dir << " to " << *it << endl; - mds->messenger->send_message(new MDirUpdate(dir->ino(), - dir->dir_rep, - dir->dir_rep_by, - path, - bcast), - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MDirUpdate(dir->ino(), + dir->dir_rep, + dir->dir_rep_by, + path, + bcast), + *it, MDS_PORT_CACHE); } return 0; @@ -2634,8 +2610,7 @@ void MDCache::dentry_unlink(CDentry *dn, Context *c) it++) { dout(7) << "inode_unlink sending DentryUnlink to " << *it << endl; - mds->messenger->send_message(new MDentryUnlink(dir->ino(), dn->name), - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MDentryUnlink(dir->ino(), dn->name), *it, MDS_PORT_CACHE); } // don't need ack. @@ -2707,8 +2682,8 @@ void MDCache::dentry_unlink(CDentry *dn, Context *c) int auth = dn->inode->authority(); dout(7) << "remote target is remote, sending unlink request to " << auth << endl; - mds->messenger->send_message(new MInodeUnlink(dn->inode->ino(), mds->get_nodeid()), - MSG_ADDR_MDS(auth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MInodeUnlink(dn->inode->ino(), mds->get_nodeid()), + auth, MDS_PORT_CACHE); // unlink locally CInode *in = dn->inode; @@ -2801,8 +2776,7 @@ void MDCache::handle_inode_unlink(MInodeUnlink *m) // proxy? if (in->is_proxy()) { dout(7) << "handle_inode_unlink proxy on " << *in << endl; - mds->messenger->send_message(m, - MSG_ADDR_MDS(in->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, in->authority(), MDS_PORT_CACHE); return; } assert(in->is_auth()); @@ -2840,8 +2814,7 @@ void MDCache::handle_inode_unlink(MInodeUnlink *m) } // ack - mds->messenger->send_message(new MInodeUnlinkAck(m->get_ino()), - MSG_ADDR_MDS(m->get_from()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MInodeUnlinkAck(m->get_ino()), m->get_from(), MDS_PORT_CACHE); } void MDCache::handle_inode_unlink_ack(MInodeUnlinkAck *m) @@ -2943,8 +2916,7 @@ void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *onfinish) srcdir->ino(), srcname, srcpath, destdir->ino(), destname, destpath, srcauth); // tell dest who src is (maybe even me) - mds->messenger->send_message(m, - MSG_ADDR_MDS(destauth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, destauth, MDS_PORT_CACHE); show_imports(); @@ -2961,8 +2933,7 @@ void MDCache::file_rename(CDentry *srcdn, CDentry *destdn, Context *onfinish) MRenameReq *m = new MRenameReq(mds->get_nodeid(), // i'm the initiator srcdir->ino(), srcname, destdir->ino(), destname, destpath, destauth); // tell src who dest is (they may not know) - mds->messenger->send_message(m, - MSG_ADDR_MDS(srcauth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, srcauth, MDS_PORT_CACHE); } else @@ -3097,8 +3068,7 @@ void MDCache::file_rename_foreign_src(CDentry *srcdn, MRename *m = new MRename(initiator, srcdir->ino(), srcdn->name, destdirino, destname, inode_state); - mds->messenger->send_message(m, - MSG_ADDR_MDS(destauth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, destauth, MDS_PORT_CACHE); // have dest? CInode *destdiri = get_inode(m->get_destdirino()); @@ -3161,8 +3131,7 @@ void MDCache::file_rename_warn(CInode *in, it != notify.end(); it++) { dout(10) << "file_rename_warn to " << *it << " for " << *in << endl; - mds->messenger->send_message(new MRenameWarning(in->ino()), - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MRenameWarning(in->ino()), *it, MDS_PORT_CACHE); } } @@ -3203,8 +3172,7 @@ void MDCache::file_rename_ack(CInode *in, int initiator) } else { // send ack dout(7) << "file_rename_ack sending MRenameAck to initiator " << initiator << endl; - mds->messenger->send_message(new MRenameAck(in->ino()), - MSG_ADDR_MDS(initiator), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MRenameAck(in->ino()), initiator, MDS_PORT_CACHE); } } @@ -3254,8 +3222,7 @@ void MDCache::handle_rename_prep(MRenamePrep *m) m->get_srcdirino(), m->get_srcname(), m->get_destdirino(), m->get_destname(), m->get_destpath(), mds->get_nodeid()); // i am dest - mds->messenger->send_message(req, - MSG_ADDR_MDS(m->get_srcauth()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(req, m->get_srcauth(), MDS_PORT_CACHE); delete m; return; } @@ -3348,14 +3315,14 @@ void MDCache::file_rename_notify(CInode *in, it != notify.end(); it++) { dout(10) << "file_rename_notify to " << *it << " for " << *in << endl; - mds->messenger->send_message(new MRenameNotify(in->ino(), - srcdir->ino(), - srcname, - destdir->ino(), - destdirpath, - destname, - srcauth), - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MRenameNotify(in->ino(), + srcdir->ino(), + srcname, + destdir->ino(), + destdirpath, + destname, + srcauth), + *it, MDS_PORT_CACHE); } } @@ -3480,8 +3447,7 @@ void MDCache::handle_rename_notify(MRenameNotify *m) // ack dout(10) << "sending RenameNotifyAck back to srcauth " << m->get_srcauth() << endl; MRenameNotifyAck *ack = new MRenameNotifyAck(m->get_ino()); - mds->messenger->send_message(ack, - MSG_ADDR_MDS(m->get_srcauth()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(ack, m->get_srcauth(), MDS_PORT_CACHE); stray_rename_warnings.erase( m->get_ino() ); @@ -3524,6 +3490,10 @@ Capability* MDCache::issue_new_caps(CInode *in, Capability c(my_want); in->add_client_cap(my_client, c); cap = in->get_client_cap(my_client); + + // note client addr + mds->clientmap.add_open(my_client, req->get_client_inst()); + } else { // make sure it has sufficient caps if (cap->wanted() & ~my_want) { @@ -3608,7 +3578,8 @@ bool MDCache::issue_caps(CInode *in) it->second.get_last_seq(), it->second.pending(), it->second.wanted()), - MSG_ADDR_CLIENT(it->first), 0, MDS_PORT_CACHE); + MSG_ADDR_CLIENT(it->first), mds->clientmap.get_inst(it->first), + 0, MDS_PORT_CACHE); } } } @@ -3659,9 +3630,9 @@ void MDCache::request_inode_file_caps(CInode *in) assert(!in->is_auth()); in->replica_caps_wanted = wanted; - mds->messenger->send_message(new MInodeFileCaps(in->ino(), mds->get_nodeid(), - in->replica_caps_wanted), - MSG_ADDR_MDS(auth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(), + in->replica_caps_wanted), + auth, MDS_PORT_CACHE); } else { in->replica_caps_wanted_keep_until.sec_ref() = 0; } @@ -3677,7 +3648,7 @@ void MDCache::handle_inode_file_caps(MInodeFileCaps *m) if (in->is_proxy()) { dout(7) << "proxy, fw" << endl; - mds->messenger->send_message(m, MSG_ADDR_MDS(in->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, in->authority(), MDS_PORT_CACHE); return; } @@ -3739,11 +3710,14 @@ void MDCache::handle_client_file_caps(MClientFileCaps *m) if (!in->is_auth()) request_inode_file_caps(in); + // dec client addr counter + mds->clientmap.dec_open(client); + // tell client. MClientFileCaps *r = new MClientFileCaps(in->inode, 0, 0, 0, MClientFileCaps::FILECAP_RELEASE); - mds->messenger->send_message(r, m->get_source(), 0, MDS_PORT_CACHE); + mds->messenger->send_message(r, m->get_source(), m->get_source_inst(), 0, MDS_PORT_CACHE); } // merge in atime? @@ -4089,9 +4063,7 @@ void MDCache::inode_hard_sync(CInode *in) MLock *m = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IHARD); m->set_data(harddata); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } // change lock @@ -4118,9 +4090,7 @@ void MDCache::inode_hard_lock(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IHARD); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } // change lock @@ -4158,9 +4128,7 @@ void MDCache::handle_lock_inode_hard(MLock *m) delete m; } else { dout(7) << "handle_lock " << m->get_ino() << " from " << from << ": proxy, fw to " << newauth << endl; - mds->messenger->send_message(m, - MSG_ADDR_MDS(newauth), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, newauth, MDS_PORT_CACHE); } return; } @@ -4222,9 +4190,7 @@ void MDCache::handle_lock_inode_hard(MLock *m) { MLock *reply = new MLock(LOCK_AC_LOCKACK, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IHARD); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(from), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, from, MDS_PORT_CACHE); } } break; @@ -4454,9 +4420,7 @@ void MDCache::inode_file_eval(CInode *in) MLock *m = new MLock(LOCK_AC_MIXED, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); m->set_data(softdata); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } @@ -4495,9 +4459,7 @@ void MDCache::inode_file_eval(CInode *in) MLock *reply = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IFILE); reply->set_data(softdata); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, *it, MDS_PORT_CACHE); } } @@ -4526,9 +4488,7 @@ void MDCache::inode_file_eval(CInode *in) // ack MLock *reply = new MLock(LOCK_AC_MIXEDACK, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(in->authority()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, in->authority(), MDS_PORT_CACHE); } break; @@ -4539,9 +4499,7 @@ void MDCache::inode_file_eval(CInode *in) // ack MLock *reply = new MLock(LOCK_AC_LOCKACK, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(in->authority()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, in->authority(), MDS_PORT_CACHE); } break; @@ -4640,14 +4598,12 @@ bool MDCache::inode_file_sync(CInode *in) // bcast to replicas for (set::iterator it = in->cached_by_begin(); - it != in->cached_by_end(); + it != in->cached_by_end(); it++) { MLock *m = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - m->set_data(softdata); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + m->set_data(softdata); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } @@ -4675,9 +4631,7 @@ bool MDCache::inode_file_sync(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } @@ -4702,9 +4656,7 @@ bool MDCache::inode_file_sync(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } @@ -4745,9 +4697,7 @@ void MDCache::inode_file_lock(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.init_gather(in->get_cached_by()); @@ -4776,9 +4726,7 @@ void MDCache::inode_file_lock(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.init_gather(in->get_cached_by()); @@ -4840,9 +4788,7 @@ void MDCache::inode_file_mixed(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_MIXED, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.init_gather(in->get_cached_by()); @@ -4871,9 +4817,7 @@ void MDCache::inode_file_mixed(CInode *in) MLock *m = new MLock(LOCK_AC_MIXED, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); m->set_data(softdata); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } @@ -4895,9 +4839,7 @@ void MDCache::inode_file_mixed(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_MIXED, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.set_state(LOCK_MIXED); issue_caps(in); @@ -4935,9 +4877,7 @@ void MDCache::inode_file_loner(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.init_gather(in->get_cached_by()); @@ -4964,9 +4904,7 @@ void MDCache::inode_file_loner(CInode *in) it++) { MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } in->filelock.init_gather(in->get_cached_by()); @@ -5008,9 +4946,7 @@ void MDCache::handle_lock_inode_file(MLock *m) delete m; } else { dout(7) << "handle_lock " << m->get_ino() << " from " << from << ": proxy, fw to " << newauth << endl; - mds->messenger->send_message(m, - MSG_ADDR_MDS(newauth), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, newauth, MDS_PORT_CACHE); } return; } @@ -5082,9 +5018,7 @@ void MDCache::handle_lock_inode_file(MLock *m) MLock *reply = new MLock(LOCK_AC_LOCKACK, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(from), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, from, MDS_PORT_CACHE); } break; @@ -5106,9 +5040,7 @@ void MDCache::handle_lock_inode_file(MLock *m) // ack MLock *reply = new MLock(LOCK_AC_MIXEDACK, mds->get_nodeid()); reply->set_ino(in->ino(), LOCK_OTYPE_IFILE); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(from), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, from, MDS_PORT_CACHE); } } else { // LOCK @@ -5303,9 +5235,7 @@ bool MDCache::dentry_xlock_start(CDentry *dn, Message *m, CInode *ref) MLock *m = new MLock(LOCK_AC_LOCK, mds->get_nodeid()); m->set_dn(dn->dir->ino(), dn->name); m->set_path(path); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } // wait @@ -5349,9 +5279,7 @@ void MDCache::dentry_xlock_finish(CDentry *dn, bool quiet) it++) { MLock *m = new MLock(LOCK_AC_SYNC, mds->get_nodeid()); m->set_dn(dn->dir->ino(), dn->name); - mds->messenger->send_message(m, - MSG_ADDR_MDS(*it), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, *it, MDS_PORT_CACHE); } } } @@ -5412,9 +5340,7 @@ void MDCache::dentry_xlock_request(CDir *dir, string& dname, bool create, int dauth = dir->dentry_authority(dname); MLock *m = new MLock(create ? LOCK_AC_REQXLOCKC:LOCK_AC_REQXLOCK, mds->get_nodeid()); m->set_dn(dir->ino(), dname); - mds->messenger->send_message(m, - MSG_ADDR_MDS(dauth), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, dauth, MDS_PORT_CACHE); // add waiter dir->add_waiter(CDIR_WAIT_DNREQXLOCK, dname, @@ -5474,9 +5400,7 @@ void MDCache::handle_lock_dn(MLock *m) } else { // not an xlock req, or it is and we just didn't register the request yet // forward normally - mds->messenger->send_message(m, - MSG_ADDR_MDS(dauth), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(m, dauth, MDS_PORT_CACHE); } return; } @@ -5505,9 +5429,7 @@ void MDCache::handle_lock_dn(MLock *m) MLock *reply = new MLock(LOCK_AC_REQXLOCKNAK, mds->get_nodeid()); reply->set_dn(dir->ino(), dname); reply->set_path(m->get_path()); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, m->get_asker(), MDS_PORT_CACHE); } // finish request (if we got that far) @@ -5559,9 +5481,7 @@ void MDCache::handle_lock_dn(MLock *m) // NAK MLock *reply = new MLock(LOCK_AC_LOCKNAK, mds->get_nodeid()); reply->set_dn(m->get_ino(), dname); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, m->get_asker(), MDS_PORT_CACHE); } } else { dout(7) << "safely ignoring." << endl; @@ -5602,9 +5522,7 @@ void MDCache::handle_lock_dn(MLock *m) // ack now MLock *reply = new MLock(LOCK_AC_LOCKACK, mds->get_nodeid()); reply->set_dn(diri->ino(), dname); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(from), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, from, MDS_PORT_CACHE); } // wake up waiters @@ -5664,9 +5582,7 @@ void MDCache::handle_lock_dn(MLock *m) MLock *reply = new MLock(LOCK_AC_REQXLOCKNAK, mds->get_nodeid()); reply->set_dn(dir->ino(), dname); reply->set_path(path); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, m->get_asker(), MDS_PORT_CACHE); // done if (active_requests.count(m)) @@ -5714,9 +5630,7 @@ void MDCache::handle_lock_dn(MLock *m) MLock *reply = new MLock(LOCK_AC_REQXLOCKACK, mds->get_nodeid()); reply->set_dn(dir->ino(), dname); reply->set_path(path); - mds->messenger->send_message(reply, - MSG_ADDR_MDS(m->get_asker()), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(reply, m->get_asker(), MDS_PORT_CACHE); // note: keep request around in memory (to hold the xlock/pins on behalf of requester) return; @@ -5825,8 +5739,7 @@ void MDCache::export_dir(CDir *dir, // send ExportDirDiscover (ask target) export_gather[dir].insert(dest); - mds->messenger->send_message(new MExportDirDiscover(dir->inode), - MSG_ADDR_MDS(dest), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirDiscover(dir->inode), dest, MDS_PORT_CACHE); dir->auth_pin(); // pin dir, to hang up our freeze (unpin on prep ack) // take away the popularity we're sending. FIXME: do this later? @@ -5935,8 +5848,7 @@ void MDCache::export_dir_frozen(CDir *dir, } // send it! - mds->messenger->send_message(prep, - MSG_ADDR_MDS(dest), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(prep, dest, MDS_PORT_CACHE); } void MDCache::handle_export_dir_prep_ack(MExportDirPrepAck *m) @@ -6047,8 +5959,7 @@ void MDCache::export_dir_go(CDir *dir, // send warning to all but dest if (*it != dest) { dout(10) << " sending export_dir_warning to mds" << *it << endl; - mds->messenger->send_message(new MExportDirWarning( dir->ino() ), - MSG_ADDR_MDS( *it ), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirWarning( dir->ino() ), *it, MDS_PORT_CACHE); } } assert(export_notify_ack_waiting[dir].count( dest )); @@ -6062,9 +5973,7 @@ void MDCache::export_dir_go(CDir *dir, dest ); // send the export data! - mds->messenger->send_message(req, - MSG_ADDR_MDS(dest), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(req, dest, MDS_PORT_CACHE); // queue up the finisher dir->add_waiter( CDIR_WAIT_UNFREEZE, fin ); @@ -6097,7 +6006,8 @@ void MDCache::encode_export_inode(CInode *in, bufferlist& enc_state, int new_aut it->second.pending(), it->second.wanted(), MClientFileCaps::FILECAP_STALE); - mds->messenger->send_message(m, MSG_ADDR_CLIENT(it->first), 0, MDS_PORT_CACHE); + mds->messenger->send_message(m, MSG_ADDR_CLIENT(it->first), mds->clientmap.get_inst(it->first), + 0, MDS_PORT_CACHE); } // relax locks? @@ -6373,9 +6283,7 @@ void MDCache::export_dir_finish(CDir *dir) // FIXME log it // send finish to new auth - mds->messenger->send_message(new MExportDirFinish(dir->ino()), - MSG_ADDR_MDS(dir->authority()), - MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirFinish(dir->ino()), dir->authority(), MDS_PORT_CACHE); // unfreeze dout(7) << "export_dir_finish " << *dir << ", unfreezing" << endl; @@ -6450,9 +6358,8 @@ void MDCache::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, in assert(0); // this shouldn't happen if the auth pins his path properly!!!! - mds->messenger->send_message(new MExportDirDiscoverAck(m->get_ino(), false), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); - + mds->send_message_mds(new MExportDirDiscoverAck(m->get_ino(), false), + m->get_source().num(), MDS_PORT_CACHE); delete m; return; } @@ -6474,8 +6381,8 @@ void MDCache::handle_export_dir_discover_2(MExportDirDiscover *m, CInode *in, in // reply dout(7) << " sending export_dir_discover_ack on " << *in << endl; - mds->messenger->send_message(new MExportDirDiscoverAck(in->ino()), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirDiscoverAck(in->ino()), + m->get_source().num(), MDS_PORT_CACHE); delete m; } @@ -6619,8 +6526,8 @@ void MDCache::handle_export_dir_prep(MExportDirPrep *m) } else { // ok! dout(7) << " all ready, sending export_dir_prep_ack on " << *dir << endl; - mds->messenger->send_message(new MExportDirPrepAck(dir->ino()), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirPrepAck(dir->ino()), + m->get_source().num(), MDS_PORT_CACHE); // done delete m; @@ -6785,9 +6692,8 @@ void MDCache::handle_export_dir(MExportDir *m) // send notify's etc. dout(7) << "sending notifyack for " << *dir << " to old auth " << MSG_ADDR_NUM(m->get_source()) << endl; - mds->messenger->send_message(new MExportDirNotifyAck(dir->inode->ino()), - m->get_source(), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirNotifyAck(dir->inode->ino()), + m->get_source().num(), MDS_PORT_CACHE); dout(7) << "sending notify to others" << endl; for (set::iterator it = dir->open_by.begin(); @@ -6802,9 +6708,7 @@ void MDCache::handle_export_dir(MExportDir *m) if (g_conf.mds_verify_export_dirauth) notify->copy_subdirs(imported_subdirs); // copy subdir list (DEBUG) - mds->messenger->send_message(notify, - MSG_ADDR_MDS( *it ), MDS_PORT_CACHE, - MDS_PORT_CACHE); + mds->send_message_mds(notify, *it, MDS_PORT_CACHE); } // done @@ -6930,8 +6834,9 @@ void MDCache::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int old in->client_caps[*it].wanted(), MClientFileCaps::FILECAP_REAP); caps->set_mds( oldauth ); // reap from whom? - mds->messenger->send_message(caps, - MSG_ADDR_CLIENT(*it), 0, MDS_PORT_CACHE); + mds->messenger->send_message(caps, + MSG_ADDR_CLIENT(*it), mds->clientmap.get_inst(*it), + 0, MDS_PORT_CACHE); } // filelock @@ -7169,8 +7074,8 @@ void MDCache::handle_export_dir_notify(MExportDirNotify *m) // send notify ack to old auth dout(7) << "handle_export_dir_notify sending ack to old_auth " << m->get_old_auth() << endl; - mds->messenger->send_message(new MExportDirNotifyAck(m->get_ino()), - MSG_ADDR_MDS(m->get_old_auth()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MExportDirNotifyAck(m->get_ino()), + m->get_old_auth(), MDS_PORT_CACHE); // done @@ -7366,8 +7271,7 @@ void MDCache::hash_dir(CDir *dir) for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == mds->get_nodeid()) continue; // except me hash_gather[dir].insert(i); - mds->messenger->send_message(new MHashDirDiscover(dir->inode), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MHashDirDiscover(dir->inode), i, MDS_PORT_CACHE); } dir->auth_pin(); // pin until discovers are all acked. @@ -7481,7 +7385,7 @@ void MDCache::hash_dir_frozen(CDir *dir) assert(hash_gather[dir].empty()); for (unsigned i=0; imessenger->send_message(msgs[i], MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(msgs[i], i, MDS_PORT_CACHE); hash_gather[dir].insert(i); } } @@ -7698,8 +7602,7 @@ void MDCache::hash_dir_go(CDir *dir) hash_notify_gather[dir][i].insert(j); } - mds->messenger->send_message(msgs[i], - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(msgs[i], i, MDS_PORT_CACHE); } // wait for all the acks. @@ -7865,7 +7768,7 @@ void MDCache::handle_hash_dir_notify(MHashDirNotify *m) } // fw notify to auth - mds->messenger->send_message(m, MSG_ADDR_MDS(dir->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(m, dir->authority(), MDS_PORT_CACHE); } } @@ -7958,8 +7861,8 @@ void MDCache::handle_hash_dir_discover_2(MHashDirDiscover *m, CInode *in, int r) // reply dout(7) << " sending hash_dir_discover_ack on " << *dir << endl; - mds->messenger->send_message(new MHashDirDiscoverAck(dir->ino()), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MHashDirDiscoverAck(dir->ino()), + m->get_source().num(), MDS_PORT_CACHE); delete m; } @@ -8035,8 +7938,8 @@ void MDCache::handle_hash_dir_prep(MHashDirPrep *m) } // ack! - mds->messenger->send_message(new MHashDirPrepAck(dir->ino()), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MHashDirPrepAck(dir->ino()), + m->get_source().num(), MDS_PORT_CACHE); // done. delete m; @@ -8082,14 +7985,14 @@ void MDCache::handle_hash_dir(MHashDir *m) for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == mds->get_nodeid()) continue; if (i == MSG_ADDR_NUM(m->get_source())) continue; - mds->messenger->send_message(new MHashDirNotify(dir->ino(), mds->get_nodeid()), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MHashDirNotify(dir->ino(), mds->get_nodeid()), + i, MDS_PORT_CACHE); } // ack dout(7) << "acking" << endl; - mds->messenger->send_message(new MHashDirAck(dir->ino()), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MHashDirAck(dir->ino()), + m->get_source().num(), MDS_PORT_CACHE); // done. delete m; @@ -8196,8 +8099,8 @@ void MDCache::unhash_dir_prep(CDir *dir) for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == mds->get_nodeid()) continue; hash_gather[dir].insert(i); - mds->messenger->send_message(new MUnhashDirPrep(dir->ino()), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDirPrep(dir->ino()), + i, MDS_PORT_CACHE); } } @@ -8304,8 +8207,8 @@ void MDCache::unhash_dir_go(CDir *dir) for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == mds->get_nodeid()) continue; hash_gather[dir].insert(i); - mds->messenger->send_message(new MUnhashDir(dir->ino()), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDir(dir->ino()), + i, MDS_PORT_CACHE); } } @@ -8377,8 +8280,8 @@ void MDCache::handle_unhash_dir_ack(MUnhashDirAck *m) hash_gather[dir].insert(i); - mds->messenger->send_message(new MUnhashDirNotify(dir->ino()), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDirNotify(dir->ino()), + i, MDS_PORT_CACHE); } } @@ -8570,8 +8473,7 @@ void MDCache::unhash_dir_prep_finish(CDir *dir) } // ack - mds->messenger->send_message(ack, - MSG_ADDR_MDS(auth), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(ack, auth, MDS_PORT_CACHE); } @@ -8706,8 +8608,8 @@ void MDCache::handle_unhash_dir(MUnhashDir *m) hash_gather[dir].erase(mds->get_nodeid()); // send unhash message - mds->messenger->send_message(new MUnhashDirAck(dir->ino(), bl, nden), - MSG_ADDR_MDS(dir->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDirAck(dir->ino(), bl, nden), + dir->authority(), MDS_PORT_CACHE); } @@ -8743,8 +8645,7 @@ void MDCache::handle_unhash_dir_notify(MUnhashDirNotify *m) for (int i=0; iget_mds_map()->get_num_mds(); i++) { if (i == from) continue; if (i == mds->get_nodeid()) continue; - mds->messenger->send_message(new MUnhashDirNotify(dir->ino()), - MSG_ADDR_MDS(i), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDirNotify(dir->ino()), i, MDS_PORT_CACHE); } } @@ -8773,8 +8674,7 @@ void MDCache::handle_unhash_dir_notify(MUnhashDirNotify *m) // ack dout(7) << "sending notify_ack to auth for unhash of " << *dir << endl; - mds->messenger->send_message(new MUnhashDirNotifyAck(dir->ino()), - MSG_ADDR_MDS(dir->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + mds->send_message_mds(new MUnhashDirNotifyAck(dir->ino()), dir->authority(), MDS_PORT_CACHE); } diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 9374d74928d86..4e4379e52f1fe 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -76,8 +76,8 @@ LogType mds_logtype, mds_cache_logtype; #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << "mds" << whoami << " " -#define dout3(l,mds) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << "mds" << mds->get_nodeid() << " " +#define dout(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << " " +#define derr(l) if (l<=g_conf.debug || l <= g_conf.debug_mds) cout << g_clock.now() << " mds" << whoami << " " @@ -192,11 +192,19 @@ MDS::~MDS() { } +void MDS::send_message_mds(Message *m, int mds, int port, int fromport) +{ + if (port && !fromport) + fromport = port; + messenger->send_message(m, MSG_ADDR_MDS(mds), mdsmap->get_inst(mds), port, fromport); +} + + int MDS::init() { // request osd map - dout(5) << "requesting osdmap, mdsmap from mon0" << endl; - messenger->send_message(new MMDSBoot, MSG_ADDR_MON(0)); + dout(5) << "requesting mdsmap from mon0" << endl; + messenger->send_message(new MMDSBoot, MSG_ADDR_MON(0), monmap->get_inst(0)); return 0; } @@ -204,14 +212,14 @@ int MDS::init() int MDS::shutdown_start() { dout(1) << "shutdown_start" << endl; - cerr << "mds shutdown start" << endl; + derr(0) << "mds shutdown start" << endl; for (set::iterator p = mdsmap->get_mds().begin(); p != mdsmap->get_mds().end(); p++) { dout(1) << "sending MShutdownStart to mds" << *p << endl; - messenger->send_message(new MGenericMessage(MSG_MDS_SHUTDOWNSTART), - MSG_ADDR_MDS(*p), MDS_PORT_MAIN, MDS_PORT_MAIN); + send_message_mds(new MGenericMessage(MSG_MDS_SHUTDOWNSTART), + *p, MDS_PORT_MAIN); } if (idalloc) idalloc->shutdown(); @@ -259,16 +267,15 @@ void MDS::handle_shutdown_finish(Message *m) if (did_shut_down.size() == (unsigned)mdsmap->get_num_mds()) { // MDS's all ready to shut down! - cerr << "mds shutdown final" << endl; + derr(0) << "mds shutdown final" << endl; dout(1) << "sending shutdown to remaining MDSs, OSDs" << endl; for (int i=1; isend_message(new MGenericMessage(MSG_SHUTDOWN), - MSG_ADDR_MDS(i), 0, 0); + send_message_mds(new MGenericMessage(MSG_SHUTDOWN), i); if (g_conf.mds_local_osd) messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - MSG_ADDR_OSD(i+10000), 0, 0); + MSG_ADDR_OSD(i+10000), osdmap->get_inst(i+10000)); } // shut down osd's @@ -280,7 +287,7 @@ void MDS::handle_shutdown_finish(Message *m) if (osdmap->is_down(*it)) continue; dout(10) << "sending shutdown to osd" << *it << endl; messenger->send_message(new MGenericMessage(MSG_SHUTDOWN), - MSG_ADDR_OSD(*it), 0, 0); + MSG_ADDR_OSD(*it), osdmap->get_inst(*it)); } // shut myself down. @@ -650,6 +657,13 @@ void MDS::handle_mds_map(MMDSMap *m) mdsmap->decode(p->second); delete m; + + // do we need an osdmap too? + if (!osdmap) { + int mon = monmap->pick_mon(); + messenger->send_message(new MOSDGetMap(0), + MSG_ADDR_MON(mon), monmap->get_inst(mon)); + } } @@ -664,13 +678,13 @@ void MDS::handle_osd_map(MOSDMap *m) } // pass on to clients - for (set::iterator it = mounted_clients.begin(); - it != mounted_clients.end(); + for (set::iterator it = clientmap.get_mount_set().begin(); + it != clientmap.get_mount_set().end(); it++) { MOSDMap *n = new MOSDMap; n->maps = m->maps; n->incremental_maps = m->incremental_maps; - messenger->send_message(n, MSG_ADDR_CLIENT(*it)); + messenger->send_message(n, MSG_ADDR_CLIENT(*it), clientmap.get_inst(*it)); } // process locally @@ -709,13 +723,13 @@ void MDS::handle_client_mount(MClientMount *m) int n = MSG_ADDR_NUM(m->get_source()); dout(3) << "mount by client" << n << endl; - mounted_clients.insert(n); + clientmap.add_mount(n, m->get_source_inst()); assert(whoami == 0); // mds0 mounts/unmounts // ack messenger->send_message(new MClientMountAck(m, mdsmap, osdmap), - m->get_source(), m->get_source_port()); + m->get_source(), m->get_source_inst()); delete m; } @@ -726,16 +740,16 @@ void MDS::handle_client_unmount(Message *m) assert(whoami == 0); // mds0 mounts/unmounts - assert(mounted_clients.count(n)); - mounted_clients.erase(n); + clientmap.rem_mount(n); - if (mounted_clients.empty()) { + if (clientmap.get_mount_set().empty()) { dout(3) << "all clients done, initiating shutdown" << endl; shutdown_start(); } // ack by sending back to client - messenger->send_message(m, m->get_source(), m->get_source_port()); + entity_inst_t srcinst = m->get_source_inst(); // make a copy! + messenger->send_message(m, m->get_source(), srcinst); } @@ -744,8 +758,7 @@ void MDS::handle_ping(MPing *m) dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with seq " << m->seq << endl; messenger->send_message(new MPingAck(m), - m->get_source(), m->get_source_port(), - MDS_PORT_MAIN); + m->get_source(), m->get_source_inst()); delete m; } @@ -813,8 +826,7 @@ void MDS::reply_request(MClientRequest *req, MClientReply *reply, CInode *tracei // send reply messenger->send_message(reply, - MSG_ADDR_CLIENT(req->get_client()), 0, - MDS_PORT_SERVER); + MSG_ADDR_CLIENT(req->get_client()), req->get_client_inst()); // discard request mdcache->request_finish(req); @@ -864,6 +876,10 @@ void MDS::handle_client_request(MClientRequest *req) { dout(4) << "req " << *req << endl; + // note original client addr + if (req->get_source().is_client()) + req->set_client_inst( req->get_source_inst() ); + if (is_shutting_down()) { dout(5) << " shutting down, discarding client request." << endl; delete req; @@ -901,8 +917,7 @@ void MDS::handle_client_request(MClientRequest *req) int next = whoami + 1; if (next >= mdsmap->get_num_mds()) next = 0; dout(10) << "got request on ino we don't have, passing buck to " << next << endl; - messenger->send_message(req, - MSG_ADDR_MDS(next), MDS_PORT_SERVER, MDS_PORT_SERVER); + send_message_mds(req, next, MDS_PORT_SERVER); return; } } @@ -951,8 +966,7 @@ void MDS::handle_client_request(MClientRequest *req) // send error messenger->send_message(new MClientReply(req, r), - MSG_ADDR_CLIENT(req->get_client()), 0, - MDS_PORT_SERVER); + MSG_ADDR_CLIENT(req->get_client()), req->get_client_inst()); // if (refpath.last_bit() == ".hash" && @@ -1307,7 +1321,7 @@ void MDS::handle_hash_readdir(MHashReaddir *m) // sent it back! messenger->send_message(new MHashReaddirReply(dir->ino(), inls, dnls, num), - m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); + m->get_source(), m->get_source_inst(), MDS_PORT_CACHE); } @@ -1486,8 +1500,7 @@ void MDS::handle_client_readdir(MClientRequest *req, // request other bits for (int i=0; iget_num_mds(); i++) { if (i == get_nodeid()) continue; - messenger->send_message(new MHashReaddir(dir->ino()), - MSG_ADDR_MDS(i), MDS_PORT_SERVER, MDS_PORT_SERVER); + send_message_mds(new MHashReaddir(dir->ino()), i, MDS_PORT_SERVER); } // wait @@ -1836,8 +1849,7 @@ void MDS::handle_client_link_2(int r, MClientRequest *req, CInode *ref, vectorsend_message(new MInodeLink(targeti->ino(), whoami), - MSG_ADDR_MDS(targeti->authority()), MDS_PORT_CACHE, MDS_PORT_CACHE); + send_message_mds(new MInodeLink(targeti->ino(), whoami), targeti->authority(), MDS_PORT_CACHE); // wait targeti->add_waiter(CINODE_WAIT_LINK, diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index ec9945dc3d1f7..d44344d3f4353 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -36,6 +36,8 @@ using namespace __gnu_cxx; #include "mon/MonMap.h" +#include "ClientMap.h" + #define MDS_PORT_MAIN 0 #define MDS_PORT_SERVER 1 @@ -114,8 +116,10 @@ class MDS : public Dispatcher { Filer *filer; // for reading/writing to/from osds AnchorTable *anchormgr; // PGManager *pgmanager; - protected: + ClientMap clientmap; + + protected: // shutdown crap bool shutting_down; @@ -138,10 +142,6 @@ class MDS : public Dispatcher { friend class MDStore; - // stats - set mounted_clients; - - public: list finished_queue; @@ -169,6 +169,8 @@ class MDS : public Dispatcher { MDSMap *get_mds_map() { return mdsmap; } OSDMap *get_osd_map() { return osdmap; } + void send_message_mds(Message *m, int mds, int port=0, int fromport=0); + // start up, shutdown bool is_shutting_down() { return shutting_down; } bool is_shut_down(int who=-1) { -- 2.39.5