#include "events/EImportFinish.h"
#include "events/EFragment.h"
#include "events/ECommitted.h"
+#include "events/ESessions.h"
#include "messages/MGenericMessage.h"
}
if (mds->is_rejoin()) {
+ map<client_t, set<int> > client_exports;
for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = cap_exports.begin();
p != cap_exports.end();
++p) {
assert(cap_export_targets.count(p->first));
- rejoins[cap_export_targets[p->first]]->cap_exports[p->first] = p->second;
+ int target = cap_export_targets[p->first];
+ rejoins[target]->cap_exports[p->first] = p->second;
+ for (map<client_t,ceph_mds_cap_reconnect>::iterator q = p->second.begin();
+ q != p->second.end();
+ ++q)
+ client_exports[q->first].insert(target);
+ }
+ for (map<client_t, set<int> >::iterator p = client_exports.begin();
+ p != client_exports.end();
+ ++p) {
+ entity_inst_t inst = mds->sessionmap.get_inst(entity_name_t::CLIENT(p->first.v));
+ for (set<int>::iterator q = p->second.begin(); q != p->second.end(); ++q)
+ rejoins[*q]->client_map[p->first] = inst;
}
}
assert(gather_locks.empty());
// check cap exports.
+ rejoin_client_map.insert(weak->client_map.begin(), weak->client_map.end());
+
for (map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> >::iterator p = weak->cap_exports.begin();
p != weak->cap_exports.end();
++p) {
cap_imports_num_opening--;
if (cap_imports_num_opening == 0) {
- if (rejoin_gather.count(mds->get_nodeid()))
- process_imported_caps();
- else
+ if (rejoin_gather.empty())
rejoin_gather_finish();
+ else if (rejoin_gather.count(mds->get_nodeid()))
+ process_imported_caps();
+ }
+}
+
+class C_MDC_RejoinSessionsOpened : public Context {
+ MDCache *cache;
+public:
+ map<client_t,entity_inst_t> client_map;
+ map<client_t,uint64_t> sseqmap;
+
+ C_MDC_RejoinSessionsOpened(MDCache *c, map<client_t,entity_inst_t>& cm) :
+ cache(c), client_map(cm) {}
+ void finish(int r) {
+ assert(r == 0);
+ cache->rejoin_open_sessions_finish(client_map, sseqmap);
}
+};
+
+void MDCache::rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
+ map<client_t,uint64_t>& sseqmap)
+{
+ dout(10) << "rejoin_open_sessions_finish" << dendl;
+ mds->server->finish_force_open_sessions(client_map, sseqmap);
+ if (rejoin_gather.empty())
+ rejoin_gather_finish();
}
bool MDCache::process_imported_caps()
// called by rejoin_gather_finish() ?
if (rejoin_gather.count(mds->get_nodeid()) == 0) {
+ // if sessions for imported caps are all open ?
+ for (map<client_t,entity_inst_t>::iterator p = rejoin_client_map.begin();
+ p != rejoin_client_map.end();
+ ++p) {
+ if (!mds->sessionmap.have_session(entity_name_t::CLIENT(p->first.v))) {
+ C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this, rejoin_client_map);
+ version_t pv = mds->server->prepare_force_open_sessions(rejoin_client_map, finish->sseqmap);
+ ESessions *le = new ESessions(pv, rejoin_client_map);
+ mds->mdlog->start_submit_entry(le, finish);
+ mds->mdlog->flush();
+ rejoin_client_map.clear();
+ return true;
+ }
+ }
+ rejoin_client_map.clear();
+
// process caps that were exported by slave rename
for (map<inodeno_t,pair<int,map<client_t,Capability::Export> > >::iterator p = rejoin_slave_exports.begin();
p != rejoin_slave_exports.end();
set<int> rejoin_ack_gather; // nodes from whom i need a rejoin ack
map<int,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
map<inodeno_t,pair<int,map<client_t,Capability::Export> > > rejoin_slave_exports;
+ map<client_t,entity_inst_t> rejoin_client_map;
map<inodeno_t,map<client_t,ceph_mds_cap_reconnect> > cap_exports; // ino -> client -> capex
map<inodeno_t,int> cap_export_targets; // ino -> auth mds
}
friend class C_MDC_RejoinOpenInoFinish;
+ friend class C_MDC_RejoinSessionsOpened;
void rejoin_open_ino_finish(inodeno_t ino, int ret);
+ void rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
+ map<client_t,uint64_t>& sseqmap);
bool process_imported_caps();
void choose_lock_states_and_reconnect_caps();
void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
// open
map<inodeno_t,map<client_t, ceph_mds_cap_reconnect> > cap_exports;
+ map<client_t, entity_inst_t> client_map;
bufferlist imported_caps;
// full
::encode(xlocked_inodes, payload);
::encode(wrlocked_inodes, payload);
::encode(cap_exports, payload);
+ ::encode(client_map, payload);
::encode(imported_caps, payload);
::encode(strong_dirfrags, payload);
::encode(dirfrag_bases, payload);
::decode(xlocked_inodes, p);
::decode(wrlocked_inodes, p);
::decode(cap_exports, p);
+ ::decode(client_map, p);
::decode(imported_caps, p);
::decode(strong_dirfrags, p);
::decode(dirfrag_bases, p);