- handled by individual MDSCacheObject _finish()'s
- properly recover lock state on rejoin...
+ - recovering mds rejoins replicas it pulled out of its journal
+ - replicas will tell it when they hold an xlock
+ - surviving mds rejoins replicas from a recovering mds
+ - will tell auth if it holds an xlock
+- recovering open files
+ - need to journal EOpen
+ - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
+ - need to batch EOpen events when rejournaling to avoid looping
+ - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
+ - path+cap window will require fetching and metadata from disk before doing the rejoin
- clientmap request history
- hmm, so we need completion codes, but only after recovery.
#define __THREAD_H
#include <pthread.h>
+#include <errno.h>
class Thread {
private:
Thread() : thread_id(0) {}
virtual ~Thread() {}
- pthread_t &get_thread_id() { return thread_id; }
- bool is_started() { return thread_id != 0; }
-
+ protected:
virtual void *entry() = 0;
private:
}
public:
+ pthread_t &get_thread_id() { return thread_id; }
+ bool is_started() { return thread_id != 0; }
+ bool am_self() { return (pthread_self() == thread_id); }
+
int create() {
return pthread_create( &thread_id, NULL, _entry_func, (void*)this );
}
-
- bool am_self() {
- return (pthread_self() == thread_id);
- }
-
int join(void **prval = 0) {
- assert(thread_id);
- //if (thread_id == 0) return -1; // never started.
-
+ if (thread_id == 0) {
+ cerr << "WARNING: join on thread that was never started" << endl;
+ //assert(0);
+ return -EINVAL; // never started.
+ }
+
int status = pthread_join(thread_id, prval);
- if (status == 0)
- thread_id = 0;
- else {
- cout << "join status = " << status << endl;
- assert(0);
+ if (status != 0) {
+ switch (status) {
+ case -EINVAL:
+ cerr << "thread " << thread_id << " join status = EINVAL" << endl;
+ break;
+ case -ESRCH:
+ cerr << "thread " << thread_id << " join status = ESRCH" << endl;
+ assert(0);
+ break;
+ case -EDEADLK:
+ cerr << "thread " << thread_id << " join status = EDEADLK" << endl;
+ break;
+ default:
+ cerr << "thread " << thread_id << " join status = " << status << endl;
+ }
+ assert(0); // none of these should happen.
}
+ thread_id = 0;
return status;
}
+
};
#endif
map< utime_t, set<Context*> > scheduled; // time -> (context ...)
hash_map< Context*, utime_t > event_times; // event -> time
- // get time of the next event
- //Context* get_next_scheduled(utime_t& when);
-
bool get_next_due(utime_t &when);
void register_timer(); // make sure i get a callback
void cancel_timer(); // make sure i get a callback
- //pthread_t thread_id;
bool thread_stop;
Mutex lock;
bool timed_sleep;
mds_decay_halflife: 30,
mds_beacon_interval: 5.0,
- mds_beacon_grace: 10.0,
+ mds_beacon_grace: 15.0,
mds_log: true,
mds_log_max_len: MDS_CACHE_SIZE / 3,
// ====== CInode =======
-CInode::CInode(MDCache *c, bool auth) :
- authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET),
- linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET),
- dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET),
- filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET)
-{
- mdcache = c;
-
- //num_parents = 0;
- parent = NULL;
-
- auth_pins = 0;
- nested_auth_pins = 0;
- //num_request_pins = 0;
-
- state = 0;
-
- if (auth) state_set(STATE_AUTH);
-}
-
-CInode::~CInode() {
- for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
- p != dirfrags.end();
- ++p)
- delete p->second;
-}
-
// dirfrags
void CInode::get_dirfrags(list<CDir*>& ls)
{
+ // all dirfrags
for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
p != dirfrags.end();
++p)
ls.push_back(p->second);
}
void CInode::get_nested_dirfrags(list<CDir*>& ls)
-{ // same subtree
+{
+ // dirfrags in same subtree
for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
p != dirfrags.end();
++p)
ls.push_back(p->second);
}
void CInode::get_subtree_dirfrags(list<CDir*>& ls)
-{ // new subtree
+{
+ // dirfrags that are roots of new subtrees
for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
p != dirfrags.end();
++p)
//static const int PIN_REPLICATED = 1;
static const int PIN_DIR = 2;
static const int PIN_PROXY = 5; // can't expire yet
- static const int PIN_CAPS = 7; // local fh's
+ static const int PIN_CAPS = 7; // client caps
static const int PIN_AUTHPIN = 8;
static const int PIN_IMPORTING = -9; // importing
static const int PIN_RENAMESRC = 11; // pinned on dest for foreign rename
string symlink; // symlink dest, if symlink
fragtree_t dirfragtree; // dir frag tree, if any
- frag_t pick_dirfrag(const string &dn);
+ off_t last_open_journaled; // log offset for the last journaled EOpen
// -- cache infrastructure --
map<frag_t,CDir*> dirfrags; // cached dir fragments
+ frag_t pick_dirfrag(const string &dn);
CDir* get_dirfrag(frag_t fg) {
if (dirfrags.count(fg))
return dirfrags[fg];
public:
// ---------------------------
- CInode(MDCache *c, bool auth=true);
- ~CInode();
+ CInode(MDCache *c, bool auth=true) :
+ mdcache(c),
+ last_open_journaled(0),
+ parent(0),
+ replica_caps_wanted(0),
+ auth_pins(0), nested_auth_pins(0),
+ authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET),
+ linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET),
+ dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET),
+ filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET)
+ {
+ state = 0;
+ if (auth) state_set(STATE_AUTH);
+ };
+ ~CInode() {
+ close_dirfrags();
+ }
// -- accessors --
// C = cache reads, R = read, W = write, A = append, B = buffer writes, L = lazyio
// -----auth-------- ---replica-------
-#define LOCK_SYNC_ 0 // AR R . / C R . . . L R . / C R . . . L stat()
-#define LOCK_GSYNCL -11 // A . . / C ? . . . L loner -> sync (*) FIXME: let old loner keep R, somehow...
-#define LOCK_GSYNCM -12 // A . . / . R . . . L
+#define LOCK_SYNC_ 1 // AR R . / C R . . . L R . / C R . . . L stat()
+#define LOCK_GSYNCL -12 // A . . / C ? . . . L loner -> sync (*) FIXME: let old loner keep R, somehow...
+#define LOCK_GSYNCM -13 // A . . / . R . . . L
-#define LOCK_LOCK_ 1 // AR R W / C . . . . . . . / C . . . . . truncate()
-#define LOCK_GLOCKR_ -2 // AR R . / C . . . . . . . / C . . . . .
-#define LOCK_GLOCKL -3 // A . . / . . . . . . loner -> lock
-#define LOCK_GLOCKM -4 // A . . / . . . . . .
+#define LOCK_LOCK_ 2 // AR R W / C . . . . . . . / C . . . . . truncate()
+#define LOCK_GLOCKR_ -3 // AR R . / C . . . . . . . / C . . . . .
+#define LOCK_GLOCKL -4 // A . . / . . . . . . loner -> lock
+#define LOCK_GLOCKM -5 // A . . / . . . . . .
-#define LOCK_MIXED 5 // AR . . / . R W A . L . . / . R . . . L
-#define LOCK_GMIXEDR -6 // AR R . / . R . . . L . . / . R . . . L
-#define LOCK_GMIXEDL -7 // A . . / . . . . . L loner -> mixed
+#define LOCK_MIXED 6 // AR . . / . R W A . L . . / . R . . . L
+#define LOCK_GMIXEDR -7 // AR R . / . R . . . L . . / . R . . . L
+#define LOCK_GMIXEDL -8 // A . . / . . . . . L loner -> mixed
-#define LOCK_LONER 8 // A . . / C R W A B L (lock)
-#define LOCK_GLONERR -9 // A . . / . R . . . L
-#define LOCK_GLONERM -10 // A . . / . R W A . L
+#define LOCK_LONER 9 // A . . / C R W A B L (lock)
+#define LOCK_GLONERR -10 // A . . / . R . . . L
+#define LOCK_GLONERM -11 // A . . / . R W A . L
// 4 stable
inline ostream& operator<<(ostream& out, FileLock& l)
{
- out << "(" << get_lock_type_name(l.get_type())
- << " " << get_filelock_state_name(l.get_state());
+ out << "(";
+ //out << get_lock_type_name(l.get_type()) << " ";
+ out << get_filelock_state_name(l.get_state());
if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set();
if (l.get_num_rdlock())
out << " r=" << l.get_num_rdlock();
#include "events/EString.h"
#include "events/EUpdate.h"
-#include "events/EUnlink.h"
#include "msg/Messenger.h"
// events i know of
#include "events/EString.h"
-#include "events/EImportMap.h"
-#include "events/EMetaBlob.h"
-#include "events/EUpdate.h"
-#include "events/ESlaveUpdate.h"
-#include "events/EUnlink.h"
+
#include "events/EMount.h"
#include "events/EClientMap.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
-#include "events/EAlloc.h"
-#include "events/EPurgeFinish.h"
+#include "events/EImportMap.h"
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
+#include "events/EOpen.h"
+
+#include "events/EAlloc.h"
+#include "events/EPurgeFinish.h"
+
+#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
+
+
LogEvent *LogEvent::decode(bufferlist& bl)
{
// parse type, length
// create event
LogEvent *le;
switch (type) {
- case EVENT_STRING: le = new EString(); break;
+ case EVENT_STRING: le = new EString; break;
+
+ case EVENT_MOUNT: le = new EMount; break;
+ case EVENT_CLIENTMAP: le = new EClientMap; break;
case EVENT_IMPORTMAP: le = new EImportMap; break;
- case EVENT_UPDATE: le = new EUpdate; break;
- case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
- case EVENT_UNLINK: le = new EUnlink(); break;
- case EVENT_PURGEFINISH: le = new EPurgeFinish(); break;
- case EVENT_MOUNT: le = new EMount(); break;
- case EVENT_CLIENTMAP: le = new EClientMap(); break;
- case EVENT_ANCHOR: le = new EAnchor(); break;
- case EVENT_ANCHORCLIENT: le = new EAnchorClient(); break;
- case EVENT_ALLOC: le = new EAlloc(); break;
case EVENT_EXPORT: le = new EExport; break;
case EVENT_IMPORTSTART: le = new EImportStart; break;
case EVENT_IMPORTFINISH: le = new EImportFinish; break;
+
+ case EVENT_UPDATE: le = new EUpdate; break;
+ case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
+ case EVENT_OPEN: le = new EOpen; break;
+
+ case EVENT_ALLOC: le = new EAlloc; break;
+ case EVENT_PURGEFINISH: le = new EPurgeFinish; break;
+
+ case EVENT_ANCHOR: le = new EAnchor; break;
+ case EVENT_ANCHORCLIENT: le = new EAnchorClient; break;
default:
dout(1) << "uh oh, unknown log event type " << type << endl;
assert(0);
#define EVENT_STRING 1
-#define EVENT_INODEUPDATE 2
-#define EVENT_DIRUPDATE 3
-
-#define EVENT_IMPORTMAP 4
-#define EVENT_UPDATE 5
-#define EVENT_SLAVEUPDATE 6
-
#define EVENT_MOUNT 7
#define EVENT_CLIENTMAP 8
+#define EVENT_IMPORTMAP 2
+#define EVENT_EXPORT 30
+#define EVENT_IMPORTSTART 31
+#define EVENT_IMPORTFINISH 32
-#define EVENT_ALLOC 10
-#define EVENT_MKNOD 11
-#define EVENT_MKDIR 12
-#define EVENT_LINK 13
+#define EVENT_UPDATE 3
+#define EVENT_SLAVEUPDATE 4
+#define EVENT_OPEN 5
-#define EVENT_UNLINK 20
-#define EVENT_RMDIR 21
+#define EVENT_ALLOC 10
#define EVENT_PURGEFINISH 22
-#define EVENT_EXPORT 30
-#define EVENT_IMPORTSTART 31
-#define EVENT_IMPORTFINISH 32
-
#define EVENT_ANCHOR 40
#define EVENT_ANCHORCLIENT 41
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
#include "events/EString.h"
-#include "events/EUnlink.h"
#include "events/EPurgeFinish.h"
#include "messages/MGenericMessage.h"
#include "messages/MMDSImportMap.h"
#include "messages/MMDSCacheRejoin.h"
-#include "messages/MMDSCacheRejoinAck.h"
#include "messages/MDiscover.h"
#include "messages/MDiscoverReply.h"
* rejoin phase!
* we start out by sending rejoins to everyone in the recovery set.
*
- * if _were_ are rejoining, send for all regions in our cache.
+ * if we are rejoin, send for all regions in our cache.
* if we are active|stopping, send only to nodes that are are rejoining.
*/
void MDCache::send_cache_rejoins()
if (*p == mds->get_nodeid()) continue; // nothing to myself!
if (mds->is_rejoin() ||
mds->mdsmap->is_rejoin(*p))
- rejoins[*p] = new MMDSCacheRejoin;
+ rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_REJOIN);
}
assert(!migrator->is_importing());
assert(!migrator->is_exporting());
-
+
// check all subtrees
for (map<CDir*, set<CDir*> >::iterator p = subtrees.begin();
p != subtrees.end();
}
// nothing?
- if (rejoins.empty()) {
+ if (mds->is_rejoin() && rejoins.empty()) {
dout(10) << "nothing to rejoin, going active" << endl;
mds->set_want_state(MDSMap::STATE_ACTIVE);
}
void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin)
{
dout(10) << "cache_rejoin_walk " << *dir << endl;
- rejoin->add_dirfrag(dir->dirfrag());
+ //if (mds->is_rejoin())
+ rejoin->add_weak_dirfrag(dir->dirfrag());
+ //else
+ //rejoin->add_strong_dirfrag(dir->dirfrag(), dir->get_replica_nonce());
+
list<CDir*> nested; // finish this dir, then do nested items
// walk dentries
p != dir->items.end();
++p) {
// dentry
- rejoin->add_dentry(dir->dirfrag(), p->first);
+ CDentry *dn = p->second;
+ if (mds->is_rejoin())
+ rejoin->add_weak_dentry(dir->dirfrag(), p->first);
+ else {
+ rejoin->add_strong_dentry(dir->dirfrag(), p->first,
+ dn->get_replica_nonce(),
+ dn->lock.get_state());
+ if (dn->lock.is_xlocked())
+ rejoin->add_dentry_xlock(dir->dirfrag(), p->first,
+ dn->lock.get_xlocked_by()->reqid);
+ }
// inode?
- if (p->second->is_primary() && p->second->get_inode()) {
- CInode *in = p->second->get_inode();
- rejoin->add_inode(in->ino(),
- in->get_caps_wanted());
+ if (dn->is_primary() && dn->get_inode()) {
+ CInode *in = dn->get_inode();
+ if (mds->is_rejoin() && in->get_caps_wanted() == 0)
+ rejoin->add_weak_inode(in->ino());
+ else {
+ rejoin->add_strong_inode(in->ino(), in->get_replica_nonce(),
+ in->get_caps_wanted(),
+ in->authlock.get_state(),
+ in->linklock.get_state(),
+ in->dirfragtreelock.get_state(),
+ in->filelock.get_state());
+ if (in->authlock.is_xlocked())
+ rejoin->add_inode_xlock(in->ino(), in->authlock.get_type(),
+ in->authlock.get_xlocked_by()->reqid);
+ if (in->linklock.is_xlocked())
+ rejoin->add_inode_xlock(in->ino(), in->linklock.get_type(),
+ in->linklock.get_xlocked_by()->reqid);
+ if (in->dirfragtreelock.is_xlocked())
+ rejoin->add_inode_xlock(in->ino(), in->dirfragtreelock.get_type(),
+ in->dirfragtreelock.get_xlocked_by()->reqid);
+ if (in->filelock.is_xlocked())
+ rejoin->add_inode_xlock(in->ino(), in->filelock.get_type(),
+ in->filelock.get_xlocked_by()->reqid);
+ }
// dirfrags in this subtree?
list<CDir*> dfs;
/*
* i got a rejoin.
- *
* - reply with the lockstate
*
* if i am active|stopping,
*/
void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m)
{
- dout(7) << "handle_cache_rejoin from " << m->get_source() << endl;
+ dout(7) << "handle_cache_rejoin " << *m << " from " << m->get_source() << endl;
+
+ switch (m->op) {
+ case MMDSCacheRejoin::OP_REJOIN:
+ handle_cache_rejoin_rejoin(m);
+ break;
+
+ case MMDSCacheRejoin::OP_ACK:
+ handle_cache_rejoin_ack(m);
+ break;
+
+ case MMDSCacheRejoin::OP_MISSING:
+ handle_cache_rejoin_missing(m);
+ break;
+
+ case MMDSCacheRejoin::OP_FULL:
+ handle_cache_rejoin_full(m);
+ break;
+
+ default:
+ assert(0);
+ }
+ delete m;
+}
+
+void MDCache::handle_cache_rejoin_rejoin(MMDSCacheRejoin *m)
+{
int from = m->get_source().num();
- MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck;
+ // do immediate ack?
+ MMDSCacheRejoin *ack = 0;
+ MMDSCacheRejoin *missing = 0;
if (mds->is_active() || mds->is_stopping()) {
- dout(10) << "removing stale cache replicas" << endl;
+ dout(10) << "i am active. removing stale cache replicas" << endl;
+
// first, scour cache of replica references
for (hash_map<inodeno_t,CInode*>::iterator p = inode_map.begin();
p != inode_map.end();
++p) {
// inode
CInode *in = p->second;
- if (in->is_replica(from) && m->inodes.count(p->first) == 0) {
+ if (in->is_replica(from) && m->weak_inodes.count(p->first) == 0) {
inode_remove_replica(in, from);
dout(10) << " rem " << *in << endl;
}
-
+
// dentry
if (in->parent) {
CDentry *dn = in->parent;
if (dn->is_replica(from) &&
- (m->dentries.count(dn->get_dir()->dirfrag()) == 0 ||
- m->dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) {
+ (m->weak_dentries.count(dn->get_dir()->dirfrag()) == 0 ||
+ m->weak_dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) {
dn->remove_replica(from);
dout(10) << " rem " << *dn << endl;
}
p != dfs.end();
++p) {
CDir *dir = *p;
- if (dir->is_replica(from) && m->dirfrags.count(dir->dirfrag()) == 0) {
+ if (dir->is_replica(from) && m->weak_dirfrags.count(dir->dirfrag()) == 0) {
dir->remove_replica(from);
dout(10) << " rem " << *dir << endl;
}
}
}
- } else {
- assert(mds->is_rejoin());
+
+ // do immediate ack.
+ ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK);
}
-
+
// dirs
- for (set<dirfrag_t>::iterator p = m->dirfrags.begin();
- p != m->dirfrags.end();
+ for (set<dirfrag_t>::iterator p = m->weak_dirfrags.begin();
+ p != m->weak_dirfrags.end();
++p) {
CDir *dir = get_dirfrag(*p);
- assert(dir);
- int nonce = dir->add_replica(from);
- dout(10) << " has " << *dir << endl;
- ack->add_dirfrag(*p, nonce);
+ if (dir) {
+ int nonce = dir->add_replica(from);
+ dout(10) << " have " << *dir << endl;
+ if (ack)
+ ack->add_strong_dirfrag(*p, nonce);
- // dentries
- for (set<string>::iterator q = m->dentries[*p].begin();
- q != m->dentries[*p].end();
- ++q) {
- CDentry *dn = dir->lookup(*q);
- assert(dn);
- int nonce = dn->add_replica(from);
- dout(10) << " has " << *dn << endl;
- ack->add_dentry(*p, *q, dn->lock.get_state(), nonce);
+ // dentries
+ for (set<string>::iterator q = m->weak_dentries[*p].begin();
+ q != m->weak_dentries[*p].end();
+ ++q) {
+ CDentry *dn = dir->lookup(*q);
+ if (dn) {
+ int nonce = dn->add_replica(from);
+ dout(10) << " have " << *dn << endl;
+ ack->add_strong_dentry(*p, *q, dn->lock.get_state(), nonce);
+ } else {
+ dout(10) << " missing " << *p << " " << *q << endl;
+ if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+ missing->add_weak_dentry(*p, *q);
+ }
+ if (ack)
+ ack->add_strong_dentry(*p, *q, nonce, dn->lock.get_state());
+ }
+ } else {
+ dout(10) << " missing " << *p << endl;
+ if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+ missing->add_weak_dirfrag(*p);
+
+ // dentries
+ for (set<string>::iterator q = m->weak_dentries[*p].begin();
+ q != m->weak_dentries[*p].end();
+ ++q)
+ missing->add_weak_dentry(*p, *q);
}
}
// inodes
- for (map<inodeno_t,int>::iterator p = m->inodes.begin();
- p != m->inodes.end();
+ for (set<inodeno_t>::iterator p = m->weak_inodes.begin();
+ p != m->weak_inodes.end();
++p) {
- CInode *in = get_inode(p->first);
- assert(in);
- int nonce = in->add_replica(from);
- if (p->second)
- in->mds_caps_wanted[from] = p->second;
- else
+ CInode *in = get_inode(*p);
+ if (in) {
+ int nonce = in->add_replica(from);
in->mds_caps_wanted.erase(from);
- in->authlock.remove_gather(from); // just in case
- in->linklock.remove_gather(from); // just in case
- in->dirfragtreelock.remove_gather(from); // just in case
- in->filelock.remove_gather(from); // just in case
- dout(10) << " has " << *in << endl;
- ack->add_inode(p->first,
- in->authlock.get_replica_state(),
- in->linklock.get_replica_state(),
- in->dirfragtreelock.get_replica_state(),
- in->filelock.get_replica_state(),
- nonce);
+ in->authlock.remove_gather(from); // just in case
+ in->linklock.remove_gather(from); // just in case
+ in->dirfragtreelock.remove_gather(from); // just in case
+ in->filelock.remove_gather(from); // just in case
+ dout(10) << " have (weak) " << *in << endl;
+ if (ack)
+ ack->add_strong_inode(in->ino(),
+ nonce,
+ 0,
+ in->authlock.get_replica_state(),
+ in->linklock.get_replica_state(),
+ in->dirfragtreelock.get_replica_state(),
+ in->filelock.get_replica_state());
+ } else {
+ dout(10) << " missing " << *p << endl;
+ if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+ missing->add_weak_inode(*p);
+ }
}
- // send ack
- mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+ // strong inodes too?
+ for (map<inodeno_t, MMDSCacheRejoin::inode_strong>::iterator p = m->strong_inodes.begin();
+ p != m->strong_inodes.end();
+ ++p) {
+ CInode *in = get_inode(p->first);
+ if (in) {
+ dout(10) << " have (strong) " << *in << endl;
+ int nonce = in->add_replica(from);
+ if (p->second.caps_wanted)
+ in->mds_caps_wanted[from] = p->second.caps_wanted;
+ else
+ in->mds_caps_wanted.erase(from);
+ in->authlock.remove_gather(from); // just in case
+ in->linklock.remove_gather(from); // just in case
+ in->dirfragtreelock.remove_gather(from); // just in case
+ in->filelock.remove_gather(from); // just in case
+ dout(10) << " have (weak) " << *in << endl;
+ if (ack)
+ ack->add_strong_inode(in->ino(),
+ nonce,
+ 0,
+ in->authlock.get_replica_state(),
+ in->linklock.get_replica_state(),
+ in->dirfragtreelock.get_replica_state(),
+ in->filelock.get_replica_state());
+ } else {
+ dout(10) << " missing " << p->first << endl;
+ if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING);
+ missing->add_weak_inode(p->first);
+ }
+ }
- delete m;
+ // xlocks
+ for (list<MMDSCacheRejoin::inode_xlock>::iterator p = m->xlocked_inodes.begin();
+ p != m->xlocked_inodes.end();
+ ++p) {
+ CInode *in = get_inode(p->ino);
+ if (!in) continue; // already missing, from strong_inodes list above.
+
+ dout(10) << " inode xlock by " << p->reqid << " on " << *in << endl;
+ assert(0);
+ //SimpleLock *lock = in->get_lock(p->locktype);
+ // .. FIXME IMPLEMENT ME ..
+
+
+ }
+ for (map<dirfrag_t, map<string, metareqid_t> >::iterator p = m->xlocked_dentries.begin();
+ p != m->xlocked_dentries.end();
+ ++p) {
+ CDir *dir = get_dirfrag(p->first);
+ if (!dir) continue; // already missing, from above.
+ for (map<string, metareqid_t>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q) {
+ CDentry *dn = dir->lookup(q->first);
+ if (!dn) continue; // already missing, from above.
+ dout(10) << " dn xlock by " << q->second << " on " << *dn << endl;
+
+ // FIXME IMPLEMENT ME
+ assert(0);
+ }
+ }
+
+ // send ack?
+ if (ack)
+ mds->send_message_mds(ack, from, MDS_PORT_CACHE);
+ else
+ want_rejoin_ack.insert(from);
+
+ // send missing?
+ if (missing)
+ mds->send_message_mds(missing, from, MDS_PORT_CACHE);
}
-void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m)
+void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *m)
{
dout(7) << "handle_cache_rejoin_ack from " << m->get_source() << endl;
int from = m->get_source().num();
// dirs
- for (list<MMDSCacheRejoinAck::dirinfo>::iterator p = m->dirfrags.begin();
- p != m->dirfrags.end();
+ for (map<dirfrag_t, MMDSCacheRejoin::dirfrag_strong>::iterator p = m->strong_dirfrags.begin();
+ p != m->strong_dirfrags.end();
++p) {
- CDir *dir = get_dirfrag(p->dirfrag);
+ CDir *dir = get_dirfrag(p->first);
assert(dir);
- dir->set_replica_nonce(p->nonce);
+ dir->set_replica_nonce(p->second.nonce);
dout(10) << " got " << *dir << endl;
// dentries
- for (map<string,MMDSCacheRejoinAck::dninfo>::iterator q = m->dentries[p->dirfrag].begin();
- q != m->dentries[p->dirfrag].end();
+ for (map<string,MMDSCacheRejoin::dn_strong>::iterator q = m->strong_dentries[p->first].begin();
+ q != m->strong_dentries[p->first].end();
++q) {
CDentry *dn = dir->lookup(q->first);
assert(dn);
}
// inodes
- for (list<MMDSCacheRejoinAck::inodeinfo>::iterator p = m->inodes.begin();
- p != m->inodes.end();
+ for (map<inodeno_t, MMDSCacheRejoin::inode_strong>::iterator p = m->strong_inodes.begin();
+ p != m->strong_inodes.end();
++p) {
- CInode *in = get_inode(p->ino);
+ CInode *in = get_inode(p->first);
assert(in);
- in->set_replica_nonce(p->nonce);
- in->authlock.set_state(p->authlock);
- in->linklock.set_state(p->linklock);
- in->dirfragtreelock.set_state(p->dirfragtreelock);
- in->filelock.set_state(p->filelock);
+ in->set_replica_nonce(p->second.nonce);
+ in->authlock.set_state(p->second.authlock);
+ in->linklock.set_state(p->second.linklock);
+ in->dirfragtreelock.set_state(p->second.dirfragtreelock);
+ in->filelock.set_state(p->second.filelock);
dout(10) << " got " << *in << endl;
}
- delete m;
-
// done?
rejoin_ack_gather.erase(from);
if (rejoin_ack_gather.empty()) {
dout(7) << "all done, going active!" << endl;
+ send_cache_rejoin_acks();
+
show_subtrees();
show_cache();
mds->set_want_state(MDSMap::STATE_ACTIVE);
}
+void MDCache::handle_cache_rejoin_missing(MMDSCacheRejoin *m)
+{
+ dout(7) << "handle_cache_rejoin_missing from " << m->get_source() << endl;
+
+ MMDSCacheRejoin *full = new MMDSCacheRejoin(MMDSCacheRejoin::OP_FULL);
+
+ // dirs
+ for (set<dirfrag_t>::iterator p = m->weak_dirfrags.begin();
+ p != m->weak_dirfrags.end();
+ ++p) {
+ CDir *dir = get_dirfrag(*p);
+ assert(dir);
+ dout(10) << " sending " << *dir << endl;
+
+ // dentries
+ for (set<string>::iterator q = m->weak_dentries[*p].begin();
+ q != m->weak_dentries[*p].end();
+ ++q) {
+ CDentry *dn = dir->lookup(*q);
+ assert(dn);
+ dout(10) << " sending " << *dn << endl;
+ if (mds->is_rejoin())
+ full->add_weak_dentry(*p, *q);
+ else
+ full->add_strong_dentry(*p, *q, dn->get_replica_nonce(), dn->lock.get_state());
+ }
+ }
+
+ // inodes
+ for (set<inodeno_t>::iterator p = m->weak_inodes.begin();
+ p != m->weak_inodes.end();
+ ++p) {
+ CInode *in = get_inode(*p);
+ assert(in);
+
+ dout(10) << " sending " << *in << endl;
+ full->add_full_inode(in->inode, in->symlink, in->dirfragtree);
+ if (mds->is_rejoin())
+ full->add_weak_inode(in->ino());
+ else
+ full->add_strong_inode(in->ino(),
+ in->get_replica_nonce(),
+ in->get_caps_wanted(),
+ in->authlock.get_replica_state(),
+ in->linklock.get_replica_state(),
+ in->dirfragtreelock.get_replica_state(),
+ in->filelock.get_replica_state());
+ }
+
+ mds->send_message_mds(full, m->get_source().num(), MDS_PORT_CACHE);
+}
+
+void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *m)
+{
+ assert(0); // write me
+}
+
+void MDCache::send_cache_rejoin_acks()
+{
+ dout(7) << "send_cache_rejoin_acks to " << want_rejoin_ack << endl;
+
+}
case MSG_MDS_CACHEREJOIN:
handle_cache_rejoin((MMDSCacheRejoin*)m);
break;
+ /*
case MSG_MDS_CACHEREJOINACK:
handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m);
break;
+ */
case MSG_MDS_DISCOVER:
// -- recovery --
protected:
+ set<int> recovery_set;
+
// from EImportStart w/o EImportFinish during journal replay
map<dirfrag_t, list<dirfrag_t> > my_ambiguous_imports;
// from MMDSImportMaps
map<int, map<dirfrag_t, list<dirfrag_t> > > other_ambiguous_imports;
- set<int> recovery_set;
set<int> wants_import_map; // nodes i need to send my import map to
set<int> got_import_map; // nodes i got import_maps from
- set<int> rejoin_ack_gather; // nodes i need a rejoin ack from
void handle_import_map(MMDSImportMap *m);
- void handle_cache_rejoin(MMDSCacheRejoin *m);
- void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m);
void disambiguate_imports();
+
+ set<int> rejoin_gather; // nodes from whom i need a rejoin
+ set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack
+ set<int> want_rejoin_ack; // nodes to whom i need to send a rejoin ack
+
void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin);
+ void handle_cache_rejoin(MMDSCacheRejoin *m);
+ void handle_cache_rejoin_rejoin(MMDSCacheRejoin *m);
+ void handle_cache_rejoin_ack(MMDSCacheRejoin *m);
+ void handle_cache_rejoin_missing(MMDSCacheRejoin *m);
+ void handle_cache_rejoin_full(MMDSCacheRejoin *m);
void send_cache_rejoin_acks();
void recalc_auth_bits();
-void MDLog::submit_entry( LogEvent *le,
- Context *c )
+void MDLog::submit_entry( LogEvent *le, Context *c )
{
if (g_conf.mds_log) {
dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *le << endl;
friend class MDCache;
void init_journaler();
+ public:
+ void add_import_map_expire_waiter(Context *c) {
+ import_map_expire_waiters.push_back(c);
+ }
+
- public:
// replay state
map<inodeno_t, set<inodeno_t> > pending_exports;
// include spanning tree for all nested exports.
// these need to be on the destination _before_ the final export so that
// dir_auth updates on any nested exports are properly absorbed.
+ // this includes inodes and dirfrags included in the subtree, but
+ // only the inodes at the bounds.
set<inodeno_t> inodes_added;
- // include base dir
- prep->add_dir( new CDirDiscover(dir, dir->add_replica(dest)) );
+ // include base dirfrag
+ prep->add_dirfrag( new CDirDiscover(dir, dir->add_replica(dest)) );
// check bounds
for (set<CDir*>::iterator it = bounds.begin();
// don't repeat ourselves
if (inodes_added.count(cur->ino())) break; // did already!
inodes_added.insert(cur->ino());
-
- CDir *parent_dir = cur->get_parent_dir();
- // inode?
+ // inode
assert(cur->inode->is_auth());
inode_trace.push_front(cur->inode);
dout(7) << " will add " << *cur->inode << endl;
- // include dir?
- // note: don't replicate ambiguous auth items! they're
- // frozen anyway.
- if (cur->is_auth() && !cur->is_ambiguous_auth()) {
- prep->add_dir( new CDirDiscover(cur, cur->add_replica(dest)) ); // yay!
+ // include the dirfrag? only if it's not the bounding subtree root.
+ if (cur != bound) {
+ assert(cur->is_auth());
+ prep->add_dirfrag( new CDirDiscover(cur, cur->add_replica(dest)) ); // yay!
dout(7) << " added " << *cur << endl;
}
- cur = parent_dir;
+ cur = cur->get_parent_dir();
}
for (list<CInode*>::iterator it = inode_trace.begin();
#include "events/EString.h"
#include "events/EUpdate.h"
#include "events/EMount.h"
+#include "events/EOpen.h"
#include "include/filepath.h"
#include "common/Timer.h"
reply->set_file_caps_seq(cap->get_last_seq());
reply->set_file_data_version(fdv);
reply_request(mdr, reply, cur);
+
+ // journal?
+ if (cur->last_open_journaled == 0) {
+ cur->last_open_journaled = mdlog->get_write_pos();
+ mdlog->submit_entry(new EOpen(cur));
+ }
+
}
}
// -- lock states --
+#define LOCK_UNDEF 0
// auth rep
-#define LOCK_SYNC 0 // AR R . R .
-#define LOCK_LOCK 1 // AR R W . .
-#define LOCK_GLOCKR 2 // AR R . . .
+#define LOCK_SYNC 1 // AR R . R .
+#define LOCK_LOCK 2 // AR R W . .
+#define LOCK_GLOCKR 3 // AR R . . .
inline const char *get_simplelock_state_name(int n) {
switch (n) {
+ case LOCK_UNDEF: return "undef";
case LOCK_SYNC: return "sync";
case LOCK_LOCK: return "lock";
case LOCK_GLOCKR: return "glockr";
inline ostream& operator<<(ostream& out, SimpleLock& l)
{
- out << "(" << get_lock_type_name(l.get_type())
- << " " << get_simplelock_state_name(l.get_state());
+ out << "(";
+ //out << get_lock_type_name(l.get_type()) << " ";
+ out << get_simplelock_state_name(l.get_state());
if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set();
if (l.get_num_rdlock())
out << " r=" << l.get_num_rdlock();
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_EOPEN_H
+#define __MDS_EOPEN_H
+
+#include "../LogEvent.h"
+#include "EMetaBlob.h"
+
+class EOpen : public LogEvent {
+public:
+ EMetaBlob metablob;
+ inodeno_t ino;
+
+ EOpen() : LogEvent(EVENT_OPEN) { }
+ EOpen(CInode *in) : LogEvent(EVENT_OPEN),
+ ino(in->ino()) {
+ metablob.add_primary_dentry(in->get_parent_dn(), false);
+ }
+ void print(ostream& out) {
+ out << "open " << metablob;
+ }
+
+ void encode_payload(bufferlist& bl) {
+ ::_encode(ino, bl);
+ metablob._encode(bl);
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ ::_decode(ino, bl, off);
+ metablob._decode(bl, off);
+ }
+
+ bool has_expired(MDS *mds);
+ void expire(MDS *mds, Context *c);
+ void replay(MDS *mds);
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef __EUNLINK_H
-#define __EUNLINK_H
-
-#include <assert.h>
-#include "config.h"
-#include "include/types.h"
-
-#include "../LogEvent.h"
-#include "EMetaBlob.h"
-
-#include "../CInode.h"
-#include "../CDentry.h"
-#include "../CDir.h"
-
-/// help rewrite me
-
-class EUnlink : public LogEvent {
- protected:
- version_t dirv;
- string dname;
-
- public:
- EMetaBlob metaglob;
-
- /*
- EUnlink(CDir *dir, CDentry* dn, CInode *in) :
- LogEvent(EVENT_UNLINK),
- diritrace(dir->inode),
- dirv(dir->get_version()),
- dname(dn->get_name()),
- inodetrace(in) {}
- */
- EUnlink() : LogEvent(EVENT_UNLINK) { }
-
- virtual void encode_payload(bufferlist& bl) {
- /*
- diritrace.encode(bl);
- bl.append((char*)&dirv, sizeof(dirv));
- ::_encode(dname, bl);
- inodetrace.encode(bl);
- */
- }
- void decode_payload(bufferlist& bl, int& off) {
- /*
- diritrace.decode(bl,off);
- bl.copy(off, sizeof(dirv), (char*)&dirv);
- off += sizeof(dirv);
- ::_decode(dname, bl, off);
- inodetrace.decode(bl, off);
- */
- }
-
- bool has_expired(MDS *mds);
- void expire(MDS *mds, Context *c);
- void replay(MDS *mds);
-};
-
-#endif
*/
#include "events/EString.h"
+#include "events/EImportMap.h"
+#include "events/EMount.h"
+#include "events/EClientMap.h"
#include "events/EMetaBlob.h"
-#include "events/EAlloc.h"
-#include "events/EAnchor.h"
-#include "events/EAnchorClient.h"
+
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
-#include "events/EImportMap.h"
-
-#include "events/EMount.h"
-#include "events/EClientMap.h"
+#include "events/EOpen.h"
+#include "events/EAlloc.h"
#include "events/EPurgeFinish.h"
-#include "events/EUnlink.h"
+
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
+
#include "MDS.h"
#include "MDLog.h"
#include "MDCache.h"
}
+// ------------------------
+// EOpen
+
+bool EOpen::has_expired(MDS *mds)
+{
+ CInode *in = mds->mdcache->get_inode(ino);
+ if (!in) return true;
+ if (!in->is_any_caps()) return true;
+ if (in->last_open_journaled > get_start_off() ||
+ in->last_open_journaled == 0) return true;
+ return false;
+}
+
+void EOpen::expire(MDS *mds, Context *c)
+{
+ CInode *in = mds->mdcache->get_inode(ino);
+ assert(in);
+
+ dout(10) << "EOpen.expire " << ino
+ << " last_open_journaled " << in->last_open_journaled << endl;
+
+ // wait?
+ // FIXME this is stupid.
+ if (in->last_open_journaled == get_start_off()) {
+ //||
+ //(get_start_off() < mds->mdlog->last_import_map &&
+ //in->last_open_journaled < mds->mdlog->last_import_map)) {
+ dout(10) << "waiting." << endl;
+ // wait
+ mds->mdlog->add_import_map_expire_waiter(c);
+ } else {
+ // rejournal now.
+ dout(10) << "rejournaling" << endl;
+ in->last_open_journaled = mds->mdlog->get_write_pos();
+ mds->mdlog->submit_entry(new EOpen(in));
+ }
+}
+
+void EOpen::replay(MDS *mds)
+{
+ dout(10) << "EOpen.replay " << ino << endl;
+ metablob.replay(mds);
+}
+
+
// -----------------------
// EUpdate
-// -----------------------
-// EUnlink
-
-bool EUnlink::has_expired(MDS *mds)
-{
- /*
- // dir
- CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino );
- CDir *dir = 0;
- if (diri) dir = diri->dir;
-
- if (dir && dir->get_last_committed_version() < dirv) return false;
-
- if (!inodetrace.trace.empty()) {
- // inode
- CInode *in = mds->mdcache->get_inode( inodetrace.back().inode.ino );
- if (in && in->get_last_committed_version() < inodetrace.back().inode.version)
- return false;
- }
- */
- return true;
-}
-
-void EUnlink::expire(MDS *mds, Context *c)
-{
- /*
- CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino );
- CDir *dir = diri->dir;
- assert(dir);
-
- // okay!
- dout(7) << "commiting dirty (from unlink) dir " << *dir << endl;
- mds->mdstore->commit_dir(dir, dirv, c);
- */
-}
-
-void EUnlink::replay(MDS *mds)
-{
-}
-
-
-
// -----------------------
// EPurgeFinish
map<inodeno_t,string> inode_dentry;
map<inodeno_t,list<frag_t> > frags_by_ino;
- map<dirfrag_t,CDirDiscover*> dirs;
+ map<dirfrag_t,CDirDiscover*> dirfrags;
set<int> bystanders;
string& get_dentry(inodeno_t ino) {
return inode_dentry[ino];
}
- bool have_dir(dirfrag_t df) {
- return dirs.count(df);
+ bool have_dirfrag(dirfrag_t df) {
+ return dirfrags.count(df);
}
CDirDiscover* get_dirfrag_discover(dirfrag_t df) {
- return dirs[df];
+ return dirfrags[df];
}
set<int> &get_bystanders() { return bystanders; }
iit != inodes.end();
iit++)
delete *iit;
- for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirs.begin();
- dit != dirs.end();
+ for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirfrags.begin();
+ dit != dirfrags.end();
dit++)
delete dit->second;
}
inode_dirfrag[in->get_ino()] = df;
inode_dentry[in->get_ino()] = dentry;
}
- void add_dir(CDirDiscover *dir) {
- dirs[dir->get_dirfrag()] = dir;
+ void add_dirfrag(CDirDiscover *dir) {
+ dirfrags[dir->get_dirfrag()] = dir;
frags_by_ino[dir->get_dirfrag().ino].push_back(dir->get_dirfrag().frag);
}
void add_bystander(int who) {
for (int i=0; i<nd; i++) {
CDirDiscover *dir = new CDirDiscover;
dir->_decode(payload, off);
- dirs[dir->get_dirfrag()] = dir;
+ dirfrags[dir->get_dirfrag()] = dir;
}
::_decode(bystanders, payload, off);
}
// dirs
- int nd = dirs.size();
+ int nd = dirfrags.size();
payload.append((char*)&nd, sizeof(int));
- for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirs.begin();
- dit != dirs.end();
+ for (map<dirfrag_t,CDirDiscover*>::iterator dit = dirfrags.begin();
+ dit != dirfrags.end();
dit++)
dit->second->_encode(payload);
class MMDSCacheRejoin : public Message {
public:
- map<inodeno_t,int> inodes; // ino -> caps_wanted
- set<dirfrag_t> dirfrags;
- map<dirfrag_t, set<string> > dentries; // dir -> (dentries...)
+ static const int OP_REJOIN = 1; // replica -> auth, i exist. and maybe my lock state.
+ static const int OP_ACK = 3; // auth -> replica, here is your lock state.
+ static const int OP_MISSING = 4; // auth -> replica, i am missing these items
+ static const int OP_FULL = 5; // replica -> auth, here is the full object.
+ static const char *get_opname(int op) {
+ switch (op) {
+ case OP_REJOIN: return "rejoin";
+ case OP_ACK: return "ack";
+ case OP_MISSING: return "missing";
+ case OP_FULL: return "full";
+ default: assert(0);
+ }
+ }
+
+ // -- types --
+ struct inode_strong {
+ int caps_wanted;
+ int nonce;
+ int authlock;
+ int linklock;
+ int dirfragtreelock;
+ int filelock;
+ inode_strong() {}
+ inode_strong(int n, int cw=0, int a=0, int l=0, int dft=0, int f=0) :
+ caps_wanted(cw),
+ nonce(n),
+ authlock(a), linklock(l), dirfragtreelock(dft), filelock(f) { }
+ };
+ struct inode_full {
+ inode_t inode;
+ string symlink;
+ fragtree_t dirfragtree;
+ inode_full() {}
+ inode_full(const inode_t& i, const string& s, const fragtree_t& f) :
+ inode(i), symlink(s), dirfragtree(f) {}
+ inode_full(bufferlist& bl, int& off) {
+ ::_decode(inode, bl, off);
+ ::_decode(symlink, bl, off);
+ ::_decode(dirfragtree, bl, off);
+ }
+ void _encode(bufferlist& bl) {
+ ::_encode(inode, bl);
+ ::_encode(symlink, bl);
+ ::_encode(dirfragtree, bl);
+ }
+ };
+ struct inode_xlock {
+ inodeno_t ino;
+ int locktype;
+ metareqid_t reqid;
+ inode_xlock() {}
+ inode_xlock(inodeno_t i, int lt, const metareqid_t& ri) :
+ ino(i), locktype(lt), reqid(ri) {}
+ };
+
+ struct dirfrag_strong {
+ int nonce;
+ dirfrag_strong() {}
+ dirfrag_strong(int n) : nonce(n) {}
+ };
+ struct dn_strong {
+ int nonce;
+ int lock;
+ dn_strong() {}
+ dn_strong(int n, int l) : nonce(n), lock(l) {}
+ };
+
+ // -- data --
+ int op;
+
+ set<inodeno_t> weak_inodes;
+ map<inodeno_t, inode_strong> strong_inodes;
+ list<inode_full> full_inodes;
+ list<inode_xlock> xlocked_inodes;
+
+ set<dirfrag_t> weak_dirfrags;
+ map<dirfrag_t, dirfrag_strong> strong_dirfrags;
+
+ map<dirfrag_t, set<string> > weak_dentries;
+ map<dirfrag_t, map<string, dn_strong> > strong_dentries;
+ map<dirfrag_t, map<string, metareqid_t> > xlocked_dentries;
MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {}
+ MMDSCacheRejoin(int o) :
+ Message(MSG_MDS_CACHEREJOIN),
+ op(o) {}
char *get_type_name() { return "cache_rejoin"; }
+ void print(ostream& out) {
+ out << "cache_rejoin " << get_opname(op);
+ }
- void add_dirfrag(dirfrag_t dirfrag) {
- dirfrags.insert(dirfrag);
+ // -- builders --
+ // inodes
+ void add_weak_inode(inodeno_t ino) {
+ weak_inodes.insert(ino);
}
- void add_dentry(dirfrag_t dirfrag, const string& dn) {
- dentries[dirfrag].insert(dn);
+ void add_strong_inode(inodeno_t i, int n, int cw, int a, int l, int dft, int f) {
+ strong_inodes[i] = inode_strong(n, cw, a, l, dft, f);
}
- void add_inode(inodeno_t ino, int cw) {
- inodes[ino] = cw;
+ void add_full_inode(inode_t &i, const string& s, const fragtree_t &f) {
+ full_inodes.push_back(inode_full(i, s, f));
+ }
+ void add_inode_xlock(inodeno_t ino, int lt, const metareqid_t& ri) {
+ xlocked_inodes.push_back(inode_xlock(ino, lt, ri));
}
+ // dirfrags
+ void add_weak_dirfrag(dirfrag_t df) {
+ weak_dirfrags.insert(df);
+ }
+ void add_strong_dirfrag(dirfrag_t df, int n) {
+ strong_dirfrags[df] = dirfrag_strong(n);
+ }
+
+ // dentries
+ void add_weak_dentry(dirfrag_t df, const string& dname) {
+ weak_dentries[df].insert(dname);
+ }
+ void add_strong_dentry(dirfrag_t df, const string& dname, int n, int ls) {
+ strong_dentries[df][dname] = dn_strong(n, ls);
+ }
+ void add_dentry_xlock(dirfrag_t df, const string& dname, const metareqid_t& ri) {
+ xlocked_dentries[df][dname] = ri;
+ }
+
+ // -- encoding --
void encode_payload() {
- ::_encode(inodes, payload);
- ::_encode(dirfrags, payload);
- for (set<dirfrag_t>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p)
- ::_encode(dentries[*p], payload);
+ ::_encode(weak_inodes, payload);
+ ::_encode(strong_inodes, payload);
+
+ __uint32_t nfull = full_inodes.size();
+ ::_encode(nfull, payload);
+ for (list<inode_full>::iterator p = full_inodes.begin(); p != full_inodes.end(); ++p)
+ p->_encode(payload);
+
+ ::_encode(xlocked_inodes, payload);
+ ::_encode(weak_dirfrags, payload);
+ //::_encode(strong_dirfrags, payload);
+ ::_encode(weak_dentries, payload);
+ ::_encode(strong_dentries, payload);
+ ::_encode(xlocked_dentries, payload);
}
void decode_payload() {
int off = 0;
- ::_decode(inodes, payload, off);
- ::_decode(dirfrags, payload, off);
- for (set<dirfrag_t>::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p)
- ::_decode(dentries[*p], payload, off);
+ ::_decode(weak_inodes, payload, off);
+ ::_decode(strong_inodes, payload, off);
+
+ __uint32_t nfull;
+ ::_decode(nfull, payload, off);
+ for (unsigned i=0; i<nfull; i++)
+ full_inodes.push_back(inode_full(payload, off));
+
+ ::_decode(xlocked_inodes, payload, off);
+ ::_decode(weak_dirfrags, payload, off);
+ //::_decode(strong_dirfrags, payload, off);
+ ::_decode(weak_dentries, payload, off);
+ ::_decode(strong_dentries, payload, off);
+ ::_decode(xlocked_dentries, payload, off);
}
+
};
#endif
#include "messages/MMDSBeacon.h"
#include "messages/MMDSImportMap.h"
#include "messages/MMDSCacheRejoin.h"
-#include "messages/MMDSCacheRejoinAck.h"
+//#include "messages/MMDSCacheRejoinAck.h"
#include "messages/MDirUpdate.h"
#include "messages/MDiscover.h"
case MSG_MDS_CACHEREJOIN:
m = new MMDSCacheRejoin;
break;
+ /*
case MSG_MDS_CACHEREJOINACK:
m = new MMDSCacheRejoinAck;
break;
+ */
case MSG_MDS_DIRUPDATE:
m = new MDirUpdate();
#define MSG_MDS_IMPORTMAP 106
#define MSG_MDS_CACHEREJOIN 107
-#define MSG_MDS_CACHEREJOINACK 108
+//#define MSG_MDS_CACHEREJOINACK 108
#define MSG_MDS_DISCOVER 110
#define MSG_MDS_DISCOVERREPLY 111