- drop remote locks on request finish
- handled by individual MDSCacheObject _finish()'s
-- properly recover lock state on rejoin...
- - recovering mds rejoins replicas it pulled out of its journal
- - replicas will tell it when they hold an xlock
- - surviving mds rejoins replicas from a recovering mds
- - will tell auth if it holds an xlock
+/- properly recover lock state on rejoin...
+/ - recovering mds rejoins replicas it pulled out of its journal
+/ - replicas will tell it when they hold an xlock
+/ - surviving mds rejoins replicas from a recovering mds
+/ - will tell auth if it holds an xlock
+- send_rejoin_acks
- recovering open files
- recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
- path+cap window will require some fetching of metadata from disk before doing the rejoin
+ - failures during migration.. what about client stale/reap stuff and misplaced WR caps?
+
+- inode.max_size
- journal+recovery
- local rename
- remote unlink
- rewrite to look link _link
- remote rename
- - open(wr cap), open+create
- file capabilities i/o
- filelock to control directory mtime, dentry changes
- hmm, may have to change lock ordering, and Server::rdlock_path_pin_ref()
Inode* Client::insert_trace(MClientReply *reply)
{
Inode *cur = root;
- utime_t now = g_clock.now();
+ utime_t now = g_clock.real_now();
dout(10) << "insert_trace got " << reply->get_trace_in().size() << " inodes" << endl;
// -------
-MClientReply *Client::make_request(MClientRequest *req,
- bool auth_best,
- int use_mds) // this param is purely for debug hacking
+int Client::choose_target_mds(MClientRequest *req)
{
- // choose an mds
int mds = 0;
- while (1) {
-
- // find deepest known prefix
- Inode *diri = root; // the deepest known containing dir
- Inode *item = 0; // the actual item... if we know it
- int missing_dn = -1; // which dn we miss on (if we miss)
- unsigned depth = req->get_filepath().depth();
- for (unsigned i=0; i<depth; i++) {
- // dir?
- if (diri && diri->inode.mode & INODE_MODE_DIR && diri->dir) {
- Dir *dir = diri->dir;
-
- // do we have the next dentry?
- if (dir->dentries.count( req->get_filepath()[i] ) == 0) {
- missing_dn = i; // no.
- break;
- }
-
- dout(7) << " have path seg " << i << " on " << diri->dir_auth << " ino " << diri->inode.ino << " " << req->get_filepath()[i] << endl;
-
- if (i == depth-1) { // last one!
- item = dir->dentries[ req->get_filepath()[i] ]->inode;
- break;
- }
-
- // continue..
- diri = dir->dentries[ req->get_filepath()[i] ]->inode;
- assert(diri);
- } else {
- missing_dn = i;
+ // find deepest known prefix
+ Inode *diri = root; // the deepest known containing dir
+ Inode *item = 0; // the actual item... if we know it
+ int missing_dn = -1; // which dn we miss on (if we miss)
+
+ unsigned depth = req->get_filepath().depth();
+ for (unsigned i=0; i<depth; i++) {
+ // dir?
+ if (diri && diri->inode.mode & INODE_MODE_DIR && diri->dir) {
+ Dir *dir = diri->dir;
+
+ // do we have the next dentry?
+ if (dir->dentries.count( req->get_filepath()[i] ) == 0) {
+ missing_dn = i; // no.
break;
}
- }
-
- // pick mds
- if (!diri || g_conf.client_use_random_mds) {
- // no root info, pick a random MDS
- mds = rand() % mdsmap->get_num_mds();
+
+ dout(7) << " have path seg " << i << " on " << diri->dir_auth << " ino " << diri->inode.ino << " " << req->get_filepath()[i] << endl;
+
+ if (i == depth-1) { // last one!
+ item = dir->dentries[ req->get_filepath()[i] ]->inode;
+ break;
+ }
+
+ // continue..
+ diri = dir->dentries[ req->get_filepath()[i] ]->inode;
+ assert(diri);
} else {
- if (auth_best) {
- // pick the actual auth (as best we can)
- if (item) {
- mds = item->authority(mdsmap);
- } else if (diri->dir_hashed && missing_dn >= 0) {
- mds = diri->dentry_authority(req->get_filepath()[missing_dn].c_str(),
- mdsmap);
- } else {
- mds = diri->authority(mdsmap);
- }
+ missing_dn = i;
+ break;
+ }
+ }
+
+ // pick mds
+ if (!diri || g_conf.client_use_random_mds) {
+ // no root info, pick a random MDS
+ mds = rand() % mdsmap->get_num_mds();
+ } else {
+ if (req->auth_is_best()) {
+ // pick the actual auth (as best we can)
+ if (item) {
+ mds = item->authority(mdsmap);
+ } else if (diri->dir_hashed && missing_dn >= 0) {
+ mds = diri->dentry_authority(req->get_filepath()[missing_dn].c_str(),
+ mdsmap);
} else {
- // balance our traffic!
- if (diri->dir_hashed && missing_dn >= 0)
- mds = diri->dentry_authority(req->get_filepath()[missing_dn].c_str(),
- mdsmap);
- else
- mds = diri->pick_replica(mdsmap);
+ mds = diri->authority(mdsmap);
}
+ } else {
+ // balance our traffic!
+ if (diri->dir_hashed && missing_dn >= 0)
+ mds = diri->dentry_authority(req->get_filepath()[missing_dn].c_str(),
+ mdsmap);
+ else
+ mds = diri->pick_replica(mdsmap);
}
- dout(20) << "mds is " << mds << endl;
+ }
+ dout(20) << "mds is " << mds << endl;
+
+ return mds;
+}
- // force use of a particular mds?
- if (use_mds >= 0) mds = use_mds;
+
+MClientReply *Client::make_request(MClientRequest *req,
+ int use_mds) // this param is purely for debug hacking
+{
+ // time the call
+ utime_t start = g_clock.real_now();
+
+ bool nojournal = false;
+ int op = req->get_op();
+ if (op == MDS_OP_STAT ||
+ op == MDS_OP_LSTAT ||
+ op == MDS_OP_READDIR ||
+ op == MDS_OP_OPEN)
+ nojournal = true;
+
+
+ // -- request --
+ // assign a unique tid
+ tid_t tid = ++last_tid;
+ req->set_tid(tid);
+ if (!mds_requests.empty())
+ req->set_oldest_client_tid(mds_requests.begin()->first);
+
+ // make note
+ MetaRequest request(req, tid);
+ mds_requests[tid] = &request;
+
+ // encode payload now, in case we have to resend (in case of mds failure)
+ req->encode_payload();
+ request.request_payload = req->get_payload();
+
+ // note idempotency
+ request.idempotent = req->is_idempotent();
+
+ // hack target mds?
+ if (use_mds)
+ request.resend_mds = use_mds;
+
+ // set up wait cond
+ Cond cond;
+ request.caller_cond = &cond;
+
+ while (1) {
+ // choose mds
+ int mds;
+ // force use of a particular mds?
+ if (request.resend_mds >= 0) {
+ mds = request.resend_mds;
+ request.resend_mds = -1;
+ dout(10) << "target resend_mds specified as mds" << mds << endl;
+ } else {
+ mds = choose_target_mds(req);
+ dout(10) << "chose target mds" << mds << " based on hierarchy" << endl;
+ }
+
// open a session?
if (mds_sessions.count(mds) == 0) {
Cond cond;
messenger->send_message(new MClientSession(MClientSession::OP_OPEN),
mdsmap->get_inst(mds), MDS_PORT_SERVER);
}
-
+
// wait
waiting_for_session[mds].push_back(&cond);
while (waiting_for_session.count(mds)) {
}
}
- break;
+ // send request.
+ send_request(&request, mds);
+
+ // wait for signal
+ dout(20) << "awaiting kick on " << &cond << endl;
+ cond.Wait(client_lock);
+
+ // did we get a reply?
+ if (request.reply)
+ break;
}
- // time the call
- utime_t start = g_clock.now();
+ // got it!
+ MClientReply *reply = request.reply;
- bool nojournal = false;
- int op = req->get_op();
- if (op == MDS_OP_STAT ||
- op == MDS_OP_LSTAT ||
- op == MDS_OP_READDIR ||
- op == MDS_OP_OPEN)
- nojournal = true;
+ // kick dispatcher (we've got it!)
+ assert(request.dispatch_cond);
+ request.dispatch_cond->Signal();
+ dout(20) << "sendrecv kickback on tid " << tid << " " << request.dispatch_cond << endl;
+
+ // clean up.
+ mds_requests.erase(tid);
- MClientReply *reply = sendrecv(req, mds);
+ // -- log times --
if (client_logger) {
- utime_t lat = g_clock.now();
+ utime_t lat = g_clock.real_now();
lat -= start;
dout(20) << "lat " << lat << endl;
client_logger->finc("lsum",(double)lat);
}
-MClientReply* Client::sendrecv(MClientRequest *req, int mds)
-{
- // -- request --
- // assign a unique tid
- tid_t tid = ++last_tid;
- req->set_tid(tid);
- if (!mds_requests.empty())
- req->set_oldest_client_tid(mds_requests.begin()->first);
-
- // make note
- MetaRequest request(req, tid);
- mds_requests[tid] = &request;
-
- // encode payload now, in case we have to resend (in case of mds failure)
- req->encode_payload();
- request.request_payload = req->get_payload();
-
- // note idempotency
- request.idempotent = req->is_idempotent();
-
- // send initial request.
- send_request(&request, mds);
-
- // wait for reply
- Cond cond;
- request.caller_cond = &cond;
- while (request.reply == 0) {
- dout(20) << "sendrecv awaiting reply kick on " << &cond << endl;
- cond.Wait(client_lock);
- }
-
- // got it!
- MClientReply *reply = request.reply;
-
- // kick dispatcher (we've got it!)
- assert(request.dispatch_cond);
- request.dispatch_cond->Signal();
- dout(20) << "sendrecv kickback on tid " << tid << " " << request.dispatch_cond << endl;
-
- // clean up.
- mds_requests.erase(tid);
- return reply;
-}
-
void Client::send_request(MetaRequest *request, int mds)
{
MClientRequest *r = request->request;
// reset retry counter
request->retry_attempt = 0;
- if (request->idempotent) {
+ if (request->idempotent &&
+ mds_sessions.count(fwd->get_dest_mds())) {
+ // dest mds has a session, and request was forwarded for us.
+
// note new mds set.
- // there are now exactly two mds's whose failure should trigger a resend
- // of this request.
if (request->num_fwd < fwd->get_num_fwd()) {
+ // there are now exactly two mds's whose failure should trigger a resend
+ // of this request.
request->mds.clear();
request->mds.insert(fwd->get_source().num());
request->mds.insert(fwd->get_dest_mds());
dout(10) << "handle_client_request tid " << tid
<< " previously forwarded to mds" << fwd->get_dest_mds()
<< ", mds still " << request->mds
- << endl;
+ << endl;
}
} else {
- request->mds.clear();
- request->mds.insert(fwd->get_dest_mds());
- request->num_fwd = fwd->get_num_fwd();
-
+ // request not forwarded, or dest mds has no session.
+ // resend.
dout(10) << "handle_client_request tid " << tid
<< " fwd " << fwd->get_num_fwd()
<< " to mds" << fwd->get_dest_mds()
<< ", non-idempotent, resending to " << fwd->get_dest_mds()
<< endl;
-
- send_request(request, fwd->get_dest_mds());
+
+ request->mds.clear();
+ request->num_fwd = fwd->get_num_fwd();
+ request->resend_mds = fwd->get_dest_mds();
+ request->caller_cond->Signal();
}
delete fwd;
mount_cond.Signal(); // mount might be waiting for this.
- // send reconnect
+ // send reconnect?
if (mdsmap->get_state(from) == MDSMap::STATE_RECONNECT) {
send_reconnect(from);
}
- // resubmit any requests to recovering mds's
- set<int> resolving;
- mdsmap->get_mds_set(resolving, MDSMap::STATE_REJOIN);
- for (set<int>::iterator p = resolving.begin();
- p != resolving.end();
- ++p) {
- kick_requests(*p);
- failed_mds.erase(*p);
+ // kick requests?
+ if (mdsmap->get_state(from) == MDSMap::STATE_ACTIVE) {
+ kick_requests(from);
+ //failed_mds.erase(from);
}
}
m->add_inode_caps(p->first,
p->second->caps[mds].caps,
p->second->caps[mds].seq,
- p->second->file_caps_wanted());
+ p->second->file_caps_wanted(),
+ p->second->inode.size,
+ p->second->inode.mtime, p->second->inode.atime);
string path;
p->second->make_path(path);
dout(10) << " path on " << p->first << " is " << path << endl;
}
}
+ // send session closes!
+ for (set<int>::iterator p = mds_sessions.begin();
+ p != mds_sessions.end();
+ ++p) {
+ dout(2) << "sending client_session close to mds" << *p << endl;
+ messenger->send_message(new MClientSession(MClientSession::OP_CLOSE),
+ mdsmap->get_inst(*p), MDS_PORT_SERVER);
+ }
+
// send unmount!
int mon = monmap->pick_mon();
dout(2) << "sending client_unmount to mon" << mon << endl;
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
if (res == 0) {
// remove from local cache
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
if (res == 0) {
// remove from local cache
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply); //FIXME assuming trace of link, not of target
delete reply;
Dentry *dn = lookup(fpath);
inode_t inode;
- utime_t now = g_clock.now();
+ utime_t now = g_clock.real_now();
if (dn &&
now <= dn->inode->valid_until &&
((dn->inode->inode.mask & INODE_MASK_ALL_STAT) == INODE_MASK_ALL_STAT)) {
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
//FIXME enforce caller uid rights?
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
// only open dir if we're actually adding stuff to it!
Dir *dir = diri->open_dir();
assert(dir);
- utime_t now = g_clock.now();
+ utime_t now = g_clock.real_now();
list<string>::const_iterator pdn = reply->get_dir_dn().begin();
for (list<InodeStat*>::const_iterator pin = reply->get_dir_in().begin();
req->args.open.mode = mode;
int cmode = req->get_open_file_mode();
- bool tryauth = !req->open_file_mode_is_readonly();
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- MClientReply *reply = make_request(req, tryauth); // try auth if writer
+ MClientReply *reply = make_request(req);
assert(reply);
dout(3) << "op: open_files[" << reply->get_result() << "] = fh; // fh = " << reply->get_result() << endl;
dout(10) << "cur file size is " << in->inode.size << " wr size " << in->file_wr_size << endl;
// time it.
- utime_t start = g_clock.now();
+ utime_t start = g_clock.real_now();
// copy into fresh buffer (since our write may be resub, async)
bufferptr bp = buffer::copy(buf, size);
}
// time
- utime_t lat = g_clock.now();
+ utime_t lat = g_clock.real_now();
lat -= start;
if (client_logger) {
client_logger->finc("wrlsum",(double)lat);
}
// mtime
- in->file_wr_mtime = in->inode.mtime = g_clock.now();
+ in->file_wr_mtime = in->inode.mtime = g_clock.real_now();
// ok!
client_lock.Unlock();
req->set_caller_uid(getuid());
req->set_caller_gid(getgid());
- MClientReply *reply = make_request(req, true);
+ MClientReply *reply = make_request(req);
int res = reply->get_result();
insert_trace(reply);
delete reply;
}
else if (dest.is_mds()) {
dout(0) << "ms_handle_failure " << dest << " inst " << inst << endl;
- failed_mds.insert(dest.num());
+ //failed_mds.insert(dest.num());
}
else {
// client?
map<int, list<Cond*> > waiting_for_session;
void handle_client_session(MClientSession *m);
+ void send_reconnect(int mds);
// mds requests
struct MetaRequest {
bool idempotent; // is request idempotent?
set<int> mds; // who i am asking
+ int resend_mds; // someone wants you to (re)send the request here
int num_fwd; // # of times i've been forwarded
int retry_attempt;
MetaRequest(MClientRequest *req, tid_t t) :
tid(t), request(req),
- idempotent(false), num_fwd(0), retry_attempt(0),
+ idempotent(false), resend_mds(-1), num_fwd(0), retry_attempt(0),
reply(0),
caller_cond(0), dispatch_cond(0) { }
};
map<tid_t, MetaRequest*> mds_requests;
set<int> failed_mds;
- MClientReply *make_request(MClientRequest *req, bool auth_best=false, int use_auth=-1);
- MClientReply* sendrecv(MClientRequest *req, int mds);
+ MClientReply *make_request(MClientRequest *req, int use_auth=-1);
+ int choose_target_mds(MClientRequest *req);
void send_request(MetaRequest *request, int mds);
+ void kick_requests(int mds);
void handle_client_request_forward(MClientRequestForward *reply);
void handle_client_reply(MClientReply *reply);
- void kick_requests(int mds);
-
- void send_reconnect(int mds);
// cluster descriptors
mds_log_before_reply: true,
mds_log_flush_on_shutdown: true,
mds_log_import_map_interval: 1024*1024, // frequency (in bytes) of EImportMap in log
- mds_log_eopen_size: 100,
+ mds_log_eopen_size: 100, // # open inodes per log entry
mds_bal_replicate_threshold: 2000,
mds_bal_unreplicate_threshold: 0,//500,
// pairs <f, b>:
// frag_t f is split by b bits.
// if child frag_t does not appear, it is not split.
- std::map<frag_t,int> _splits;
+ std::map<frag_t,__int32_t> _splits;
public:
// accessors
return _splits.empty();
}
int get_split(const frag_t hb) const {
- std::map<frag_t,int>::const_iterator p = _splits.find(hb);
+ std::map<frag_t,__int32_t>::const_iterator p = _splits.find(hb);
if (p == _splits.end())
return 0;
else
// verify that we describe a legal partition of the namespace.
void verify() const {
- std::map<frag_t,int> copy;
+ std::map<frag_t,__int32_t> copy;
std::list<frag_t> q;
q.push_back(frag_t());
bool anchored; // auth only?
// file (data access)
- off_t size;
+ off_t size, max_size;
utime_t mtime; // file data modify time.
utime_t atime; // file data access time.
// -----auth-------- ---replica-------
#define LOCK_SYNC_ 1 // AR R . / C R . . . L R . / C R . . . L stat()
-#define LOCK_GSYNCL -12 // A . . / C ? . . . L loner -> sync (*) FIXME: let old loner keep R, somehow...
+#define LOCK_GSYNCL -12 // A . . / C ? . . . L loner -> sync (*)
#define LOCK_GSYNCM -13 // A . . / . R . . . L
#define LOCK_LOCK_ 2 // AR R W / C . . . . . . . / C . . . . . truncate()
#define LOCK_GLONERR -10 // A . . / . R . . . L
#define LOCK_GLONERM -11 // A . . / . R W A . L
+// (*) FIXME: how to let old loner keep R, somehow, during GSYNCL
// 4 stable
// +9 transition
++p) {
CInode *in = get_inode(p->first);
if (in) {
- dout(10) << " have (strong) " << *in << endl;
int nonce = in->add_replica(from);
if (p->second.caps_wanted)
in->mds_caps_wanted[from] = p->second.caps_wanted;
in->linklock.remove_gather(from); // just in case
in->dirfragtreelock.remove_gather(from); // just in case
in->filelock.remove_gather(from); // just in case
- dout(10) << " have (weak) " << *in << endl;
- if (ack)
+ dout(10) << " have (strong) " << *in << endl;
+ if (ack) {
ack->add_strong_inode(in->ino(),
nonce,
0,
in->linklock.get_replica_state(),
in->dirfragtreelock.get_replica_state(),
in->filelock.get_replica_state());
+ } else {
+ // note strong replica filelock state requests
+ //if (p->second.filelock & CAP_FILE_RD)
+ //filelock_replica_readers.insert(in);
+ }
} else {
dout(10) << " missing " << p->first << endl;
if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
if (!in) continue; // already missing, from strong_inodes list above.
dout(10) << " inode xlock by " << p->reqid << " on " << *in << endl;
- assert(0);
- //SimpleLock *lock = in->get_lock(p->locktype);
- // .. FIXME IMPLEMENT ME ..
-
-
+
+ // create slave mdrequest
+ MDRequest *mdr = request_start(p->reqid);
+
+ // auth_pin
+ mdr->auth_pin(in);
+
+ // xlock
+ SimpleLock *lock = in->get_lock(p->locktype);
+ lock->set_state(LOCK_LOCK);
+ lock->get_xlock(mdr);
+ mdr->xlocks.insert(lock);
+ mdr->locks.insert(lock);
}
for (map<dirfrag_t, map<string, metareqid_t> >::iterator p = m->xlocked_dentries.begin();
p != m->xlocked_dentries.end();
CDentry *dn = dir->lookup(q->first);
if (!dn) continue; // already missing, from above.
dout(10) << " dn xlock by " << q->second << " on " << *dn << endl;
-
- // FIXME IMPLEMENT ME
- assert(0);
+
+ // create slave mdrequest
+ MDRequest *mdr = request_start(q->second);
+
+ // auth_pin
+ mdr->auth_pin(dn->dir);
+
+ // xlock
+ dn->lock.set_state(LOCK_LOCK);
+ dn->lock.get_xlock(mdr);
+ mdr->xlocks.insert(&dn->lock);
+ mdr->locks.insert(&dn->lock);
}
}
{
dout(7) << "send_cache_rejoin_acks to " << want_rejoin_ack << endl;
+ /* nope, not necessary, we adjust lock state gradually.
+ after we've processed all rejoins, lockstate is legal.
+ we just have to do a final _eval-ish thing at the end...
+
+ // calculate proper filelock states
+ for (set<CInode*>::iterator p = filelock_replica_readers.begin();
+ p != filelock_replica_readers.end();
+ ++p) {
+ dout(10) << "replica(s) have RD caps on " << *p->first << endl;
+
+ for (set<int>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ if (*q == LOCK_
+ }
+ }
+ */
+
+ // send acks
+ map<int,MMDSCacheRejoin*> ack;
+
+ for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
+ p != subtrees.end();
+ p++) {
+ CDir *dir = p->first;
+ if (!dir->is_auth()) continue;
+ dout(10) << "subtree " << *dir << endl;
+
+ // auth items in this subtree
+ list<CDir*> dq;
+ dq.push_back(dir);
+
+ while (!dq.empty()) {
+ CDir *dir = dq.front();
+ dq.pop_front();
+
+ // dir
+ for (map<int,int>::iterator r = dir->replicas_begin();
+ r != dir->replicas_end();
+ ++r) {
+ if (!ack[r->first]) ack[r->first] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+ ack[r->first]->add_strong_dirfrag(dir->dirfrag(), r->second);
+ }
+
+ for (map<string,CDentry*>::iterator q = dir->items.begin();
+ q != dir->items.end();
+ ++q) {
+ CDentry *dn = q->second;
+
+ // dentry
+ for (map<int,int>::iterator r = dn->replicas_begin();
+ r != dn->replicas_end();
+ ++r) {
+ //if (!ack[r->first]) ack[r->first] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+ ack[r->first]->add_strong_dentry(dir->dirfrag(), dn->name, r->second,
+ dn->lock.get_replica_state());
+ }
+
+ if (!dn->is_primary()) continue;
+
+ // inode
+ CInode *in = dn->inode;
+
+ // twiddle filelock at all?
+ // hmm.
+
+ for (map<int,int>::iterator r = in->replicas_begin();
+ r != in->replicas_end();
+ ++r) {
+ //if (!ack[r->first]) ack[r->first] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
+ ack[r->first]->add_strong_inode(in->ino(), r->second, 0,
+ in->authlock.get_replica_state(),
+ in->linklock.get_replica_state(),
+ in->dirfragtreelock.get_replica_state(),
+ in->filelock.get_replica_state());
+ }
+
+ // subdirs in this subtree?
+ in->get_nested_dirfrags(dq);
+ }
+ }
+ }
+
+ // send acks
+ for (map<int,MMDSCacheRejoin*>::iterator p = ack.begin();
+ p != ack.end();
+ ++p)
+ mds->send_message_mds(p->second, p->first, MDS_PORT_CACHE);
+
}
}
if (lru.lru_get_size() == 0) {
- // root, stray, etc.
- for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
- p != inode_map.end();
- ++p) {
+ // root, stray, etc.?
+ hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
+ while (p != inode_map.end()) {
+ hash_map<inodeno_t,CInode*>::iterator next = p;
+ ++next;
CInode *in = p->second;
-
- list<CDir*> ls;
- in->get_dirfrags(ls);
- for (list<CDir*>::iterator p = ls.begin();
- p != ls.end();
- ++p) {
- assert((*p)->get_num_ref() == 0);
- remove_subtree((*p));
- in->close_dirfrag((*p)->dirfrag().frag);
+ if (!in->is_auth()) {
+ list<CDir*> ls;
+ in->get_dirfrags(ls);
+ for (list<CDir*>::iterator p = ls.begin();
+ p != ls.end();
+ ++p) {
+ assert((*p)->get_num_ref() == 0);
+ remove_subtree((*p));
+ in->close_dirfrag((*p)->dirfrag().frag);
+ }
+ assert(in->get_num_ref() == 0);
+ remove_inode(in);
}
- assert(in->get_num_ref() == 0);
- remove_inode(in);
+ p = next;
}
}
// print
dout(dbl) << indent << "|_" << pad << s << " " << auth << *dir << endl;
+ if (dir->ino() == MDS_INO_ROOT)
+ assert(dir->inode == root);
+ if (dir->ino() == MDS_INO_STRAY(mds->get_nodeid()))
+ assert(dir->inode == stray);
+
// nested items?
if (!subtrees[dir].empty()) {
// more at my level?
MClientRequest *creq = (MClientRequest*)req;
creq->inc_num_fwd(); // inc forward counter
- // tell the client
+ // tell the client where it should go
messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd()),
creq->get_client_inst());
- if (!creq->is_idempotent())
- return; // don't forward if non-idempotent
+ if (!creq->is_idempotent()) {
+ delete req;
+ return; // don't actually forward if non-idempotent
+ }
}
// forward
anchorclient->finish_recovery();
mdcache->start_recovered_purges();
+
+ // tell connected clients
+ bcast_mds_map();
}
dout(1) << "now active" << endl;
}
// inst set changed?
+ /*
if (state >= MDSMap::STATE_ACTIVE && // only if i'm active+. otherwise they'll get map during reconnect.
mdsmap->get_same_inst_since() > last_client_mdsmap_bcast) {
bcast_mds_map();
}
+ */
delete m;
}
<< " on " << *in << endl;
Capability cap(p->second.wanted, p->second.seq);
in->add_client_cap(from, cap);
+ in->inode.size = MAX(in->inode.size, p->second.size);
+ in->inode.mtime = MAX(in->inode.mtime, p->second.mtime);
+ in->inode.atime = MAX(in->inode.atime, p->second.atime);
reconnected_open_files.insert(in);
}
void Server::client_reconnect_failure(int from)
{
+ dout(5) << "client_reconnect_failure on client" << from << endl;
client_reconnect_gather.erase(from);
if (client_reconnect_gather.empty())
reconnect_finish();
for (set<CInode*>::iterator p = reconnected_open_files.begin();
p != reconnected_open_files.end();
++p) {
-
+ CInode *in = *p;
+ int issued = in->get_caps_issued();
+ if (in->is_auth()) {
+ // wr?
+ if (issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER)) {
+ if (issued & (CAP_FILE_RDCACHE|CAP_FILE_WRBUFFER)) {
+ in->filelock.set_state(LOCK_LONER);
+ } else {
+ in->filelock.set_state(LOCK_MIXED);
+ }
+ }
+ } else {
+ // note that client should perform stale/reap cleanup during reconnect.
+ assert(issued & (CAP_FILE_WR|CAP_FILE_WRBUFFER) == 0); // ????
+ if (in->filelock.is_xlocked())
+ in->filelock.set_state(LOCK_LOCK);
+ else
+ in->filelock.set_state(LOCK_SYNC); // might have been lock, previously
+ }
+ dout(10) << " issued " << cap_string(issued)
+ << " chose " << in->filelock
+ << " on " << *in << endl;
}
reconnected_open_files.clear(); // clean up
void Server::handle_client_request(MClientRequest *req)
{
dout(4) << "handle_client_request " << *req << endl;
+ int client = req->get_client();
if (!mds->is_active()) {
dout(5) << " not active, discarding client request." << endl;
}
// active session?
- if (!mds->clientmap.have_session(req->get_source().num())) {
- dout(1) << "no session for " << req->get_source() << ", dropping" << endl;
+ if (!mds->clientmap.have_session(client)) {
+ dout(1) << "no session for client" << client << ", dropping" << endl;
delete req;
return;
}
if (mds->clientmap.have_completed_request(req->get_reqid())) {
dout(5) << "already completed " << req->get_reqid() << endl;
mds->messenger->send_message(new MClientReply(req, 0),
- req->get_source_inst());
+ req->get_client_inst());
delete req;
return;
}
}
// trim completed_request list
if (req->get_oldest_client_tid() > 0)
- mds->clientmap.trim_completed_requests(req->get_source().num(),
+ mds->clientmap.trim_completed_requests(client,
req->get_oldest_client_tid());
CInode *in = mdcache->create_inode();
in->inode.uid = req->get_caller_uid();
in->inode.gid = req->get_caller_gid();
- in->inode.ctime = in->inode.mtime = in->inode.atime = g_clock.now(); // now
+ in->inode.ctime = in->inode.mtime = in->inode.atime = g_clock.real_now(); // now
dout(10) << "prepare_new_inode " << *in << endl;
// bump modify pop
inode_t *pi = le->metablob.add_dentry(cur->parent, true);
pi->mtime = mtime;
pi->atime = mtime;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
pi->version = pdv;
mdlog->submit_entry(le);
inode_t *pi = le->metablob.add_dentry(cur->parent, true);
pi->mode = mode;
pi->version = pdv;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
mdlog->submit_entry(le);
mdlog->wait_for_sync(fin);
if (uid >= 0) pi->uid = uid;
if (gid >= 0) pi->gid = gid;
pi->version = pdv;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
mdlog->submit_entry(le);
mdlog->wait_for_sync(fin);
// update journaled target inode
pi->nlink++;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
pi->version = tpdv;
// finisher
// update journaled target inode
pi->nlink--;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
pi->version = ipv;
// finisher
if (pi) {
// update journaled target inode
pi->nlink--;
- pi->ctime = g_clock.now();
+ pi->ctime = g_clock.real_now();
pi->version = ipv;
}
// prepare
version_t pdv = cur->pre_dirty();
- utime_t ctime = g_clock.now();
+ utime_t ctime = g_clock.real_now();
Context *fin = new C_MDS_truncate_logged(mds, mdr, cur,
pdv, req->args.truncate.length, ctime);
// prepare
version_t pdv = cur->pre_dirty();
- utime_t ctime = g_clock.now();
+ utime_t ctime = g_clock.real_now();
Context *fin = new C_MDS_open_truncate_logged(mds, mdr, cur,
pdv, ctime);
#define MDS_INO_STRAY_OFFSET 0x300
#define MDS_INO_BASE 0x1000
-#define MDS_INO_STRAY(x) (MDS_INO_STRAY_OFFSET+(x))
+#define MDS_INO_STRAY(x) (MDS_INO_STRAY_OFFSET+((unsigned)x))
#define MDS_INO_IS_STRAY(i) ((i) >= MDS_INO_STRAY_OFFSET && (i) < MDS_INO_STRAY_OFFSET+MAX_MDS)
#define MDS_TRAVERSE_FORWARD 1
__int32_t caps;
__int32_t seq;
__int32_t wanted;
+ off_t size;
+ utime_t mtime, atime;
inode_caps_t() {}
- inode_caps_t(int c, int s, int w) : caps(c), seq(s), wanted(w) {}
+ inode_caps_t(int c, int s, int w) :
+ caps(c), seq(s), wanted(w), size(0) {}
+ inode_caps_t(int c, int s, int w, off_t sz, utime_t mt, utime_t at) :
+ caps(c), seq(s), wanted(w), size(sz), mtime(mt), atime(at) {}
};
map<inodeno_t, inode_caps_t> inode_caps;
}
void add_inode_caps(inodeno_t ino,
- int havecaps,
- long seq,
- int wanted) {
- inode_caps[ino] = inode_caps_t(havecaps, seq, wanted);
+ int havecaps, long seq, int wanted,
+ off_t sz, utime_t mt, utime_t at) {
+ inode_caps[ino] = inode_caps_t(havecaps, seq, wanted, sz, mt, at);
}
void add_inode_path(inodeno_t ino, const string& path) {
inode_path[ino] = path;
return open_file_mode_is_readonly();
return (st.op < 1000);
}
+ bool auth_is_best() {
+ if (!is_idempotent()) return true;
+ if (st.op == MDS_OP_READDIR) return true;
+ return false;
+ }
bool follow_trailing_symlink() {
switch (st.op) {
case MDS_OP_LSTAT:
// -- types --
struct inode_strong {
- int caps_wanted;
- int nonce;
- int authlock;
- int linklock;
- int dirfragtreelock;
- int filelock;
+ __int32_t caps_wanted;
+ __int32_t nonce;
+ __int32_t authlock;
+ __int32_t linklock;
+ __int32_t dirfragtreelock;
+ __int32_t filelock;
inode_strong() {}
inode_strong(int n, int cw=0, int a=0, int l=0, int dft=0, int f=0) :
caps_wanted(cw),
};
struct inode_xlock {
inodeno_t ino;
- int locktype;
+ __int32_t locktype;
metareqid_t reqid;
inode_xlock() {}
inode_xlock(inodeno_t i, int lt, const metareqid_t& ri) :
};
struct dirfrag_strong {
- int nonce;
+ __int32_t nonce;
dirfrag_strong() {}
dirfrag_strong(int n) : nonce(n) {}
};
struct dn_strong {
- int nonce;
- int lock;
+ __int32_t nonce;
+ __int32_t lock;
dn_strong() {}
dn_strong(int n, int l) : nonce(n), lock(l) {}
};
// -- data --
- int op;
+ __int32_t op;
set<inodeno_t> weak_inodes;
map<inodeno_t, inode_strong> strong_inodes;
// -- encoding --
void encode_payload() {
+ ::_encode(op, payload);
::_encode(weak_inodes, payload);
::_encode(strong_inodes, payload);
::_encode(xlocked_inodes, payload);
::_encode(weak_dirfrags, payload);
- //::_encode(strong_dirfrags, payload);
+ ::_encode(strong_dirfrags, payload);
::_encode(weak_dentries, payload);
::_encode(strong_dentries, payload);
::_encode(xlocked_dentries, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(op, payload, off);
::_decode(weak_inodes, payload, off);
::_decode(strong_inodes, payload, off);
::_decode(xlocked_inodes, payload, off);
::_decode(weak_dirfrags, payload, off);
- //::_decode(strong_dirfrags, payload, off);
+ ::_decode(strong_dirfrags, payload, off);
::_decode(weak_dentries, payload, off);
::_decode(strong_dentries, payload, off);
::_decode(xlocked_dentries, payload, off);
#include "msg/Message.h"
#include "mds/MDSMap.h"
-
class MMDSMap : public Message {
public:
/*
mm->encode(encoded);
}
+ char *get_type_name() { return "mdsmap"; }
+ void print(ostream& out) {
+ out << "mdsmap(e " << epoch << ")";
+ }
// marshalling
- virtual void decode_payload() {
+ void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(epoch), (char*)&epoch);
- off += sizeof(epoch);
+ ::_decode(epoch, payload, off);
::_decode(encoded, payload, off);
}
- virtual void encode_payload() {
- payload.append((char*)&epoch, sizeof(epoch));
+ void encode_payload() {
+ ::_encode(epoch, payload);
::_encode(encoded, payload);
}
-
- virtual char *get_type_name() { return "mdsmap"; }
};
#endif
if (client_map.count(from)) {
client_map.erase(from);
- if (client_map.empty()) {
+ if (client_map.empty() &&
+ g_conf.mds_shutdown_on_last_unmount) {
dout(1) << "last client unmounted" << endl;
- if (g_conf.mds_shutdown_on_last_unmount)
- mon->do_stop();
+ mon->do_stop();
}
}
return -1;
}
if (peer_addr != paddr) {
- derr(0) << "pipe(" << peer_addr << ' ' << this << ").connect peer is " << paddr << ", wtf" << endl;
- assert(0);
+ dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect peer identifies itself as " << paddr << ", wrong guy!" << endl;
+ ::close(sd);
+ sd = 0;
return -1;
}