Export make_export() {
return Export(cap_id, _wanted, issued(), pending(), client_follows, last_sent, mseq+1, last_issue_stamp);
}
- void merge(Export& other, bool auth_cap) {
+ void merge(const Export& other, bool auth_cap) {
if (!is_stale()) {
// issued + pending
int newpending = other.pending | pending();
im.cap_id = ++last_cap_id; // assign a new cap ID
im.issue_seq = 1;
im.mseq = q->second.mseq;
+
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
+ if (session)
+ rejoin_imported_client_map.emplace(q->first, session->info.inst);
}
// will process these caps in rejoin stage
assert(gather_locks.empty());
// check cap exports.
- rejoin_client_map.insert(weak->client_map.begin(), weak->client_map.end());
+ rejoin_imported_client_map.insert(weak->client_map.begin(), weak->client_map.end());
for (auto p = weak->cap_exports.begin(); p != weak->cap_exports.end(); ++p) {
CInode *in = get_inode(p->first);
dout(10) << " exporting caps for client." << q->first << " ino " << p->first << dendl;
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ if (!session) {
+ dout(10) << " no session for client." << p->first << dendl;
+ ex.second.erase(r);
+ continue;
+ }
// mark client caps stale.
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0,
class C_MDC_RejoinSessionsOpened : public MDCacheLogContext {
public:
- map<client_t,entity_inst_t> client_map;
- map<client_t,uint64_t> sseqmap;
-
- C_MDC_RejoinSessionsOpened(MDCache *c, map<client_t,entity_inst_t>& cm) :
- MDCacheLogContext(c), client_map(cm) {}
+ map<client_t,pair<Session*,uint64_t> > imported_session_map;
+ C_MDC_RejoinSessionsOpened(MDCache *c) : MDCacheLogContext(c) {}
void finish(int r) override {
assert(r == 0);
- mdcache->rejoin_open_sessions_finish(client_map, sseqmap);
+ mdcache->rejoin_open_sessions_finish(imported_session_map);
}
};
-void MDCache::rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
- map<client_t,uint64_t>& sseqmap)
+void MDCache::rejoin_open_sessions_finish(map<client_t,pair<Session*,uint64_t> >& imported_session_map)
{
dout(10) << "rejoin_open_sessions_finish" << dendl;
- mds->server->finish_force_open_sessions(client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_session_map);
+ rejoin_imported_session_map.swap(imported_session_map);
if (rejoin_gather.empty())
rejoin_gather_finish();
}
// called by rejoin_gather_finish() ?
if (rejoin_gather.count(mds->get_nodeid()) == 0) {
- // if sessions for imported caps are all open ?
- for (map<client_t,entity_inst_t>::iterator p = rejoin_client_map.begin();
- p != rejoin_client_map.end();
- ++p) {
- if (!mds->sessionmap.have_session(entity_name_t::CLIENT(p->first.v))) {
- C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this, rejoin_client_map);
- version_t pv = mds->server->prepare_force_open_sessions(rejoin_client_map, finish->sseqmap);
- ESessions *le = new ESessions(pv, rejoin_client_map);
- mds->mdlog->start_submit_entry(le, finish);
- mds->mdlog->flush();
- rejoin_client_map.clear();
- return true;
- }
+ if (!rejoin_imported_client_map.empty() &&
+ rejoin_imported_session_map.empty()) {
+ C_MDC_RejoinSessionsOpened *finish = new C_MDC_RejoinSessionsOpened(this);
+ version_t pv = mds->server->prepare_force_open_sessions(rejoin_imported_client_map,
+ finish->imported_session_map);
+ mds->mdlog->start_submit_entry(new ESessions(pv, rejoin_imported_client_map), finish);
+ mds->mdlog->flush();
+ rejoin_imported_client_map.clear();
+ return true;
}
- rejoin_client_map.clear();
// process caps that were exported by slave rename
for (map<inodeno_t,pair<mds_rank_t,map<client_t,Capability::Export> > >::iterator p = rejoin_slave_exports.begin();
for (map<client_t,Capability::Export>::iterator q = p->second.second.begin();
q != p->second.second.end();
++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ auto r = rejoin_imported_session_map.find(q->first);
+ if (r == rejoin_imported_session_map.end())
+ continue;
+ Session *session = r->second.first;
Capability *cap = in->get_client_cap(q->first);
if (!cap) {
cap = in->add_client_cap(q->first, session);
}
assert(in->is_auth());
for (auto q = p->second.begin(); q != p->second.end(); ++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ Session *session;
+ {
+ auto r = rejoin_imported_session_map.find(q->first);
+ session = (r != rejoin_imported_session_map.end() ? r->second.first : nullptr);
+ }
+
for (auto r = q->second.begin(); r != q->second.end(); ++r) {
+ if (!session) {
+ if (r->first >= 0)
+ (void)rejoin_imported_caps[r->first][p->first][q->first]; // all are zero
+ continue;
+ }
+
Capability *cap = in->reconnect_cap(q->first, r->second, session);
add_reconnected_cap(q->first, in->ino(), r->second);
if (r->first >= 0) {
set<mds_rank_t> rejoin_ack_gather; // nodes from whom i need a rejoin ack
map<mds_rank_t,map<inodeno_t,map<client_t,Capability::Import> > > rejoin_imported_caps;
map<inodeno_t,pair<mds_rank_t,map<client_t,Capability::Export> > > rejoin_slave_exports;
- map<client_t,entity_inst_t> rejoin_client_map;
+ map<client_t,entity_inst_t> rejoin_imported_client_map;
+ map<client_t,pair<Session*,uint64_t> > rejoin_imported_session_map;
map<inodeno_t,pair<mds_rank_t,map<client_t,cap_reconnect_t> > > cap_exports; // ino -> target, client -> capex
friend class C_MDC_RejoinSessionsOpened;
void rejoin_open_ino_finish(inodeno_t ino, int ret);
void rejoin_prefetch_ino_finish(inodeno_t ino, int ret);
- void rejoin_open_sessions_finish(map<client_t,entity_inst_t> client_map,
- map<client_t,uint64_t>& sseqmap);
+ void rejoin_open_sessions_finish(map<client_t,pair<Session*,uint64_t> >& imported_session_map);
bool process_imported_caps();
void choose_lock_states_and_reconnect_caps();
void prepare_realm_split(SnapRealm *realm, client_t client, inodeno_t ino,
map<client_t,Capability::Import>::iterator q = peer_imported.find(it->first);
assert(q != peer_imported.end());
- m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq, peer, 0);
+ m->set_cap_peer(q->second.cap_id, q->second.issue_seq, q->second.mseq,
+ (q->second.cap_id > 0 ? peer : -1), 0);
mds->send_message_client_counted(m, it->first);
}
in->clear_client_caps_after_export();
CDir *dir;
mds_rank_t from;
public:
- map<client_t,entity_inst_t> imported_client_map;
- map<client_t,uint64_t> sseqmap;
+ map<client_t,pair<Session*,uint64_t> > imported_session_map;
C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, mds_rank_t f) :
MigratorLogContext(m), df(d->dirfrag()), dir(d), from(f) {
}
void finish(int r) override {
- mig->import_logged_start(df, dir, from, imported_client_map, sseqmap);
+ mig->import_logged_start(df, dir, from, imported_session_map);
}
};
// new client sessions, open these after we journal
// include imported sessions in EImportStart
bufferlist::iterator cmp = m->client_map.begin();
- decode(onlogged->imported_client_map, cmp);
+ map<client_t,entity_inst_t> client_map;
+ decode(client_map, cmp);
assert(cmp.end());
- le->cmapv = mds->server->prepare_force_open_sessions(onlogged->imported_client_map, onlogged->sseqmap);
- le->client_map.claim(m->client_map);
+ le->cmapv = mds->server->prepare_force_open_sessions(client_map, onlogged->imported_session_map);
+ encode(client_map, le->client_map, mds->mdsmap->get_up_features());
bufferlist::iterator blp = m->export_data.begin();
int num_imported_inodes = 0;
if (stat.state == IMPORT_ACKING) {
// remove imported caps
for (map<CInode*,map<client_t,Capability::Export> >::iterator p = stat.peer_exports.begin();
- p != stat.peer_exports.end();
- ++p) {
+ p != stat.peer_exports.end();
+ ++p) {
CInode *in = p->first;
for (map<client_t,Capability::Export>::iterator q = p->second.begin();
- q != p->second.end();
- ++q) {
+ q != p->second.end();
+ ++q) {
Capability *cap = in->get_client_cap(q->first);
- assert(cap);
+ if (!cap) {
+ assert(!stat.session_map.count(q->first));
+ continue;
+ }
if (cap->is_importing())
in->remove_client_cap(q->first);
}
in->put(CInode::PIN_IMPORTINGCAPS);
}
- for (map<client_t,entity_inst_t>::iterator p = stat.client_map.begin();
- p != stat.client_map.end();
- ++p) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
- assert(session);
+ for (auto& p : stat.session_map) {
+ Session *session = p.second.first;
session->dec_importing();
}
}
void Migrator::import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
- map<client_t,entity_inst_t>& imported_client_map,
- map<client_t,uint64_t>& sseqmap)
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map)
{
map<dirfrag_t, import_state_t>::iterator it = import_state.find(dir->dirfrag());
if (it == import_state.end() ||
it->second.state != IMPORT_LOGGINGSTART) {
dout(7) << "import " << df << " must have aborted" << dendl;
- mds->server->finish_force_open_sessions(imported_client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_session_map);
return;
}
assert (g_conf->mds_kill_import_at != 7);
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(imported_client_map, sseqmap, false);
- it->second.client_map.swap(imported_client_map);
+ mds->server->finish_force_open_sessions(imported_session_map, false);
map<inodeno_t,map<client_t,Capability::Import> > imported_caps;
for (map<CInode*, map<client_t,Capability::Export> >::iterator p = it->second.peer_exports.begin();
p != it->second.peer_exports.end();
++p) {
// parameter 'peer' is NONE, delay sending cap import messages to client
- finish_import_inode_caps(p->first, MDS_RANK_NONE, true, p->second, imported_caps[p->first->ino()]);
+ finish_import_inode_caps(p->first, MDS_RANK_NONE, true, imported_session_map,
+ p->second, imported_caps[p->first->ino()]);
}
+
+ it->second.session_map.swap(imported_session_map);
// send notify's etc.
dout(7) << "sending ack for " << *dir << " to old auth mds." << from << dendl;
for (map<client_t,Capability::Export>::iterator q = p->second.begin();
q != p->second.end();
++q) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(q->first.v));
- assert(session);
+ auto r = it->second.session_map.find(q->first);
+ if (r == it->second.session_map.end())
+ continue;
+
+ Session *session = r->second.first;
Capability *cap = in->get_client_cap(q->first);
assert(cap);
cap->merge(q->second, true);
p->second.clear();
in->replica_caps_wanted = 0;
}
- for (map<client_t,entity_inst_t>::iterator p = it->second.client_map.begin();
- p != it->second.client_map.end();
- ++p) {
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->first.v));
- assert(session);
+ for (auto& p : it->second.session_map) {
+ Session *session = p.second.first;
session->dec_importing();
}
}
{
map<client_t,Capability::Export> cap_map;
decode(cap_map, blp);
- if (auth_cap)
if (auth_cap) {
mempool::mds_co::compact_map<int32_t,int32_t> mds_wanted;
decode(mds_wanted, blp);
}
void Migrator::finish_import_inode_caps(CInode *in, mds_rank_t peer, bool auth_cap,
- map<client_t,Capability::Export> &export_map,
+ const map<client_t,pair<Session*,uint64_t> >& session_map,
+ const map<client_t,Capability::Export> &export_map,
map<client_t,Capability::Import> &import_map)
{
- for (map<client_t,Capability::Export>::iterator it = export_map.begin();
- it != export_map.end();
- ++it) {
- dout(10) << "finish_import_inode_caps for client." << it->first << " on " << *in << dendl;
- Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(it->first.v));
- assert(session);
+ for (auto& it : export_map) {
+ dout(10) << "finish_import_inode_caps for client." << it.first << " on " << *in << dendl;
+
+ auto p = session_map.find(it.first);
+ if (p == session_map.end()) {
+ dout(10) << " no session for client." << it.first << dendl;
+ (void)import_map[it.first];
+ continue;
+ }
- Capability *cap = in->get_client_cap(it->first);
+ Session *session = p->second.first;
+
+ Capability *cap = in->get_client_cap(it.first);
if (!cap) {
- cap = in->add_client_cap(it->first, session);
+ cap = in->add_client_cap(it.first, session);
if (peer < 0)
cap->mark_importing();
}
- Capability::Import& im = import_map[it->first];
+ Capability::Import& im = import_map[it.first];
im.cap_id = cap->get_cap_id();
- im.mseq = auth_cap ? it->second.mseq : cap->get_mseq();
+ im.mseq = auth_cap ? it.second.mseq : cap->get_mseq();
im.issue_seq = cap->get_last_seq() + 1;
if (peer >= 0) {
- cap->merge(it->second, auth_cap);
- mds->mdcache->do_cap_import(session, in, cap, it->second.cap_id,
- it->second.seq, it->second.mseq - 1, peer,
+ cap->merge(it.second, auth_cap);
+ mds->mdcache->do_cap_import(session, in, cap, it.second.cap_id,
+ it.second.seq, it.second.mseq - 1, peer,
auth_cap ? CEPH_CAP_FLAG_AUTH : CEPH_CAP_FLAG_RELEASE);
}
}
CInode *in;
mds_rank_t from;
public:
+ map<client_t,pair<Session*,uint64_t> > imported_session_map;
map<CInode*, map<client_t,Capability::Export> > peer_exports;
- map<client_t,entity_inst_t> client_map;
- map<client_t,uint64_t> sseqmap;
C_M_LoggedImportCaps(Migrator *m, CInode *i, mds_rank_t f) : MigratorLogContext(m), in(i), from(f) {}
void finish(int r) override {
- mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
+ mig->logged_import_caps(in, from, imported_session_map, peer_exports);
}
};
return;
in->auth_pin(this);
+ map<client_t,entity_inst_t> client_map;
+ client_map.swap(ex->client_map);
+
C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(
this, in, mds_rank_t(ex->get_source().num()));
- finish->client_map = ex->client_map;
+ version_t pv = mds->server->prepare_force_open_sessions(client_map,
+ finish->imported_session_map);
// decode new caps
bufferlist::iterator blp = ex->cap_bl.begin();
decode_import_inode_caps(in, false, blp, finish->peer_exports);
assert(!finish->peer_exports.empty()); // thus, inode is pinned.
// journal open client sessions
- version_t pv = mds->server->prepare_force_open_sessions(finish->client_map, finish->sseqmap);
- ESessions *le = new ESessions(pv, ex->client_map);
+ ESessions *le = new ESessions(pv, client_map);
mds->mdlog->start_submit_entry(le, finish);
mds->mdlog->flush();
void Migrator::logged_import_caps(CInode *in,
mds_rank_t from,
- map<CInode*, map<client_t,Capability::Export> >& peer_exports,
- map<client_t,entity_inst_t>& client_map,
- map<client_t,uint64_t>& sseqmap)
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map,
+ map<CInode*, map<client_t,Capability::Export> >& peer_exports)
{
dout(10) << "logged_import_caps on " << *in << dendl;
// see export_go() vs export_go_synced()
assert(in->is_auth());
// force open client sessions and finish cap import
- mds->server->finish_force_open_sessions(client_map, sseqmap);
+ mds->server->finish_force_open_sessions(imported_session_map);
map<client_t,Capability::Import> imported_caps;
- assert(peer_exports.count(in));
+ auto it = peer_exports.find(in);
+ assert(it != peer_exports.end());
+
// clients will release caps from the exporter when they receive the cap import message.
- finish_import_inode_caps(in, from, false, peer_exports[in], imported_caps);
+ finish_import_inode_caps(in, from, false, imported_session_map, it->second, imported_caps);
mds->locker->eval(in, CEPH_CAP_LOCKS, true);
in->auth_unpin(this);
}
class CDir;
class CInode;
class CDentry;
+class Session;
class MExportDirDiscover;
class MExportDirDiscoverAck;
set<mds_rank_t> bystanders;
list<dirfrag_t> bound_ls;
list<ScatterLock*> updated_scatterlocks;
- map<client_t,entity_inst_t> client_map;
+ map<client_t,pair<Session*,uint64_t> > session_map;
map<CInode*, map<client_t,Capability::Export> > peer_exports;
MutationRef mut;
import_state_t() : state(0), peer(0), tid(0), mut() {}
void import_notify_abort(CDir *dir, set<CDir*>& bounds);
void import_notify_finish(CDir *dir, set<CDir*>& bounds);
void import_logged_start(dirfrag_t df, CDir *dir, mds_rank_t from,
- map<client_t,entity_inst_t> &imported_client_map,
- map<client_t,uint64_t>& sseqmap);
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map);
void handle_export_finish(MExportDirFinish *m);
void handle_export_caps(MExportCaps *m);
void logged_import_caps(CInode *in,
mds_rank_t from,
- map<CInode*, map<client_t,Capability::Export> >& cap_imports,
- map<client_t,entity_inst_t>& client_map,
- map<client_t,uint64_t>& sseqmap);
+ map<client_t,pair<Session*,uint64_t> >& imported_session_map,
+ map<CInode*, map<client_t,Capability::Export> >& cap_imports);
friend class C_MDS_ImportDirLoggedStart;
void decode_import_inode_caps(CInode *in, bool auth_cap, bufferlist::iterator &blp,
map<CInode*, map<client_t,Capability::Export> >& cap_imports);
void finish_import_inode_caps(CInode *in, mds_rank_t from, bool auth_cap,
- map<client_t,Capability::Export> &export_map,
+ const map<client_t,pair<Session*,uint64_t> >& smap,
+ const map<client_t,Capability::Export> &export_map,
map<client_t,Capability::Import> &import_map);
int decode_import_dir(bufferlist::iterator& blp,
mds_rank_t oldauth,
bool is_remote_frozen_authpin;
bool is_inode_exporter;
- map<client_t,entity_inst_t> imported_client_map;
- map<client_t,uint64_t> sseq_map;
+ map<client_t, pair<Session*, uint64_t> > imported_session_map;
map<CInode*, map<client_t,Capability::Export> > cap_imports;
// for lock/flock
* - sessions learned from other MDSs during a cross-MDS rename
*/
version_t Server::prepare_force_open_sessions(map<client_t,entity_inst_t>& cm,
- map<client_t,uint64_t>& sseqmap)
+ map<client_t, pair<Session*,uint64_t> >& smap)
{
version_t pv = mds->sessionmap.get_projected();
dout(10) << "prepare_force_open_sessions " << pv
<< " on " << cm.size() << " clients"
<< dendl;
- for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
+ mds->objecter->with_osdmap(
+ [this, &cm](const OSDMap &osd_map) {
+ for (auto p = cm.begin(); p != cm.end(); ) {
+ if (osd_map.is_blacklisted(p->second.addr)) {
+ dout(10) << " ignoring blacklisted client." << p->first
+ << " (" << p->second.addr << ")" << dendl;
+ cm.erase(p++);
+ } else {
+ ++p;
+ }
+ }
+ });
+
+ for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
Session *session = mds->sessionmap.get_or_add_session(p->second);
pv = mds->sessionmap.mark_projected(session);
+ uint64_t sseq;
if (session->is_closed() ||
session->is_closing() ||
- session->is_killing())
- sseqmap[p->first] = mds->sessionmap.set_state(session, Session::STATE_OPENING);
- else
+ session->is_killing()) {
+ sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
+ } else {
assert(session->is_open() ||
session->is_opening() ||
session->is_stale());
+ sseq = 0;
+ }
+ smap[p->first] = make_pair(session, sseq);
session->inc_importing();
}
return pv;
}
-void Server::finish_force_open_sessions(map<client_t,entity_inst_t>& cm,
- map<client_t,uint64_t>& sseqmap,
+void Server::finish_force_open_sessions(const map<client_t,pair<Session*,uint64_t> >& smap,
bool dec_import)
{
/*
* client trying to close a session and an MDS doing an import
* trying to force open a session...
*/
- dout(10) << "finish_force_open_sessions on " << cm.size() << " clients,"
+ dout(10) << "finish_force_open_sessions on " << smap.size() << " clients,"
<< " initial v " << mds->sessionmap.get_version() << dendl;
-
-
- for (map<client_t,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
- Session *session = mds->sessionmap.get_session(p->second.name);
- assert(session);
-
- if (sseqmap.count(p->first)) {
- uint64_t sseq = sseqmap[p->first];
+ for (auto &it : smap) {
+ Session *session = it.second.first;
+ uint64_t sseq = it.second.second;
+ if (sseq > 0) {
if (session->get_state_seq() != sseq) {
dout(10) << "force_open_sessions skipping changed " << session->info.inst << dendl;
} else {
bufferlist::iterator blp = mdr->more()->inode_import.begin();
// imported caps
- decode(mdr->more()->imported_client_map, blp);
- encode(mdr->more()->imported_client_map, *client_map_bl,
- mds->mdsmap->get_up_features());
- prepare_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
+ map<client_t,entity_inst_t> client_map;
+ decode(client_map, blp);
+ prepare_force_open_sessions(client_map, mdr->more()->imported_session_map);
+ encode(client_map, *client_map_bl, mds->mdsmap->get_up_features());
list<ScatterLock*> updated_scatterlocks;
mdcache->migrator->decode_import_inode(srcdn, blp, srcdn->authority().first, mdr->ls,
map<client_t,Capability::Import> imported_caps;
// finish cap imports
- finish_force_open_sessions(mdr->more()->imported_client_map, mdr->more()->sseq_map);
+ finish_force_open_sessions(mdr->more()->imported_session_map);
if (mdr->more()->cap_imports.count(destdnl->get_inode())) {
mdcache->migrator->finish_import_inode_caps(destdnl->get_inode(),
- mdr->more()->srcdn_auth_mds, true,
- mdr->more()->cap_imports[destdnl->get_inode()],
- imported_caps);
+ mdr->more()->srcdn_auth_mds, true,
+ mdr->more()->imported_session_map,
+ mdr->more()->cap_imports[destdnl->get_inode()],
+ imported_caps);
}
mdr->more()->inode_import.clear();
void _session_logged(Session *session, uint64_t state_seq,
bool open, version_t pv, interval_set<inodeno_t>& inos,version_t piv);
version_t prepare_force_open_sessions(map<client_t,entity_inst_t> &cm,
- map<client_t,uint64_t>& sseqmap);
- void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
- map<client_t,uint64_t>& sseqmap,
+ map<client_t,pair<Session*,uint64_t> >& smap);
+ void finish_force_open_sessions(const map<client_t,pair<Session*,uint64_t> >& smap,
bool dec_import=true);
void flush_client_sessions(set<client_t>& client_set, MDSGatherBuilder& gather);
void finish_flush_session(Session *session, version_t seq);