for (set<int>::iterator it = cached_by.begin();
it != cached_by.end();
it++) {
+ // mds
int i = *it;
r.append( (char*)&i, sizeof(int) );
+ // nonce
+ int j = cached_by_nonce(i);
+ r.append( (char*)&j, sizeof(int) );
}
return r;
for (set<int>::iterator it = cached_by.begin();
it != cached_by.end();
it++) {
- int j = *it;
- r.append((char*)&j, sizeof(j));
+ int mds = *it;
+ r.append((char*)&mds, sizeof(mds));
+ int nonce = cached_by_nonce(mds);
+ r.append((char*)&nonce, sizeof(nonce));
}
// dir_auth
off += sizeof(inode_t);
// cached_by --- although really this is rep_by,
- // since we're non-authoritative
+ // since we're non-authoritative (?????)
int n;
r.copy(off, sizeof(int), (char*)&n);
off += sizeof(int);
cached_by.clear();
for (int i=0; i<n; i++) {
- int j;
- r.copy(off, sizeof(int), (char*)&j);
- cached_by.insert(j);
+ // mds
+ int mds;
+ r.copy(off, sizeof(int), (char*)&mds);
off += sizeof(int);
+ int nonce;
+ r.copy(off, sizeof(int), (char*)&nonce);
+ off += sizeof(int);
+ cached_by_add(mds, nonce);
}
// dir_auth
#define CINODE_PIN_DIRWAIT 10032 // "
#define CINODE_PIN_DIRWAITDN 10033 // "
+#define CINODE_PIN_CACHEPROXY 10050
+
#define CINODE_PIN_IAUTHPIN 20000
#define CINODE_PIN_DAUTHPIN 30000
#define CINODE_PIN_DIRTY 50000 // must flush
#define CINODE_STATE_UNSAFE 4 // not logged yet
#define CINODE_STATE_DANGLING 8 // delete me when i expire; i have no dentry
#define CINODE_STATE_UNLINKING 16
+#define CINODE_STATE_CACHEPROXY 32
+// misc
+#define CINODE_EXPORT_NONCE 1 // nonce given to replicas created by export
+#define CINODE_ROOT_NONCE 1 // nonce given to replicas of root
+#define CINODE_HASHREPICA_NONCE 1 // hashed inodes that are duped
class Context;
class CDentry;
// dcache lru
CInode *lru_next, *lru_prev;
- // distributed caching
+ // -- distributed caching
bool auth; // safety check; true if this is authoritative.
set<int> cached_by; // mds's that cache me.
/* NOTE: on replicas, this doubles as replicated_by, but the
cached_by_* access methods below should NOT be used in those
cases, as the semantics are different! */
- set<int> soft_tokens; // replicas who can to soft update the inode
+ /* NOTE: if replica is_cacheproxy(), cached_by is still defined! */
+ map<int,int> cached_by_nonce; // nonce issued to each replica
+ int replica_nonce; // defined on replica
+ set<int> soft_tokens; // replicas who can soft update the inode
/* ..and thus may have a newer mtime, size, etc.! .. w/o sync
for authority: set of nodes; self is assumed, but not included
for replica: undefined */
- // -- cached_by -- to be used ONLY when we're authoritative!
- bool is_cached_by_anyone() {
- return !cached_by.empty();
- }
- bool is_cached_by(int mds) {
- return cached_by.count(mds);
- }
- void cached_by_add(int mds) {
- if (is_cached_by(mds)) return;
+ // -- cached_by -- to be used ONLY when we're authoritative or cacheproxy
+ bool is_cacheproxy() { return state & CINODE_STATE_CACHEPROXY; }
+ bool is_cached_by_anyone() { return !cached_by.empty(); }
+ bool is_cached_by(int mds) { return cached_by.count(mds); }
+ // cached_by_add returns a nonce
+ int cached_by_add(int mds) {
+ if (is_cached_by(mds)) { // already had it?
+ // new nonce (+1)
+ map<int,int>::iterator it = cached_by_nonce.find(mds);
+ cached_by_nonce.insert(pair<int,int>(mds,it->second + 1));
+ return it->second + 1;
+ }
if (cached_by.empty())
get(CINODE_PIN_CACHED);
cached_by.insert(mds);
+ cached_by_nonce.insert(pair<int,int>(mds,1)); // first! serial of 1.
+ return 1; // default nonce
+ }
+ void cached_by_add(int mds, int nonce) {
+ if (cached_by.empty())
+ get(CINODE_PIN_CACHED);
+ cached_by.insert(mds);
+ cached_by_nonce.insert(pair<int,int>(mds,nonce));
+ }
+ int cached_by_nonce(int mds) {
+ map<int,int>::iterator it = cached_by_nonce.find(mds);
+ return it->second;
}
void cached_by_remove(int mds) {
if (!is_cached_by(mds)) return;
if (cached_by.size())
put(CINODE_PIN_CACHED);
cached_by.clear();
+ cached_by_nonce.clear();
}
- set<int>::iterator cached_by_begin() {
- return cached_by.begin();
- }
- set<int>::iterator cached_by_end() {
- return cached_by.end();
- }
- set<int>& get_cached_by() {
- return cached_by;
- }
+ set<int>::iterator cached_by_begin() { return cached_by.begin(); }
+ set<int>::iterator cached_by_end() { return cached_by.end(); }
+ set<int>& get_cached_by() { return cached_by; }
// -- waiting --
}
else {
// use generic range
- free.map_insert(1000000000000 * (mds->get_nodeid()+1),
- 1000000000000 * (mds->get_nodeid()+2) - 1);
+ free.map_insert((long long)1000000000000 * (mds->get_nodeid()+1),
+ (long long)1000000000000 * (mds->get_nodeid()+2) - 1);
}
}
root->dir->dir_rep_by = trace[0].dir_rep_by;
root->state_set(CINODE_STATE_ROOT);
root->set_auth(false);
+ root->replica_nonce = trace[i].replica_nonce;
+ assert(root->replica_nonce == CINODE_ROOT_NONCE);
if (trace[0].is_syncbyauth) root->dist_state |= CINODE_DIST_SYNCBYAUTH;
if (trace[0].is_softasync) root->dist_state |= CINODE_DIST_SOFTASYNC;
in->dir_auth = trace[i].dir_auth;
in->auth = false;
+ in->replica_nonce = trace[i].replica_nonce;
if (in->is_dir()) {
in->dir = new CDir(in, whoami); // can't be ours (an import) or it'd be in our cache.
// just root?
if (dis->just_root()) {
CInode *root = get_root();
- dis->add_bit( root, 0 );
-
- root->cached_by_add(dis->get_asker());
+ dis->add_bit( root, 0, CINODE_ROOT_NONCE );
+ root->cached_by_add(dis->get_asker(), CINODE_ROOT_NONCE);
}
// add bits
dout(7) << "discover adding bit " << *next << " for mds" << dis->get_asker() << endl;
+ // remember who is caching this!
+ int nonce = next->cached_by_add( dis->get_asker() );
+
// add it
- dis->add_bit( next, whoami );
+ dis->add_bit( next, whoami, nonce );
have_added = true;
- // remember who is caching this!
- next->cached_by_add( dis->get_asker() );
-
cur = next; // continue!
} else {
// don't have dentry.
dout(7) << "handle_inode_get_replica from " << m->get_source() << " for " << *in << endl;
// add to cached_by
- in->cached_by_add(m->get_source());
+ int nonce = in->cached_by_add(m->get_source());
// add bit
- //****
+ //**** hmm do we put any data in the reply? not for the limited instances
+ // when is this used? FIXME?
// reply
- mds->messenger->send_message(new MInodeGetReplicaAck(in->ino()),
+ mds->messenger->send_message(new MInodeGetReplicaAck(in->ino(), nonce),
MSG_ADDR_MDS(m->get_source()), MDS_PORT_CACHE, MDS_PORT_CACHE);
// done.
CInode *in = get_inode(m->get_ino());
assert(in);
- dout(7) << "handle_inode_get_replica_ack from " << m->get_source() << " on " << *in << endl;
+ dout(7) << "handle_inode_get_replica_ack from " << m->get_source() << " on " << *in << " nonce " << m->get_nonce() << endl;
+
+ in->replica_nonce = m->get_nonce();
// waiters
in->finish_waiting(CINODE_WAIT_GETREPLICA);
if (!in) {
dout(7) << "got inode_expire on " << m->get_ino() << " from " << from << ", don't have it" << endl;
- goto forward;
+ assert(in); // I BETTER! i shoudl be authority, or cacheproxy.
}
- auth = in->authority(mds->get_cluster());
- if (auth != mds->get_nodeid()) {
- dout(7) << "got inode_expire on " << *in << ", not mine" << endl;
- goto forward;
+ if (!in->is_auth()) {
+ auth = in->authority(mds->get_cluster());
+ dout(7) << "got inode_expire on " << *in << ", auth is " << auth << endl;
+
+ assert(in->is_cacheproxy());
+
+ mds->messenger->send_message(m,
+ MSG_ADDR_MDS(next), MDS_PORT_CACHE, MDS_PORT_CACHE);
+ mds->logger->inc("iupfw");
+ return;
}
// remove from our cached_by
if (in->is_auth()) {
// it's mine, easy enough: new auth will replicate my inode (i included it above)
if (!in->is_cached_by(newauth))
- in->cached_by_add( newauth );
+ in->cached_by_add( newauth, CINODE_HASHREPLICA_NONCE );
}
else {
// i'm a replica. the recipient had better discover this dir.
// cached_by
in->cached_by.clear();
for (int nby = istate->ncached_by; nby>0; nby--) {
- if (*((int*)p) != mds->get_nodeid())
- in->cached_by_add( *((int*)p) );
+ int node = *((int*)p);
p += sizeof(int);
+ int nonce = *((int*)p);
+ p += sizeof(int);
+
+ if (node != mds->get_nodeid())
+ in->cached_by_add( node, nonce );
}
- in->cached_by_add(from); // old auth still has it too.
+ in->cached_by_add(from, CINODE_EXPORT_NONCE); // old auth still has it too.
// dist state: new authority inherits softasync state only; sync/lock are dropped for import/export
in->dist_state = 0;
class MUnhashDirAck;
-// DCache
+// MDCache
typedef hash_map<inodeno_t, CInode*> inode_map_t;
bool is_syncbyauth;
bool is_softasync;
bool is_lockbyauth;
+ int replica_nonce;
// dir stuff
int dir_rep;
r.append((char*)&is_syncbyauth, sizeof(bool));
r.append((char*)&is_softasync, sizeof(bool));
r.append((char*)&is_lockbyauth, sizeof(bool));
+ r.append((char*)&replica_nonce, sizeof(replica_nonce));
r.append((char*)&dir_rep, sizeof(int));
n = dir_rep_by.size();
off += sizeof(bool);
s.copy(off, sizeof(bool), (char*)&is_lockbyauth);
off += sizeof(bool);
+ s.copy(off, sizeof(int), (char*)&replica_nonce);
+ off += sizeof(int);
s.copy(off, sizeof(int), (char*)&dir_rep);
off += sizeof(int);
// ---
- void add_bit(CInode *in, int auth) {
+ void add_bit(CInode *in, int auth, int nonce) {
MDiscoverRec_t bit;
bit.inode = in->inode;
bit.cached_by = in->get_cached_by();
bit.cached_by.insert( auth ); // obviously the authority has it too
bit.dir_auth = in->dir_auth;
+ bit.replica_nonce = nonce;
// send sync/lock state
bit.is_syncbyauth = in->is_syncbyme() || in->is_presync();
class MInodeGetReplicaAck : public Message {
inodeno_t ino;
+ int nonce;
//crope state;
public:
inodeno_t get_ino() { return ino; }
+ int get_nonce() { return nonce; }
//crope& get_state() { return state; }
MInodeGetReplicaAck() {}
- MInodeGetReplicaAck(inodeno_t ino
- //, crope& state
- ) :
+ MInodeGetReplicaAck(inodeno_t ino, int nonce ) :
Message(MSG_MDS_INODEGETREPLICA) {
this->ino = ino;
+ this->nonce = nonce;
//this->state = state;
}
virtual char *get_type_name() { return "GInoA";}
virtual int decode_payload(crope s) {
s.copy(0, sizeof(ino), (char*)&ino);
+ s.copy(sizeof(ino), sizeof(int), (char*)&nonce);
//state = s.substr(sizeof(ino), s.length() - sizeof(ino));
}
virtual crope get_payload() {
crope s;
- s.append((char*)&ino,sizeof(ino));
+ s.append((char*)&ino, sizeof(ino));
+ s.append((char*)&nonce, sizeof(nonce));
//s.append(state);
return s;
}