- mds metadata versioning
- (dir) inode versions..
-- cdir fetch/store versioned dentries
+/- cdir fetch/store versioned dentries
- emetablob.. journaling a versioned update..
- [0,head] foo
- becomes
- [0,10] foo
- [11,head] foo
- and blob contains simply
- [11,head] foo
- that means replay _AND_ fetch have to convert the [0,head] foo into [0,10] foo.
-?
+- set dir mtime on snap create
- migrator import/export of versioned dentries, inodes... drop them on export...
ostream& operator<<(ostream &out, Inode &in)
{
- out << in.inode.ino
- << "s" << in.snapid << "("
+ out << in.vino() << "("
<< " cap_refs=" << in.cap_refs
<< " open=" << in.open_by_mode
<< " ref=" << in.ref
::decode(numd, p);
::decode(snapdirpos, p);
dout(10) << "insert_trace got " << numi << " inodes, " << numd << " dentries, snapdir at " << snapdirpos << dendl;
- int icount = 0;
// decode
- LeaseStat ilease[numi], snapdirlease;
- InodeStat ist[numi], snapdirst;
+ LeaseStat ilease[numi];
+ InodeStat ist[numi];
DirStat dst[numd];
string dname[numd];
LeaseStat dlease[numd];
inode:
if (!ileft) goto done;
- ileft--; icount++;
- if (icount == snapdirpos) {
- snapdirst.decode(p);
- ::decode(snapdirlease, p);
- }
+ ileft--;
ist[ileft].decode(p);
+ dout(20) << " got vino " << ist[ileft].vino << dendl;
::decode(ilease[ileft], p);
dentry:
if (!dleft) goto done;
dleft--;
::decode(dname[dleft], p);
+ dout(20) << " got dname " << dname[dleft] << dendl;
::decode(dlease[dleft], p);
dst[dleft].decode(p);
goto inode;
dout(10) << " curi " << *curi << dendl;
if ((int)i == numi-snapdirpos-1) {
- Inode *snapdiri = open_snapdir(curi);
- dout(10) << " snapdir " << *snapdiri << dendl;
- char s[20];
- sprintf(s, "%llu", (unsigned long long)snapdirst.vino.snapid);
- string snapname = s;
- Dir *snapdir = snapdiri->open_dir();
- curi = insert_dentry_inode(snapdir, snapname, &snapdirlease, // FIXME
- &snapdirst, &snapdirlease, from);
- dout(10) << " snapped diri " << *curi << dendl;
+ curi = open_snapdir(curi);
+ dout(10) << " snapdir " << *curi << dendl;
}
update_dir_dist(curi, &dst[i]); // dir stat info is attached to inode...
filepath path;
diri->make_path(path);
- path.push_dentry(name);
- int r = _mkdir(path.c_str(), mode, uid, gid);
- if (r == 0) {
- string dname(name);
- Inode *in = diri->dir->dentries[dname]->inode;
- fill_stat(in, attr);
- _ll_get(in);
+
+ int r;
+ if (diri->snapid == CEPH_SNAPDIR) {
+ MClientRequest *req = new MClientRequest(CEPH_MDS_OP_MKSNAP);
+ req->set_filepath(path);
+ req->set_path2(name);
+
+ Inode *in;
+ MClientReply *reply = make_request(req, uid, gid, &in);
+ r = reply->get_result();
+ snapid_t snapid = 0;
+ if (reply->get_snaps().size())
+ snapid = reply->get_snaps()[0];
+ delete reply;
+ dout(10) << "mksnap result is " << r << " vino " << in->vino() << " snapid " << snapid << dendl;
+ if (r == 0) {
+ fill_stat(in, attr);
+ _ll_get(in);
+ }
+ } else {
+ path.push_dentry(name);
+ r = _mkdir(path.c_str(), mode, uid, gid);
+ if (r == 0) {
+ string dname(name);
+ Inode *in = diri->dir->dentries[dname]->inode;
+ fill_stat(in, attr);
+ _ll_get(in);
+ }
}
tout << attr->st_ino << std::endl;
dout(3) << "ll_mkdir " << parent << " " << name
continue;
}
if (snapid == CEPH_SNAPDIR) {
- snapid = atoll(path[depth].c_str());
+ SnapRealm *realm = cur->find_snaprealm();
+ snapid = realm->resolve_snapname(path[depth]);
dout(10) << "traverse: snap " << path[depth] << " -> " << snapid << dendl;
if (!snapid)
return -ENOENT;
- SnapRealm *realm = cur->find_snaprealm();
- if (realm->get_snaps().count(snapid) == 0)
- return -ENOENT;
depth++;
continue;
}
}
+static void encode_empty_dirstat(bufferlist& bl)
+{
+ static DirStat empty;
+ empty.encode(bl);
+}
+
+static void encode_empty_lease(bufferlist& bl)
+{
+ LeaseStat e;
+ e.mask = -1;
+ e.duration_ms = -1;
+ ::encode(e, bl);
+}
+
+
/*
* pass inode OR dentry (not both, or we may get confused)
*
// choose lease duration
utime_t now = g_clock.now();
- int lmask;
+ int lmask = 0;
// start with dentry or inode?
if (!in) {
dout(20) << " trace added " << lmask << " snapid " << snapid << " " << *in << dendl;
if (snapid != CEPH_NOSNAP && in == snapdiri) {
+ // do the snap name dentry
+ const string& snapname = in->find_snaprealm()->get_snapname(snapid);
+ dout(10) << " snapname " << snapname << dendl;
+ ::encode(snapname, bl);
+ encode_empty_lease(bl);
+ numdn++;
+ encode_empty_dirstat(bl);
+
+ // back to the live tree
snapid = CEPH_NOSNAP;
- snapdirpos = numi;
- dout(10) << " snapdiri at pos " << snapdirpos << dendl;
in->encode_inodestat(bl, snapid);
lmask = mds->locker->issue_client_lease(in, client, bl, now, session);
+ numi++;
dout(20) << " trace added " << lmask << " snapid " << snapid << " " << *in << dendl;
+
+ snapdirpos = numi;
+ dout(10) << " snapdiri at pos " << snapdirpos << dendl;
}
if (!dn)
-static void encode_empty_dirstat(bufferlist& bl)
-{
- // encode fake dirstat
- frag_t fg;
- __s32 auth = CDIR_AUTH_PARENT;
- __u32 zero;
- ::encode(fg, bl);
- ::encode(auth, bl);
- ::encode(zero, bl);
-}
-
-static void encode_empty_lease(bufferlist& bl)
-{
- LeaseStat e;
- e.mask = -1;
- e.duration_ms = -1;
- ::encode(e, bl);
-}
// snaps
dout(10) << p->first << " -> " << *p->second << dendl;
- char nm[20];
- sprintf(nm, "%llu", (unsigned long long)p->second->snapid);
- ::encode(nm, dnbl);
+ // actual
+ if (p->second->dirino == diri->ino())
+ ::encode(p->second->name, dnbl);
+ else
+ ::encode(p->second->get_long_name(), dnbl);
encode_empty_lease(dnbl);
diri->encode_inodestat(dnbl, p->first);
encode_empty_lease(dnbl);
if (mdr->now == utime_t())
mdr->now = g_clock.now();
+ // make sure name is unique
+ const string &snapname = req->get_path2();
+ if (diri->snaprealm &&
+ diri->snaprealm->exists(snapname)) {
+ reply_request(mdr, -EEXIST);
+ return;
+ }
+ if (snapname.length() == 0 ||
+ snapname[0] == '_') {
+ reply_request(mdr, -EINVAL);
+ return;
+ }
+
// anchor diri
if (!diri->inode.anchored) {
mds->mdcache->anchor_create(mdr, diri, new C_MDS_RetryRequest(mds->mdcache, mdr));
// allocate a snapid
// HACK
- snapid_t snapid = mds->snaptable->create(diri->ino(), req->get_path2(), mdr->now);
+ snapid_t snapid = mds->snaptable->create(diri->ino(), snapname, mdr->now);
dout(10) << " snapid is " << snapid << dendl;
// add the snap
dout(10) << "snaprealm was " << *diri->snaprealm << dendl;
SnapInfo info;
+ info.dirino = diri->ino();
info.snapid = snapid;
info.name = req->get_path2();
info.stamp = mdr->now;
mds->send_message_client(p->second, p->first);
// yay
- reply_request(mdr, 0, diri);
+ mdr->ref = diri;
+ mdr->ref_snapid = snapid;
+ mdr->ref_snapdiri = diri;
+ MClientReply *reply = new MClientReply(req, 0);
+ reply->set_snaps(diri->snaprealm->get_snaps());
+ reply_request(mdr, reply);
}
void Server::handle_client_rmsnap(MDRequest *mdr)
<< ".cache.snaprealm(" << inode->ino() \
<< " " << this << ") "
+
bool SnapRealm::open_parents(MDRequest *mdr)
{
dout(10) << "open_parents" << dendl;
parent->get_snap_info(infomap, thru, last);
}
+const string& SnapInfo::get_long_name()
+{
+ if (long_name.length() == 0) {
+ char nm[80];
+ sprintf(nm, "_%s_%llu", name.c_str(), (unsigned long long)dirino);
+ long_name = nm;
+ }
+ return long_name;
+}
+
+const string& SnapRealm::get_snapname(snapid_t snapid, bool actual)
+{
+ if (snaps.count(snapid)) {
+ if (actual)
+ return snaps[snapid].name;
+ else
+ return snaps[snapid].get_long_name();
+ }
+
+ map<snapid_t, SnapInfo>::iterator p = snaps.lower_bound(snapid);
+ if (p != snaps.end() && p->first <= snapid) {
+ CInode *oldparent = mdcache->get_inode(p->second.dirino);
+ assert(oldparent); // call open_parents first!
+ assert(oldparent->snaprealm);
+
+ return oldparent->snaprealm->get_snapname(snapid, false);
+ }
+
+ return parent->get_snapname(snapid, false);
+}
+
+snapid_t SnapRealm::resolve_snapname(const string& n, bool actual, snapid_t first, snapid_t last)
+{
+ // first try me
+ dout(10) << "resolve_snapname '" << n << "' in [" << first << "," << last << "]" << dendl;
+
+ snapid_t num;
+ if (n[0] == '~') num = atoll(n.c_str()+1);
+
+ string pname;
+ inodeno_t pdirino;
+ if (!actual) {
+ if (!n.length() ||
+ n[0] != '_') return 0;
+ int next_ = n.find('_', 1);
+ if (next_ < 0) return 0;
+ pname = n.substr(1, next_ - 1);
+ pdirino = atoll(n.c_str() + next_ + 1);
+ dout(10) << " " << n << " -> " << pname << " dirino " << pdirino << dendl;
+ }
+
+ for (map<snapid_t, SnapInfo>::iterator p = snaps.lower_bound(first); // first element >= first
+ p != snaps.end() && p->first <= last;
+ p++) {
+ if (num && p->second.snapid == num)
+ return p->first;
+ if (actual && p->second.name == n)
+ return p->first;
+ if (!actual && p->second.name == pname && p->second.dirino == pdirino)
+ return p->first;
+ }
+
+ // include snaps for parents during intervals that intersect [first,last]
+ snapid_t thru = first;
+ for (map<snapid_t, snaplink_t>::iterator p = past_parents.lower_bound(first);
+ p != past_parents.end() && p->first >= first && p->second.first <= last;
+ p++) {
+ CInode *oldparent = mdcache->get_inode(p->second.dirino);
+ assert(oldparent); // call open_parents first!
+ assert(oldparent->snaprealm);
+
+ thru = MIN(last, p->first);
+ snapid_t r = oldparent->snaprealm->resolve_snapname(n, false,
+ MAX(first, p->second.first),
+ thru);
+ if (r)
+ return r;
+ ++thru;
+ }
+ if (thru <= last && parent)
+ return parent->resolve_snapname(n, false, thru, last);
+ return 0;
+}
+
+
void SnapRealm::split_at(SnapRealm *child)
{
snapid_t snapid;
inodeno_t dirino;
utime_t stamp;
- string name;
+ string name, long_name;
void encode(bufferlist& bl) const {
::encode(snapid, bl);
::decode(stamp, bl);
::decode(name, bl);
}
+ const string& get_long_name();
};
WRITE_CLASS_ENCODER(SnapInfo)
snap_highwater(0)
{ }
+ bool exists(const string &name) {
+ for (map<snapid_t,SnapInfo>::iterator p = snaps.begin();
+ p != snaps.end();
+ p++)
+ if (p->second.name == name)
+ return true;
+ return false;
+ }
bool open_parents(MDRequest *mdr);
void build_snap_set(set<snapid_t>& s, snapid_t first, snapid_t last);
void get_snap_info(map<snapid_t,SnapInfo*>& infomap, snapid_t first=0, snapid_t last=CEPH_NOSNAP);
+ const string& get_snapname(snapid_t snapid, bool actual=true);
+ snapid_t resolve_snapname(const string &name, bool actual=true, snapid_t first=0, snapid_t last=CEPH_NOSNAP);
+
const set<snapid_t>& get_snaps();
const vector<snapid_t>& get_snap_vector();
const set<snapid_t>& update_snaps(snapid_t adding=0);
__s32 auth;
set<__s32> dist;
- DirStat() {}
+ DirStat() : auth(CDIR_AUTH_PARENT) {}
DirStat(bufferlist::iterator& p) {
decode(p);
}
+ void encode(bufferlist& bl) {
+ ::encode(frag, bl);
+ ::encode(auth, bl);
+ ::encode(dist, bl);
+ }
void decode(bufferlist::iterator& p) {
::decode(frag, p);
::decode(auth, p);