void Capability::Export::encode(bufferlist &bl) const
{
ENCODE_START(2, 2, bl);
+ ::encode(cap_id, bl);
::encode(wanted, bl);
::encode(issued, bl);
::encode(pending, bl);
::encode(client_follows, bl);
+ ::encode(seq, bl);
::encode(mseq, bl);
::encode(last_issue_stamp, bl);
ENCODE_FINISH(bl);
void Capability::Export::decode(bufferlist::iterator &p)
{
DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, p);
+ ::decode(cap_id, p);
::decode(wanted, p);
::decode(issued, p);
::decode(pending, p);
::decode(client_follows, p);
+ ::decode(seq, p);
::decode(mseq, p);
::decode(last_issue_stamp, p);
DECODE_FINISH(p);
void Capability::Export::dump(Formatter *f) const
{
+ f->dump_unsigned("cap_id", cap_id);
f->dump_unsigned("wanted", wanted);
f->dump_unsigned("issued", issued);
f->dump_unsigned("pending", pending);
f->dump_unsigned("client_follows", client_follows);
+ f->dump_unsigned("seq", seq);
f->dump_unsigned("migrate_seq", mseq);
f->dump_stream("last_issue_stamp") << last_issue_stamp;
}
ls.back()->last_issue_stamp = utime_t(6, 7);
}
+void Capability::Import::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(cap_id, bl);
+ ::encode(issue_seq, bl);
+ ::encode(mseq, bl);
+ ENCODE_FINISH(bl);
+}
+
+void Capability::Import::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(cap_id, bl);
+ ::decode(issue_seq, bl);
+ ::decode(mseq, bl);
+ DECODE_FINISH(bl);
+}
+
+void Capability::Import::dump(Formatter *f) const
+{
+ f->dump_unsigned("cap_id", cap_id);
+ f->dump_unsigned("issue_seq", issue_seq);
+ f->dump_unsigned("migrate_seq", mseq);
+}
/*
* Capability::revoke_info
}
public:
struct Export {
+ int64_t cap_id;
int32_t wanted;
int32_t issued;
int32_t pending;
snapid_t client_follows;
+ ceph_seq_t seq;
ceph_seq_t mseq;
utime_t last_issue_stamp;
Export() {}
- Export(int w, int i, int p, snapid_t cf, ceph_seq_t s, utime_t lis) :
- wanted(w), issued(i), pending(p), client_follows(cf), mseq(s), last_issue_stamp(lis) {}
+ Export(int64_t id, int w, int i, int p, snapid_t cf, ceph_seq_t s, ceph_seq_t m, utime_t lis) :
+ cap_id(id), wanted(w), issued(i), pending(p), client_follows(cf),
+ seq(s), mseq(m), last_issue_stamp(lis) {}
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &p);
void dump(Formatter *f) const;
static void generate_test_instances(list<Export*>& ls);
};
+ struct Import {
+ int64_t cap_id;
+ ceph_seq_t issue_seq;
+ ceph_seq_t mseq;
+ Import() {}
+ Import(int64_t i, ceph_seq_t s, ceph_seq_t m) : cap_id(i), issue_seq(s), mseq(m) {}
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &p);
+ void dump(Formatter *f) const;
+ };
private:
CInode *inode;
// -- exports --
Export make_export() {
- return Export(_wanted, issued(), pending(), client_follows, mseq+1, last_issue_stamp);
+ return Export(cap_id, _wanted, issued(), pending(), client_follows, last_sent, mseq+1, last_issue_stamp);
}
void rejoin_import() { mseq++; }
void merge(Export& other, bool auth_cap) {
};
WRITE_CLASS_ENCODER(Capability::Export)
+WRITE_CLASS_ENCODER(Capability::Import)
WRITE_CLASS_ENCODER(Capability::revoke_info)
WRITE_CLASS_ENCODER(Capability)
assert(it->second.state == EXPORT_EXPORTING);
assert(it->second.tid == m->get_tid());
+ bufferlist::iterator bp = m->imported_caps.begin();
+ ::decode(it->second.peer_imported, bp);
+
it->second.state = EXPORT_LOGGINGFINISH;
assert (g_conf->mds_kill_export_at != 9);
set<CDir*> bounds;
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+ map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
+
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
p != it->second.peer_exports.end();
++p) {
- finish_import_inode_caps(p->first, true, p->second);
+ finish_import_inode_caps(p->first, true, p->second, imported_caps[p->first->ino()]);
}
// send notify's etc.
// test surviving observer of a failed migration that did not complete
//assert(dir->replica_map.size() < 2 || mds->whoami != 0);
- mds->send_message_mds(new MExportDirAck(dir->dirfrag(), it->second.tid), from);
+ MExportDirAck *ack = new MExportDirAck(dir->dirfrag(), it->second.tid);
+ ::encode(imported_caps, ack->imported_caps);
+
+ mds->send_message_mds(ack, from);
assert (g_conf->mds_kill_import_at != 8);
cache->show_subtrees();
}
void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap,
- map<client_t,Capability::Export> &cap_map)
+ map<client_t,Capability::Export> &export_map,
+ map<client_t,Capability::Import> &import_map)
{
- for (map<client_t,Capability::Export>::iterator it = cap_map.begin();
- it != cap_map.end();
+ for (map<client_t,Capability::Export>::iterator it = export_map.begin();
+ it != export_map.end();
++it) {
dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl;
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v));
if (!cap) {
cap = in->add_client_cap(it->first, session);
}
- cap->merge(it->second, auth_cap);
+ Capability::Import& im = import_map[it->first];
+ im.cap_id = cap->get_cap_id();
+ im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
+ im.issue_seq = cap->get_last_seq() + 1;
+
+ cap->merge(it->second, auth_cap);
mds->mdcache->do_cap_import(session, in, cap);
}
// force open client sessions and finish cap import
mds->server->finish_force_open_sessions(client_map, sseqmap);
+ map<client_t,Capability::Import> imported_caps;
+
assert(peer_exports.count(in));
- finish_import_inode_caps(in, false, peer_exports[in]);
+ finish_import_inode_caps(in, false, peer_exports[in], imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
mds->send_message_mds(new MExportCapsAck(in->ino()), from);
set<SimpleLock*> locks;
set<int> warning_ack_waiting;
set<int> notify_ack_waiting;
+ map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
list<Context*> waiting_for_finish;
};
bufferlist::iterator &blp,
map<CInode*, map<client_t,Capability::Export> >& cap_imports);
void finish_import_inode_caps(CInode *in, bool auth_cap,
- map<client_t,Capability::Export> &cap_map);
+ map<client_t,Capability::Export> &export_map,
+ map<client_t,Capability::Import> &import_map);
int decode_import_dir(bufferlist::iterator& blp,
int oldauth,
CDir *import_root,
// srcdn inode import?
if (!srcdn->is_auth() && destdn->is_auth()) {
assert(mdr->more()->inode_import.length() > 0);
+
+ map<client_t,Capability::Import> imported_caps;
// finish cap imports
finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
if (mdr->more()->cap_imports.count(destdnl->get_inode())) {
- mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(), srcdn->authority().first,
- mdr->more()->cap_imports[destdnl->get_inode()]);
+ mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(),
+ mdr->more()->srcdn_auth_mds,
+ mdr->more()->cap_imports[destdnl->get_inode()],
+ imported_caps);
}
/* hack: add an auth pin for each xlock we hold. These were
* remote xlocks previously but now they're local and
#include "MExportDir.h"
class MExportDirAck : public Message {
+public:
dirfrag_t dirfrag;
+ bufferlist imported_caps;
- public:
dirfrag_t get_dirfrag() { return dirfrag; }
MExportDirAck() : Message(MSG_MDS_EXPORTDIRACK) {}
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(dirfrag, p);
+ ::decode(imported_caps, p);
}
void encode_payload(uint64_t features) {
::encode(dirfrag, payload);
+ ::encode(imported_caps, payload);
}
};