- DOCUMENT.
- discover
- - hard link dentries
+/ - hard link dentries
- rejoin and replicas that are not in recovered node's cache... fetch storm?
- locking madness
/ - request_auth_pin, request_drop_auth_pins, and _link/_unlink_local should pre-pin dn dir and targeti.
- - move auth_pinning _out_ of locking _start and _finish methods
- - clean up multi-auth_pin code paths (e.g. link_local)
+/ - move auth_pinning _out_ of locking _start and _finish methods
+/ - clean up multi-auth_pin code paths (e.g. link_local)
- mds failure vs clients
- - clean up client op redirection
+/ - clean up client op redirection
- idempotent ops
+ - how to track in memory?
+ - how to trim?
+ - EMetablob replay, expire logic
- journal+recovery
- test anchortable, anchorclient
/ - local link
if (op == MDS_OP_STAT ||
op == MDS_OP_LSTAT ||
op == MDS_OP_READDIR ||
- op == MDS_OP_OPEN ||
- op == MDS_OP_RELEASE)
+ op == MDS_OP_OPEN)
nojournal = true;
MClientReply *reply = sendrecv(req, mds);
// encode payload now, in case we have to resend (in case of mds failure)
req->encode_payload();
request.request_payload = req->get_payload();
+
+ // note idempotency
+ request.idempotent = req->is_idempotent();
// send initial request.
send_request(&request, mds);
MetaRequest *request = mds_requests[tid];
assert(request);
- // note new mds set.
- // there are now exactly two mds's whose failure should trigger a resend
- // of this request.
- if (request->num_fwd < fwd->get_num_fwd()) {
+ if (request->idempotent) {
+ // note new mds set.
+ // there are now exactly two mds's whose failure should trigger a resend
+ // of this request.
+ if (request->num_fwd < fwd->get_num_fwd()) {
+ request->mds.clear();
+ request->mds.insert(fwd->get_source().num());
+ request->mds.insert(fwd->get_dest_mds());
+ request->num_fwd = fwd->get_num_fwd();
+ dout(10) << "handle_client_request tid " << tid
+ << " fwd " << fwd->get_num_fwd()
+ << " to mds" << fwd->get_dest_mds()
+ << ", mds set now " << request->mds
+ << endl;
+ } else {
+ dout(10) << "handle_client_request tid " << tid
+ << " previously forwarded to mds" << fwd->get_dest_mds()
+ << ", mds still " << request->mds
+ << endl;
+ }
+ } else {
request->mds.clear();
- request->mds.insert(fwd->get_source().num());
request->mds.insert(fwd->get_dest_mds());
request->num_fwd = fwd->get_num_fwd();
+
dout(10) << "handle_client_request tid " << tid
<< " fwd " << fwd->get_num_fwd()
<< " to mds" << fwd->get_dest_mds()
- << ", mds set now " << request->mds
- << endl;
- } else {
- dout(10) << "handle_client_request tid " << tid
- << " previously forwarded to mds" << fwd->get_dest_mds()
- << ", mds still " << request->mds
+ << ", non-idempotent, resending to " << fwd->get_dest_mds()
<< endl;
+
+ send_request(request, fwd->get_dest_mds());
}
delete fwd;
/****** file i/o **********/
-int Client::open(const char *relpath, int flags)
+int Client::open(const char *relpath, int flags, mode_t mode)
{
client_lock.Lock();
tout << path << endl;
tout << flags << endl;
- int cmode = 0;
- bool tryauth = false;
- if (flags & O_LAZY)
- cmode = FILE_MODE_LAZY;
- else if (flags & O_WRONLY) {
- cmode = FILE_MODE_W;
- tryauth = true;
- } else if (flags & O_RDWR) {
- cmode = FILE_MODE_RW;
- tryauth = true;
- } else if (flags & O_APPEND) {
- cmode = FILE_MODE_W;
- tryauth = true;
- } else
- cmode = FILE_MODE_R;
-
// go
MClientRequest *req = new MClientRequest(MDS_OP_OPEN, messenger->get_myinst());
req->set_path(path);
req->args.open.flags = flags;
- req->args.open.mode = cmode;
+ req->args.open.mode = mode;
+
+ int cmode = req->get_open_file_mode();
+ bool tryauth = !req->open_file_mode_is_readonly();
// FIXME where does FUSE maintain user information
req->set_caller_uid(getuid());
#include <ext/hash_map>
using namespace __gnu_cxx;
-#define O_LAZY 01000000
-
class MClientRequest;
class MClientRequestForward;
MClientRequest *request;
bufferlist request_payload; // in case i have to retry
+ bool idempotent; // is request idempotent?
set<int> mds; // who i am asking
int num_fwd; // # of times i've been forwarded
MetaRequest(MClientRequest *req, tid_t t) :
tid(t), request(req),
- num_fwd(0),
+ idempotent(false), num_fwd(0),
reply(0),
caller_cond(0), dispatch_cond(0) { }
};
// file ops
int mknod(const char *path, mode_t mode);
- int open(const char *path, int mode);
+ int open(const char *path, int flags, mode_t mode=0);
int close(fh_t fh);
off_t lseek(fh_t fh, off_t offset, int whence);
int read(fh_t fh, char *buf, off_t size, off_t offset=-1);
void SyntheticClient::foo()
{
// link fun
- /*
client->mknod("one", 0755);
client->mknod("two", 0755);
client->link("one", "three");
client->mkdir("dir", 0755);
client->link("two", "/dir/twolink");
client->link("dir/twolink", "four");
- */
// unlink fun
client->mknod("a", 0644);
{
int res;
- res = client->open(path, fi->flags);
+ res = client->open(path, fi->flags, 0);
if (res < 0) return res;
fi->fh = res;
return 0; // fuse wants 0 onsucess
If the MDS fails before the metadata update has been journaled, no
action is taken, since nothing is known about the previously proposed
transaction. If an AGREE message is received and there is no
-corresponding PREPARE state, and ROLLBACK is sent to the anchor table.
+corresponding PREPARE or pending-commit state, and ROLLBACK is sent to
+the anchor table.
If the MDS fails after journaling the metadata update but before
journaling the ACK, it resends COMMIT to the anchor table. If it
+#define O_LAZY 01000000
+
/** object layout
#include "MDS.h"
#include "MDLog.h"
-#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
#include "messages/MAnchor.h"
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
-#define derr(x) if (x <= g_conf.debug) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
+#define dout(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
+#define derr(x) if (x <= g_conf.debug_mds) cout << g_clock.now() << " " << mds->messenger->get_myaddr() << ".anchorclient "
void AnchorClient::dispatch(Message *m)
inodeno_t ino = m->get_ino();
version_t atid = m->get_atid();
+ dout(10) << "handle_anchor_reply " << *m << endl;
+
switch (m->get_op()) {
// lookup
onfinish->finish(0);
delete onfinish;
}
- } else {
+ }
+ else if (pending_commit.count(atid)) {
+ dout(10) << "stray create_agree on " << ino
+ << " atid " << atid
+ << ", already committing, resending COMMIT"
+ << endl;
+ MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+ mds->messenger->send_message(req,
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ }
+ else {
dout(10) << "stray create_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
onfinish->finish(0);
delete onfinish;
}
- } else {
+ }
+ else if (pending_commit.count(atid)) {
+ dout(10) << "stray destroy_agree on " << ino
+ << " atid " << atid
+ << ", already committing, resending COMMIT"
+ << endl;
+ MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+ mds->messenger->send_message(req,
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ }
+ else {
dout(10) << "stray destroy_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
onfinish->finish(0);
delete onfinish;
}
- } else {
+ }
+ else if (pending_commit.count(atid)) {
+ dout(10) << "stray update_agree on " << ino
+ << " atid " << atid
+ << ", already committing, resending COMMIT"
+ << endl;
+ MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
+ mds->messenger->send_message(req,
+ mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ }
+ else {
dout(10) << "stray update_agree on " << ino
<< " atid " << atid
<< ", sending ROLLBACK"
pending_commit.erase(atid);
// log ACK.
- mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_ACK, 0, atid)); // ino doesn't matter.
+ mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid));
// kick any waiters
if (ack_waiters.count(atid)) {
pending_lookup[ino].onfinish = onfinish;
pending_lookup[ino].trace = &trace;
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(req,
+ mds->mdsmap->get_anchortable(),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
pending_create_prepare[ino].patid = patid;
pending_create_prepare[ino].onfinish = onfinish;
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(req,
+ mds->mdsmap->get_anchortable(),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
void AnchorClient::prepare_destroy(inodeno_t ino,
void AnchorClient::finish_recovery()
{
- dout(7) << "finish_recovery - sending COMMIT on un-ACKed atids" << endl;
+ dout(7) << "finish_recovery" << endl;
+
+ resend_commits();
+}
+void AnchorClient::resend_commits()
+{
for (set<version_t>::iterator p = pending_commit.begin();
p != pending_commit.end();
++p) {
- dout(10) << " sending COMMIT on " << *p << endl;
+ dout(10) << "resending commit on " << *p << endl;
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p);
- mds->messenger->send_message(req,
- mds->mdsmap->get_inst(mds->mdsmap->get_anchortable()),
- MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ mds->send_message_mds(req,
+ mds->mdsmap->get_anchortable(),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
}
}
+void AnchorClient::resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op)
+{
+ for (hash_map<inodeno_t, _pending_prepare>::iterator p = prepares.begin();
+ p != prepares.end();
+ p++) {
+ dout(10) << "resending " << get_anchor_opname(op) << " on " << p->first << endl;
+ MAnchor *req = new MAnchor(op, p->first);
+ req->set_trace(p->second.trace);
+ mds->send_message_mds(req,
+ mds->mdsmap->get_anchortable(),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ }
+}
+
+
+void AnchorClient::handle_mds_recovery(int who)
+{
+ dout(7) << "handle_mds_recovery mds" << who << endl;
+
+ if (who != mds->mdsmap->get_anchortable())
+ return; // do nothing.
+
+ // resend any pending lookups.
+ for (hash_map<inodeno_t, _pending_lookup>::iterator p = pending_lookup.begin();
+ p != pending_lookup.end();
+ p++) {
+ dout(10) << "resending lookup on " << p->first << endl;
+ mds->send_message_mds(new MAnchor(ANCHOR_OP_LOOKUP, p->first),
+ mds->mdsmap->get_anchortable(),
+ MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
+ }
+
+ // resend any pending prepares.
+ resend_prepares(pending_create_prepare, ANCHOR_OP_CREATE_PREPARE);
+ resend_prepares(pending_update_prepare, ANCHOR_OP_UPDATE_PREPARE);
+ resend_prepares(pending_destroy_prepare, ANCHOR_OP_DESTROY_PREPARE);
+
+ // resend any pending commits.
+ resend_commits();
+}
// for recovery (by other nodes)
void handle_mds_recovery(int mds); // called when someone else recovers
+ void resend_commits();
+ void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
+
// for recovery (by me)
void got_journaled_agree(version_t atid) {
pending_commit.insert(atid);
// MIDLEVEL
-void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace)
+void AnchorTable::create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
{
// make sure trace is in table
for (unsigned i=0; i<trace.size(); i++)
version++;
pending_create[version] = ino; // so we can undo
+ pending_reqmds[version] = reqmds;
//dump();
}
-void AnchorTable::destroy_prepare(inodeno_t ino)
+void AnchorTable::destroy_prepare(inodeno_t ino, int reqmds)
{
version++;
pending_destroy[version] = ino;
+ pending_reqmds[version] = reqmds;
//dump();
}
-void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace)
+void AnchorTable::update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds)
{
version++;
pending_update[version].first = ino;
pending_update[version].second = trace;
+ pending_reqmds[version] = reqmds;
//dump();
}
else
assert(0);
+ pending_reqmds.erase(atid);
+
// bump version.
version++;
//dump();
else
assert(0);
+ pending_reqmds.erase(atid);
+
// bump version.
version++;
//dump();
dout(7) << "handle_create_prepare " << ino << endl;
- create_prepare(ino, trace);
+ create_prepare(ino, trace, req->get_source().num());
// log it
- EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version);
+ EAnchor *le = new EAnchor(ANCHOR_OP_CREATE_PREPARE, ino, version, req->get_source().num());
le->set_trace(trace);
mds->mdlog->submit_entry(le,
new C_AT_CreatePrepare(this, req, version));
inodeno_t ino = req->get_ino();
dout(7) << "handle_destroy_prepare " << ino << endl;
- destroy_prepare(ino);
+ destroy_prepare(ino, req->get_source().num());
- mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version),
+ mds->mdlog->submit_entry(new EAnchor(ANCHOR_OP_DESTROY_PREPARE, ino, version, req->get_source().num()),
new C_AT_DestroyPrepare(this, req, version));
}
dout(7) << "handle_update_prepare " << ino << endl;
- update_prepare(ino, trace);
+ update_prepare(ino, trace, req->get_source().num());
// log it
- EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version);
+ EAnchor *le = new EAnchor(ANCHOR_OP_UPDATE_PREPARE, ino, version, req->get_source().num());
le->set_trace(trace);
mds->mdlog->submit_entry(le,
new C_AT_UpdatePrepare(this, req, version));
return;
}
+ dout(10) << "handle_anchor_request " << *req << endl;
+
// go
switch (req->get_op()) {
void AnchorTable::save(Context *onfinish)
{
dout(7) << "save v " << version << endl;
- if (!opened) return;
+ if (!opened) {
+ assert(!onfinish);
+ return;
+ }
if (onfinish)
waiting_for_save[version].push_back(onfinish);
}
// pending
+ ::_encode(pending_reqmds, bl);
::_encode(pending_create, bl);
::_encode(pending_destroy, bl);
dout(15) << "load_2 decoded " << a << endl;
}
+ ::_decode(pending_reqmds, bl, off);
::_decode(pending_create, bl, off);
::_decode(pending_destroy, bl, off);
finish_contexts(waiting_for_open);
}
+
+//////
+
+void AnchorTable::finish_recovery()
+{
+ dout(7) << "finish_recovery" << endl;
+
+ for (map<version_t, inodeno_t>::iterator p = pending_create.begin();
+ p != pending_create.end();
+ ++p) {
+ MAnchor *reply = new MAnchor(ANCHOR_OP_CREATE_AGREE, p->second, p->first);
+ mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+ }
+ for (map<version_t, inodeno_t>::iterator p = pending_destroy.begin();
+ p != pending_destroy.end();
+ ++p) {
+ MAnchor *reply = new MAnchor(ANCHOR_OP_DESTROY_AGREE, p->second, p->first);
+ mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+ }
+ for (map<version_t, pair<inodeno_t, vector<Anchor> > >::iterator p = pending_update.begin();
+ p != pending_update.end();
+ ++p) {
+ MAnchor *reply = new MAnchor(ANCHOR_OP_UPDATE_AGREE, p->second.first, p->first);
+ mds->messenger->send_message(reply, mds->mdsmap->get_inst(pending_reqmds[p->first]), MDS_PORT_ANCHORCLIENT);
+ }
+}
+
+
hash_map<inodeno_t, Anchor> anchor_map;
// uncommitted operations
+ map<version_t, int> pending_reqmds;
map<version_t, inodeno_t> pending_create;
map<version_t, inodeno_t> pending_destroy;
map<version_t, pair<inodeno_t, vector<Anchor> > > pending_update;
void dec(inodeno_t ino);
// mid-level
- void create_prepare(inodeno_t ino, vector<Anchor>& trace);
- void destroy_prepare(inodeno_t ino);
- void update_prepare(inodeno_t ino, vector<Anchor>& trace);
+ void create_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
+ void destroy_prepare(inodeno_t ino, int reqmds);
+ void update_prepare(inodeno_t ino, vector<Anchor>& trace, int reqmds);
void commit(version_t atid);
void rollback(version_t atid);
friend class EAnchor; // used for journal replay.
void load(Context *onfinish);
void _loaded(bufferlist& bl);
+ // recovery
+ void finish_recovery();
};
string& get_dname() { return dname; }
int get_nonce() { return replica_nonce; }
+ bool is_remote() { return remote_ino ? true:false; }
+ inodeno_t get_remote_ino() { return remote_ino; }
void update_dentry(CDentry *dn) {
dn->set_replica_nonce( replica_nonce );
+ if (remote_ino)
+ dn->set_remote_ino(remote_ino);
+ }
+ void update_new_dentry(CDentry *dn) {
+ update_dentry(dn);
dn->set_lockstate( lockstate );
}
//assert(null_items.count(dn->name) == 1);
//null_items.erase(dn->name);
nnull--;
+ assert(nnull + nitems == items.size());
}
void CDir::link_inode( CDentry *dn, CInode *in )
class CDirExport {
struct {
dirfrag_t dirfrag;
- long nitems; // actual real entries
long nden; // num dentries (including null ones)
version_t version;
version_t committed_version;
assert(dir->get_version() == dir->get_projected_version());
st.dirfrag = dir->dirfrag();
- st.nitems = dir->nitems;
st.nden = dir->items.size();
st.version = dir->version;
st.committed_version = dir->committed_version;
void update_dir(CDir *dir) {
assert(dir->dirfrag() == st.dirfrag);
- //dir->nitems = st.nitems;
-
// set committed_version at old version
dir->committing_version = dir->committed_version = st.committed_version;
dir->projected_version = dir->version = st.version;
return false;
}
-bool Locker::inode_hard_read_start(CInode *in, MClientRequest *m)
+bool Locker::inode_hard_read_start(CInode *in, MClientRequest *m, CInode *ref)
{
dout(7) << "inode_hard_read_start on " << *in << endl;
// wait!
dout(7) << "inode_hard_read_start waiting on " << *in << endl;
- in->add_waiter(CInode::WAIT_HARDR, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_HARDR, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
}
-bool Locker::inode_hard_write_start(CInode *in, MClientRequest *m)
+bool Locker::inode_hard_write_start(CInode *in, MClientRequest *m, CInode *ref)
{
dout(7) << "inode_hard_write_start on " << *in << endl;
}
dout(7) << "inode_hard_write_start waiting on " << *in << endl;
- in->add_waiter(CInode::WAIT_HARDW, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_HARDW, new C_MDS_RetryRequest(mds, m, ref));
return false;
} else {
// soft inode metadata
-bool Locker::inode_file_read_start(CInode *in, MClientRequest *m)
+bool Locker::inode_file_read_start(CInode *in, MClientRequest *m, CInode *ref)
{
dout(7) << "inode_file_read_start " << *in << " filelock=" << in->filelock << endl;
}
} else {
dout(7) << "inode_file_read_start waiting until stable on " << *in << ", filelock=" << in->filelock << endl;
- in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
} else {
} else {
// wait until stable
dout(7) << "inode_file_read_start waiting until stable on " << *in << ", filelock=" << in->filelock << endl;
- in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
}
// wait
dout(7) << "inode_file_read_start waiting on " << *in << ", filelock=" << in->filelock << endl;
- in->add_waiter(CInode::WAIT_FILER, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_FILER, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
}
-bool Locker::inode_file_write_start(CInode *in, MClientRequest *m)
+bool Locker::inode_file_write_start(CInode *in, MClientRequest *m, CInode *ref)
{
dout(7) << "inode_file_write_start on " << *in << endl;
if (!in->filelock.can_write_soon(in->is_auth())) {
if (!in->filelock.is_stable()) {
dout(7) << "inode_file_write_start on auth, waiting for stable on " << *in << endl;
- in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_FILESTABLE, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
return true;
} else {
dout(7) << "inode_file_write_start on auth, waiting for write on " << *in << endl;
- in->add_waiter(CInode::WAIT_FILEW, new C_MDS_RetryRequest(mds, m, in));
+ in->add_waiter(CInode::WAIT_FILEW, new C_MDS_RetryRequest(mds, m, ref));
return false;
}
}
// high level interface
public:
bool inode_hard_read_try(CInode *in, Context *con);
- bool inode_hard_read_start(CInode *in, MClientRequest *m);
+ bool inode_hard_read_start(CInode *in, MClientRequest *m, CInode *ref);
void inode_hard_read_finish(CInode *in);
- bool inode_hard_write_start(CInode *in, MClientRequest *m);
+ bool inode_hard_write_start(CInode *in, MClientRequest *m, CInode *ref);
void inode_hard_write_finish(CInode *in);
- bool inode_file_read_start(CInode *in, MClientRequest *m);
+ bool inode_file_read_start(CInode *in, MClientRequest *m, CInode *ref);
void inode_file_read_finish(CInode *in);
- bool inode_file_write_start(CInode *in, MClientRequest *m);
+ bool inode_file_write_start(CInode *in, MClientRequest *m, CInode *ref);
void inode_file_write_finish(CInode *in);
void inode_hard_eval(CInode *in);
#include "events/EMount.h"
#include "events/EClientMap.h"
#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
#include "events/EAlloc.h"
#include "events/EPurgeFinish.h"
#include "events/EExport.h"
case EVENT_MOUNT: le = new EMount(); break;
case EVENT_CLIENTMAP: le = new EClientMap(); break;
case EVENT_ANCHOR: le = new EAnchor(); break;
+ case EVENT_ANCHORCLIENT: le = new EAnchorClient(); break;
case EVENT_ALLOC: le = new EAlloc(); break;
case EVENT_EXPORT: le = new EExport; break;
case EVENT_IMPORTSTART: le = new EImportStart; break;
#define EVENT_CLIENTMAP 7
#define EVENT_ANCHOR 8
+#define EVENT_ANCHORCLIENT 9
#define EVENT_ALLOC 10
#define EVENT_MKNOD 11
}
}
- if (lru.lru_get_size() == 0) {
+ if (lru.lru_get_size() == 0 && root) {
list<CDir*> ls;
root->get_dirfrags(ls);
for (list<CDir*>::iterator p = ls.begin();
dout(10) << "traverse: relative symlink, path now " << path << " depth " << depth << endl;
}
continue;
- } else {
- // keep going.
+ }
- // forwarder wants replicas?
- if (is_client_req && ((MClientRequest*)req)->get_mds_wants_replica_in_dirino()) {
- dout(30) << "traverse: REP is here, " << ((MClientRequest*)req)->get_mds_wants_replica_in_dirino() << " vs " << curdir->dirfrag() << endl;
-
- if (((MClientRequest*)req)->get_mds_wants_replica_in_dirino() == curdir->ino() &&
- curdir->is_auth() &&
- curdir->is_rep() &&
- curdir->is_replica(req->get_source().num()) &&
- dn->is_auth()
- ) {
- assert(req->get_source().is_mds());
- int from = req->get_source().num();
-
- if (dn->is_replica(from)) {
- dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by "
- << req->get_source() << " dn " << *dn << endl;
- } else {
- dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl;
- MDiscoverReply *reply = new MDiscoverReply(curdir->ino());
- reply->add_dentry( dn->replicate_to( from ) );
- if (dn->is_primary())
- reply->add_inode( dn->inode->replicate_to( from ) );
- mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
- }
- }
- }
-
- trace.push_back(dn);
- cur = dn->inode;
- touch_inode(cur);
- depth++;
- continue;
+ // forwarder wants replicas?
+ if (is_client_req && ((MClientRequest*)req)->get_mds_wants_replica_in_dirino()) {
+ dout(30) << "traverse: REP is here, " << ((MClientRequest*)req)->get_mds_wants_replica_in_dirino() << " vs " << curdir->dirfrag() << endl;
+
+ if (((MClientRequest*)req)->get_mds_wants_replica_in_dirino() == curdir->ino() &&
+ curdir->is_auth() &&
+ curdir->is_rep() &&
+ curdir->is_replica(req->get_source().num()) &&
+ dn->is_auth()
+ ) {
+ assert(req->get_source().is_mds());
+ int from = req->get_source().num();
+
+ if (dn->is_replica(from)) {
+ dout(15) << "traverse: REP would replicate to mds" << from << ", but already cached_by "
+ << req->get_source() << " dn " << *dn << endl;
+ } else {
+ dout(10) << "traverse: REP replicating to " << req->get_source() << " dn " << *dn << endl;
+ MDiscoverReply *reply = new MDiscoverReply(curdir->ino());
+ reply->add_dentry( dn->replicate_to( from ) );
+ if (dn->is_primary())
+ reply->add_inode( dn->inode->replicate_to( from ) );
+ mds->send_message_mds(reply, req->get_source().num(), MDS_PORT_CACHE);
+ }
+ }
}
+
+ // add to trace, continue.
+ trace.push_back(dn);
+ cur = dn->inode;
+ touch_inode(cur);
+ depth++;
+ continue;
}
- // MISS. don't have it.
+ // MISS. dentry doesn't exist.
dout(12) << "traverse: miss on dentry " << path[depth] << " in " << *curdir << endl;
if (curdir->is_auth()) {
if (dn) {
dout(7) << "had " << *dn << endl;
- dn->replica_nonce = m->get_dentry(i).get_nonce(); // fix nonce.
+ m->get_dentry(i).update_dentry(dn);
} else {
dn = curdir->add_dentry( m->get_dentry(i).get_dname(), 0, false );
- m->get_dentry(i).update_dentry(dn);
+ m->get_dentry(i).update_new_dentry(dn);
dout(7) << "added " << *dn << endl;
}
if (dbl > g_conf.debug && dbl > g_conf.debug_mds)
return; // i won't print anything.
+ if (!root) {
+ dout(dbl) << "no subtrees" << endl;
+ return;
+ }
+
list<pair<CDir*,int> > q;
string indent;
{
// client request?
if (req->get_type() == MSG_CLIENT_REQUEST) {
- // tell the client
MClientRequest *creq = (MClientRequest*)req;
creq->inc_num_fwd(); // inc forward counter
+
+ // tell the client
messenger->send_message(new MClientRequestForward(creq->get_tid(), mds, creq->get_num_fwd()),
creq->get_client_inst());
+
+ if (!creq->is_idempotent())
+ return; // don't forward if non-idempotent
}
// forward
// did i just recover?
if (oldstate == MDSMap::STATE_REJOIN) {
dout(1) << "successful recovery!" << endl;
-
+
+ // kick anchortable (resent AGREEs)
+ if (mdsmap->get_anchortable() == whoami)
+ anchortable->finish_recovery();
+
// kick anchorclient (resent COMMITs)
anchorclient->finish_recovery();
if (*p == whoami) continue; // not me
if (oldactive.count(*p)) continue; // newly so?
mdcache->handle_mds_recovery(*p);
+ anchorclient->handle_mds_recovery(*p);
}
}
friend class MDSMonitor;
public:
- MDSMap() : epoch(0), same_inst_since(0), anchortable(0), root(0) {}
+ MDSMap() : epoch(0), same_inst_since(0), anchortable(1), root(0) {}
epoch_t get_epoch() const { return epoch; }
void inc_epoch() { epoch++; }
case MDS_OP_TRUNCATE:
if (!req->args.truncate.ino) break; // can be called w/ either fh OR path
- case MDS_OP_RELEASE:
case MDS_OP_FSYNC:
ref = mdcache->get_inode(req->args.fsync.ino); // fixme someday no ino needed?
int mask = req->args.stat.mask;
if (mask & (INODE_MASK_SIZE|INODE_MASK_MTIME)) {
// yes. do a full stat.
- if (!mds->locker->inode_file_read_start(ref, req))
+ if (!mds->locker->inode_file_read_start(ref, req, ref))
return; // syncing
mds->locker->inode_file_read_finish(ref);
} else {
mdcache->request_auth_pin(req, cur);
// write
- if (!mds->locker->inode_file_write_start(cur, req))
+ if (!mds->locker->inode_file_write_start(cur, req, cur))
return; // fw or (wait for) sync
mds->balancer->hit_inode(cur, META_POP_IWR);
// log + wait
EUpdate *le = new EUpdate("utime");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(cur->get_parent_dir());
inode_t *pi = le->metablob.add_dentry(cur->parent, true);
pi->mtime = mtime;
mdcache->request_auth_pin(req, cur);
// write
- if (!mds->locker->inode_hard_write_start(cur, req))
+ if (!mds->locker->inode_hard_write_start(cur, req, cur))
return; // fw or (wait for) lock
mds->balancer->hit_inode(cur, META_POP_IWR);
// log + wait
EUpdate *le = new EUpdate("chmod");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(cur->get_parent_dir());
inode_t *pi = le->metablob.add_dentry(cur->parent, true);
pi->mode = mode;
mdcache->request_auth_pin(req, cur);
// write
- if (!mds->locker->inode_hard_write_start(cur, req))
+ if (!mds->locker->inode_hard_write_start(cur, req, cur))
return; // fw or (wait for) lock
mds->balancer->hit_inode(cur, META_POP_IWR);
// log + wait
EUpdate *le = new EUpdate("chown");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(cur->get_parent_dir());
inode_t *pi = le->metablob.add_dentry(cur->parent, true);
if (uid >= 0) pi->uid = uid;
assert(dir->is_auth());
// check perm
- if (!mds->locker->inode_hard_read_start(diri,req))
+ if (!mds->locker->inode_hard_read_start(diri, req, diri))
return;
mds->locker->inode_hard_read_finish(diri);
// prepare finisher
C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
EUpdate *le = new EUpdate("mknod");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(dir);
inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
pi->version = dn->get_projected_version();
// prepare finisher
C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
EUpdate *le = new EUpdate("mkdir");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(dir);
inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
pi->version = dn->get_projected_version();
// prepare finisher
C_MDS_mknod_finish *fin = new C_MDS_mknod_finish(mds, req, dn, newi);
EUpdate *le = new EUpdate("symlink");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(dir);
inode_t *pi = le->metablob.add_primary_dentry(dn, true, newi);
pi->version = dn->get_projected_version();
mdcache->request_auth_pin(req, targeti);
// sweet. let's get our locks.
- // lock dentry
- if (!mds->locker->dentry_xlock_start(dn, req, diri))
- return;
-
- // lock target inode
- if (!mds->locker->inode_hard_write_start(targeti, req))
+ // lock dentry, target inode
+ if (!mds->locker->dentry_xlock_start(dn, req, diri) ||
+ !mds->locker->inode_hard_write_start(targeti, req, diri))
return;
// ok, let's do it.
// prepare log entry
EUpdate *le = new EUpdate("link_local");
+ le->metablob.add_client_req(req->get_reqid());
// predirty
dn->pre_dirty();
dout(10) << "_link_local_finish " << *dn << " to " << *targeti << endl;
// link and unlock the new dentry
- dn->set_remote_ino(targeti->ino());
+ dn->dir->link_inode(dn, targeti->ino());
dn->set_version(dpv);
dn->mark_dirty(dpv);
mdcache->request_auth_pin(req, in);
// lock
- if (!mds->locker->dentry_xlock_start(dn, req, dn->get_dir()->get_inode()))
- return;
- if (!mds->locker->inode_hard_write_start(in, req))
+ if (!mds->locker->dentry_xlock_start(dn, req, dn->get_dir()->get_inode()) ||
+ !mds->locker->inode_hard_write_start(in, req, dn->get_dir()->get_inode()))
return;
} else {
// the inode will go away.
// ok, let's do it.
// prepare log entry
EUpdate *le = new EUpdate("unlink_local");
+ le->metablob.add_client_req(req->get_reqid());
// predirty
version_t ipv = in->pre_dirty();
mdcache->request_auth_pin(req, cur);
// write
- if (!mds->locker->inode_file_write_start(cur, req))
+ if (!mds->locker->inode_file_write_start(cur, req, cur))
return; // fw or (wait for) lock
// check permissions
void Server::handle_client_open(MClientRequest *req, CInode *cur)
{
int flags = req->args.open.flags;
- int mode = req->args.open.mode;
+ int cmode = req->get_open_file_mode();
dout(7) << "open " << flags << " on " << *cur << endl;
- dout(10) << "open flags = " << flags << " mode = " << mode << endl;
+ dout(10) << "open flags = " << flags << " filemode = " << cmode << endl;
// is it a file?
- if (!(cur->inode.mode & INODE_MODE_FILE)) {
+ if (!(cmode & INODE_MODE_FILE)) {
dout(7) << "not a regular file" << endl;
reply_request(req, -EINVAL); // FIXME what error do we want?
return;
}
// auth for write access
- if (mode != FILE_MODE_R && mode != FILE_MODE_LAZY &&
+ if (cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY &&
!cur->is_auth()) {
int auth = cur->authority().first;
assert(auth != mds->get_nodeid());
mdcache->request_auth_pin(req, cur);
// write
- if (!mds->locker->inode_file_write_start(cur, req))
+ if (!mds->locker->inode_file_write_start(cur, req, cur))
return; // fw or (wait for) lock
// do update
// can we issue the caps they want?
version_t fdv = mds->locker->issue_file_data_version(cur);
- Capability *cap = mds->locker->issue_new_caps(cur, mode, req);
+ Capability *cap = mds->locker->issue_new_caps(cur, cmode, req);
if (!cap) return; // can't issue (yet), so wait!
dout(12) << "open gets caps " << cap_string(cap->pending()) << " for " << req->get_source() << " on " << *cur << endl;
// prepare finisher
C_MDS_openc_finish *fin = new C_MDS_openc_finish(mds, req, dn, in);
EUpdate *le = new EUpdate("openc");
+ le->metablob.add_client_req(req->get_reqid());
le->metablob.add_dir_context(dir);
inode_t *pi = le->metablob.add_primary_dentry(dn, true, in);
pi->version = dn->get_projected_version();
version_t atid;
vector<Anchor> trace;
version_t version; // anchor table version
+ int reqmds;
public:
EAnchor() : LogEvent(EVENT_ANCHOR) { }
- EAnchor(int o, inodeno_t i, version_t v) :
+ EAnchor(int o, inodeno_t i, version_t v, int rm) :
LogEvent(EVENT_ANCHOR),
- op(o), ino(i), atid(0), version(v) { }
- EAnchor(int o, version_t a, version_t v=0) :
+ op(o), ino(i), atid(0), version(v), reqmds(rm) { }
+ EAnchor(int o, version_t a, version_t v) :
LogEvent(EVENT_ANCHOR),
- op(o), atid(a), version(v) { }
+ op(o), atid(a), version(v), reqmds(-1) { }
void set_trace(vector<Anchor>& t) { trace = t; }
vector<Anchor>& get_trace() { return trace; }
bl.append((char*)&atid, sizeof(atid));
::_encode(trace, bl);
bl.append((char*)&version, sizeof(version));
+ bl.append((char*)&reqmds, sizeof(reqmds));
}
void decode_payload(bufferlist& bl, int& off) {
bl.copy(off, sizeof(op), (char*)&op);
::_decode(trace, bl, off);
bl.copy(off, sizeof(version), (char*)&version);
off += sizeof(version);
+ bl.copy(off, sizeof(reqmds), (char*)&reqmds);
+ off += sizeof(reqmds);
}
void print(ostream& out) {
if (ino) out << " " << ino;
if (atid) out << " atid " << atid;
if (version) out << " v " << version;
+ if (reqmds >= 0) out << " by mds" << reqmds;
}
bool has_expired(MDS *mds);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_EANCHORCLIENT_H
+#define __MDS_EANCHORCLIENT_H
+
+#include <assert.h>
+#include "config.h"
+#include "include/types.h"
+
+#include "../LogEvent.h"
+#include "../Anchor.h"
+
+class EAnchorClient : public LogEvent {
+protected:
+ int op;
+ version_t atid;
+
+ public:
+ EAnchorClient() : LogEvent(EVENT_ANCHORCLIENT) { }
+ EAnchorClient(int o, version_t at) :
+ LogEvent(EVENT_ANCHORCLIENT),
+ op(o), atid(at) { }
+
+ void encode_payload(bufferlist& bl) {
+ bl.append((char*)&op, sizeof(op));
+ bl.append((char*)&atid, sizeof(atid));
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ bl.copy(off, sizeof(op), (char*)&op);
+ off += sizeof(op);
+ bl.copy(off, sizeof(atid), (char*)&atid);
+ off += sizeof(atid);
+ }
+
+ void print(ostream& out) {
+ out << "EAnchorClient " << get_anchor_opname(op);
+ if (atid) out << " atid " << atid;
+ }
+
+ bool has_expired(MDS *mds);
+ void expire(MDS *mds, Context *c);
+ void replay(MDS *mds);
+
+};
+
+#endif
#include "../CInode.h"
#include "../CDir.h"
#include "../CDentry.h"
-
+#include "include/reqid.h"
class MDS;
// inodes i've destroyed.
list<inode_t> destroyed_inodes;
+ // idempotent op(s)
+ list<reqid_t> client_reqs;
+
public:
+ void add_client_req(reqid_t r) {
+ client_reqs.push_back(r);
+ }
+
void add_anchor_transaction(version_t atid) {
atids.push_back(atid);
}
}
::_encode(atids, bl);
::_encode(destroyed_inodes, bl);
+ ::_encode(client_reqs, bl);
}
void _decode(bufferlist& bl, int& off) {
int n;
}
::_decode(atids, bl, off);
::_decode(destroyed_inodes, bl, off);
+ ::_decode(client_reqs, bl, off);
}
void print(ostream& out) const {
#include "events/EMetaBlob.h"
#include "events/EAlloc.h"
#include "events/EAnchor.h"
+#include "events/EAnchorClient.h"
#include "events/EUpdate.h"
#include "events/EImportMap.h"
switch (op) {
// anchortable
case ANCHOR_OP_CREATE_PREPARE:
- mds->anchortable->create_prepare(ino, trace);
+ mds->anchortable->create_prepare(ino, trace, reqmds);
break;
case ANCHOR_OP_DESTROY_PREPARE:
- mds->anchortable->destroy_prepare(ino);
+ mds->anchortable->destroy_prepare(ino, reqmds);
break;
case ANCHOR_OP_UPDATE_PREPARE:
- mds->anchortable->update_prepare(ino, trace);
+ mds->anchortable->update_prepare(ino, trace, reqmds);
break;
case ANCHOR_OP_COMMIT:
mds->anchortable->commit(atid);
break;
- // anchorclient
- case ANCHOR_OP_ACK:
- mds->anchorclient->got_journaled_ack(atid);
- break;
-
default:
assert(0);
}
}
+// EAnchorClient
+
+bool EAnchorClient::has_expired(MDS *mds)
+{
+ return true;
+}
+
+void EAnchorClient::expire(MDS *mds, Context *c)
+{
+ assert(0);
+}
+
+void EAnchorClient::replay(MDS *mds)
+{
+ dout(10) << " EAnchorClient.replay op " << op << " atid " << atid << endl;
+
+ switch (op) {
+ // anchorclient
+ case ANCHOR_OP_ACK:
+ mds->anchorclient->got_journaled_ack(atid);
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
+
// -----------------------
// EUpdate
#include <utime.h>
#include <sys/stat.h>
#include <sys/types.h>
+#include <fcntl.h>
// md ops
#define MDS_OP_STAT 100
#define MDS_OP_LSTAT 101
-#define MDS_OP_UTIME 102
-#define MDS_OP_CHMOD 103
-#define MDS_OP_CHOWN 104
+#define MDS_OP_UTIME 1102
+#define MDS_OP_CHMOD 1103
+#define MDS_OP_CHOWN 1104
#define MDS_OP_READDIR 200
-#define MDS_OP_MKNOD 201
-#define MDS_OP_LINK 202
-#define MDS_OP_UNLINK 203
-#define MDS_OP_RENAME 204
+#define MDS_OP_MKNOD 1201
+#define MDS_OP_LINK 1202
+#define MDS_OP_UNLINK 1203
+#define MDS_OP_RENAME 1204
-#define MDS_OP_MKDIR 220
-#define MDS_OP_RMDIR 221
-#define MDS_OP_SYMLINK 222
+#define MDS_OP_MKDIR 1220
+#define MDS_OP_RMDIR 1221
+#define MDS_OP_SYMLINK 1222
#define MDS_OP_OPEN 301
-#define MDS_OP_TRUNCATE 306
+#define MDS_OP_TRUNCATE 1306
#define MDS_OP_FSYNC 307
-#define MDS_OP_RELEASE 308
+
+#define MDS_OP_RELEASE 308 // used only by SyntheticClient op_dist thinger
class MClientRequest : public Message {
this->st.client_inst = ci;
}
+ reqid_t get_reqid() {
+ // FIXME: for now, assume clients always have 1 incarnation
+ return reqid_t(st.client_inst.name, 1, st.tid);
+ }
+
+ int get_open_file_mode() {
+ if (args.open.flags & O_LAZY)
+ return FILE_MODE_LAZY;
+ if (args.open.flags & O_WRONLY)
+ return FILE_MODE_W;
+ if (args.open.flags & O_RDWR)
+ return FILE_MODE_RW;
+ if (args.open.flags & O_APPEND)
+ return FILE_MODE_W;
+ return FILE_MODE_R;
+ }
+ bool open_file_mode_is_readonly() {
+ return get_open_file_mode() == FILE_MODE_R;
+ }
+ bool is_idempotent() {
+ if (st.op == MDS_OP_OPEN)
+ return open_file_mode_is_readonly();
+ return (st.op < 1000);
+ }
+
// normal fields
void set_tid(long t) { st.tid = t; }
void inc_num_fwd() { st.num_fwd++; }
out << "truncate"; break;
case MDS_OP_FSYNC:
out << "fsync"; break;
- case MDS_OP_RELEASE:
- out << "release"; break;
+ // case MDS_OP_RELEASE:
+ //out << "release"; break;
default:
out << "unknown=" << get_op();
}
void print(ostream& o) {
o << "client_request_forward(" << tid
<< " to " << dest_mds
- << " num_fwd=" << num_fwd << ")";
+ << " num_fwd=" << num_fwd
+ << ")";
}
void encode_payload() {
void close();
void join() {
- writer_thread.join();
- reader_thread.join();
+ if (writer_thread.is_started()) writer_thread.join();
+ if (reader_thread.is_started()) reader_thread.join();
}
void send(Message *m) {