__le32 time_warp_seq;
} __attribute__ ((packed));
+/* extra info for cap import/export */
+struct ceph_mds_cap_peer {
+ __le64 cap_id;
+ __le32 seq;
+ __le32 mseq;
+ __le32 mds;
+ __u8 flags;
+} __attribute__ ((packed));
+
/* cap release msg head */
struct ceph_mds_cap_release {
__le32 num; /* number of cap_items that follow */
WRITE_RAW_ENCODER(ceph_mds_request_release)
WRITE_RAW_ENCODER(ceph_filelock)
WRITE_RAW_ENCODER(ceph_mds_caps)
+WRITE_RAW_ENCODER(ceph_mds_cap_peer)
WRITE_RAW_ENCODER(ceph_mds_cap_release)
WRITE_RAW_ENCODER(ceph_mds_cap_item)
WRITE_RAW_ENCODER(ceph_mds_lease)
cap->set_wanted(icr.wanted);
cap->issue_norevoke(icr.issued);
cap->reset_seq();
+ cap->set_cap_id(icr.cap_id);
}
- cap->set_cap_id(icr.cap_id);
cap->set_last_issue_stamp(ceph_clock_now(g_ceph_context));
return cap;
}
}
ceph_seq_t get_mseq() { return mseq; }
+ void inc_mseq() { mseq++; }
ceph_seq_t get_last_sent() { return last_sent; }
utime_t get_last_issue_stamp() { return last_issue_stamp; }
Export make_export() {
return Export(cap_id, _wanted, issued(), pending(), client_follows, last_sent, mseq+1, last_issue_stamp);
}
- void rejoin_import() { mseq++; }
void merge(Export& other, bool auth_cap) {
// issued + pending
int newpending = other.pending | pending();
// mark client caps stale.
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
cap_exports[p->first][q->first].cap_id, 0);
+ m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, from, 0);
mds->send_message_client_counted(m, session);
cap_exports[p->first].erase(q->first);
add_reconnected_cap(in, q->first, inodeno_t(r->second.snaprealm));
Capability *cap = in->reconnect_cap(q->first, r->second, session);
if (r->first >= 0) {
- do_cap_import(session, in, cap);
+ if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists
+ cap->inc_mseq();
+ do_cap_import(session, in, cap, r->second.cap_id, 0, 0, r->first, 0);
Capability::Import& im = rejoin_imported_caps[r->first][p->first][q->first];
im.cap_id = cap->get_cap_id();
Capability *cap = in->reconnect_cap(client, icr, session);
if (frommds >= 0) {
- cap->rejoin_import();
- do_cap_import(session, in, cap);
+ if (cap->get_last_seq() == 0) // don't increase mseq if cap already exists
+ cap->inc_mseq();
+ do_cap_import(session, in, cap, icr.cap_id, 0, 0, frommds, 0);
}
return cap;
if (session) {
// mark client caps stale.
MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0, 0, 0);
+ stale->set_cap_peer(0, 0, 0, -1, 0);
mds->send_message_client_counted(stale, q->first);
}
}
// -------
// cap imports and delayed snap parent opens
-void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap)
+void MDCache::do_cap_import(Session *session, CInode *in, Capability *cap,
+ uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq,
+ int peer, int p_flags)
{
client_t client = session->info.inst.name.num();
SnapRealm *realm = in->find_snaprealm();
cap->get_mseq());
in->encode_cap_message(reap, cap);
realm->build_snap_trace(reap->snapbl);
+ reap->set_cap_peer(p_cap_id, p_seq, p_mseq, peer, p_flags);
mds->send_message_client_counted(reap, session);
} else {
dout(10) << "do_cap_import missing past snap parents, delaying " << session->info.inst.name << " mseq "
{
dout(10) << "do_delayed_cap_imports" << dendl;
+ assert(delayed_imported_caps.empty());
+#if 0
map<client_t,set<CInode*> > d;
d.swap(delayed_imported_caps);
mds->locker->issue_caps(in);
}
}
+#endif
}
struct C_MDC_OpenSnapParents : public Context {
map<CInode*,map<client_t, set<inodeno_t> > > missing_snap_parents;
map<client_t,set<CInode*> > delayed_imported_caps;
- void do_cap_import(Session *session, CInode *in, Capability *cap);
+ void do_cap_import(Session *session, CInode *in, Capability *cap,
+ uint64_t p_cap_id, ceph_seq_t p_seq, ceph_seq_t p_mseq,
+ int peer, int p_flags);
void do_delayed_cap_imports();
void check_realm_past_parents(SnapRealm *realm);
void open_snap_parents();
exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first.v));
}
-void Migrator::finish_export_inode_caps(CInode *in)
+void Migrator::finish_export_inode_caps(CInode *in, int peer,
+ map<client_t,Capability::Import>& peer_imported)
{
dout(20) << "finish_export_inode_caps " << *in << dendl;
it != in->client_caps.end();
++it) {
Capability *cap = it->second;
- dout(7) << "finish_export_inode telling client." << it->first
+ dout(7) << "finish_export_inode_caps telling client." << it->first
<< " exported caps on " << *in << dendl;
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT,
- in->ino(),
- in->find_snaprealm()->inode->ino(),
- cap->get_cap_id(), cap->get_last_seq(),
- cap->pending(), cap->wanted(), 0,
- cap->get_mseq());
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, in->ino(), 0,
+ cap->get_cap_id(), cap->get_mseq());
+
+ map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
+ assert(q != peer_imported.end());
+ m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
mds->send_message_client_counted(m, it->first);
}
in->clear_client_caps_after_export();
mds->locker->eval(in, CEPH_CAP_LOCKS);
}
-void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished)
+void Migrator::finish_export_inode(CInode *in, utime_t now, int peer,
+ map<client_t,Capability::Import>& peer_imported,
+ list<Context*>& finished)
{
dout(12) << "finish_export_inode " << *in << dendl;
in->finish_export(now);
- finish_export_inode_caps(in);
+ finish_export_inode_caps(in, peer, peer_imported);
// *** other state too?
return num_exported;
}
-void Migrator::finish_export_dir(CDir *dir, list<Context*>& finished, utime_t now)
+void Migrator::finish_export_dir(CDir *dir, utime_t now, int peer,
+ map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
+ list<Context*>& finished)
{
dout(10) << "finish_export_dir " << *dir << dendl;
// inode?
if (dn->get_linkage()->is_primary()) {
- finish_export_inode(in, now, finished);
+ finish_export_inode(in, now, peer, peer_imported[in->ino()], finished);
// subdirs?
in->get_nested_dirfrags(subdirs);
// subdirs
for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); ++it)
- finish_export_dir(*it, finished, now);
+ finish_export_dir(*it, now, peer, peer_imported, finished);
}
class C_MDS_ExportFinishLogged : public Context {
// finish export (adjust local cache state)
C_Contexts *fin = new C_Contexts(g_ceph_context);
- finish_export_dir(dir, fin->contexts, ceph_clock_now(g_ceph_context));
+ finish_export_dir(dir, ceph_clock_now(g_ceph_context),
+ it->second.peer, it->second.peer_imported, fin->contexts);
dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
// unfreeze
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
p != it->second.peer_exports.end();
++p) {
- finish_import_inode_caps(p->first, true, p->second, imported_caps[p->first->ino()]);
+
+ finish_import_inode_caps(p->first, from, true, p->second, imported_caps[p->first->ino()]);
}
// send notify's etc.
}
}
-void Migrator::finish_import_inode_caps(CInode *in, bool auth_cap,
+void Migrator::finish_import_inode_caps(CInode *in, int peer, bool auth_cap,
map<client_t,Capability::Export> &export_map,
map<client_t,Capability::Import> &import_map)
{
im.issue_seq = cap->get_last_seq() + 1;
cap->merge(it->second, auth_cap);
- mds->mdcache->do_cap_import(session, in, cap);
+ mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
+ it->second.seq, it->second.mseq - 1, peer,
+ auth_cap ? CEPH_CAP_FLAG_AUTH : 0);
}
in->replica_caps_wanted = 0;
map<client_t,Capability::Import> imported_caps;
assert(peer_exports.count(in));
- finish_import_inode_caps(in, false, peer_exports[in], imported_caps);
+ finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
mds->send_message_mds(new MExportCapsAck(in->ino()), from);
map<client_t,entity_inst_t>& exported_client_map);
void encode_export_inode_caps(CInode *in, bufferlist& bl,
map<client_t,entity_inst_t>& exported_client_map);
- void finish_export_inode(CInode *in, utime_t now, list<Context*>& finished);
- void finish_export_inode_caps(CInode *in);
+ void finish_export_inode(CInode *in, utime_t now, int target,
+ map<client_t,Capability::Import>& peer_imported,
+ list<Context*>& finished);
+ void finish_export_inode_caps(CInode *in, int target,
+ map<client_t,Capability::Import>& peer_imported);
+
int encode_export_dir(bufferlist& exportbl,
CDir *dir,
map<client_t,entity_inst_t>& exported_client_map,
utime_t now);
- void finish_export_dir(CDir *dir, list<Context*>& finished, utime_t now);
+ void finish_export_dir(CDir *dir, utime_t now, int target,
+ map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
+ list<Context*>& finished);
void add_export_finish_waiter(CDir *dir, Context *c) {
map<CDir*, export_state_t>::iterator it = export_state.find(dir);
void decode_import_inode_caps(CInode *in,
bufferlist::iterator &blp,
map<CInode*, map<client_t,Capability::Export> >& cap_imports);
- void finish_import_inode_caps(CInode *in, bool auth_cap,
+ void finish_import_inode_caps(CInode *in, int from, bool auth_cap,
map<client_t,Capability::Export> &export_map,
map<client_t,Capability::Import> &import_map);
int decode_import_dir(bufferlist::iterator& blp,
finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
if (mdr->more()->cap_imports.count(destdnl->get_inode())) {
mds->mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(),
- mdr->more()->srcdn_auth_mds,
+ mdr->more()->srcdn_auth_mds, true,
mdr->more()->cap_imports[destdnl->get_inode()],
imported_caps);
}
::decode(peer_imported, bp);
dout(10) << " finishing inode export on " << *destdnl->get_inode() << dendl;
- mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now, finished);
+ mdcache->migrator->finish_export_inode(destdnl->get_inode(), mdr->now,
+ mdr->slave_to_mds, peer_imported, finished);
mds->queue_waiters(finished); // this includes SINGLEAUTH waiters.
// unfreeze
class MClientCaps : public Message {
- static const int HEAD_VERSION = 2; // added flock metadata
+ static const int HEAD_VERSION = 3; // added flock metadata
static const int COMPAT_VERSION = 1;
public:
struct ceph_mds_caps head;
+ struct ceph_mds_cap_peer peer;
bufferlist snapbl;
bufferlist xattrbl;
bufferlist flockbl;
void set_mtime(const utime_t &t) { t.encode_timeval(&head.mtime); }
void set_atime(const utime_t &t) { t.encode_timeval(&head.atime); }
+ void set_cap_peer(uint64_t id, ceph_seq_t seq, ceph_seq_t mseq, int mds, int flags) {
+ peer.cap_id = id;
+ peer.seq = seq;
+ peer.mseq = mseq;
+ peer.mds = mds;
+ peer.flags = flags;
+ }
+
MClientCaps()
: Message(CEPH_MSG_CLIENT_CAPS, HEAD_VERSION, COMPAT_VERSION) { }
MClientCaps(int op,
head.wanted = wanted;
head.dirty = dirty;
head.migrate_seq = mseq;
+ peer.cap_id = 0;
}
MClientCaps(int op,
inodeno_t ino, inodeno_t realm,
head.realm = realm;
head.cap_id = id;
head.migrate_seq = mseq;
+ peer.cap_id = 0;
}
private:
~MClientCaps() {}
// conditionally decode flock metadata
if (header.version >= 2)
::decode(flockbl, p);
+
+ if (header.version >= 3) {
+ if (head.op == CEPH_CAP_OP_IMPORT)
+ ::decode(peer, p);
+ else if (head.op == CEPH_CAP_OP_EXPORT)
+ memcpy(&peer, &head.size, sizeof(peer));
+ }
}
void encode_payload(uint64_t features) {
head.snap_trace_len = snapbl.length();
head.xattr_len = xattrbl.length();
+
+ // record peer in unused fields of cap export message
+ if (head.op == CEPH_CAP_OP_EXPORT)
+ memcpy(&head.size, &peer, sizeof(peer));
+
::encode(head, payload);
::encode_nohead(snapbl, payload);
if (features & CEPH_FEATURE_FLOCK) {
::encode(flockbl, payload);
} else {
- header.version = 1; // old
+ header.version = 1;
+ return;
+ }
+
+ if (true) {
+ if (head.op == CEPH_CAP_OP_IMPORT)
+ ::encode(peer, payload);
+ } else {
+ header.version = 2;
+ return;
}
}
};