if (waiting_for_session.count(mds) == 0) {
dout(10) << "opening session to mds" << mds << dendl;
messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_OPEN),
- mdsmap->get_inst(mds), MDS_PORT_SERVER);
+ mdsmap->get_inst(mds));
}
// wait
request->request = 0;
dout(10) << "send_request " << *r << " to mds" << mds << dendl;
- messenger->send_message(r, mdsmap->get_inst(mds), MDS_PORT_SERVER);
+ messenger->send_message(r, mdsmap->get_inst(mds));
request->mds.insert(mds);
}
m->closed = true;
}
- messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER);
+ messenger->send_message(m, mdsmap->get_inst(mds));
}
<< ", which we don't want caps for, releasing." << dendl;
m->set_caps(0);
m->set_wanted(0);
- messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
+ messenger->send_message(m, m->get_source_inst());
return;
}
in->file_wr_size = 0;
}
- messenger->send_message(m, m->get_source_inst(), MDS_PORT_LOCKER);
+ messenger->send_message(m, m->get_source_inst());
}
it->second.seq,
it->second.caps,
in->file_caps_wanted());
- messenger->send_message(m, mdsmap->get_inst(it->first), MDS_PORT_LOCKER);
+ messenger->send_message(m, mdsmap->get_inst(it->first));
}
}
it->second.seq,
it->second.caps,
in->file_caps_wanted());
- messenger->send_message(m,
- mdsmap->get_inst(it->first), MDS_PORT_LOCKER);
+ messenger->send_message(m, mdsmap->get_inst(it->first));
}
}
dout(10) << "_try_mount" << dendl;
int mon = monmap->pick_mon();
dout(2) << "sending client_mount to mon" << mon << " as instance " << my_instance << dendl;
- messenger->send_first_message(this, // simultaneously go active (if we haven't already)
- new MClientMount(messenger->get_myaddr(), my_instance),
- monmap->get_inst(mon));
+ messenger->set_dispatcher(this);
+ messenger->send_message(new MClientMount(messenger->get_myaddr(), my_instance),
+ monmap->get_inst(mon));
// schedule timeout?
assert(mount_timeout_event == 0);
dout(2) << "sending client_session close to mds" << p->first << " seq " << p->second << dendl;
messenger->send_message(new MClientSession(MClientSession::OP_REQUEST_CLOSE,
p->second),
- mdsmap->get_inst(p->first), MDS_PORT_SERVER);
+ mdsmap->get_inst(p->first));
}
// send unmount!
* message header
*/
struct ceph_message_header {
+ __u32 seq;
__u32 type;
struct ceph_entity_inst src, dst;
- __u32 source_port, dest_port;
__u32 nchunks;
};
// anchor ops
#define ANCHOR_OP_LOOKUP 1
-#define ANCHOR_OP_LOOKUP_REPLY 2
+#define ANCHOR_OP_LOOKUP_REPLY -2
#define ANCHOR_OP_CREATE_PREPARE 11
-#define ANCHOR_OP_CREATE_AGREE 12
+#define ANCHOR_OP_CREATE_AGREE -12
#define ANCHOR_OP_DESTROY_PREPARE 21
-#define ANCHOR_OP_DESTROY_AGREE 22
+#define ANCHOR_OP_DESTROY_AGREE -22
#define ANCHOR_OP_UPDATE_PREPARE 31
-#define ANCHOR_OP_UPDATE_AGREE 32
+#define ANCHOR_OP_UPDATE_AGREE -32
#define ANCHOR_OP_COMMIT 41
-#define ANCHOR_OP_ACK 42
+#define ANCHOR_OP_ACK -42
#define ANCHOR_OP_ROLLBACK 43
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
else {
dout(10) << "stray create_agree on " << ino
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
break;
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
else {
dout(10) << "stray destroy_agree on " << ino
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
break;
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
else {
dout(10) << "stray update_agree on " << ino
<< dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_ROLLBACK, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
break;
pending_lookup[ino].trace = &trace;
mds->send_message_mds(req,
- mds->mdsmap->get_anchortable(),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_anchortable());
}
pending_create_prepare[ino].onfinish = onfinish;
mds->send_message_mds(req,
- mds->mdsmap->get_anchortable(),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_anchortable());
}
void AnchorClient::prepare_destroy(inodeno_t ino,
pending_destroy_prepare[ino].onfinish = onfinish;
pending_destroy_prepare[ino].patid = patid;
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
pending_update_prepare[ino].onfinish = onfinish;
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()));
}
dout(10) << "resending commit on " << p->first << dendl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
mds->send_message_mds(req,
- mds->mdsmap->get_anchortable(),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_anchortable());
}
}
MAnchor *req = new MAnchor(op, p->first);
req->set_trace(p->second.trace);
mds->send_message_mds(req,
- mds->mdsmap->get_anchortable(),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_anchortable());
}
}
p++) {
dout(10) << "resending lookup on " << p->first << dendl;
mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
- mds->mdsmap->get_anchortable(),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->mdsmap->get_anchortable());
}
// resend any pending prepares.
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, req->get_ino());
reply->set_trace(trace);
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
}
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
}
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
}
// reply
MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, ino, atid);
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
}
<< ", already committed, sending ack."
<< dendl;
MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, 0, atid);
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
return;
}
{
dout(7) << "_commit_logged, sending ACK" << dendl;
MAnchor *reply = new MAnchor(ANCHOR_OP_ACK, req->get_ino(), req->get_atid());
- mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+ mds->messenger->send_message(reply, req->get_source_inst());
delete req;
}
{
if (pending_create.count(v)) {
MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, pending_create[v], v);
- mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(reply, who);
}
else if (pending_destroy.count(v)) {
MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, pending_destroy[v], v);
- mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(reply, who);
}
else {
assert(pending_update.count(v));
MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, pending_update[v].first, v);
- mds->send_message_mds(reply, who, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(reply, who);
}
}
if (mds->mdsmap->get_state(it->first) < MDSMap::STATE_REJOIN)
continue;
MLock *m = new MLock(lock, msg, mds->get_nodeid());
- mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
+ mds->send_message_mds(m, it->first);
}
}
continue;
MLock *m = new MLock(lock, msg, mds->get_nodeid());
m->set_data(data);
- mds->send_message_mds(m, it->first, MDS_PORT_LOCKER);
+ mds->send_message_mds(m, it->first);
}
}
(*q)->set_object_info(info);
req->get_authpins().push_back(info);
}
- mds->send_message_mds(req, p->first, MDS_PORT_SERVER);
+ mds->send_message_mds(req, p->first);
// put in waiting list
assert(mdr->more()->waiting_on_slave.count(p->first) == 0);
if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
mds->send_message_mds(new MInodeFileCaps(in->ino(), mds->get_nodeid(),
in->replica_caps_wanted),
- auth, MDS_PORT_LOCKER);
+ auth);
} else {
in->replica_caps_wanted_keep_until.sec_ref() = 0;
}
} else {
// update lock and reply
lock->set_state(LOCK_LOCK);
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
- from, MDS_PORT_LOCKER);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()), from);
}
break;
int auth = lock->get_parent()->authority().first;
if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN)
mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid()),
- lock->get_parent()->authority().first, MDS_PORT_LOCKER);
+ lock->get_parent()->authority().first);
}
lock->set_state(LOCK_LOCK);
MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
- mds->send_message_mds(r, auth, MDS_PORT_SERVER);
+ mds->send_message_mds(r, auth);
// wait
lock->add_waiter(SimpleLock::WAIT_REMOTEXLOCK, new C_MDS_RetryRequest(mdcache, mdr));
MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNXLOCK);
slavereq->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(slavereq->get_object_info());
- mds->send_message_mds(slavereq, auth, MDS_PORT_SERVER);
+ mds->send_message_mds(slavereq, auth);
}
}
int auth = lock->get_parent()->authority().first;
dout(10) << "requesting scatter from auth on "
<< *lock << " on " << *lock->get_parent() << dendl;
- mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()),
- auth, MDS_PORT_LOCKER);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_REQSCATTER, mds->get_nodeid()), auth);
}
}
if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
bufferlist data;
lock->encode_locked_state(data);
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data),
- auth, MDS_PORT_LOCKER);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), auth);
}
lock->set_state(LOCK_LOCK);
}
int auth = lock->get_parent()->authority().first;
if (lock->get_state() == LOCK_SCATTER &&
mds->mdsmap->get_state(auth) >= MDSMap::STATE_ACTIVE)
- mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()),
- auth, MDS_PORT_LOCKER);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_REQUNSCATTER, mds->get_nodeid()), auth);
// wait...
lock->add_waiter(SimpleLock::WAIT_STABLE, c);
// encode and reply
bufferlist data;
lock->encode_locked_state(data);
- mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data),
- from, MDS_PORT_LOCKER);
+ mds->send_message_mds(new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid(), data), from);
lock->set_state(LOCK_LOCK);
}
break;
// ack
MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid());
- mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER);
+ mds->send_message_mds(reply, in->authority().first);
}
break;
// ack
MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid());
- mds->send_message_mds(reply, in->authority().first, MDS_PORT_LOCKER);
+ mds->send_message_mds(reply, in->authority().first);
}
break;
lock->set_state(LOCK_LOCK);
MLock *reply = new MLock(lock, LOCK_AC_LOCKACK, mds->get_nodeid());
- mds->send_message_mds(reply, from, MDS_PORT_LOCKER);
+ mds->send_message_mds(reply, from);
}
break;
// ack
MLock *reply = new MLock(lock, LOCK_AC_MIXEDACK, mds->get_nodeid());
- mds->send_message_mds(reply, from, MDS_PORT_LOCKER);
+ mds->send_message_mds(reply, from);
}
} else {
// LOCK
MHeartbeat *hb = new MHeartbeat(load, beat_epoch);
hb->get_import_map() = import_map;
mds->messenger->send_message(hb,
- mds->mdsmap->get_inst(*p),
- MDS_PORT_BALANCER, MDS_PORT_BALANCER);
+ mds->mdsmap->get_inst(*p));
}
}
// send
- mds->send_message_mds(m, who, MDS_PORT_CACHE);
+ mds->send_message_mds(m, who);
}
}
}
- mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+ mds->send_message_mds(ack, from);
}
// am i a surviving ambiguous importer?
assert(rejoin_ack_gather.count(p->first) == 0);
rejoin_sent.insert(p->first);
rejoin_ack_gather.insert(p->first);
- mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+ mds->send_message_mds(p->second, p->first);
}
// nothing?
if (survivor) {
// survivor. do everything now.
rejoin_scour_survivor_replicas(from, ack);
- mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+ mds->send_message_mds(ack, from);
} else {
// done?
assert(rejoin_gather.count(from));
// send missing?
if (missing) {
// we expect a FULL soon.
- mds->send_message_mds(missing, from, MDS_PORT_CACHE);
+ mds->send_message_mds(missing, from);
} else {
// done?
assert(rejoin_gather.count(from));
full->add_full_inode(in->inode, in->symlink, in->dirfragtree);
}
- mds->send_message_mds(full, missing->get_source().num(), MDS_PORT_CACHE);
+ mds->send_message_mds(full, missing->get_source().num());
}
void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *full)
in->client_caps[client].wanted());
reap->set_mds( frommds ); // reap from whom?
- mds->messenger->send_message(reap,
- mds->clientmap.get_inst(client),
- 0, MDS_PORT_CACHE);
+ mds->messenger->send_message(reap, mds->clientmap.get_inst(client));
}
void MDCache::rejoin_send_acks()
for (map<int,MMDSCacheRejoin*>::iterator p = ack.begin();
p != ack.end();
++p)
- mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+ mds->send_message_mds(p->second, p->first);
}
it != expiremap.end();
it++) {
dout(7) << "sending cache_expire to " << it->first << dendl;
- mds->send_message_mds(it->second, it->first, MDS_PORT_CACHE);
+ mds->send_message_mds(it->second, it->first);
}
}
reply->add_dentry( dn->replicate_to( from ) );
if (dn->is_primary())
reply->add_inode( dn->inode->replicate_to( from ) );
- mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
+ mds->send_message_mds(reply, req->get_source().num());
}
}
}
}
if (mdr)
- request_forward(mdr, dauth.first, req->get_dest_port());
+ request_forward(mdr, dauth.first);
else
- mds->forward_message_mds(req, dauth.first, req->get_dest_port());
+ mds->forward_message_mds(req, dauth.first);
if (mds->logger) mds->logger->inc("tfw");
return 2;
void MDCache::request_forward(MDRequest *mdr, int who, int port)
{
- if (!port) port = MDS_PORT_SERVER;
dout(7) << "request_forward " << *mdr << " to mds" << who << " req " << *mdr << dendl;
- mds->forward_message_mds(mdr->client_request, who, port);
+ mds->forward_message_mds(mdr->client_request, who);
request_cleanup(mdr);
if (mds->logger) mds->logger->inc("fw");
p != mdr->more()->slaves.end();
++p) {
MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_FINISH);
- mds->send_message_mds(r, *p, MDS_PORT_SERVER);
+ mds->send_message_mds(r, *p);
}
// strip foreign xlocks out of lock lists, since the OP_FINISH drops them implicitly.
request_forget_foreign_locks(mdr);
want_ino,
want_path,
false);
- mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+ mds->send_message_mds(dis, from);
}
waiting_for_base_ino[from][want_ino].push_back(onfinish);
want_path,
true); // need the base dir open
dis->set_base_dir_frag(approx_fg);
- mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+ mds->send_message_mds(dis, from);
}
// register + wait
want_path,
true, // we want the base dir; we are relative to ino.
want_xlocked);
- mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+ mds->send_message_mds(dis, from);
}
// register + wait
want_path,
false, // no base dir; we are relative to dir
want_xlocked);
- mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+ mds->send_message_mds(dis, from);
}
// register + wait
base->dirfrag(),
want_ino,
want_xlocked);
- mds->send_message_mds(dis, from, MDS_PORT_CACHE);
+ mds->send_message_mds(dis, from);
}
// register + wait
// how did we do?
assert(!reply->is_empty());
dout(7) << "handle_discover sending result back to asker mds" << dis->get_asker() << dendl;
- mds->send_message_mds(reply, dis->get_asker(), MDS_PORT_CACHE);
+ mds->send_message_mds(reply, dis->get_asker());
delete dis;
}
it++) {
dout(7) << "sending inode_update on " << *in << " to " << *it << dendl;
assert(*it != mds->get_nodeid());
- mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it, MDS_PORT_CACHE);
+ mds->send_message_mds(new MInodeUpdate(in, in->get_cached_by_nonce(*it)), *it);
}
return 0;
dout(7) << "inode_update on " << m->get_ino() << ", don't have it, sending expire" << dendl;
MCacheExpire *expire = new MCacheExpire(mds->get_nodeid());
expire->add_inode(m->get_ino(), m->get_nonce());
- mds->send_message_mds(expire, m->get_source().num(), MDS_PORT_CACHE);
+ mds->send_message_mds(expire, m->get_source().num());
goto out;
}
dir->dir_rep_by,
path,
bcast),
- *it, MDS_PORT_CACHE);
+ *it);
}
return 0;
basedis->_encode(notify->basebl);
delete basedis;
}
- mds->send_message_mds(notify, *p, MDS_PORT_CACHE);
+ mds->send_message_mds(notify, *p);
}
}
#include "messages/MClientRequest.h"
#include "messages/MClientRequestForward.h"
+#include "messages/MAnchor.h"
#include "config.h"
server->reopen_logger(start, append);
}
-void MDS::send_message_mds(Message *m, int mds, int port, int fromport)
+void MDS::send_message_mds(Message *m, int mds)
{
// send mdsmap first?
if (peer_mdsmap_epoch[mds] < mdsmap->get_epoch()) {
}
// send message
- if (port && !fromport)
- fromport = port;
- messenger->send_message(m, mdsmap->get_inst(mds), port, fromport);
+ messenger->send_message(m, mdsmap->get_inst(mds));
}
-void MDS::forward_message_mds(Message *req, int mds, int port)
+void MDS::forward_message_mds(Message *req, int mds)
{
// client request?
if (req->get_type() == MSG_CLIENT_REQUEST) {
}
// forward
- send_message_mds(req, mds, port);
+ send_message_mds(req, mds);
}
}
- switch (m->get_dest_port()) {
-
- case MDS_PORT_ANCHORTABLE:
- anchortable->dispatch(m);
- break;
- case MDS_PORT_ANCHORCLIENT:
- anchorclient->dispatch(m);
- break;
-
+ int port = m->get_type() & 0xff00;
+ switch (port) {
case MDS_PORT_CACHE:
mdcache->dispatch(m);
break;
+
case MDS_PORT_LOCKER:
locker->dispatch(m);
break;
case MDS_PORT_MIGRATOR:
mdcache->migrator->dispatch(m);
break;
- case MDS_PORT_RENAMER:
- //mdcache->renamer->dispatch(m);
- break;
- case MDS_PORT_BALANCER:
- balancer->proc_message(m);
- break;
-
- case MDS_PORT_MAIN:
- proc_message(m);
- break;
+ default:
+ switch (m->get_type()) {
+ // SERVER
+ case MSG_CLIENT_SESSION:
+ case MSG_CLIENT_REQUEST:
+ case MSG_MDS_SLAVE_REQUEST:
+ server->dispatch(m);
+ break;
+
+ case MSG_MDS_HEARTBEAT:
+ balancer->proc_message(m);
+ break;
- case MDS_PORT_SERVER:
- server->dispatch(m);
- break;
+ // anchor
+ case MSG_MDS_ANCHOR:
+ if (((MAnchor*)m)->get_op() < 0)
+ anchorclient->dispatch(m);
+ else
+ anchortable->dispatch(m);
+ break;
- default:
- dout(1) << "MDS dispatch unknown message port" << m->get_dest_port() << dendl;
- assert(0);
+ // OSD
+ case MSG_OSD_OPREPLY:
+ objecter->handle_osd_op_reply((class MOSDOpReply*)m);
+ break;
+ case MSG_OSD_MAP:
+ handle_osd_map((MOSDMap*)m);
+ break;
+
+ // MDS
+ case MSG_MDS_MAP:
+ handle_mds_map((MMDSMap*)m);
+ break;
+ case MSG_MDS_BEACON:
+ handle_mds_beacon((MMDSBeacon*)m);
+ break;
+
+ default:
+ dout(1) << "MDS unknown messge " << m->get_type() << dendl;
+ assert(0);
+ }
}
// finish any triggered contexts
MDSMap *get_mds_map() { return mdsmap; }
OSDMap *get_osd_map() { return osdmap; }
- void send_message_mds(Message *m, int mds, int port=0, int fromport=0);
- void forward_message_mds(Message *req, int mds, int port=0);
+ void send_message_mds(Message *m, int mds);
+ void forward_message_mds(Message *req, int mds);
void send_message_client(Message *m, int client);
void send_message_client(Message *m, entity_inst_t clientinst);
export_state.erase(dir); // clean up
dir->state_clear(CDir::STATE_EXPORTING);
if (export_peer[dir] != who) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
break;
case EXPORT_FREEZING:
export_state.erase(dir); // clean up
dir->state_clear(CDir::STATE_EXPORTING);
if (export_peer[dir] != who) // tell them.
- mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir], MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirCancel(dir->dirfrag()), export_peer[dir]);
break;
// NOTE: state order reversal, warning comes after loggingstart+prepping
dir->state_set(CDir::STATE_EXPORTING);
// send ExportDirDiscover (ask target)
- mds->send_message_mds(new MExportDirDiscover(dir), dest, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirDiscover(dir), dest);
// start the freeze, but hold it up with an auth_pin.
dir->auth_pin();
// send.
export_state[dir] = EXPORT_PREPPING;
- mds->send_message_mds(prep, dest, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(prep, dest);
}
void Migrator::handle_export_prep_ack(MExportDirPrepAck *m)
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
pair<int,int>(mds->get_nodeid(),export_peer[dir]));
notify->copy_bounds(bounds);
- mds->send_message_mds(notify, p->first, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(notify, p->first);
}
export_state[dir] = EXPORT_WARNING;
req->add_export((*p)->dirfrag());
// send
- mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(req, dest);
// stats
if (mds->logger) mds->logger->inc("ex");
notify->copy_bounds(bounds);
- mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(notify, *p);
}
// wait for notifyacks
// send finish/commit to new auth
if (mds->mdsmap->is_active_or_stopping(export_peer[dir])) {
- mds->send_message_mds(new MExportDirFinish(dir->dirfrag()),
- export_peer[dir], MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag()), export_peer[dir]);
} else {
dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
}
// reply
dout(7) << " sending export_discover_ack on " << *in << dendl;
- mds->send_message_mds(new MExportDirDiscoverAck(df),
- import_peer[df], MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirDiscoverAck(df), import_peer[df]);
}
void Migrator::handle_export_cancel(MExportDirCancel *m)
// ok!
dout(7) << " sending export_prep_ack on " << *dir << dendl;
- mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()),
- m->get_source().num(), MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirPrepAck(dir->dirfrag()), m->get_source().num());
// note new state
import_state[dir->dirfrag()] = IMPORT_PREPPED;
pair<int,int>(mds->get_nodeid(), CDIR_AUTH_UNKNOWN),
pair<int,int>(import_peer[dir->dirfrag()], CDIR_AUTH_UNKNOWN));
notify->copy_bounds(bounds);
- mds->send_message_mds(notify, *p, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(notify, *p);
}
}
// send notify's etc.
dout(7) << "sending ack for " << *dir << " to old auth mds" << from << dendl;
- mds->send_message_mds(new MExportDirAck(dir->dirfrag()),
- from, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirAck(dir->dirfrag()), from);
cache->show_subtrees();
}
// send ack
if (m->wants_ack()) {
- mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()),
- from, MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirNotifyAck(m->get_dirfrag()), from);
} else {
// aborted. no ack.
dout(7) << "handle_export_notify no ack requested" << dendl;
if (req->get_retry_attempt()) {
if (mds->clientmap.have_completed_request(req->get_reqid())) {
dout(5) << "already completed " << req->get_reqid() << dendl;
- mds->messenger->send_message(new MClientReply(req, 0),
- req->get_client_inst());
+ mds->messenger->send_message(new MClientReply(req, 0), req->get_client_inst());
delete req;
return;
}
MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_XLOCKACK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
- mds->send_message_mds(r, mdr->slave_request->get_source().num(), MDS_PORT_SERVER);
+ mds->send_message_mds(r, mdr->slave_request->get_source().num());
} else {
if (lock) {
dout(10) << "not auth for remote xlock attempt, dropping on "
reply->get_authpins().push_back(info);
}
- mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+ mds->send_message_mds(reply, mdr->slave_to_mds);
// clean up this request
delete mdr->slave_request;
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREP);
targeti->set_object_info(req->get_object_info());
req->now = mdr->now;
- mds->send_message_mds(req, linkauth, MDS_PORT_SERVER);
+ mds->send_message_mds(req, linkauth);
assert(mdr->more()->waiting_on_slave.count(linkauth) == 0);
mdr->more()->waiting_on_slave.insert(linkauth);
// ack
MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREPACK);
- mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+ mds->send_message_mds(reply, mdr->slave_to_mds);
// set up commit waiter
mdr->more()->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti, old_ctime, old_version, inc);
unlink->straydir = straydn->dir->replicate_to(it->first);
unlink->straydn = straydn->replicate_to(it->first);
}
- mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE);
+ mds->send_message_mds(unlink, it->first);
}
// commit anchor update?
MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_UNLINKPREP);
dn->inode->set_object_info(req->get_object_info());
req->now = mdr->now;
- mds->send_message_mds(req, inauth, MDS_PORT_SERVER);
+ mds->send_message_mds(req, inauth);
assert(mdr->more()->waiting_on_slave.count(inauth) == 0);
mdr->more()->waiting_on_slave.insert(inauth);
it++) {
dout(7) << "_unlink_remote_finish sending MDentryUnlink to mds" << it->first << dendl;
MDentryUnlink *unlink = new MDentryUnlink(dn->dir->dirfrag(), dn->name);
- mds->send_message_mds(unlink, it->first, MDS_PORT_CACHE);
+ mds->send_message_mds(unlink, it->first);
}
// commit anchor update?
// srcdn auth will verify our current witness list is sufficient
req->witnesses = mdr->more()->witnessed;
- mds->send_message_mds(req, who, MDS_PORT_SERVER);
+ mds->send_message_mds(req, who);
assert(mdr->more()->waiting_on_slave.count(who) == 0);
mdr->more()->waiting_on_slave.insert(who);
dout(10) << " witness list insufficient; providing srcdn replica list" << dendl;
MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREPACK);
reply->witnesses.swap(srcdnrep);
- mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+ mds->send_message_mds(reply, mdr->slave_to_mds);
delete mdr->slave_request;
mdr->slave_request = 0;
return;
// apply
_rename_apply(mdr, srcdn, destdn, straydn);
- mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+ mds->send_message_mds(reply, mdr->slave_to_mds);
// bump popularity
//if (srcdn->is_auth())
#define MDS_REF_SET // define me for improved debug output, sanity checking
-#define MDS_PORT_MAIN 0
-#define MDS_PORT_SERVER 1
-#define MDS_PORT_CACHE 2
-#define MDS_PORT_LOCKER 3
-#define MDS_PORT_STORE 4
-#define MDS_PORT_BALANCER 5
-#define MDS_PORT_MIGRATOR 6
-#define MDS_PORT_RENAMER 7
-#define MDS_PORT_ANCHORCLIENT 10
-#define MDS_PORT_ANCHORTABLE 11
+
+#define MDS_PORT_CACHE 0x200
+#define MDS_PORT_LOCKER 0x300
+#define MDS_PORT_MIGRATOR 0x400
+
#define MAX_MDS 0x100
}
-int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fromport)
+int FakeMessenger::send_message(Message *m, entity_inst_t inst)
{
entity_name_t dest = inst.name;
- m->set_source(get_myname(), fromport);
+ m->set_source(get_myname());
m->set_source_addr(get_myaddr());
m->set_dest_inst(inst);
- m->set_dest_port(port);
lock.Lock();
void reset_myname(entity_name_t m);
// msg interface
- virtual int send_message(Message *m, entity_inst_t dest, int port=0, int fromport=0);
+ int send_message(Message *m, entity_inst_t dest);
// events
//virtual void trigger_timer(Timer *t);
case MSG_CLOSE:
case MSG_SHUTDOWN:
- case MSG_MDS_SHUTDOWNSTART:
- case MSG_MDS_SHUTDOWNFINISH:
- case MSG_OSD_MKFS_ACK:
m = new MGenericMessage(env.type);
break;
default:
- dout(1) << "can't decode unknown message type " << env.type << dendl;
+ dout(0) << "can't decode unknown message type " << env.type << dendl;
assert(0);
}
#define MSG_OSD_MAP 44
#define MSG_OSD_BOOT 45
-#define MSG_OSD_MKFS_ACK 46
#define MSG_OSD_FAILURE 47
#define MSG_CLIENT_REQUEST 80
#define MSG_CLIENT_REQUEST_FORWARD 81
#define MSG_CLIENT_REPLY 82
-#define MSG_CLIENT_FILECAPS 83
+#define MSG_CLIENT_FILECAPS 0x310 //
// *** MDS ***
+
+#define MSG_MDS_RESOLVE 0x200
+#define MSG_MDS_RESOLVEACK 0x201
+#define MSG_MDS_CACHEREJOIN 0x202
+#define MSG_MDS_DISCOVER 0x203
+#define MSG_MDS_DISCOVERREPLY 0x204
+#define MSG_MDS_INODEUPDATE 0x205
+#define MSG_MDS_DIRUPDATE 0x206
+#define MSG_MDS_CACHEEXPIRE 0x207
+#define MSG_MDS_DENTRYUNLINK 0x208
+#define MSG_MDS_FRAGMENTNOTIFY 0x209
+
+#define MSG_MDS_LOCK 0x300
+#define MSG_MDS_INODEFILECAPS 0x301
+
+#define MSG_MDS_EXPORTDIRDISCOVER 0x449
+#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
+#define MSG_MDS_EXPORTDIRCANCEL 0x451
+#define MSG_MDS_EXPORTDIRPREP 0x452
+#define MSG_MDS_EXPORTDIRPREPACK 0x453
+#define MSG_MDS_EXPORTDIRWARNING 0x454
+#define MSG_MDS_EXPORTDIRWARNINGACK 0x455
+#define MSG_MDS_EXPORTDIR 0x456
+#define MSG_MDS_EXPORTDIRACK 0x457
+#define MSG_MDS_EXPORTDIRNOTIFY 0x458
+#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
+#define MSG_MDS_EXPORTDIRFINISH 0x460
+
+
#define MSG_MDS_GETMAP 102
#define MSG_MDS_MAP 103
-#define MSG_MDS_HEARTBEAT 104 // for mds load balancer
#define MSG_MDS_BEACON 105 // to monitor
-#define MSG_MDS_RESOLVE 106
-#define MSG_MDS_RESOLVEACK 107
-
-#define MSG_MDS_CACHEREJOIN 108
+#define MSG_MDS_ANCHOR 0x100
+#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
-#define MSG_MDS_DISCOVER 110
-#define MSG_MDS_DISCOVERREPLY 111
+#define MSG_MDS_SLAVE_REQUEST 170
+/*
#define MSG_MDS_INODEGETREPLICA 112
#define MSG_MDS_INODEGETREPLICAACK 113
-#define MSG_MDS_INODEFILECAPS 115
-
-#define MSG_MDS_INODEUPDATE 120
-#define MSG_MDS_DIRUPDATE 121
-#define MSG_MDS_INODEEXPIRE 122
-#define MSG_MDS_DIREXPIRE 123
-
#define MSG_MDS_DIREXPIREREQ 124
+*/
-#define MSG_MDS_CACHEEXPIRE 125
-
-#define MSG_MDS_ANCHOR 130
-
-#define MSG_MDS_FRAGMENTNOTIFY 140
-
-#define MSG_MDS_EXPORTDIRDISCOVER 149
-#define MSG_MDS_EXPORTDIRDISCOVERACK 150
-#define MSG_MDS_EXPORTDIRCANCEL 151
-#define MSG_MDS_EXPORTDIRPREP 152
-#define MSG_MDS_EXPORTDIRPREPACK 153
-#define MSG_MDS_EXPORTDIRWARNING 154
-#define MSG_MDS_EXPORTDIRWARNINGACK 155
-#define MSG_MDS_EXPORTDIR 156
-#define MSG_MDS_EXPORTDIRACK 157
-#define MSG_MDS_EXPORTDIRNOTIFY 158
-#define MSG_MDS_EXPORTDIRNOTIFYACK 159
-#define MSG_MDS_EXPORTDIRFINISH 160
-
-#define MSG_MDS_SLAVE_REQUEST 170
-
-#define MSG_MDS_DENTRYUNLINK 200
-
-#define MSG_MDS_LOCK 500
-#define MSG_MDS_SHUTDOWNSTART 900
-#define MSG_MDS_SHUTDOWNFINISH 901
#include <stdlib.h>
public:
Message() {
- env.source_port = env.dest_port = 0;
env.nchunks = 0;
};
Message(int t) {
- env.source_port = env.dest_port = 0;
env.nchunks = 0;
env.type = t;
}
void set_source_inst(entity_inst_t& inst) { env.src = *(ceph_entity_inst*)&inst; }
entity_name_t& get_dest() { return *(entity_name_t*)&env.dst.name; }
- void set_dest(entity_name_t a, int p) { env.dst.name = *(ceph_entity_name*)&a; env.dest_port = p; }
- int get_dest_port() { return env.dest_port; }
- void set_dest_port(int p) { env.dest_port = p; }
+ void set_dest(entity_name_t a) { env.dst.name = *(ceph_entity_name*)&a; }
entity_name_t& get_source() { return *(entity_name_t*)&env.src.name; }
- void set_source(entity_name_t a, int p) { env.src.name = *(ceph_entity_name*)&a; env.source_port = p; }
- int get_source_port() { return env.source_port; }
+ void set_source(entity_name_t a) { env.src.name = *(ceph_entity_name*)&a; }
entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; }
void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; }
// send message
virtual void prepare_dest(const entity_addr_t& addr) {}
- virtual int send_message(Message *m, entity_inst_t dest,
- int port=0, int fromport=0) = 0;
- virtual int send_first_message(Dispatcher *d,
- Message *m, entity_inst_t dest,
- int port=0, int fromport=0) {
- set_dispatcher(d);
- return send_message(m, dest, port, fromport);
- }
-
- // make a procedure call
- //virtual Message* sendrecv(Message *m, msg_name_t dest, int port=0);
+ virtual int send_message(Message *m, entity_inst_t dest) = 0;
virtual void mark_down(entity_addr_t a) {}
dout(10) << "accepter.start bound to " << listen_addr << dendl;
// listen!
- rc = ::listen(listen_sd, 1000);
+ rc = ::listen(listen_sd, 128);
assert(rc >= 0);
// figure out my_addr
break;
}
- dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message "
- << m << " " << *m
+ in_seq++;
+
+ dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message "
+ << in_seq << " " << m << " " << *m
<< " for " << m->get_dest() << dendl;
// deliver
rank.lock.Unlock();
}
-int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest,
- int port, int fromport)
-{
- // set envelope
- m->set_source(get_myname(), fromport);
- m->set_source_addr(my_addr);
- m->set_dest_inst(dest);
- m->set_dest_port(port);
-
- dout(1) << m->get_source()
- << " --> " << dest.name << " " << dest.addr
- << " -- " << *m
- << " -- " << m
- << dendl;
-
- rank.submit_message(m, dest.addr);
-
- return 0;
-}
-
-int Rank::EntityMessenger::send_first_message(Dispatcher *d,
- Message *m, entity_inst_t dest,
- int port, int fromport)
+int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest)
{
- /* hacky thing for csyn and newsyn:
- * set dispatcher (go active) AND set sender for this
- * message while holding rank.lock. this prevents any
- * races against incoming unnamed messages naming us before
- * we fire off our first message.
- */
- rank.lock.Lock();
- set_dispatcher(d);
-
// set envelope
- m->set_source(get_myname(), fromport);
+ m->set_source(get_myname());
m->set_source_addr(my_addr);
m->set_dest_inst(dest);
- m->set_dest_port(port);
- rank.lock.Unlock();
dout(1) << m->get_source()
<< " --> " << dest.name << " " << dest.addr
bool writer_running;
list<Message*> q;
+ list<Message*> sent;
Mutex lock;
Cond cond;
+
+ int out_seq, out_acked;
+ int in_seq;
int accept(); // server handshake
int connect(); // client handshake
done(false), server(true),
need_to_send_close(true),
reader_running(false), writer_running(false),
+ out_seq(0), out_acked(0), in_seq(0),
reader_thread(this), writer_thread(this) {
// server
reader_running = true;
int shutdown();
void suicide();
void prepare_dest(const entity_addr_t& addr);
- int send_message(Message *m, entity_inst_t dest,
- int port=0, int fromport=0);
- int send_first_message(Dispatcher *d,
- Message *m, entity_inst_t dest,
- int port=0, int fromport=0);
+ int send_message(Message *m, entity_inst_t dest);
void mark_down(entity_addr_t a);
void mark_up(entity_name_t a, entity_addr_t& i);