using std::string;
#include "include/types.h"
+#include "mdstypes.h"
#include "include/buffer.h"
+
+// anchor ops
+#define ANCHOR_OP_LOOKUP 1
+#define ANCHOR_OP_LOOKUP_REPLY 2
+#define ANCHOR_OP_CREATE_PREPARE 3
+#define ANCHOR_OP_CREATE_ACK 4
+#define ANCHOR_OP_CREATE_COMMIT 5
+#define ANCHOR_OP_DESTROY_PREPARE 6
+#define ANCHOR_OP_DESTROY_ACK 7
+#define ANCHOR_OP_DESTROY_COMMIT 8
+#define ANCHOR_OP_UPDATE_PREPARE 9
+#define ANCHOR_OP_UPDATE_ACK 10
+#define ANCHOR_OP_UPDATE_COMMIT 11
+
+inline const char* get_anchor_opname(int o) {
+ switch (o) {
+ case ANCHOR_OP_LOOKUP: return "lookup";
+ case ANCHOR_OP_LOOKUP_REPLY: return "lookup_reply";
+ case ANCHOR_OP_CREATE_PREPARE: return "create_prepare";
+ case ANCHOR_OP_CREATE_ACK: return "create_ack";
+ case ANCHOR_OP_CREATE_COMMIT: return "create_commit";
+ case ANCHOR_OP_DESTROY_PREPARE: return "destroy_prepare";
+ case ANCHOR_OP_DESTROY_ACK: return "destroy_ack";
+ case ANCHOR_OP_DESTROY_COMMIT: return "destroy_commit";
+ case ANCHOR_OP_UPDATE_PREPARE: return "update_prepare";
+ case ANCHOR_OP_UPDATE_ACK: return "update_ack";
+ case ANCHOR_OP_UPDATE_COMMIT: return "update_commit";
+ default: assert(0);
+ }
+}
+
+
+// anchor type
+
class Anchor {
public:
- inodeno_t ino; // my ino
- inodeno_t dirino; // containing dir
- string ref_dn; // referring dentry
+ inodeno_t ino; // anchored ino
+ dirfrag_t dirfrag; // containing dirfrag
+ //string ref_dn; // referring dentry
int nref; // reference count
Anchor() {}
- Anchor(inodeno_t ino, inodeno_t dirino, string& ref_dn, int nref=0) {
- this->ino = ino;
- this->dirino = dirino;
- this->ref_dn = ref_dn;
- this->nref = nref;
- }
+ Anchor(inodeno_t i, dirfrag_t df,
+ //string& rd,
+ int nr=0) :
+ ino(i), dirfrag(df),
+ //ref_dn(rd),
+ nref(nr) { }
void _encode(bufferlist &bl) {
bl.append((char*)&ino, sizeof(ino));
- bl.append((char*)&dirino, sizeof(dirino));
+ bl.append((char*)&dirfrag, sizeof(dirfrag));
bl.append((char*)&nref, sizeof(nref));
- ::_encode(ref_dn, bl);
+ //::_encode(ref_dn, bl);
}
void _decode(bufferlist& bl, int& off) {
bl.copy(off, sizeof(ino), (char*)&ino);
off += sizeof(ino);
- bl.copy(off, sizeof(dirino), (char*)&dirino);
- off += sizeof(dirino);
+ bl.copy(off, sizeof(dirfrag), (char*)&dirfrag);
+ off += sizeof(dirfrag);
bl.copy(off, sizeof(nref), (char*)&nref);
off += sizeof(nref);
- ::_decode(ref_dn, bl, off);
+ //::_decode(ref_dn, bl, off);
}
-} ;
+};
+
+inline ostream& operator<<(ostream& out, Anchor& a)
+{
+ return out << "a(" << a.ino << " " << a.dirfrag << " " << a.nref << ")";
+}
#endif
#include "MDS.h"
-#include "messages/MAnchorRequest.h"
-#include "messages/MAnchorReply.h"
+#include "messages/MAnchor.h"
#include "config.h"
#undef dout
void AnchorClient::dispatch(Message *m)
{
switch (m->get_type()) {
- case MSG_MDS_ANCHORREPLY:
- handle_anchor_reply((MAnchorReply*)m);
+ case MSG_MDS_ANCHOR:
+ handle_anchor_reply((MAnchor*)m);
break;
default:
}
}
-void AnchorClient::handle_anchor_reply(class MAnchorReply *m)
+void AnchorClient::handle_anchor_reply(class MAnchor *m)
{
switch (m->get_op()) {
- case ANCHOR_OP_LOOKUP:
+ case ANCHOR_OP_LOOKUP_REPLY:
{
assert(pending_lookup_trace.count(m->get_ino()) == 1);
*(pending_lookup_trace[ m->get_ino() ]) = m->get_trace();
- Context *onfinish = pending_lookup_context[ m->get_ino() ];
+ Context *onfinish = pending_lookup[ m->get_ino() ];
pending_lookup_trace.erase(m->get_ino());
- pending_lookup_context.erase(m->get_ino());
-
+ pending_lookup.erase(m->get_ino());
+
if (onfinish) {
onfinish->finish(0);
delete onfinish;
}
break;
- case ANCHOR_OP_UPDATE:
- case ANCHOR_OP_CREATE:
- case ANCHOR_OP_DESTROY:
+ case ANCHOR_OP_UPDATE_ACK:
+ case ANCHOR_OP_CREATE_ACK:
+ case ANCHOR_OP_DESTROY_ACK:
{
assert(pending_op.count(m->get_ino()) == 1);
* public async interface
*/
-void AnchorClient::lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+
+/*
+ * FIXME: we need to be able to resubmit messages if the anchortable mds fails.
+ */
+
+
+void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
{
// send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_LOOKUP, ino);
+ MAnchor *req = new MAnchor(ANCHOR_OP_LOOKUP, ino);
pending_lookup_trace[ino] = &trace;
- pending_lookup_context[ino] = onfinish;
+ pending_lookup[ino] = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
- MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
-void AnchorClient::create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+void AnchorClient::prepare_create(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
{
// send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_CREATE, ino);
+ MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_PREPARE, ino);
req->set_trace(trace);
pending_op[ino] = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
- MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+}
+
+void AnchorClient::commit_create(inodeno_t ino)
+{
+ // send message
+ MAnchor *req = new MAnchor(ANCHOR_OP_CREATE_COMMIT, ino);
+ messenger->send_message(req,
+ mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+}
+
+
+void AnchorClient::prepare_destroy(inodeno_t ino, Context *onfinish)
+{
+ // send message
+ MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_PREPARE, ino);
+ pending_op[ino] = onfinish;
+ messenger->send_message(req,
+ mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
+void AnchorClient::commit_destroy(inodeno_t ino)
+{
+ // send message
+ MAnchor *req = new MAnchor(ANCHOR_OP_DESTROY_COMMIT, ino);
+ messenger->send_message(req,
+ mdsmap->get_inst(mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+}
+
+
-void AnchorClient::update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish)
+void AnchorClient::prepare_update(inodeno_t ino, vector<Anchor>& trace, Context *onfinish)
{
// send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_UPDATE, ino);
+ MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_PREPARE, ino);
req->set_trace(trace);
pending_op[ino] = onfinish;
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
- MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
-void AnchorClient::destroy(inodeno_t ino, Context *onfinish)
+void AnchorClient::commit_update(inodeno_t ino)
{
// send message
- MAnchorRequest *req = new MAnchorRequest(ANCHOR_OP_DESTROY, ino);
-
- pending_op[ino] = onfinish;
-
+ MAnchor *req = new MAnchor(ANCHOR_OP_UPDATE_COMMIT, ino);
messenger->send_message(req,
mdsmap->get_inst(mdsmap->get_anchortable()),
- MDS_PORT_ANCHORMGR, MDS_PORT_ANCHORCLIENT);
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
Messenger *messenger;
MDSMap *mdsmap;
- // remote state
+ // lookups
+ hash_map<inodeno_t, Context*> pending_lookup;
+ hash_map<inodeno_t, vector<Anchor>*> pending_lookup_trace;
+
+ // updates
hash_map<inodeno_t, Context*> pending_op;
- hash_map<inodeno_t, Context*> pending_lookup_context;
- hash_map<inodeno_t, vector<Anchor*>*> pending_lookup_trace;
- void handle_anchor_reply(class MAnchorReply *m);
+ void handle_anchor_reply(class MAnchor *m);
public:
AnchorClient(Messenger *ms, MDSMap *mm) : messenger(ms), mdsmap(mm) {}
+ void dispatch(Message *m);
+
// async user interface
- void lookup(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void create(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void update(inodeno_t ino, vector<Anchor*>& trace, Context *onfinish);
- void destroy(inodeno_t ino, Context *onfinish);
+ void lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
+
+ void prepare_create(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
+ void commit_create(inodeno_t ino);
+
+ void prepare_destroy(inodeno_t ino, Context *onfinish);
+ void commit_destroy(inodeno_t ino);
+
+ void prepare_update(inodeno_t ino, vector<Anchor>& trace, Context *onfinish);
+ void commit_update(inodeno_t ino);
+
- void dispatch(Message *m);
};
#endif
#include "osdc/Filer.h"
#include "msg/Messenger.h"
-#include "messages/MAnchorRequest.h"
-#include "messages/MAnchorReply.h"
+#include "messages/MAnchor.h"
#include "common/Clock.h"
+#include "MDLog.h"
+#include "events/EAnchor.h"
+
#include "config.h"
#undef dout
#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
#define derr(x) if (x <= g_conf.debug_mds) cerr << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchortable "
-AnchorTable::AnchorTable(MDS *mds)
-{
- this->mds = mds;
- opening = false;
- opened = false;
-}
-void AnchorTable::init_inode()
-{
- memset(&table_inode, 0, sizeof(table_inode));
- table_inode.ino = MDS_INO_ANCHORTABLE+mds->get_nodeid();
- table_inode.layout = g_OSD_FileLayout;
-}
-
-void AnchorTable::reset()
-{
- init_inode();
- opened = true;
- anchor_map.clear();
-}
/*
* basic updates
*/
-bool AnchorTable::add(inodeno_t ino, inodeno_t dirino, string& ref_dn)
+bool AnchorTable::add(inodeno_t ino, dirfrag_t dirfrag)
{
- dout(7) << "add " << std::hex << ino << " dirino " << dirino << std::dec << " ref_dn " << ref_dn << endl;
+ dout(7) << "add " << ino << " dirfrag " << dirfrag << endl;
// parent should be there
- assert(dirino < 1000 || // system dirino
- anchor_map.count(dirino)); // have
+ assert(dirfrag.ino < MDS_INO_BASE || // system dirino
+ anchor_map.count(dirfrag.ino)); // have
if (anchor_map.count(ino) == 0) {
// new item
- anchor_map[ ino ] = new Anchor(ino, dirino, ref_dn);
- dout(10) << " add: added " << std::hex << ino << std::dec << endl;
+ anchor_map[ino] = Anchor(ino, dirfrag);
+ dout(10) << " add: added " << anchor_map[ino] << endl;
return true;
} else {
- dout(10) << " add: had " << std::hex << ino << std::dec << endl;
+ dout(10) << " add: had " << anchor_map[ino] << endl;
return false;
}
}
void AnchorTable::inc(inodeno_t ino)
{
- dout(7) << "inc " << std::hex << ino << std::dec << endl;
+ dout(7) << "inc " << ino << endl;
- assert(anchor_map.count(ino) != 0);
- Anchor *anchor = anchor_map[ino];
- assert(anchor);
+ assert(anchor_map.count(ino));
+ Anchor &anchor = anchor_map[ino];
while (1) {
- anchor->nref++;
+ anchor.nref++;
- dout(10) << " inc: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
- ino = anchor->dirino;
+ dout(10) << " inc: record now " << anchor << endl;
+ ino = anchor.dirfrag.ino;
if (ino == 0) break;
if (anchor_map.count(ino) == 0) break;
anchor = anchor_map[ino];
- assert(anchor);
}
}
void AnchorTable::dec(inodeno_t ino)
{
- dout(7) << "dec " << std::hex << ino << std::dec << endl;
+ dout(7) << "dec " << ino << endl;
- assert(anchor_map.count(ino) != 0);
- Anchor *anchor = anchor_map[ino];
- assert(anchor);
+ assert(anchor_map.count(ino));
+ Anchor &anchor = anchor_map[ino];
while (true) {
- anchor->nref--;
+ anchor.nref--;
- if (anchor->nref == 0) {
- dout(10) << " dec: record " << std::hex << ino << std::dec << " now 0, removing" << endl;
- inodeno_t dirino = anchor->dirino;
+ if (anchor.nref == 0) {
+ dout(10) << " dec: record " << anchor << " now 0, removing" << endl;
+ dirfrag_t dirfrag = anchor.dirfrag;
anchor_map.erase(ino);
- delete anchor;
- ino = dirino;
+ ino = dirfrag.ino;
} else {
- dout(10) << " dec: record " << std::hex << ino << std::dec << " now " << anchor->nref << endl;
- ino = anchor->dirino;
+ dout(10) << " dec: record now " << anchor << endl;
+ ino = anchor.dirfrag.ino;
}
if (ino == 0) break;
if (anchor_map.count(ino) == 0) break;
anchor = anchor_map[ino];
- assert(anchor);
}
}
* high level
*/
-void AnchorTable::lookup(inodeno_t ino, vector<Anchor*>& trace)
+
+// LOOKUP
+
+void AnchorTable::handle_lookup(MAnchor *req)
{
- dout(7) << "lookup " << std::hex << ino << std::dec << endl;
+ inodeno_t ino = req->get_ino();
+ dout(7) << "handle_lookup " << ino << endl;
assert(anchor_map.count(ino) == 1);
- Anchor *anchor = anchor_map[ino];
- assert(anchor);
+ Anchor &anchor = anchor_map[ino];
+ vector<Anchor> trace;
while (true) {
- dout(10) << " record " << std::hex << anchor->ino << " dirino " << anchor->dirino << std::dec << " ref_dn " << anchor->ref_dn << endl;
+ dout(10) << "handle_lookup adding " << anchor << endl;
trace.insert(trace.begin(), anchor); // lame FIXME
- if (anchor->dirino < MDS_INO_BASE) break;
+ if (anchor.dirfrag.ino < MDS_INO_BASE) break;
- assert(anchor_map.count(anchor->dirino) == 1);
- anchor = anchor_map[anchor->dirino];
- assert(anchor);
+ assert(anchor_map.count(anchor.dirfrag.ino) == 1);
+ anchor = anchor_map[anchor.dirfrag.ino];
}
+
+ // reply
+ MAnchor *reply = new MAnchor(ANCHOR_OP_LOOKUP_REPLY, ino);
+ reply->set_trace(trace);
+ mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+
+ delete req;
}
-void AnchorTable::create(inodeno_t ino, vector<Anchor*>& trace)
+
+// MIDLEVEL
+
+void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace)
{
- dout(7) << "create " << std::hex << ino << std::dec << endl;
-
// make sure trace is in table
for (unsigned i=0; i<trace.size(); i++)
- add(trace[i]->ino, trace[i]->dirino, trace[i]->ref_dn);
+ add(trace[i].ino, trace[i].dirfrag);
+ inc(ino);
+ pending_create.insert(ino); // so we can undo
+ version++;
+}
- inc(ino); // ok!
+void AnchorTable::create_commit(inodeno_t ino)
+{
+ pending_create.erase(ino);
+ version++;
}
-void AnchorTable::destroy(inodeno_t ino)
+void AnchorTable::destroy_prepare(inodeno_t ino)
{
+ pending_destroy.insert(ino);
+ version++;
+}
+
+void AnchorTable::destroy_commit(inodeno_t ino)
+{
+ // apply
+ dec(ino);
+ pending_destroy.erase(ino);
+ version++;
+}
+
+void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace)
+{
+ pending_update[ino] = trace;
+ version++;
+}
+
+void AnchorTable::update_commit(inodeno_t ino)
+{
+ // remove old
dec(ino);
+
+ // add new
+ // make sure new trace is in table
+ vector<Anchor> &trace = pending_update[ino];
+ for (unsigned i=0; i<trace.size(); i++)
+ add(trace[i].ino, trace[i].dirfrag);
+ inc(ino);
+
+ pending_update.erase(ino);
+ version++;
+}
+
+
+// CREATE
+
+class C_AT_CreatePrepare : public Context {
+ AnchorTable *at;
+ MAnchor *req;
+public:
+ C_AT_CreatePrepare(AnchorTable *a, MAnchor *r) :
+ at(a), req(r) { }
+ void finish(int r) {
+ at->_create_prepare_logged(req);
+ }
+};
+
+void AnchorTable::handle_create_prepare(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ vector<Anchor>& trace = req->get_trace();
+
+ dout(7) << "handle_create_prepare " << ino << endl;
+
+ create_prepare(ino, trace);
+
+ // log it
+ EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version);
+ le->set_trace(trace);
+ mds->mdlog->submit_entry(le,
+ new C_AT_CreatePrepare(this, req));
+}
+
+void AnchorTable::_create_prepare_logged(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "_create_prepare_logged " << ino << endl;
+
+ // reply
+ MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_ACK, ino);
+ mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+
+ delete req;
+}
+
+void AnchorTable::handle_create_commit(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "handle_create_commit " << ino << endl;
+
+ create_commit(ino);
+
+ mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_CREATE_COMMIT, ino, version));
+}
+
+
+// DESTROY
+
+class C_AT_DestroyPrepare : public Context {
+ AnchorTable *at;
+ MAnchor *req;
+public:
+ C_AT_DestroyPrepare(AnchorTable *a, MAnchor *r) :
+ at(a), req(r) { }
+ void finish(int r) {
+ at->_destroy_prepare_logged(req);
+ }
+};
+
+void AnchorTable::handle_destroy_prepare(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "handle_destroy_prepare " << ino << endl;
+
+ destroy_prepare(ino);
+
+ mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version),
+ new C_AT_DestroyPrepare(this, req));
+}
+
+void AnchorTable::_destroy_prepare_logged(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "_destroy_prepare_logged " << ino << endl;
+
+ // reply
+ MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_ACK, ino);
+ mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+
+ delete req;
}
+void AnchorTable::handle_destroy_commit(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "handle_destroy_commit " << ino << endl;
+
+ destroy_commit(ino);
+
+ // log
+ mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_COMMIT, ino, version));
+}
+
+
+// UPDATE
+
+class C_AT_UpdatePrepare : public Context {
+ AnchorTable *at;
+ MAnchor *req;
+public:
+ C_AT_UpdatePrepare(AnchorTable *a, MAnchor *r) :
+ at(a), req(r) { }
+ void finish(int r) {
+ at->_update_prepare_logged(req);
+ }
+};
+
+void AnchorTable::handle_update_prepare(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ vector<Anchor>& trace = req->get_trace();
+
+ dout(7) << "handle_update_prepare " << ino << endl;
+
+ update_prepare(ino, trace);
+
+ // log it
+ EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version);
+ le->set_trace(trace);
+ mds->mdlog->submit_entry(le,
+ new C_AT_UpdatePrepare(this, req));
+}
+
+void AnchorTable::_update_prepare_logged(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "_update_prepare_logged " << ino << endl;
+
+ // reply
+ MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_ACK, ino);
+ mds->messenger->send_message(reply, req->get_source_inst(), req->get_source_port());
+
+ delete req;
+}
+
+void AnchorTable::handle_update_commit(MAnchor *req)
+{
+ inodeno_t ino = req->get_ino();
+ dout(7) << "handle_update_commit " << ino << endl;
+
+ update_commit(ino);
+
+ mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_UPDATE_COMMIT, ino, version));
+}
/*
void AnchorTable::dispatch(Message *m)
{
switch (m->get_type()) {
- case MSG_MDS_ANCHORREQUEST:
- handle_anchor_request((MAnchorRequest*)m);
+ case MSG_MDS_ANCHOR:
+ handle_anchor_request((MAnchor*)m);
break;
default:
}
-
-void AnchorTable::handle_anchor_request(class MAnchorRequest *m)
+void AnchorTable::handle_anchor_request(class MAnchor *req)
{
// make sure i'm open!
if (!opened) {
dout(7) << "not open yet" << endl;
- waiting_for_open.push_back(new C_MDS_RetryMessage(mds,m));
+ waiting_for_open.push_back(new C_MDS_RetryMessage(mds, req));
if (!opening) {
opening = true;
}
// go
- MAnchorReply *reply = new MAnchorReply(m);
-
- switch (m->get_op()) {
+ switch (req->get_op()) {
case ANCHOR_OP_LOOKUP:
- lookup( m->get_ino(), reply->get_trace() );
+ handle_lookup(req);
break;
- case ANCHOR_OP_UPDATE:
- destroy( m->get_ino() );
- create( m->get_ino(), m->get_trace() );
+ case ANCHOR_OP_CREATE_PREPARE:
+ handle_create_prepare(req);
+ break;
+ case ANCHOR_OP_CREATE_COMMIT:
+ handle_create_commit(req);
break;
- case ANCHOR_OP_CREATE:
- create( m->get_ino(), m->get_trace() );
+ case ANCHOR_OP_DESTROY_PREPARE:
+ handle_destroy_prepare(req);
+ break;
+ case ANCHOR_OP_DESTROY_COMMIT:
+ handle_destroy_commit(req);
break;
- case ANCHOR_OP_DESTROY:
- destroy( m->get_ino() );
+
+ case ANCHOR_OP_UPDATE_PREPARE:
+ handle_update_prepare(req);
+ break;
+ case ANCHOR_OP_UPDATE_COMMIT:
+ handle_update_commit(req);
break;
default:
assert(0);
}
- // send reply
- mds->messenger->send_message(reply, m->get_source_inst(), m->get_source_port());
- delete m;
}
void AnchorTable::save(Context *onfinish)
{
- dout(7) << "save" << endl;
+ dout(7) << "save v " << version << endl;
if (!opened) return;
// build up write
- bufferlist tabbl;
+ bufferlist bl;
- int num = anchor_map.size();
- tabbl.append((char*)&num, sizeof(int));
+ // version
+ bl.append((char*)&version, sizeof(version));
+
+ // # anchors
+ size_t size = anchor_map.size();
+ bl.append((char*)&size, sizeof(size));
- for (hash_map<inodeno_t, Anchor*>::iterator it = anchor_map.begin();
+ // anchors
+ for (hash_map<inodeno_t, Anchor>::iterator it = anchor_map.begin();
it != anchor_map.end();
it++) {
- dout(14) << " saving anchor for " << std::hex << it->first << std::dec << endl;
- Anchor *a = it->second;
- assert(a);
- a->_encode(tabbl);
+ it->second._encode(bl);
+ dout(15) << "save encoded " << it->second << endl;
}
- bufferlist bl;
- size_t size = tabbl.length();
- bl.append((char*)&size, sizeof(size));
- bl.claim_append(tabbl);
-
- dout(7) << " " << num << " anchors, " << size << " bytes" << endl;
+ // pending
+ ::_encode(pending_create, bl);
+ ::_encode(pending_destroy, bl);
+ size_t s = pending_update.size();
+ bl.append((char*)&s, sizeof(s));
+ for (map<inodeno_t, vector<Anchor> >::iterator p = pending_update.begin();
+ p != pending_update.end();
+ ++p) {
+ bl.append((char*)&p->first, sizeof(p->first));
+ ::_encode(p->second, bl);
+ }
+
// write!
- mds->filer->write(table_inode,
- 0, bl.length(),
- bl, 0,
- NULL, onfinish);
+ mds->objecter->write(object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0),
+ 0, bl.length(),
+ bl,
+ NULL, onfinish);
}
class C_AT_Load : public Context {
AnchorTable *at;
public:
- size_t size;
bufferlist bl;
- C_AT_Load(size_t size, AnchorTable *at) {
- this->size = size;
- this->at = at;
- }
+ C_AT_Load(AnchorTable *a) : at(a) {}
void finish(int result) {
assert(result > 0);
-
- at->load_2(size, bl);
- }
-};
-
-class C_AT_LoadSize : public Context {
- AnchorTable *at;
- MDS *mds;
-public:
- bufferlist bl;
- C_AT_LoadSize(AnchorTable *at, MDS *mds) {
- this->at = at;
- this->mds = mds;
- }
- void finish(int r) {
- size_t size = 0;
- assert(bl.length() >= sizeof(size));
- bl.copy(0, sizeof(size), (char*)&size);
- cout << "r is " << r << " size is " << size << endl;
- if (r > 0 && size > 0) {
- C_AT_Load *c = new C_AT_Load(size, at);
- mds->filer->read(at->table_inode,
- sizeof(size), size,
- &c->bl,
- c);
- } else {
- // fail
- bufferlist empty;
- at->load_2(0, empty);
- }
+ at->_loaded(bl);
}
};
void AnchorTable::load(Context *onfinish)
{
dout(7) << "load" << endl;
- init_inode();
-
assert(!opened);
waiting_for_open.push_back(onfinish);
-
- C_AT_LoadSize *c = new C_AT_LoadSize(this, mds);
- mds->filer->read(table_inode,
- 0, sizeof(size_t),
- &c->bl,
- c);
+
+ C_AT_Load *fin = new C_AT_Load(this);
+ mds->objecter->read(object_t(MDS_INO_ANCHORTABLE+mds->get_nodeid(), 0),
+ 0, 0, &fin->bl, fin);
}
-void AnchorTable::load_2(size_t size, bufferlist& bl)
+void AnchorTable::_loaded(bufferlist& bl)
{
- // num
+ dout(10) << "_loaded got " << bl.length() << " bytes" << endl;
+
int off = 0;
- int num;
- bl.copy(0, sizeof(num), (char*)&num);
- off += sizeof(num);
-
- // parse anchors
- for (int i=0; i<num; i++) {
- Anchor *a = new Anchor;
- a->_decode(bl, off);
- dout(10) << "load_2 decoded " << std::hex << a->ino << " dirino " << a->dirino << std::dec << " ref_dn " << a->ref_dn << endl;
- anchor_map[a->ino] = a;
+ bl.copy(off, sizeof(version), (char*)&version);
+ off += sizeof(version);
+
+ size_t size;
+ bl.copy(off, sizeof(size), (char*)&size);
+ off += sizeof(size);
+
+ for (size_t n=0; n<size; n++) {
+ Anchor a;
+ a._decode(bl, off);
+ anchor_map[a.ino] = a;
+ dout(15) << "load_2 decoded " << a << endl;
}
- dout(7) << "load_2 got " << num << " anchors" << endl;
+ ::_decode(pending_create, bl, off);
+ ::_decode(pending_destroy, bl, off);
+
+ size_t s;
+ bl.copy(off, sizeof(s), (char*)&s);
+ off += sizeof(s);
+ for (size_t i=0; i<s; i++) {
+ inodeno_t ino;
+ bl.copy(off, sizeof(ino), (char*)&ino);
+ off += sizeof(ino);
+ ::_decode(pending_update[ino], bl, off);
+ }
+
+ assert(off == (int)bl.length());
+ // done.
opened = true;
opening = false;
-
- // finish
+
finish_contexts(waiting_for_open);
}
using namespace __gnu_cxx;
class MDS;
-
+class MAnchor;
class AnchorTable {
MDS *mds;
- hash_map<inodeno_t, Anchor*> anchor_map;
- bool opening, opened;
- list<Context*> waiting_for_open;
+ // keep the entire table in memory.
+ hash_map<inodeno_t, Anchor> anchor_map;
- public:
- inode_t table_inode;
+ // uncommitted operations
+ set<inodeno_t> pending_create;
+ set<inodeno_t> pending_destroy;
+ map<inodeno_t, vector<Anchor> > pending_update;
- public:
- AnchorTable(MDS *mds);
+ version_t version; // this includes anchor_map AND pending_* state.
- protected:
- void init_inode(); // call this before doing anything.
+ // load/save state
+ bool opening, opened;
- //
- bool have_ino(inodeno_t ino) {
- return true; // always in memory for now.
- }
- void fetch_ino(inodeno_t ino, Context *onfinish) {
- assert(!opened);
- load(onfinish);
- }
+ // waiters
+ list<Context*> waiting_for_open;
+
+protected:
- // adjust table
- bool add(inodeno_t ino, inodeno_t dirino, string& ref_dn);
+ // basic updates
+ bool add(inodeno_t ino, dirfrag_t dirfrag);
void inc(inodeno_t ino);
void dec(inodeno_t ino);
-
+ // mid-level
+ void create_prepare(inodeno_t ino, vector<Anchor>& trace);
+ void create_commit(inodeno_t ino);
+ void destroy_prepare(inodeno_t ino);
+ void destroy_commit(inodeno_t ino);
+ void update_prepare(inodeno_t ino, vector<Anchor>& trace);
+ void update_commit(inodeno_t ino);
+ friend class EAnchor; // used for journal replay.
+
// high level interface
- void lookup(inodeno_t ino, vector<Anchor*>& trace);
- void create(inodeno_t ino, vector<Anchor*>& trace);
- void destroy(inodeno_t ino);
+ void handle_lookup(MAnchor *req);
+
+ void handle_create_prepare(MAnchor *req);
+ void _create_prepare_logged(MAnchor *req);
+ void handle_create_commit(MAnchor *req);
+ friend class C_AT_CreatePrepare;
+
+ void handle_destroy_prepare(MAnchor *req);
+ void _destroy_prepare_logged(MAnchor *req);
+ void handle_destroy_commit(MAnchor *req);
+ friend class C_AT_DestroyPrepare;
+
+ void handle_update_prepare(MAnchor *req);
+ void _update_prepare_logged(MAnchor *req);
+ void handle_update_commit(MAnchor *req);
+ friend class C_AT_UpdatePrepare;
// messages
- public:
+ void handle_anchor_request(MAnchor *m);
+
+public:
+ AnchorTable(MDS *m) :
+ mds(m),
+ version(0),
+ opening(false), opened(false) { }
+
void dispatch(class Message *m);
- protected:
- void handle_anchor_request(class MAnchorRequest *m);
+ version_t get_version() { return version; }
- public:
+ void create_fresh() { // reset on mkfs() to empty, loaded table.
+ version = 0;
+ opened = true;
+ opening = false;
+ anchor_map.clear();
+ pending_create.clear();
+ pending_destroy.clear();
+ pending_update.clear();
+ }
// load/save entire table for now!
- void reset();
void save(Context *onfinish);
void load(Context *onfinish);
- void load_2(size_t size, bufferlist& bl);
+ void _loaded(bufferlist& bl);
};
}
}
-void CInode::make_anchor_trace(vector<Anchor*>& trace)
+void CInode::make_anchor_trace(vector<Anchor>& trace)
{
if (parent) {
parent->dir->inode->make_anchor_trace(trace);
-
- dout(7) << "make_anchor_trace adding " << ino() << " dirino " << parent->dir->inode->ino() << " dn " << parent->name << endl;
- trace.push_back( new Anchor(ino(),
- parent->dir->inode->ino(),
- parent->name) );
+ trace.push_back(Anchor(ino(), parent->dir->dirfrag()));
+ dout(7) << "make_anchor_trace added " << trace.back() << endl;
}
else if (state_test(STATE_DANGLING)) {
dout(7) << "make_anchor_trace dangling " << ino() << " on mds " << dangling_auth << endl;
- string ref_dn;
- trace.push_back( new Anchor(ino(),
- MDS_INO_INODEFILE_OFFSET+dangling_auth.first,
- ref_dn) );
+ assert(0);
+ //trace.push_back( Anchor(ino(),
+ //MDS_INO_INODEFILE_OFFSET+dangling_auth.first) );
}
else
assert(is_root());
static const int PIN_REQUEST = -10; // request is logging, finishing
static const int PIN_RENAMESRC = 11; // pinned on dest for foreign rename
static const int PIN_ANCHORING = 12;
- static const int PIN_OPENINGDIR = 13;
- static const int PIN_REMOTEPARENT = 14;
- static const int PIN_DENTRYLOCK = 15;
+ static const int PIN_UNANCHORING = 13;
+ static const int PIN_OPENINGDIR = 14;
+ static const int PIN_REMOTEPARENT = 15;
+ static const int PIN_DENTRYLOCK = 16;
const char *pin_name(int p) {
switch (p) {
case PIN_REQUEST: return "request";
case PIN_RENAMESRC: return "renamesrc";
case PIN_ANCHORING: return "anchoring";
+ case PIN_UNANCHORING: return "unanchoring";
case PIN_OPENINGDIR: return "openingdir";
case PIN_REMOTEPARENT: return "remoteparent";
case PIN_DENTRYLOCK: return "dentrylock";
static const int STATE_UNSAFE = (1<<3); // not logged yet
static const int STATE_DANGLING = (1<<4); // delete me when i expire; i have no dentry
static const int STATE_UNLINKING = (1<<5);
- static const int STATE_PROXY = (1<<6); // can't expire yet
- static const int STATE_EXPORTING = (1<<7); // on nonauth bystander.
- static const int STATE_ANCHORING = (1<<8);
+ static const int STATE_EXPORTING = (1<<6); // on nonauth bystander.
+ static const int STATE_ANCHORING = (1<<7);
+ static const int STATE_UNANCHORING = (1<<8);
static const int STATE_OPENINGDIR = (1<<9);
//static const int STATE_RENAMING = (1<<8); // moving me
//static const int STATE_RENAMINGTO = (1<<9); // rename target (will be unlinked)
// triggers: handle_disocver_reply
static const int WAIT_LINK = (1<<14); // as in remotely nlink++
static const int WAIT_ANCHORED = (1<<15);
- static const int WAIT_UNLINK = (1<<16); // as in remotely nlink--
- static const int WAIT_HARDR = (1<<17); // 131072
- static const int WAIT_HARDW = (1<<18); // 262...
- static const int WAIT_HARDB = (1<<19);
+ static const int WAIT_UNANCHORED = (1<<16);
+ static const int WAIT_UNLINK = (1<<17); // as in remotely nlink--
+ static const int WAIT_HARDR = (1<<18); // 131072
+ static const int WAIT_HARDW = (1<<19); // 262...
+ static const int WAIT_HARDB = (1<<20);
static const int WAIT_HARDRWB = (WAIT_HARDR|WAIT_HARDW|WAIT_HARDB);
- static const int WAIT_HARDSTABLE = (1<<20);
- static const int WAIT_HARDNORD = (1<<21);
- static const int WAIT_FILER = (1<<22);
- static const int WAIT_FILEW = (1<<23);
- static const int WAIT_FILEB = (1<<24);
+ static const int WAIT_HARDSTABLE = (1<<21);
+ static const int WAIT_HARDNORD = (1<<22);
+ static const int WAIT_FILER = (1<<23);
+ static const int WAIT_FILEW = (1<<24);
+ static const int WAIT_FILEB = (1<<25);
static const int WAIT_FILERWB = (WAIT_FILER|WAIT_FILEW|WAIT_FILEB);
- static const int WAIT_FILESTABLE = (1<<25);
- static const int WAIT_FILENORD = (1<<26);
- static const int WAIT_FILENOWR = (1<<27);
- static const int WAIT_RENAMEACK =(1<<28);
- static const int WAIT_RENAMENOTIFYACK =(1<<29);
- static const int WAIT_CAPS =(1<<30);
+ static const int WAIT_FILESTABLE = (1<<26);
+ static const int WAIT_FILENORD = (1<<27);
+ static const int WAIT_FILENOWR = (1<<28);
+ static const int WAIT_RENAMEACK =(1<<29);
+ static const int WAIT_RENAMENOTIFYACK =(1<<30);
+ static const int WAIT_CAPS =(1<<31);
static const int WAIT_ANY = 0xffffffff;
// misc
bool is_anchored() { return inode.anchored; }
bool is_root() { return state & STATE_ROOT; }
- bool is_proxy() { return state & STATE_PROXY; }
bool is_auth() { return state & STATE_AUTH; }
void set_auth(bool auth);
// -- misc --
void make_path(string& s);
- void make_anchor_trace(vector<class Anchor*>& trace);
+ void make_anchor_trace(vector<class Anchor>& trace);
{
CInode *in = mdcache->get_inode(m->get_ino());
assert(in);
- assert(in->is_auth() || in->is_proxy());
+ assert(in->is_auth());// || in->is_proxy());
dout(7) << "handle_inode_file_caps replica mds" << m->get_from() << " wants caps " << cap_string(m->get_caps()) << " on " << *in << endl;
- if (in->is_proxy()) {
+ /*if (in->is_proxy()) {
dout(7) << "proxy, fw" << endl;
mds->send_message_mds(m, in->authority().first, MDS_PORT_LOCKER);
return;
}
+ */
if (m->get_caps())
in->mds_caps_wanted[m->get_from()] = m->get_caps();
if (LOCK_AC_FOR_AUTH(m->get_action())) {
// auth
assert(in);
- assert(in->is_auth() || in->is_proxy());
+ assert(in->is_auth());// || in->is_proxy());
dout(7) << "handle_lock_inode_hard " << *in << " hardlock=" << in->hardlock << endl;
- if (in->is_proxy()) {
+ /*if (in->is_proxy()) {
// fw
int newauth = in->authority().first;
assert(newauth >= 0);
}
return;
}
+ */
} else {
// replica
if (!in) {
if (LOCK_AC_FOR_AUTH(m->get_action())) {
// auth
assert(in);
- assert(in->is_auth() || in->is_proxy());
+ assert(in->is_auth());// || in->is_proxy());
dout(7) << "handle_lock_inode_file " << *in << " hardlock=" << in->hardlock << endl;
- if (in->is_proxy()) {
+ /*if (in->is_proxy()) {
// fw
int newauth = in->authority().first;
assert(newauth >= 0);
}
return;
}
+ */
} else {
// replica
if (!in) {
#include "events/EUnlink.h"
#include "events/EMount.h"
#include "events/EClientMap.h"
+#include "events/EAnchor.h"
#include "events/EAlloc.h"
#include "events/EPurgeFinish.h"
#include "events/EExport.h"
case EVENT_PURGEFINISH: le = new EPurgeFinish(); break;
case EVENT_MOUNT: le = new EMount(); break;
case EVENT_CLIENTMAP: le = new EClientMap(); break;
+ case EVENT_ANCHOR: le = new EAnchor(); break;
case EVENT_ALLOC: le = new EAlloc(); break;
case EVENT_EXPORT: le = new EExport; break;
case EVENT_IMPORTSTART: le = new EImportStart; break;
#define EVENT_MOUNT 6
#define EVENT_CLIENTMAP 7
+#define EVENT_ANCHOR 8
+
#define EVENT_ALLOC 10
#define EVENT_MKNOD 11
#define EVENT_MKDIR 12
#include "osdc/Filer.h"
#include "events/EImportMap.h"
+#include "events/EUpdate.h"
#include "events/EString.h"
#include "events/EUnlink.h"
#include "events/EPurgeFinish.h"
Message *req;
Context *onfinish;
public:
- vector<Anchor*> anchortrace;
+ vector<Anchor> anchortrace;
C_MDC_OpenRemoteInoLookup(MDCache *mdc, inodeno_t ino, Message *req, Context *onfinish) {
this->mdc = mdc;
this->ino = ino;
void MDCache::open_remote_ino_2(inodeno_t ino,
Message *req,
- vector<Anchor*>& anchortrace,
+ vector<Anchor>& anchortrace,
Context *onfinish)
{
dout(7) << "open_remote_ino_2 on " << ino << ", trace depth is " << anchortrace.size() << endl;
+ // REWRITE ME: we're not going to use ref_dn.
+
+ /*
// construct path
filepath path;
for (unsigned i=0; i<anchortrace.size(); i++)
- path.push_dentry(anchortrace[i]->ref_dn);
+ path.push_dentry(anchortrace[i].ref_dn);
dout(7) << " path is " << path << endl;
onfinish->finish(r);
delete onfinish;
+ */
}
}
+// --------------------------------------------------------------------
// ANCHORS
-class C_MDC_AnchorInode : public Context {
+// CREATE
+
+class C_MDC_AnchorCreatePrepared : public Context {
+ MDCache *cache;
CInode *in;
-
public:
- C_MDC_AnchorInode(CInode *in) {
- this->in = in;
+ C_MDC_AnchorCreatePrepared(MDCache *c, CInode *i) : cache(c), in(i) {}
+ void finish(int r) {
+ cache->_anchor_create_prepared(in);
+ }
+};
+
+void MDCache::anchor_create(CInode *in, Context *onfinish)
+{
+ assert(in->is_auth());
+
+ // wait
+ in->add_waiter(CInode::WAIT_ANCHORED, onfinish);
+
+ // already anchoring?
+ if (in->state_test(CInode::STATE_ANCHORING)) {
+ dout(7) << "anchor_create already anchoring " << *in << endl;
+ return;
}
+
+ dout(7) << "anchor_create " << *in << endl;
+
+ // auth: do it
+ in->state_set(CInode::STATE_ANCHORING);
+ in->get(CInode::PIN_ANCHORING);
+
+ // make trace
+ vector<Anchor> trace;
+ in->make_anchor_trace(trace);
+
+ // do it
+ mds->anchorclient->prepare_create(in->ino(), trace,
+ new C_MDC_AnchorCreatePrepared(this, in));
+}
+
+class C_MDC_AnchorCreateLogged : public Context {
+ MDCache *cache;
+ CInode *in;
+ version_t pdv;
+public:
+ C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {}
void finish(int r) {
- if (r == 0) {
- assert(in->inode.anchored == false);
- in->inode.anchored = true;
+ cache->_anchor_create_logged(in, pdv);
+ }
+};
+
+void MDCache::_anchor_create_prepared(CInode *in)
+{
+ dout(10) << "_anchor_create_prepared " << *in << endl;
+
+ assert(in->inode.anchored == false);
+
+ // predirty, prepare log entry
+ version_t pdv = in->pre_dirty();
+
+ EUpdate *le = new EUpdate("anchor_create");
+ le->metablob.add_dir_context(in->get_parent_dir());
+
+ // update the logged inode copy
+ inode_t *pi = le->metablob.add_dentry(in->parent, true);
+ pi->anchored = true;
+ pi->version = pdv;
+
+ // log + wait
+ mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, pdv));
+}
+
+
+void MDCache::_anchor_create_logged(CInode *in, version_t pdv)
+{
+ dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << endl;
+
+ // unpin
+ assert(in->state_test(CInode::STATE_ANCHORING));
+ in->state_clear(CInode::STATE_ANCHORING);
+ in->put(CInode::PIN_ANCHORING);
+
+ // apply update to cache
+ in->inode.anchored = true;
+ in->inode.version = pdv;
+
+ // tell the anchortable we've committed
+ mds->anchorclient->commit_create(in->ino());
+
+ // trigger waiters
+ in->finish_waiting(CInode::WAIT_ANCHORED, 0);
+}
- in->state_clear(CInode::STATE_ANCHORING);
- in->put(CInode::PIN_ANCHORING);
-
- in->_mark_dirty(); // fixme
- }
- // trigger
- in->finish_waiting(CInode::WAIT_ANCHORED, r);
+// DESTROY
+
+class C_MDC_AnchorDestroyPrepared : public Context {
+ MDCache *cache;
+ CInode *in;
+public:
+ C_MDC_AnchorDestroyPrepared(MDCache *c, CInode *i) : cache(c), in(i) {}
+ void finish(int r) {
+ cache->_anchor_destroy_prepared(in);
}
};
-void MDCache::anchor_inode(CInode *in, Context *onfinish)
+void MDCache::anchor_destroy(CInode *in, Context *onfinish)
{
assert(in->is_auth());
+ // wait
+ if (onfinish)
+ in->add_waiter(CInode::WAIT_UNANCHORED, onfinish);
+
// already anchoring?
- if (in->state_test(CInode::STATE_ANCHORING)) {
- dout(7) << "anchor_inode already anchoring " << *in << endl;
+ if (in->state_test(CInode::STATE_UNANCHORING)) {
+ dout(7) << "anchor_destroy already unanchoring " << *in << endl;
+ return;
+ }
- // wait
- in->add_waiter(CInode::WAIT_ANCHORED,
- onfinish);
+ dout(7) << "anchor_destroy " << *in << endl;
- } else {
- dout(7) << "anchor_inode anchoring " << *in << endl;
+ // auth: do it
+ in->state_set(CInode::STATE_UNANCHORING);
+ in->get(CInode::PIN_UNANCHORING);
+
+ // do it
+ mds->anchorclient->prepare_destroy(in->ino(), new C_MDC_AnchorDestroyPrepared(this, in));
+}
- // auth: do it
- in->state_set(CInode::STATE_ANCHORING);
- in->get(CInode::PIN_ANCHORING);
-
- // wait
- in->add_waiter(CInode::WAIT_ANCHORED,
- onfinish);
-
- // make trace
- vector<Anchor*> trace;
- in->make_anchor_trace(trace);
-
- // do it
- mds->anchorclient->create(in->ino(), trace,
- new C_MDC_AnchorInode( in ));
+class C_MDC_AnchorDestroyLogged : public Context {
+ MDCache *cache;
+ CInode *in;
+ version_t pdv;
+public:
+ C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t v) : cache(c), in(i), pdv(v) {}
+ void finish(int r) {
+ cache->_anchor_destroy_logged(in, pdv);
}
+};
+
+void MDCache::_anchor_destroy_prepared(CInode *in)
+{
+ dout(10) << "_anchor_destroy_prepared " << *in << endl;
+
+ assert(in->inode.anchored == true);
+
+ // predirty, prepare log entry
+ version_t pdv = in->pre_dirty();
+
+ EUpdate *le = new EUpdate("anchor_destroy");
+ le->metablob.add_dir_context(in->get_parent_dir());
+
+ // update the logged inode copy
+ inode_t *pi = le->metablob.add_dentry(in->parent, true);
+ pi->anchored = true;
+ pi->version = pdv;
+
+ // log + wait
+ mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, pdv));
}
+void MDCache::_anchor_destroy_logged(CInode *in, version_t pdv)
+{
+ dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << endl;
+
+ // unpin
+ assert(in->state_test(CInode::STATE_UNANCHORING));
+ in->state_clear(CInode::STATE_UNANCHORING);
+ in->put(CInode::PIN_UNANCHORING);
+
+ // apply update to cache
+ in->inode.anchored = false;
+ in->inode.version = pdv;
+
+ // tell the anchortable we've committed
+ mds->anchorclient->commit_destroy(in->ino());
+
+ // trigger waiters
+ in->finish_waiting(CInode::WAIT_UNANCHORED, 0);
+}
+
+
+
+
+// -------------------------------------------------------------------------------
+// HARD LINKS
+
+
void MDCache::handle_inode_link(MInodeLink *m)
{
CInode *in = get_inode(m->get_ino());
assert(in->inode.nlink == 1);
dout(7) << "needs anchor, nlink=" << in->inode.nlink << ", creating anchor" << endl;
- anchor_inode(in,
- new C_MDS_RetryMessage(mds, m));
+ anchor_create(in, new C_MDS_RetryMessage(mds, m));
return;
}
in->_mark_dirty(); // fixme
// update anchor to point to inode file+mds
- vector<Anchor*> atrace;
+ vector<Anchor> atrace;
in->make_anchor_trace(atrace);
assert(atrace.size() == 1); // it's dangling
- mds->anchorclient->update(in->ino(), atrace,
- new C_MDC_DentryUnlink(this, dn, dir, c));
+ assert(0); // rewrite me w/ new anchor interface
+ //mds->anchorclient->prepare_update(in->ino(), atrace,
+ // new C_MDC_DentryUnlink(this, dn, dir, c));
return;
}
}
dout(7) << "nlink=1+primary or 0+dangling, removing anchor" << endl;
// remove anchor (async)
- mds->anchorclient->destroy(dn->inode->ino(), NULL);
+ anchor_destroy(dn->inode, NULL);
}
} else {
int auth = dn->inode->authority().first;
{
CInode *in = get_inode(m->get_ino());
assert(in);
-
- // proxy?
- if (in->is_proxy()) {
- dout(7) << "handle_inode_unlink proxy on " << *in << endl;
- mds->send_message_mds(m, in->authority().first, MDS_PORT_CACHE);
- return;
- }
assert(in->is_auth());
// do it.
in->mark_clean(); // mark it clean.
// remove anchor (async)
- mds->anchorclient->destroy(in->ino(), NULL);
+ anchor_destroy(in, NULL);
}
else {
in->_mark_dirty(); // fixme
dout(7) << "nlink=1, removing anchor" << endl;
// remove anchor (async)
- mds->anchorclient->destroy(in->ino(), NULL);
+ anchor_destroy(in, NULL);
}
}
void open_remote_dir(CInode *diri, frag_t fg, Context *fin);
void open_remote_ino(inodeno_t ino, Message *req, Context *fin);
void open_remote_ino_2(inodeno_t ino, Message *req,
- vector<Anchor*>& anchortrace,
+ vector<Anchor>& anchortrace,
Context *onfinish);
bool path_pin(vector<CDentry*>& trace, Message *m, Context *c);
void request_pin_inode(Message *req, CInode *in);
void request_pin_dir(Message *req, CDir *dir);
- // anchors
- void anchor_inode(CInode *in, Context *onfinish);
- //void unanchor_inode(CInode *in, Context *c);
+ // -- anchors --
+public:
+ void anchor_create(CInode *in, Context *onfinish);
+ void anchor_destroy(CInode *in, Context *onfinish);
+protected:
+ void _anchor_create_prepared(CInode *in);
+ void _anchor_create_logged(CInode *in, version_t pdv);
+ void _anchor_destroy_prepared(CInode *in);
+ void _anchor_destroy_logged(CInode *in, version_t pdv);
+
+ friend class C_MDC_AnchorCreatePrepared;
+ friend class C_MDC_AnchorCreateLogged;
+ friend class C_MDC_AnchorDestroyPrepared;
+ friend class C_MDC_AnchorDestroyLogged;
+
+ // -- hard links --
void handle_inode_link(class MInodeLink *m);
void handle_inode_link_ack(class MInodeLinkAck *m);
anchorclient = new AnchorClient(messenger, mdsmap);
idalloc = new IdAllocator(this);
- anchormgr = new AnchorTable(this);
+ anchortable = new AnchorTable(this);
server = new Server(this);
locker = new Locker(this, mdcache);
if (mdlog) { delete mdlog; mdlog = NULL; }
if (balancer) { delete balancer; balancer = NULL; }
if (idalloc) { delete idalloc; idalloc = NULL; }
- if (anchormgr) { delete anchormgr; anchormgr = NULL; }
+ if (anchortable) { delete anchortable; anchortable = NULL; }
if (anchorclient) { delete anchorclient; anchorclient = NULL; }
if (osdmap) { delete osdmap; osdmap = 0; }
if (mdsmap) { delete mdsmap; mdsmap = 0; }
// save anchor table
if (mdsmap->get_anchortable() == whoami)
- anchormgr->save(0); // FIXME? or detect completion via filer?
+ anchortable->save(0); // FIXME? or detect completion via filer?
if (idalloc)
idalloc->save(0); // FIXME? or detect completion via filer?
// fixme: fake out anchortable
if (mdsmap->get_anchortable() == whoami) {
dout(10) << "boot_create creating fresh anchortable" << endl;
- anchormgr->reset();
- anchormgr->save(fin->new_sub());
+ anchortable->create_fresh();
+ anchortable->save(fin->new_sub());
}
}
if (mdsmap->get_anchortable() == whoami) {
dout(2) << "boot_start opening anchor table" << endl;
- anchormgr->load(fin->new_sub());
+ anchortable->load(fin->new_sub());
} else {
dout(2) << "boot_start i have no anchor table" << endl;
}
case 2:
if (mdsmap->get_anchortable() == whoami) {
dout(2) << "boot_replay " << step << ": opening anchor table" << endl;
- anchormgr->load(new C_MDS_BootRecover(this, 3));
+ anchortable->load(new C_MDS_BootRecover(this, 3));
break;
}
dout(2) << "boot_replay " << step << ": i have no anchor table" << endl;
switch (m->get_dest_port()) {
- case MDS_PORT_ANCHORMGR:
- anchormgr->dispatch(m);
+ case MDS_PORT_ANCHORTABLE:
+ anchortable->dispatch(m);
break;
case MDS_PORT_ANCHORCLIENT:
anchorclient->dispatch(m);
IdAllocator *idalloc;
- AnchorTable *anchormgr;
+ AnchorTable *anchortable;
AnchorClient *anchorclient;
Logger *logger, *logger2;
srcdn->_mark_dirty(); // fixme
// proxy!
- in->state_set(CInode::STATE_PROXY);
- in->get(CInode::PIN_PROXY);
+ //in->state_set(CInode::STATE_PROXY);
+ //in->get(CInode::PIN_PROXY);
// generate notify list (everybody but src|dst) and send warnings
set<int> notify;
// we got all our MNotifyAck's.
// was i proxy (if not, it's cuz this was a local rename)
- if (in->state_test(CInode::STATE_PROXY)) {
+ /*if (in->state_test(CInode::STATE_PROXY)) {
dout(10) << "file_rename_ack clearing proxy bit on " << *in << endl;
in->state_clear(CInode::STATE_PROXY);
in->put(CInode::PIN_PROXY);
- }
+ }*/
// done!
if (initiator == mds->get_nodeid()) {
assert(targeti->inode.nlink == 1);
dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl;
- mdcache->anchor_inode(targeti,
- new C_MDS_RetryRequest(mds, req, diri));
+ mdcache->anchor_create(targeti,
+ new C_MDS_RetryRequest(mds, req, diri));
return;
}
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_EANCHOR_H
+#define __MDS_EANCHOR_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+#include "../Anchor.h"
+
+class EAnchor : public LogEvent {
+protected:
+ int op;
+ inodeno_t ino;
+ vector<Anchor> trace;
+ version_t version; // anchor table version
+
+ public:
+ EAnchor() : LogEvent(EVENT_ANCHOR) { }
+ EAnchor(int o, inodeno_t i, version_t v) :
+ LogEvent(EVENT_ANCHOR),
+ op(o), ino(i), version(v) { }
+
+ void set_trace(vector<Anchor>& t) { trace = t; }
+ vector<Anchor>& get_trace() { return trace; }
+
+ void encode_payload(bufferlist& bl) {
+ bl.append((char*)&op, sizeof(op));
+ bl.append((char*)&ino, sizeof(ino));
+ ::_encode(trace, bl);
+ bl.append((char*)&version, sizeof(version));
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(op), (char*)&op);
+ off += sizeof(op);
+ bl.copy(off, sizeof(ino), (char*)&ino);
+ off += sizeof(ino);
+ ::_decode(trace, bl, off);
+ bl.copy(off, sizeof(version), (char*)&version);
+ off += sizeof(version);
+ }
+
+
+ void print(ostream& out) {
+ out << "EAnchor " << get_anchor_opname(op) << " " << ino << endl;
+ }
+
+
+ bool has_expired(MDS *mds);
+ void expire(MDS *mds, Context *c);
+ void replay(MDS *mds);
+
+};
+
+#endif
#include "events/EMetaBlob.h"
#include "events/EAlloc.h"
+#include "events/EAnchor.h"
#include "events/EUpdate.h"
#include "events/EImportMap.h"
#include "MDLog.h"
#include "MDCache.h"
#include "Migrator.h"
+#include "AnchorTable.h"
#include "config.h"
#undef dout
}
+// -----------------------
+// EAnchor
+
+bool EAnchor::has_expired(MDS *mds)
+{
+ version_t cv = mds->anchortable->get_version();
+ if (cv < version) {
+ dout(10) << "EAnchor.has_expired v " << version << " > " << cv
+ << ", still dirty" << endl;
+ return false; // still dirty
+ } else {
+ dout(10) << "EAnchor.has_expired v " << version << " <= " << cv
+ << ", already flushed" << endl;
+ return true; // already flushed
+ }
+}
+
+void EAnchor::expire(MDS *mds, Context *c)
+{
+ dout(10) << "EAnchor.expire saving anchor table" << endl;
+ mds->anchortable->save(c);
+}
+
+void EAnchor::replay(MDS *mds)
+{
+ if (mds->anchortable->get_version() >= version) {
+ dout(10) << "EAnchor.replay event " << version
+ << " <= table " << mds->anchortable->get_version() << endl;
+ } else {
+ dout(10) << " EAnchor.replay event " << version
+ << " - 1 == table " << mds->anchortable->get_version() << endl;
+ assert(version-1 == mds->anchortable->get_version());
+
+ switch (op) {
+ case ANCHOR_OP_CREATE_PREPARE:
+ mds->anchortable->create_prepare(ino, trace);
+ break;
+ case ANCHOR_OP_CREATE_COMMIT:
+ mds->anchortable->create_commit(ino);
+ break;
+ case ANCHOR_OP_DESTROY_PREPARE:
+ mds->anchortable->destroy_prepare(ino);
+ break;
+ case ANCHOR_OP_DESTROY_COMMIT:
+ mds->anchortable->destroy_commit(ino);
+ break;
+ case ANCHOR_OP_UPDATE_PREPARE:
+ mds->anchortable->update_prepare(ino, trace);
+ break;
+ case ANCHOR_OP_UPDATE_COMMIT:
+ mds->anchortable->update_commit(ino);
+ break;
+ default:
+ assert(0);
+ }
+
+ assert(version == mds->anchortable->get_version());
+ }
+}
+
+
// -----------------------
// EUpdate
#define MDS_PORT_RENAMER 7
#define MDS_PORT_ANCHORCLIENT 10
-#define MDS_PORT_ANCHORMGR 11
+#define MDS_PORT_ANCHORTABLE 11
frag_t frag;
dirfrag_t() { }
- //dirfrag_t(inodeno_t i) : ino(i) { }
dirfrag_t(inodeno_t i, frag_t f) : ino(i), frag(f) { }
};
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef __MANCHORREQUEST_H
+#define __MANCHORREQUEST_H
+
+#include <vector>
+
+#include "msg/Message.h"
+#include "mds/AnchorTable.h"
+
+
+class MAnchor : public Message {
+ int op;
+ inodeno_t ino;
+ vector<Anchor> trace;
+
+ public:
+ MAnchor() {}
+ MAnchor(int o, inodeno_t i) :
+ Message(MSG_MDS_ANCHOR),
+ op(o), ino(i) { }
+
+
+ virtual char *get_type_name() { return "anchor"; }
+ void print(ostream& o) {
+ o << "anchor(" << get_anchor_opname(op) << " " << ino;
+ for (unsigned i=0; i<trace.size(); i++) {
+ o << ' ' << trace[i];
+ }
+ o << ")";
+ }
+
+ void set_trace(vector<Anchor>& trace) {
+ this->trace = trace;
+ }
+
+ int get_op() { return op; }
+ inodeno_t get_ino() { return ino; }
+ vector<Anchor>& get_trace() { return trace; }
+
+ virtual void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(op), (char*)&op);
+ off += sizeof(op);
+ payload.copy(off, sizeof(ino), (char*)&ino);
+ off += sizeof(ino);
+ ::_decode(trace, payload, off);
+ }
+
+ virtual void encode_payload() {
+ payload.append((char*)&op, sizeof(op));
+ payload.append((char*)&ino, sizeof(ino));
+ ::_encode(trace, payload);
+ /*
+ int n = trace.size();
+ payload.append((char*)&n, sizeof(int));
+ for (int i=0; i<n; i++)
+ trace[i]->_encode(payload);
+ */
+ }
+};
+
+#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MANCHORREPLY_H
-#define __MANCHORREPLY_H
-
-#include <vector>
-
-#include "msg/Message.h"
-#include "mds/AnchorTable.h"
-
-#include "MAnchorRequest.h"
-
-
-class MAnchorReply : public Message {
- int op;
- inodeno_t ino;
- vector<Anchor*> trace;
-
- public:
- MAnchorReply() {}
- MAnchorReply(MAnchorRequest *req) : Message(MSG_MDS_ANCHORREPLY) {
- this->op = req->get_op();
- this->ino = req->get_ino();
- }
- ~MAnchorReply() {
- for (unsigned i=0; i<trace.size(); i++) delete trace[i];
- }
- virtual char *get_type_name() { return "arep"; }
-
- void set_trace(vector<Anchor*>& trace) { this->trace = trace; }
-
- int get_op() { return op; }
- inodeno_t get_ino() { return ino; }
- vector<Anchor*>& get_trace() { return trace; }
-
- virtual void decode_payload() {
- int off = 0;
- payload.copy(off, sizeof(op), (char*)&op);
- off += sizeof(op);
- payload.copy(off, sizeof(ino), (char*)&ino);
- off += sizeof(ino);
- int n;
- payload.copy(off, sizeof(int), (char*)&n);
- off += sizeof(int);
- for (int i=0; i<n; i++) {
- Anchor *a = new Anchor;
- a->_decode(payload, off);
- trace.push_back(a);
- }
- }
-
- virtual void encode_payload() {
- payload.append((char*)&op, sizeof(op));
- payload.append((char*)&ino, sizeof(ino));
- int n = trace.size();
- payload.append((char*)&n, sizeof(int));
- for (int i=0; i<n; i++)
- trace[i]->_encode(payload);
- }
-};
-
-#endif
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef __MANCHORREQUEST_H
-#define __MANCHORREQUEST_H
-
-#include <vector>
-
-#include "msg/Message.h"
-#include "mds/AnchorTable.h"
-
-#define ANCHOR_OP_CREATE 1
-#define ANCHOR_OP_DESTROY 2
-#define ANCHOR_OP_LOOKUP 3
-#define ANCHOR_OP_UPDATE 4
-
-class MAnchorRequest : public Message {
- int op;
- inodeno_t ino;
- vector<Anchor*> trace;
-
- public:
- MAnchorRequest() {}
- MAnchorRequest(int op, inodeno_t ino) : Message(MSG_MDS_ANCHORREQUEST) {
- this->op = op;
- this->ino = ino;
- }
- ~MAnchorRequest() {
- for (unsigned i=0; i<trace.size(); i++) delete trace[i];
- }
- virtual char *get_type_name() { return "areq"; }
-
- void set_trace(vector<Anchor*>& trace) { this->trace = trace; }
-
- int get_op() { return op; }
- inodeno_t get_ino() { return ino; }
- vector<Anchor*>& get_trace() { return trace; }
-
- virtual void decode_payload() {
- int off = 0;
- payload.copy(off, sizeof(op), (char*)&op);
- off += sizeof(op);
- payload.copy(off, sizeof(ino), (char*)&ino);
- off += sizeof(ino);
- int n;
- payload.copy(off, sizeof(int), (char*)&n);
- off += sizeof(int);
- for (int i=0; i<n; i++) {
- Anchor *a = new Anchor;
- a->_decode(payload, off);
- trace.push_back(a);
- }
- }
-
- virtual void encode_payload() {
- payload.append((char*)&op, sizeof(op));
- payload.append((char*)&ino, sizeof(ino));
- int n = trace.size();
- payload.append((char*)&n, sizeof(int));
- for (int i=0; i<n; i++)
- trace[i]->_encode(payload);
- }
-};
-
-#endif
#include "messages/MHeartbeat.h"
-#include "messages/MAnchorRequest.h"
-#include "messages/MAnchorReply.h"
+#include "messages/MAnchor.h"
#include "messages/MInodeLink.h"
#include "messages/MInodeLinkAck.h"
m = new MCacheExpire();
break;
- case MSG_MDS_ANCHORREQUEST:
- m = new MAnchorRequest();
- break;
- case MSG_MDS_ANCHORREPLY:
- m = new MAnchorReply();
+ case MSG_MDS_ANCHOR:
+ m = new MAnchor();
break;
case MSG_MDS_INODELINK:
#define MSG_MDS_CACHEEXPIRE 125
-#define MSG_MDS_ANCHORREQUEST 130
-#define MSG_MDS_ANCHORREPLY 131
+#define MSG_MDS_ANCHOR 130
#define MSG_MDS_INODELINK 140
#define MSG_MDS_INODELINKACK 141