- or, only put frag size/mtime in inode when frag is closed. otherwise, soft (journaled) state, possibly on another mds.
-
-- fix auth_pin/export vs mds recovery deadlock: unfrozen exports should abort. maybe delay exportdiscover until after frozen?
-
- discover
/ - hard link dentries
- open_remote_ino needs major work...
- journaled bit in MDRequest
- journal
-/untested- truncate
-
+/untested- truncate...
- mds failure vs clients
-/ - clean up client op redirection
- idempotent client ops
- how to track in memory?
- how to trim?
- EMetablob replay, expire logic
- journal+recovery
-/ - test anchortable, anchorclient
-/ - local link
-/ - local unlink
- local rename
/ - fix dir renames vs subtrees
- how to notify replicas...
/ - stray purge
- stray reintegration
- remote link
+ - impl remote inode xlock
+ - ESlaveUpdate replay, resolution, etc.
- remote unlink
+ - rewrite to look link _link
- remote rename
- open(wr cap), open+create
- file capabilities i/o
- client readdir for dirfrags
- consistency points/snapshots
- dentry versions vs dirfrags...
-- sync clients on stat
- - will need to ditch 10s client metadata caching before this is useful
- - implement truncate
- real chdir (directory "open")
- relative metadata ops
- statfs?
-foreign link
--
foreign rename
- question: can we generalize foreign and local rename?
// set inode version
//in->inode.version = dn->get_version();
- // clear dangling
- in->state_clear(CInode::STATE_DANGLING);
-
// pin dentry?
if (in->get_num_ref())
dn->get(CDentry::PIN_INODEPIN);
// primary
assert(dn->is_primary());
- // explicitly define auth
- in->dangling_auth = in->authority();
- //dout(10) << "unlink_inode " << *in << " dangling_auth now " << in->dangling_auth << endl;
-
// unpin dentry?
if (in->get_num_ref())
dn->put(CDentry::PIN_INODEPIN);
if (in->auth_pins + in->nested_auth_pins)
adjust_nested_auth_pins( 0 - (in->auth_pins + in->nested_auth_pins) );
- // set dangling flag
- in->state_set(CInode::STATE_DANGLING);
-
// detach inode
in->remove_primary_parent(dn);
dn->inode = 0;
// -- wait masks --
- static const int WAIT_DENTRY = (1<<0); // wait for item to be in cache
- static const int WAIT_COMPLETE = (1<<1); // wait for complete dir contents
- static const int WAIT_FREEZEABLE = (1<<2); // hard_pins removed
- static const int WAIT_UNFREEZE = (1<<3); // unfreeze
+ static const int WAIT_DENTRY = (1<<0); // wait for item to be in cache
+ static const int WAIT_COMPLETE = (1<<1); // wait for complete dir contents
+ static const int WAIT_FREEZEABLE = (1<<2); // hard_pins removed
+ static const int WAIT_UNFREEZE = (1<<3); // unfreeze
static const int WAIT_AUTHPINNABLE = WAIT_UNFREEZE;
static const int WAIT_IMPORTED = (1<<4); // import finish
static const int WAIT_SINGLEAUTH = (1<<5);
void CInode::set_auth(bool a)
{
- if (!is_dangling() && !is_root() &&
- is_auth() != a) {
- }
-
if (a) state_set(STATE_AUTH);
else state_clear(STATE_AUTH);
}
trace.push_back(Anchor(ino(), parent->dir->dirfrag()));
dout(10) << "make_anchor_trace added " << trace.back() << endl;
}
- else if (state_test(STATE_DANGLING)) {
- dout(10) << "make_anchor_trace dangling " << ino() << " on mds " << dangling_auth << endl;
- assert(0);
- //trace.push_back( Anchor(ino(),
- //MDS_INO_INODEFILE_OFFSET+dangling_auth.first) );
- }
else
assert(is_root());
}
pair<int,int> CInode::authority()
{
- if (is_dangling())
- return dangling_auth; // explicit
-
if (is_root())
return CDIR_AUTH_ROOTINODE; // root _inode_ is locked to mds0.
static const int STATE_AUTH = (1<<0);
static const int STATE_ROOT = (1<<1);
static const int STATE_DIRTY = (1<<2);
- static const int STATE_UNSAFE = (1<<3); // not logged yet
- static const int STATE_DANGLING = (1<<4); // delete me when i expire; i have no dentry
- static const int STATE_UNLINKING = (1<<5);
+ //static const int STATE_UNSAFE = (1<<3); // not logged yet
+ //static const int STATE_DANGLING = (1<<4); // delete me when i expire; i have no dentry
static const int STATE_EXPORTING = (1<<6); // on nonauth bystander.
static const int STATE_ANCHORING = (1<<7);
static const int STATE_UNANCHORING = (1<<8);
//static const int STATE_RENAMINGTO = (1<<9); // rename target (will be unlinked)
// -- waiters --
- static const int WAIT_AUTHPINNABLE = (1<<10);
- // waiters: write_hard_start, read_file_start, write_file_start (mdcache)
- // handle_client_chmod, handle_client_touch (mds)
- // trigger: (see CDIR_WAIT_UNFREEZE)
- static const int WAIT_SINGLEAUTH = (1<<11);
-
- static const int WAIT_DIR = (1<<13);
- // waiters: traverse_path
- // triggers: handle_disocver_reply
- static const int WAIT_LINK = (1<<14); // as in remotely nlink++
- static const int WAIT_ANCHORED = (1<<15);
- static const int WAIT_UNANCHORED = (1<<16);
- static const int WAIT_UNLINK = (1<<17); // as in remotely nlink--
- static const int WAIT_HARDR = (1<<18); // 131072
- static const int WAIT_HARDW = (1<<19); // 262...
- static const int WAIT_HARDB = (1<<20);
+ static const int WAIT_SLAVEAGREE = (1<<0);
+ static const int WAIT_AUTHPINNABLE = (1<<1);
+ static const int WAIT_SINGLEAUTH = (1<<2);
+ static const int WAIT_DIR = (1<<3);
+ static const int WAIT_LINK = (1<<4); // as in remotely nlink++
+ static const int WAIT_ANCHORED = (1<<5);
+ static const int WAIT_UNANCHORED = (1<<6);
+ static const int WAIT_UNLINK = (1<<7); // as in remotely nlink--
+ static const int WAIT_HARDR = (1<<8); // 131072
+ static const int WAIT_HARDW = (1<<9); // 262...
+ static const int WAIT_HARDB = (1<<10);
static const int WAIT_HARDRWB = (WAIT_HARDR|WAIT_HARDW|WAIT_HARDB);
- static const int WAIT_HARDSTABLE = (1<<21);
- static const int WAIT_HARDNORD = (1<<22);
- static const int WAIT_FILER = (1<<23);
- static const int WAIT_FILEW = (1<<24);
- static const int WAIT_FILEB = (1<<25);
+ static const int WAIT_HARDSTABLE = (1<<11);
+ static const int WAIT_HARDNORD = (1<<12);
+ static const int WAIT_FILER = (1<<13);
+ static const int WAIT_FILEW = (1<<14);
+ static const int WAIT_FILEB = (1<<15);
static const int WAIT_FILERWB = (WAIT_FILER|WAIT_FILEW|WAIT_FILEB);
- static const int WAIT_FILESTABLE = (1<<26);
- static const int WAIT_FILENORD = (1<<27);
- static const int WAIT_FILENOWR = (1<<28);
- static const int WAIT_RENAMEACK =(1<<29);
- static const int WAIT_RENAMENOTIFYACK =(1<<30);
- static const int WAIT_CAPS =(1<<31);
+ static const int WAIT_FILESTABLE = (1<<16);
+ static const int WAIT_FILENORD = (1<<17);
+ static const int WAIT_FILENOWR = (1<<18);
+ static const int WAIT_RENAMEACK =(1<<19);
+ static const int WAIT_RENAMENOTIFYACK =(1<<20);
+ static const int WAIT_CAPS =(1<<21);
static const int WAIT_ANY = 0xffffffff;
// misc
// -- other --
// distributed caching (old)
- pair<int,int> dangling_auth; // explicit auth, when dangling.
+ //pair<int,int> dangling_auth; // explicit auth, when dangling.
// waiters
multimap<int, Context*> waiting;
void name_stray_dentry(string& dname);
-
- // -- state --
- bool is_unsafe() { return state & STATE_UNSAFE; }
- bool is_dangling() { return state & STATE_DANGLING; }
- bool is_unlinking() { return state & STATE_UNLINKING; }
-
- void mark_unsafe() { state |= STATE_UNSAFE; }
- void mark_safe() { state &= ~STATE_UNSAFE; }
-
// -- state encoding --
- //void encode_basic_state(bufferlist& r);
- //void decode_basic_state(bufferlist& r, int& off);
-
-
void encode_file_state(bufferlist& r);
void decode_file_state(bufferlist& r, int& off);
#include "MDS.h"
#include "MDCache.h"
#include "Locker.h"
-#include "Server.h"
#include "CInode.h"
#include "CDir.h"
#include "CDentry.h"
-#include "Migrator.h"
-#include "MDBalancer.h"
#include "MDLog.h"
#include "MDSMap.h"
if (dn->gather_set.size() == 0) {
dout(7) << "handle_lock_dn finish gather, now xlock on " << *dn << endl;
dn->lockstate = DN_LOCK_XLOCK;
- mdcache->active_requests[dn->xlockedby->reqid].dentry_xlocks.insert(dn);
- mdcache->active_requests[dn->xlockedby->reqid].dentry_locks.insert(dn);
+ // FIXME
+ mdcache->slave_requests[dn->xlockedby->reqid]->dentry_xlocks.insert(dn);
+ mdcache->slave_requests[dn->xlockedby->reqid]->dentry_locks.insert(dn);
dir->finish_waiting(CDir::WAIT_DNLOCK, dname);
}
break;
#include "events/EImportMap.h"
#include "events/EMetaBlob.h"
#include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
#include "events/EUnlink.h"
#include "events/EMount.h"
#include "events/EClientMap.h"
case EVENT_STRING: le = new EString(); break;
case EVENT_IMPORTMAP: le = new EImportMap; break;
case EVENT_UPDATE: le = new EUpdate; break;
+ case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
case EVENT_UNLINK: le = new EUnlink(); break;
case EVENT_PURGEFINISH: le = new EPurgeFinish(); break;
case EVENT_MOUNT: le = new EMount(); break;
#define EVENT_IMPORTMAP 4
#define EVENT_UPDATE 5
+#define EVENT_SLAVEUPDATE 6
-#define EVENT_MOUNT 6
-#define EVENT_CLIENTMAP 7
-
-#define EVENT_ANCHOR 8
-#define EVENT_ANCHORCLIENT 9
+#define EVENT_MOUNT 7
+#define EVENT_CLIENTMAP 8
#define EVENT_ALLOC 10
#define EVENT_MKNOD 11
#define EVENT_IMPORTSTART 31
#define EVENT_IMPORTFINISH 32
+#define EVENT_ANCHOR 40
+#define EVENT_ANCHORCLIENT 41
+
+
#include <string>
#include "events/EImportMap.h"
#include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
#include "events/EString.h"
#include "events/EUnlink.h"
#include "events/EPurgeFinish.h"
case MSG_MDS_INODELINK:
handle_inode_link((MInodeLink*)m);
break;
- case MSG_MDS_INODELINKACK:
- handle_inode_link_ack((MInodeLinkAck*)m);
- break;
case MSG_MDS_DIRUPDATE:
handle_dir_update((MDirUpdate*)m);
MDRequest *MDCache::request_start(metareqid_t ri)
{
- assert(active_requests.count(ri) == 0);
- active_requests[ri].reqid = ri;
- MDRequest *mdr = &active_requests[ri];
+ MDRequest *mdr = new MDRequest(ri);
dout(7) << "request_start " << *mdr << endl;
return mdr;
}
MDRequest *MDCache::request_start(MClientRequest *req)
{
- metareqid_t ri = req->get_reqid();
- MDRequest *mdr = request_start(ri);
- mdr->request = req;
+ MDRequest *mdr = new MDRequest(req->get_reqid(), req);
+ dout(7) << "request_start " << *mdr << endl;
return mdr;
}
void MDCache::request_cleanup(MDRequest *mdr)
{
metareqid_t ri = mdr->reqid;
- assert(active_requests.count(ri));
// clear ref, trace
mdr->ref = 0;
(*it)->put(CDir::PIN_REQUEST);
mdr->dir_pins.clear();
- // remove from map
- active_requests.erase(ri);
-
// log some stats *****
if (mds->logger) {
mds->logger->set("c", lru.lru_get_size());
// HARD LINKS
+class C_MDC_InodeLinkAgree : public Context {
+ MDS *mds;
+ MInodeLink *m;
+public:
+ C_MDC_InodeLinkAgree(MDS *_mds, MInodeLink *_m) : mds(_mds), m(_m) {}
+ void finish(int r) {
+ mds->send_message_mds(new MInodeLink(MInodeLink::OP_AGREE,
+ m->get_ino(),
+ m->get_inc(),
+ m->get_reqid()),
+ m->get_source().num(),
+ m->get_source_port());
+ delete m;
+ }
+};
+
void MDCache::handle_inode_link(MInodeLink *m)
{
CInode *in = get_inode(m->get_ino());
assert(in);
-
- if (!in->is_auth()) {
- dout(7) << "handle_inode_link not auth for " << *in << ", fw to auth" << endl;
- mds->send_message_mds(m, in->authority().first, MDS_PORT_CACHE);
+ dout(7) << "handle_inode_link " << *m << " on " << *in << endl;
+
+ // get request.
+ // we should have this bc the inode is xlocked.
+ assert(slave_requests.count(m->get_reqid()));
+ MDRequest *mdr = slave_requests[m->get_reqid()];
+
+ switch (m->get_op()) {
+ // auth
+ case MInodeLink::OP_PREPARE:
+ assert(in->is_auth());
+ {
+ version_t pv = in->pre_dirty();
+ ESlaveUpdate *le = new ESlaveUpdate("link_prepare", m->get_reqid(), 0);
+ le->metablob.add_dir_context(in->get_parent_dir());
+ inode_t *pi = le->metablob.add_primary_dentry(in->parent, true, in);
+ if (m->get_inc())
+ pi->nlink++;
+ else
+ pi->nlink--;
+ pi->ctime = m->get_ctime();
+ pi->version = pv;
+ mdr->projected_inode[in->ino()] = *pi;
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_sync(new C_MDC_InodeLinkAgree(mds, m));
+ }
return;
- }
-
- dout(7) << "handle_inode_link on " << *in << endl;
-
- if (!in->is_anchored()) {
- assert(in->inode.nlink == 1);
- dout(7) << "needs anchor, nlink=" << in->inode.nlink << ", creating anchor" << endl;
- anchor_create(in, new C_MDS_RetryMessage(mds, m));
+ case MInodeLink::OP_COMMIT:
+ assert(in->is_auth());
+ {
+ // make the update to our cache
+ in->inode = mdr->projected_inode[in->ino()];
+ in->mark_dirty(in->inode.version);
+
+ // journal the commit
+ ESlaveUpdate *le = new ESlaveUpdate("link_commit", m->get_reqid(), 1);
+ mds->mdlog->submit_entry(le);
+ }
+ delete m;
return;
- }
-
- in->inode.nlink++;
- in->_mark_dirty(); // fixme
- // reply
- dout(7) << " nlink++, now " << in->inode.nlink++ << endl;
- mds->send_message_mds(new MInodeLinkAck(m->get_ino(), true), m->get_from(), MDS_PORT_CACHE);
- delete m;
+ case MInodeLink::OP_AGREE:
+ assert(!in->is_auth());
+ in->finish_waiting(CInode::WAIT_SLAVEAGREE);
+ delete m;
+ return;
+
+ default:
+ assert(0);
+ }
}
-void MDCache::handle_inode_link_ack(MInodeLinkAck *m)
-{
- CInode *in = get_inode(m->get_ino());
- assert(in);
-
- dout(7) << "handle_inode_link_ack success = " << m->is_success() << " on " << *in << endl;
- in->finish_waiting(CInode::WAIT_LINK,
- m->is_success() ? 1:-1);
-}
set< CInode* > inode_file_rdlocks;
set< CInode* > inode_file_xlocks;
+ // projected updates
+ map< inodeno_t, inode_t > projected_inode;
+
// old
set< CDentry* > xlocks; // xlocks (local)
set< CDentry* > foreign_xlocks; // xlocks on foreign hosts
MDRequest() : request(0), ref(0) {}
- MDRequest(metareqid_t ri) : reqid(ri), request(0), ref(0) {}
+ MDRequest(metareqid_t ri, Message *req=0) : reqid(ri), request(req), ref(0) {}
// requeest
MClientRequest *client_request() {
protected:
- hash_map<metareqid_t, MDRequest> active_requests;
+ hash_map<metareqid_t, MDRequest*> slave_requests;
public:
MDRequest* request_start(metareqid_t rid);
void reintegrate_stray(CDentry *dn, CDentry *rlink);
void migrate_stray(CDentry *dn, int dest);
- // -- hard links --
- void handle_inode_link(class MInodeLink *m);
- void handle_inode_link_ack(class MInodeLinkAck *m);
// == messages ==
public:
list<Context*>& finished);
CDir* forge_replica_dir(CInode *diri, frag_t fg, int from);
+
+ // -- hard links --
+ void handle_inode_link(class MInodeLink *m);
+
// -- namespace --
void handle_dentry_unlink(MDentryUnlink *m);
-
// -- updates --
//int send_inode_updates(CInode *in);
//void handle_inode_update(MInodeUpdate *m);
int send_dir_updates(CDir *in, bool bcast=false);
void handle_dir_update(MDirUpdate *m);
+ // -- cache expiration --
void handle_cache_expire(MCacheExpire *m);
void process_delayed_expire(CDir *dir);
void discard_delayed_expire(CDir *dir);
break;
}
+ // register + dispatch
+ MDRequest *mdr = mdcache->request_start(req);
+
if (ref) {
- MDRequest *mdr = mdcache->request_start(req);
dout(10) << "inode op on ref " << *ref << endl;
mdr->ref = ref;
mdr->pin(ref);
- dispatch_request(mdr);
- return;
- }
-
-
- // -----
- // some ops are on existing inodes
-
- bool follow_trailing_symlink = false;
-
- switch (req->get_op()) {
- case MDS_OP_LSTAT:
- follow_trailing_symlink = false;
- case MDS_OP_OPEN:
- if (req->args.open.flags & O_CREAT) break; // handled below.
- case MDS_OP_STAT:
- case MDS_OP_UTIME:
- case MDS_OP_CHMOD:
- case MDS_OP_CHOWN:
- case MDS_OP_READDIR:
- {
- filepath refpath = req->get_filepath();
- Context *ondelay = new C_MDS_RetryMessage(mds, req);
- vector<CDentry*> trace;
-
- int r = mdcache->path_traverse(0, 0,
- refpath, trace, follow_trailing_symlink,
- req, ondelay,
- MDS_TRAVERSE_FORWARD,
- true); // is MClientRequest
-
- if (r > 0) return; // delayed
- if (r < 0) {
- dout(10) << "traverse error " << r << " " << strerror(-r) << endl;
-
- // send error. don't bother registering request.
- messenger->send_message(new MClientReply(req, r),
- req->get_client_inst());
-
- // <HACK>
- // is this a special debug command?
- if (refpath.depth() - 1 == trace.size() &&
- refpath.last_dentry().find(".ceph.") == 0) {
- // ...
- }
- // </HACK>
- }
-
- // can we dnlock whole path?
- if (!mds->locker->dentry_can_rdlock_trace(trace, req))
- return;
-
- // go
- MDRequest *mdr = mdcache->request_start(req);
- mds->locker->dentry_anon_rdlock_trace_start(trace);
- dispatch_request(mdr);
- return;
- }
- }
-
-
- // ----
- // the rest handle things themselves.
-
- switch (req->get_op()) {
- case MDS_OP_OPEN:
- assert(req->args.open.flags & O_CREAT);
- case MDS_OP_MKNOD:
- case MDS_OP_MKDIR:
- case MDS_OP_SYMLINK:
- case MDS_OP_LINK:
- case MDS_OP_UNLINK:
- case MDS_OP_RMDIR:
- case MDS_OP_RENAME:
- {
- // register request
- MDRequest *mdr = mdcache->request_start(req);
- dispatch_request(mdr);
- return;
- }
}
- assert(0); // we missed something!
+ dispatch_request(mdr);
+ return;
}
// HELPERS
-/** request_pin_ref
- * return the ref inode, referred to by the last dentry in the trace.
- * open if it is remote.
- * pin.
- * return existing, if mdr->ref already set.
- */
-CInode *Server::request_pin_ref(MDRequest *mdr)
-{
- // already did it?
- if (mdr->ref)
- return mdr->ref;
-
- // open and pin ref inode in cache too
- CInode *ref = 0;
- if (mdr->trace.empty())
- ref = mdcache->get_root();
- else {
- ref = mdcache->get_dentry_inode(mdr->trace[mdr->trace.size()-1], mdr);
- if (!ref) return 0;
- }
- mdr->pin(ref);
- mdr->ref = ref;
- return ref;
-}
-
-
-
/** validate_dentry_dir
*
* verify that the dir exists and would own the dname.
}
+
+CInode* Server::rdlock_path_pin_ref(MDRequest *mdr, bool want_auth)
+{
+ // already got ref?
+ if (mdr->ref)
+ return mdr->ref;
+
+ MClientRequest *req = mdr->client_request();
+
+ // traverse
+ filepath refpath = req->get_filepath();
+ Context *ondelay = new C_MDS_RetryRequest(mdcache, mdr);
+ vector<CDentry*> trace;
+ int r = mdcache->path_traverse(mdr, 0,
+ refpath, trace, req->follow_trailing_symlink(),
+ req, ondelay,
+ MDS_TRAVERSE_FORWARD,
+ true); // is MClientRequest
+ if (r > 0) return false; // delayed
+ if (r < 0) { // error
+ reply_request(mdr, r);
+ return 0;
+ }
+
+ // open ref inode
+ CInode *ref = 0;
+ if (mdr->trace.empty())
+ ref = mdcache->get_root();
+ else {
+ CDentry *dn = mdr->trace[mdr->trace.size()-1];
+
+ // if no inode, fw to dentry auth?
+ if (want_auth &&
+ dn->is_remote() &&
+ !dn->inode &&
+ !dn->is_auth()) {
+ if (dn->dir->auth_is_ambiguous()) {
+ dout(10) << "waiting for single auth on " << *dn << endl;
+ dn->dir->add_waiter(CInode::WAIT_SINGLEAUTH,
+ dn->get_name(),
+ new C_MDS_RetryMessage(mds, req));
+ } else {
+ dout(10) << "fw to auth for " << *dn << endl;
+ mds->forward_message_mds(req, dn->authority().first, MDS_PORT_SERVER);
+ }
+ }
+
+ // open ref inode
+ ref = mdcache->get_dentry_inode(dn, mdr);
+ if (!ref) return 0;
+ }
+
+ // fw to inode auth?
+ if (want_auth && !ref->is_auth()) {
+ if (ref->auth_is_ambiguous()) {
+ dout(10) << "waiting for single auth on " << *ref << endl;
+ ref->add_waiter(CInode::WAIT_SINGLEAUTH,
+ new C_MDS_RetryMessage(mds, req));
+ } else {
+ dout(10) << "fw to auth for " << *ref << endl;
+ mds->forward_message_mds(req, ref->authority().first, MDS_PORT_SERVER);
+ }
+ }
+
+ // auth_pin?
+ if (want_auth) {
+ if (!ref->can_auth_pin()) {
+ dout(7) << "waiting for authpinnable on " << *ref << endl;
+ ref->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+ return 0;
+ }
+ mdr->auth_pin(ref);
+ }
+
+ // lock the path
+ set<CDentry*> dentry_rdlocks;
+ set<CDentry*> dentry_xlocks;
+ set<CInode*> inode_empty;
+
+ for (unsigned i=0; i<trace.size(); i++)
+ dentry_rdlocks.insert(trace[i]);
+
+ if (!mds->locker->acquire_locks(mdr,
+ dentry_rdlocks, dentry_xlocks,
+ inode_empty, inode_empty))
+ return 0;
+
+ // set and pin ref
+ mdr->pin(ref);
+ mdr->ref = ref;
+
+ // save the locked trace.
+ mdr->trace.swap(trace);
+
+ return ref;
+}
+
+
/** rdlock_path_xlock_dentry
* traverse path to the directory that could/would contain dentry.
* make sure i am auth for that dentry, forward as necessary.
// not open and inode frozen?
if (!dir && diri->is_frozen_dir()) {
- dout(10) << "try_open_dir: dir inode is frozen, waiting " << *diri << endl;
+ dout(10) << "try_open_auth_dir: dir inode is frozen, waiting " << *diri << endl;
assert(diri->get_parent_dir());
diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
new C_MDS_RetryRequest(mdcache, mdr));
return dir;
}
+/*
CDir* Server::try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr)
{
CDir *dir = diri->get_dirfrag(fg);
return 0;
}
}
-
+*/
// ===============================================================================
// STAT
void Server::handle_client_stat(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *ref = request_pin_ref(mdr);
+ CInode *ref = rdlock_path_pin_ref(mdr, false);
if (!ref) return;
// FIXME: this is really not the way to handle the statlite mask.
void Server::handle_client_utime(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *cur = request_pin_ref(mdr);
+ CInode *cur = rdlock_path_pin_ref(mdr, true);
if (!cur) return;
- // auth pin
- if (!cur->can_auth_pin()) {
- dout(7) << "waiting for authpinnable on " << *cur << endl;
- cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
- mdr->auth_pin(cur);
-
// write
if (!mds->locker->inode_file_xlock_start(cur, mdr))
- return; // fw or (wait for) sync
+ return;
mds->balancer->hit_inode(cur, META_POP_IWR);
void Server::handle_client_chmod(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *cur = request_pin_ref(mdr);
+ CInode *cur = rdlock_path_pin_ref(mdr, true);
if (!cur) return;
- // auth pin
- if (!cur->can_auth_pin()) {
- dout(7) << "waiting for authpinnable on " << *cur << endl;
- cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
- mdr->auth_pin(cur);
-
// write
if (!mds->locker->inode_hard_xlock_start(cur, mdr))
- return; // fw or (wait for) lock
+ return;
mds->balancer->hit_inode(cur, META_POP_IWR);
void Server::handle_client_chown(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *cur = request_pin_ref(mdr);
+ CInode *cur = rdlock_path_pin_ref(mdr, true);
if (!cur) return;
- // auth pin
- if (!cur->can_auth_pin()) {
- dout(7) << "waiting for authpinnable on " << *cur << endl;
- cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
- mdr->auth_pin(cur);
-
// write
if (!mds->locker->inode_hard_xlock_start(cur, mdr))
- return; // fw or (wait for) lock
+ return;
mds->balancer->hit_inode(cur, META_POP_IWR);
void Server::handle_client_readdir(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *diri = request_pin_ref(mdr);
+ CInode *diri = rdlock_path_pin_ref(mdr, false);
if (!diri) return;
// it's a directory, right?
req, ondelay,
MDS_TRAVERSE_DISCOVER);
if (r > 0) return; // wait
+ if (targettrace.empty()) r = -EINVAL;
if (r < 0) {
reply_request(mdr, r);
return;
}
// identify target inode
- CInode *targeti;
- if (targettrace.empty())
- targeti = mdcache->get_root();
- else
- targeti = targettrace[targettrace.size()-1]->inode;
+ CInode *targeti = targettrace[targettrace.size()-1]->inode;
assert(targeti);
- assert(r == 0);
// dir?
dout(7) << "target is " << *targeti << endl;
if (targeti->is_dir()) {
- dout(7) << "target is a dir, failing" << endl;
+ dout(7) << "target is a dir, failing..." << endl;
reply_request(mdr, -EINVAL);
return;
}
{
dout(10) << "_link_remote " << *dn << " to " << *targeti << endl;
- // ??
- // 1. send LinkPrepare to dest (lock target on dest, journal target update)
+ // 1. send LinkPrepare to dest (journal nlink++ prepare)
// 2. create+journal new dentry, as with link_local.
- // 3. send LinkCommit to dest (unlocks target on dest, journals commit)
+ // 3. send LinkCommit to dest (journals commit)
// IMPLEMENT ME
reply_request(mdr, -EXDEV);
CDentry *dn = trace[trace.size()-1];
assert(dn);
- // readable?
- if (!dn->can_read(mdr)) {
- dout(10) << "waiting on unreadable dentry " << *dn << endl;
- dn->dir->add_waiter(CDir::WAIT_DNREAD, dn->name, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
-
// rmdir or unlink?
bool rmdir = false;
if (req->get_op() == MDS_OP_RMDIR) rmdir = true;
} else {
dout(7) << "handle_client_unlink on " << *dn << endl;
}
+
+ // readable?
+ if (!dn->can_read(mdr)) {
+ dout(10) << "waiting on unreadable dentry " << *dn << endl;
+ dn->dir->add_waiter(CDir::WAIT_DNREAD, dn->name, new C_MDS_RetryRequest(mdcache, mdr));
+ return;
+ }
+
// dn looks ok.
// get/open inode.
mdr->trace.swap(trace);
- CInode *in = request_pin_ref(mdr);
+ CInode *in = mdcache->get_dentry_inode(dn, mdr);
if (!in) return;
dout(7) << "dn links to " << *in << endl;
}
}
- dout(7) << "inode is " << *in << endl;
+ // lock
+ set<CDentry*> dentry_rdlocks;
+ set<CDentry*> dentry_xlocks;
+ set<CInode*> inode_hard_rdlocks;
+ set<CInode*> inode_hard_xlocks;
+
+ for (unsigned i=0; i<trace.size()-1; i++)
+ dentry_rdlocks.insert(trace[i]);
+ dentry_xlocks.insert(dn);
+ inode_hard_xlocks.insert(in);
+
+ if (!mds->locker->acquire_locks(mdr,
+ dentry_rdlocks, dentry_xlocks,
+ inode_hard_rdlocks, inode_hard_xlocks))
+ return;
// ok!
if (dn->is_remote() && !dn->inode->is_auth())
{
dout(10) << "_unlink_local " << *dn << endl;
- // auth pin inode
- if (!mdr->is_auth_pinned(dn->inode) &&
- !dn->inode->can_auth_pin()) {
- dn->inode->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
-
- // drop all locks while we wait (racey?)
- mdcache->request_drop_locks(mdr);
- mdr->drop_auth_pins();
- return;
- }
- mdr->auth_pin(dn->inode);
-
- // lock inode
- if (!mds->locker->inode_hard_xlock_start(dn->inode, mdr))
- return;
-
-
// get stray dn ready?
CDentry *straydn = 0;
if (dn->is_primary()) {
// ===================================
// TRUNCATE, FSYNC
-class C_MDS_ReplyRequest : public Context {
- MDS *mds;
- MDRequest *mdr;
-public:
- C_MDS_ReplyRequest(MDS *m, MDRequest *r) : mds(m), mdr(r) {}
- void finish(int r) {
- // reply
- MClientReply *reply = new MClientReply(mdr->client_request(), 0);
- reply->set_result(0);
- mds->server->reply_request(mdr, reply, mdr->ref);
- }
-};
-
-class C_MDS_truncate_finish : public Context {
+class C_MDS_truncate_purged : public Context {
MDS *mds;
MDRequest *mdr;
CInode *in;
off_t size;
time_t ctime;
public:
- C_MDS_truncate_finish(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
+ C_MDS_truncate_purged(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
mds(m), mdr(r), in(i),
pv(pdv),
size(sz), ctime(ct) { }
// hit pop
mds->balancer->hit_inode(in, META_POP_IWR);
+ // reply
+ mds->server->reply_request(mdr, 0);
+ }
+};
+
+class C_MDS_truncate_logged : public Context {
+ MDS *mds;
+ MDRequest *mdr;
+ CInode *in;
+ version_t pv;
+ off_t size;
+ time_t ctime;
+public:
+ C_MDS_truncate_logged(MDS *m, MDRequest *r, CInode *i, version_t pdv, off_t sz, time_t ct) :
+ mds(m), mdr(r), in(i),
+ pv(pdv),
+ size(sz), ctime(ct) { }
+ void finish(int r) {
+ assert(r == 0);
+
// purge
mds->mdcache->purge_inode(&in->inode, size);
- mds->mdcache->wait_for_purge(in->inode.ino, size, new C_MDS_ReplyRequest(mds, mdr));
+ mds->mdcache->wait_for_purge(in->inode.ino, size,
+ new C_MDS_truncate_purged(mds, mdr, in, pv, size, ctime));
}
};
void Server::handle_client_truncate(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *cur = request_pin_ref(mdr);
+ CInode *cur = rdlock_path_pin_ref(mdr, true);
if (!cur) return;
- // not auth?
- if (!cur->is_auth()) {
- mdcache->request_forward(mdr, cur->authority().first);
- return;
- }
-
- // lock inode
- if (!cur->can_auth_pin()) {
- dout(7) << "waiting for authpinnable on " << *cur << endl;
- cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
- mdcache->request_drop_locks(mdr);
- mdr->drop_auth_pins();
- return;
- }
- mdr->auth_pin(cur);
-
// check permissions?
// xlock inode
// prepare
version_t pdv = cur->pre_dirty();
time_t ctime = g_clock.gettime();
- C_MDS_truncate_finish *fin = new C_MDS_truncate_finish(mds, mdr, cur,
- pdv, req->args.truncate.length, ctime);
+ Context *fin = new C_MDS_truncate_logged(mds, mdr, cur,
+ pdv, req->args.truncate.length, ctime);
// log + wait
EUpdate *le = new EUpdate("truncate");
void Server::handle_client_open(MDRequest *mdr)
{
MClientRequest *req = mdr->client_request();
- CInode *cur = request_pin_ref(mdr);
- if (!cur) return;
int flags = req->args.open.flags;
int cmode = req->get_open_file_mode();
-
- dout(7) << "open " << flags << " on " << *cur << endl;
- dout(10) << "open flags = " << flags << " filemode = " << cmode << endl;
-
+ bool need_auth = ((cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY) ||
+ (flags & O_TRUNC));
+ dout(10) << "open flags = " << flags
+ << ", filemode = " << cmode
+ << ", need_auth = " << need_auth
+ << endl;
+
+ CInode *cur = rdlock_path_pin_ref(mdr, need_auth);
+ if (!cur) return;
+
// regular file?
if ((cur->inode.mode & INODE_TYPE_MASK) != INODE_MODE_FILE) {
dout(7) << "not a regular file " << *cur << endl;
return;
}
- // auth for write access
- if (cmode != FILE_MODE_R && cmode != FILE_MODE_LAZY &&
- !cur->is_auth()) {
- int auth = cur->authority().first;
- assert(auth != mds->get_nodeid());
- dout(9) << "open writeable on replica for " << *cur << " fw to auth " << auth << endl;
-
- mdcache->request_forward(mdr, auth);
- return;
- }
+ // hmm, check permissions or something.
+
// O_TRUNC
if (flags & O_TRUNC) {
- // auth pin
- if (!cur->can_auth_pin()) {
- dout(7) << "waiting for authpinnable on " << *cur << endl;
- cur->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
- return;
- }
- mdr->auth_pin(cur);
+ assert(cur->is_auth());
- // write
+ // xlock file size
if (!mds->locker->inode_file_xlock_start(cur, mdr))
- return; // fw or (wait for) lock
-
- // do update
- cur->inode.size = 0;
- cur->_mark_dirty(); // fixme
+ return;
- mds->locker->inode_file_xlock_finish(cur, mdr);
+ if (cur->inode.size > 0) {
+ handle_client_opent(mdr);
+ return;
+ }
}
+
+ // do it
+ _do_open(mdr, cur);
+}
-
- // hmm, check permissions or something.
-
+void Server::_do_open(MDRequest *mdr, CInode *cur)
+{
+ MClientRequest *req = mdr->client_request();
+ int cmode = req->get_open_file_mode();
// can we issue the caps they want?
version_t fdv = mds->locker->issue_file_data_version(cur);
Capability *cap = mds->locker->issue_new_caps(cur, cmode, req);
if (!cap) return; // can't issue (yet), so wait!
-
- dout(12) << "open gets caps " << cap_string(cap->pending()) << " for " << req->get_source() << " on " << *cur << endl;
-
+
+ dout(12) << "_do_open issuing caps " << cap_string(cap->pending())
+ << " for " << req->get_source()
+ << " on " << *cur << endl;
+
+ // hit pop
mds->balancer->hit_inode(cur, META_POP_IRD);
// reply
}
+class C_MDS_open_truncate_purged : public Context {
+ MDS *mds;
+ MDRequest *mdr;
+ CInode *in;
+ version_t pv;
+ time_t ctime;
+public:
+ C_MDS_open_truncate_purged(MDS *m, MDRequest *r, CInode *i, version_t pdv, time_t ct) :
+ mds(m), mdr(r), in(i),
+ pv(pdv),
+ ctime(ct) { }
+ void finish(int r) {
+ assert(r == 0);
+
+ // apply to cache
+ in->inode.size = 0;
+ in->inode.ctime = ctime;
+ in->inode.mtime = ctime;
+ in->mark_dirty(pv);
+
+ // hit pop
+ mds->balancer->hit_inode(in, META_POP_IWR);
+
+ // do the open
+ mds->server->_do_open(mdr, in);
+ }
+};
+
+class C_MDS_open_truncate_logged : public Context {
+ MDS *mds;
+ MDRequest *mdr;
+ CInode *in;
+ version_t pv;
+ time_t ctime;
+public:
+ C_MDS_open_truncate_logged(MDS *m, MDRequest *r, CInode *i, version_t pdv, time_t ct) :
+ mds(m), mdr(r), in(i),
+ pv(pdv),
+ ctime(ct) { }
+ void finish(int r) {
+ assert(r == 0);
+
+ // purge also...
+ mds->mdcache->purge_inode(&in->inode, 0);
+ mds->mdcache->wait_for_purge(in->inode.ino, 0,
+ new C_MDS_open_truncate_purged(mds, mdr, in, pv, ctime));
+ }
+};
+
+
+void Server::handle_client_opent(MDRequest *mdr)
+{
+ CInode *cur = mdr->ref;
+ assert(cur);
+
+ // prepare
+ version_t pdv = cur->pre_dirty();
+ time_t ctime = g_clock.gettime();
+ Context *fin = new C_MDS_open_truncate_logged(mds, mdr, cur,
+ pdv, ctime);
+
+ // log + wait
+ EUpdate *le = new EUpdate("open_truncate");
+ le->metablob.add_client_req(mdr->reqid);
+ le->metablob.add_dir_context(cur->get_parent_dir());
+ le->metablob.add_inode_truncate(cur->inode, 0);
+ inode_t *pi = le->metablob.add_dentry(cur->parent, true);
+ pi->mtime = ctime;
+ pi->ctime = ctime;
+ pi->version = pdv;
+ pi->size = 0;
+
+ mdlog->submit_entry(le);
+ mdlog->wait_for_sync(fin);
+}
+
+
+
class C_MDS_openc_finish : public Context {
MDS *mds;
MDRequest *mdr;
void reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei);
// some helpers
- CInode *request_pin_ref(MDRequest *mdr);
CDir *validate_dentry_dir(MDRequest *mdr, CInode *diri, const string& dname);
CDir *traverse_to_auth_dir(MDRequest *mdr, vector<CDentry*> &trace, filepath refpath);
CDentry *prepare_null_dentry(MDRequest *mdr, CDir *dir, const string& dname, bool okexist=false);
CInode* prepare_new_inode(MClientRequest *req, CDir *dir);
+
+ CInode* rdlock_path_pin_ref(MDRequest *mdr, bool want_auth);
CDentry* rdlock_path_xlock_dentry(MDRequest *mdr, bool okexist, bool mustexist);
+
CDir* try_open_auth_dir(CInode *diri, frag_t fg, MDRequest *mdr);
-
- CDir* try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr);
+ //CDir* try_open_dir(CInode *diri, frag_t fg, MDRequest *mdr);
// requests on existing inodes.
void handle_client_stat(MDRequest *mdr);
// open
void handle_client_open(MDRequest *mdr);
void handle_client_openc(MDRequest *mdr); // O_CREAT variant.
+ void handle_client_opent(MDRequest *mdr); // O_TRUNC variant.
+ void _do_open(MDRequest *mdr, CInode *ref);
// namespace changes
void handle_client_mknod(MDRequest *mdr);
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_ESLAVEUPDATE_H
+#define __MDS_ESLAVEUPDATE_H
+
+#include "../LogEvent.h"
+#include "EMetaBlob.h"
+
+class ESlaveUpdate : public LogEvent {
+public:
+ string type;
+ metareqid_t reqid;
+ int op; // prepare, commit, abort
+ EMetaBlob metablob;
+
+ ESlaveUpdate() : LogEvent(EVENT_SLAVEUPDATE) { }
+ ESlaveUpdate(const char *s, metareqid_t ri, int o) :
+ LogEvent(EVENT_SLAVEUPDATE),
+ type(s),
+ reqid(ri),
+ op(o) { }
+
+ void print(ostream& out) {
+ if (type.length())
+ out << type << " ";
+ out << " " << op;
+ out << " " << reqid;
+ out << metablob;
+ }
+
+ void encode_payload(bufferlist& bl) {
+ ::_encode(type, bl);
+ ::_encode(reqid, bl);
+ ::_encode(op, bl);
+ metablob._encode(bl);
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ ::_decode(type, bl, off);
+ ::_decode(reqid, bl, off);
+ ::_decode(op, bl, off);
+ metablob._decode(bl, off);
+ }
+
+ bool has_expired(MDS *mds);
+ void expire(MDS *mds, Context *c);
+ void replay(MDS *mds);
+};
+
+#endif
#include "events/EAnchor.h"
#include "events/EAnchorClient.h"
#include "events/EUpdate.h"
+#include "events/ESlaveUpdate.h"
#include "events/EImportMap.h"
#include "events/EMount.h"
}
+// -----------------------
+// EUpdate
+
+bool ESlaveUpdate::has_expired(MDS *mds)
+{
+ return true;
+ //return metablob.has_expired(mds);
+}
+
+void ESlaveUpdate::expire(MDS *mds, Context *c)
+{
+ metablob.expire(mds, c);
+}
+
+void ESlaveUpdate::replay(MDS *mds)
+{
+ //metablob.replay(mds);
+}
+
+
// -----------------------
// EImportMap
return open_file_mode_is_readonly();
return (st.op < 1000);
}
+ bool follow_trailing_symlink() {
+ switch (st.op) {
+ case MDS_OP_LSTAT:
+ case MDS_OP_LINK:
+ case MDS_OP_UNLINK:
+ case MDS_OP_RENAME:
+ return false;
+
+ case MDS_OP_STAT:
+ case MDS_OP_UTIME:
+ case MDS_OP_CHMOD:
+ case MDS_OP_CHOWN:
+ case MDS_OP_READDIR:
+ case MDS_OP_OPEN:
+ return true;
+
+ default:
+ assert(0);
+ }
+ }
+
+
// normal fields
void set_tid(long t) { st.tid = t; }
#ifndef __MINODELINK_H
#define __MINODELINK_H
-typedef struct {
- inodeno_t ino;
- int from;
-} MInodeLink_st;
-
class MInodeLink : public Message {
- inodeno_t ino;
- filepath link_name;
- bool prepare;
- MInodeLink_st st;
+public:
+ static const int OP_PREPARE = 1;
+ static const int OP_AGREE = 2;
+ static const int OP_COMMIT = 3;
+ static const int OP_ACK = 4;
+ static const int OP_ROLLBACK = 5;
+
+ const char *get_opname(int o) {
+ switch (o) {
+ case OP_PREPARE: return "prepare";
+ case OP_AGREE: return "agree";
+ case OP_COMMIT: return "commit";
+ case OP_ACK: return "ack";
+ case OP_ROLLBACK: return "rollback";
+ default: assert(0);
+ }
+ }
+
+private:
+ struct _st {
+ inodeno_t ino; // inode to nlink++
+ metareqid_t reqid; // relevant request
+ int op; // see above
+ bool inc; // true == ++, false == --
- public:
+ time_t ctime;
+ } st;
+
+public:
inodeno_t get_ino() { return st.ino; }
- int get_from() { return st.from; }
+ metareqid_t get_reqid() { return st.reqid; }
+ int get_op() { return st.op; }
+ bool get_inc() { return st.inc; }
+
+ time_t get_ctime() { return st.ctime; }
+ void set_ctime(time_t ct) { st.ctime = ct; }
MInodeLink() {}
- MInodeLink(inodeno_t ino, int from) :
+ MInodeLink(int op, inodeno_t ino, bool inc, metareqid_t ri) :
Message(MSG_MDS_INODELINK) {
+ st.op = op;
st.ino = ino;
- st.from = from;
+ st.inc = inc;
+ st.reqid = ri;
+ }
+
+ virtual char *get_type_name() { return "inode_link"; }
+ void print(ostream& o) {
+ o << "inode_link(" << get_opname(st.op)
+ << " " << st.ino
+ << " nlink" << (st.inc ? "++":"--")
+ << " " << st.reqid << ")";
}
- virtual char *get_type_name() { return "InL";}
virtual void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(st), (char*)&st);
- off += sizeof(st);
+ _decoderaw(st, payload, off);
}
virtual void encode_payload() {
- payload.append((char*)&st,sizeof(st));
+ _encode(st, payload);
}
};