}
break;
+ case MMDSSlaveRequest::OP_LINKPREPACK:
+ {
+ MDRequest *mdr = mdcache->request_get(m->get_reqid());
+ handle_slave_link_prep_ack(mdr, m);
+ }
+ break;
+
case MMDSSlaveRequest::OP_RENAMEPREPACK:
{
MDRequest *mdr = mdcache->request_get(m->get_reqid());
handle_slave_auth_pin(mdr);
break;
+ case MMDSSlaveRequest::OP_LINKPREP:
+ handle_slave_link_prep(mdr);
+ break;
+
case MMDSSlaveRequest::OP_RENAMEPREP:
handle_slave_rename_prep(mdr);
break;
0, targetpath, targettrace, false,
MDS_TRAVERSE_DISCOVER);
if (r > 0) return; // wait
- if (targettrace.empty()) r = -EINVAL;
+ if (targettrace.empty()) r = -EINVAL;
if (r < 0) {
reply_request(mdr, r);
return;
return;
}
+ // get/make null link dentry
+ CDentry *dn = prepare_null_dentry(mdr, dir, dname, false);
+ if (!dn) return;
+
+ // create lock lists
+ set<SimpleLock*> rdlocks, wrlocks, xlocks;
+
+ for (int i=0; i<(int)linktrace.size(); i++)
+ rdlocks.insert(&linktrace[i]->lock);
+ xlocks.insert(&dn->lock);
+ wrlocks.insert(&dn->dir->inode->dirlock);
+ for (int i=0; i<(int)targettrace.size(); i++)
+ rdlocks.insert(&targettrace[i]->lock);
+ xlocks.insert(&targeti->linklock);
+
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
+ mdr->done_locking = true; // avoid wrlock moving target issues.
+
+ // pick mtime
+ if (mdr->now == utime_t())
+ mdr->now = g_clock.real_now();
+
// does the target need an anchor?
if (targeti->is_auth()) {
/*if (targeti->get_parent_dir() == dn->dir) {
else {
dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl;
- mdcache->anchor_create(targeti,
+ mdcache->anchor_create(mdr, targeti,
new C_MDS_RetryRequest(mdcache, mdr));
return;
}
}
- // can we create the dentry?
- CDentry *dn = 0;
-
- // make null link dentry
- dn = prepare_null_dentry(mdr, dir, dname, false);
- if (!dn) return;
-
- // create lock lists
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
-
- for (int i=0; i<(int)linktrace.size(); i++)
- rdlocks.insert(&linktrace[i]->lock);
- xlocks.insert(&dn->lock);
- wrlocks.insert(&dn->dir->inode->dirlock);
- for (int i=0; i<(int)targettrace.size(); i++)
- rdlocks.insert(&targettrace[i]->lock);
- xlocks.insert(&targeti->linklock);
-
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
- return;
-
// go!
// local or remote?
CDentry *dn;
CInode *targeti;
version_t dpv;
- utime_t tctime;
version_t tpv;
version_t dirpv;
public:
- C_MDS_link_local_finish(MDS *m, MDRequest *r, CDentry *d, CInode *ti, version_t dirpv_, utime_t ct) :
+ C_MDS_link_local_finish(MDS *m, MDRequest *r, CDentry *d, CInode *ti, version_t dirpv_) :
mds(m), mdr(r), dn(d), targeti(ti),
dpv(d->get_projected_version()),
- tctime(ct),
tpv(targeti->get_parent_dn()->get_projected_version()),
dirpv(dirpv_) { }
void finish(int r) {
assert(r == 0);
- mds->server->_link_local_finish(mdr, dn, targeti, dpv, tctime, tpv, dirpv);
+ mds->server->_link_local_finish(mdr, dn, targeti, dpv, tpv, dirpv);
}
};
// add to event
utime_t now = g_clock.real_now();
- version_t dirpv = predirty_dn_diri(mdr, dn, &le->metablob, now); // dir inode's mtime
+ version_t dirpv = predirty_dn_diri(mdr, dn, &le->metablob, mdr->now); // dir inode's mtime
le->metablob.add_dir_context(dn->get_dir());
le->metablob.add_remote_dentry(dn, true, targeti->ino()); // new remote
le->metablob.add_dir_context(targeti->get_parent_dir());
// update journaled target inode
pi->nlink++;
- pi->ctime = now;
+ pi->ctime = mdr->now;
pi->version = tpdv;
// finisher
- C_MDS_link_local_finish *fin = new C_MDS_link_local_finish(mds, mdr, dn, targeti, dirpv, now);
+ C_MDS_link_local_finish *fin = new C_MDS_link_local_finish(mds, mdr, dn, targeti, dirpv);
// log + wait
mdlog->submit_entry(le);
}
void Server::_link_local_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
- version_t dpv, utime_t tctime, version_t tpv, version_t dirpv)
+ version_t dpv, version_t tpv, version_t dirpv)
{
dout(10) << "_link_local_finish " << *dn << " to " << *targeti << endl;
// update the target
targeti->inode.nlink++;
- targeti->inode.ctime = tctime;
+ targeti->inode.ctime = mdr->now;
targeti->mark_dirty(tpv);
// dir inode's mtime
- dirty_dn_diri(dn, dirpv, tctime);
+ dirty_dn_diri(dn, dirpv, mdr->now);
// bump target popularity
mds->balancer->hit_inode(targeti, META_POP_IWR);
}
+// remote
+
+class C_MDS_link_remote_finish : public Context {
+ MDS *mds;
+ MDRequest *mdr;
+ CDentry *dn;
+ CInode *targeti;
+ version_t dpv;
+ version_t dirpv;
+public:
+ C_MDS_link_remote_finish(MDS *m, MDRequest *r, CDentry *d, CInode *ti, version_t dirpv_) :
+ mds(m), mdr(r), dn(d), targeti(ti),
+ dpv(d->get_projected_version()),
+ dirpv(dirpv_) { }
+ void finish(int r) {
+ assert(r == 0);
+ mds->server->_link_remote_finish(mdr, dn, targeti, dpv, dirpv);
+ }
+};
void Server::_link_remote(MDRequest *mdr, CDentry *dn, CInode *targeti)
{
dout(10) << "_link_remote " << *dn << " to " << *targeti << endl;
-
+
// 1. send LinkPrepare to dest (journal nlink++ prepare)
+ int linkauth = targeti->authority().first;
+ if (mdr->witnessed.count(linkauth) == 0) {
+ dout(10) << " targeti auth must prepare nlink++" << endl;
+
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREP);
+ targeti->set_object_info(req->get_object_info());
+ req->now = mdr->now;
+ mds->send_message_mds(req, linkauth, MDS_PORT_SERVER);
+
+ assert(mdr->waiting_on_slave.count(linkauth) == 0);
+ mdr->waiting_on_slave.insert(linkauth);
+ return;
+ }
+ dout(10) << " targeti auth has prepared nlink++" << endl;
+
// 2. create+journal new dentry, as with link_local.
- // 3. send LinkCommit to dest (journals commit)
+ // prepare log entry
+ EUpdate *le = new EUpdate("link_remote");
+ le->metablob.add_client_req(mdr->reqid);
+
+ // predirty
+ dn->pre_dirty();
+
+ // add to event
+ version_t dirpv = predirty_dn_diri(mdr, dn, &le->metablob, mdr->now); // dir inode's mtime
+ le->metablob.add_dir_context(dn->get_dir());
+ le->metablob.add_remote_dentry(dn, true, targeti->ino()); // new remote
- // IMPLEMENT ME
- reply_request(mdr, -EXDEV);
+ // finisher
+ C_MDS_link_remote_finish *fin = new C_MDS_link_remote_finish(mds, mdr, dn, targeti, dirpv);
+
+ // log + wait
+ mdlog->submit_entry(le);
+ mdlog->wait_for_sync(fin);
}
-
-/*
-void Server::handle_client_link_finish(MClientRequest *req, CInode *ref,
- CDentry *dn, CInode *targeti)
+void Server::_link_remote_finish(MDRequest *mdr, CDentry *dn, CInode *targeti,
+ version_t dpv, version_t dirpv)
{
- // create remote link
+ dout(10) << "_link_remote_finish " << *dn << " to " << *targeti << endl;
+
+ // link the new dentry
dn->dir->link_inode(dn, targeti->ino());
- dn->link_remote( targeti ); // since we have it
- dn->_mark_dirty(); // fixme
+ dn->set_version(dpv);
+ dn->mark_dirty(dpv);
+
+ // dir inode's mtime
+ dirty_dn_diri(dn, dirpv, mdr->now);
- mds->balancer->hit_dir(dn->dir, META_POP_DWR);
+ // bump target popularity
+ mds->balancer->hit_inode(targeti, META_POP_IWR);
- // done!
- commit_request(req, new MClientReply(req, 0), ref,
- 0); // FIXME i should log something
+ // reply
+ MClientReply *reply = new MClientReply(mdr->client_request, 0);
+ reply_request(mdr, reply, dn->get_dir()->get_inode()); // FIXME: imprecise ref
}
-*/
-/*
-class C_MDS_RemoteLink : public Context {
+
+class C_MDS_SlaveLinkPrep : public Context {
Server *server;
- MClientRequest *req;
- CInode *ref;
- CDentry *dn;
+ MDRequest *mdr;
CInode *targeti;
+ version_t tpv;
public:
- C_MDS_RemoteLink(Server *server, MClientRequest *req, CInode *ref, CDentry *dn, CInode *targeti) {
- this->server = server;
- this->req = req;
- this->ref = ref;
- this->dn = dn;
- this->targeti = targeti;
- }
+ C_MDS_SlaveLinkPrep(Server *s, MDRequest *r, CInode *t, version_t v) :
+ server(s), mdr(r), targeti(t), tpv(v) { }
void finish(int r) {
- if (r > 0) { // success
- // yay
- server->handle_client_link_finish(req, ref, dn, targeti);
- }
- else if (r == 0) {
- // huh? retry!
- assert(0);
- server->dispatch_request(req, ref);
- } else {
- // link failed
- server->reply_request(req, r);
- }
+ assert(r == 0);
+ server->_logged_slave_link(mdr, targeti, tpv);
}
};
+void Server::handle_slave_link_prep(MDRequest *mdr)
+{
+ dout(10) << "handle_slave_link_prep " << *mdr
+ << " on " << mdr->slave_request->get_object_info()
+ << endl;
- } else {
- // remote: send nlink++ request, wait
- dout(7) << "target is remote, sending InodeLink" << endl;
- mds->send_message_mds(new MInodeLink(targeti->ino(), mds->get_nodeid()), targeti->authority().first, MDS_PORT_CACHE);
-
- // wait
- targeti->add_waiter(CInode::WAIT_LINK, new C_MDS_RemoteLink(this, req, diri, dn, targeti));
+ CInode *targeti = mdcache->get_inode(mdr->slave_request->get_object_info().ino);
+ assert(targeti);
+
+ dout(10) << "targeti " << *targeti << endl;
+
+ mdr->now = mdr->slave_request->now;
+
+ // anchor?
+ if (targeti->is_anchored() && !targeti->is_unanchoring()) {
+ dout(7) << "target anchored already (nlink=" << targeti->inode.nlink << "), sweet" << endl;
+ }
+ else {
+ dout(7) << "target needs anchor, nlink=" << targeti->inode.nlink << ", creating anchor" << endl;
+ mdcache->anchor_create(mdr, targeti,
+ new C_MDS_RetryRequest(mdcache, mdr));
return;
}
-*/
+ // journal it
+ ESlaveUpdate *le = new ESlaveUpdate("slave_link_prep", mdr->reqid, ESlaveUpdate::OP_PREPARE);
+
+ version_t tpv = targeti->pre_dirty();
+
+ // add to event
+ le->metablob.add_remote_dentry(targeti->get_parent_dn(), true, targeti->ino()); // new remote
+ le->metablob.add_dir_context(targeti->get_parent_dir());
+ inode_t *pi = le->metablob.add_primary_dentry(targeti->parent, true, targeti); // update old primary
+
+ // update journaled target inode
+ pi->nlink++;
+ pi->ctime = mdr->now;
+ pi->version = tpv;
+
+ mds->mdlog->submit_entry(le, new C_MDS_SlaveLinkPrep(this, mdr, targeti, tpv));
+}
+
+class C_MDS_SlaveLinkCommit : public Context {
+ Server *server;
+ MDRequest *mdr;
+ CInode *targeti;
+ version_t tpv;
+public:
+ C_MDS_SlaveLinkCommit(Server *s, MDRequest *r, CInode *t, version_t v) :
+ server(s), mdr(r), targeti(t), tpv(v) { }
+ void finish(int r) {
+ assert(r == 0);
+ server->_commit_slave_link(mdr, targeti, tpv);
+ }
+};
+
+void Server::_logged_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv)
+{
+ dout(10) << "_logged_slave_link " << *mdr << " " << *targeti << endl;
+
+ // ack
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREPACK);
+ mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
+
+ // set up commit waiter
+ mdr->slave_commit = new C_MDS_SlaveLinkCommit(this, mdr, targeti, tpv);
+
+ // done.
+ delete mdr->slave_request;
+ mdr->slave_request = 0;
+
+}
+
+void Server::_commit_slave_link(MDRequest *mdr, CInode *targeti, version_t tpv)
+{
+ dout(10) << "_commit_slave_link " << *mdr << " " << *targeti << endl;
+
+ // update the target
+ targeti->inode.nlink++;
+ targeti->inode.ctime = mdr->now;
+ targeti->mark_dirty(tpv);
+
+ // write a commit to the journal
+ ESlaveUpdate *le = new ESlaveUpdate("slave_link_commit", mdr->reqid, ESlaveUpdate::OP_COMMIT);
+ mds->mdlog->submit_entry(le);
+}
+
+
+
+
+void Server::handle_slave_link_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m)
+{
+ dout(10) << "handle_slave_link_prep_ack " << *mdr
+ << " " << *m << endl;
+ int from = m->get_source().num();
+
+ // note slave
+ mdr->slaves.insert(from);
+
+ // witnessed!
+ assert(mdr->witnessed.count(from) == 0);
+ mdr->witnessed.insert(from);
+
+ // remove from waiting list
+ assert(mdr->waiting_on_slave.count(from));
+ mdr->waiting_on_slave.erase(from);
+
+ assert(mdr->waiting_on_slave.empty());
+
+ dispatch_client_request(mdr); // go again!
+}