case CEPH_SESSION_RENEWCAPS: return "renewcaps";
case CEPH_SESSION_STALE: return "stale";
case CEPH_SESSION_RECALL_STATE: return "recall_state";
+ case CEPH_SESSION_FLUSHMSG: return "flushmsg";
+ case CEPH_SESSION_FLUSHMSG_ACK: return "flushmsg_ack";
}
return "???";
}
CEPH_SESSION_RENEWCAPS,
CEPH_SESSION_STALE,
CEPH_SESSION_RECALL_STATE,
+ CEPH_SESSION_FLUSHMSG,
+ CEPH_SESSION_FLUSHMSG_ACK,
};
extern const char *ceph_session_op_name(int op);
m->put(); // done
}
+class C_M_ExportSessionsFlushed : public Context {
+ Migrator *migrator;
+ CDir *dir;
+ uint64_t tid;
+public:
+ C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
+ migrator(m), dir(d), tid(t) {}
+ void finish(int r) {
+ migrator->export_sessions_flushed(dir, tid);
+ }
+};
+
+void Migrator::export_sessions_flushed(CDir *dir, uint64_t tid)
+{
+ dout(7) << "export_sessions_flushed " << *dir << dendl;
+
+ map<CDir*,export_state_t>::iterator it = export_state.find(dir);
+ if (it == export_state.end() || it->second.tid != tid) {
+ // export must have aborted.
+ dout(7) << "export must have aborted on " << dir << dendl;
+ return;
+ }
+
+ assert(it->second.state == EXPORT_PREPPING || it->second.state == EXPORT_WARNING);
+ assert(it->second.warning_ack_waiting.count(-1) > 0);
+ it->second.warning_ack_waiting.erase(-1);
+ if (it->second.state == EXPORT_WARNING && it->second.warning_ack_waiting.empty())
+ export_go(dir); // start export.
+}
+
void Migrator::export_frozen(CDir *dir)
{
dout(7) << "export_frozen on " << *dir << dendl;
it->second.state = EXPORT_PREPPING;
mds->send_message_mds(prep, it->second.peer);
assert (g_conf->mds_kill_export_at != 4);
+
+ // make sure any new instantiations of caps are flushed out
+ assert(it->second.warning_ack_waiting.empty());
+
+ set<client_t> export_client_set;
+ get_export_client_set(dir, export_client_set);
+
+ C_GatherBuilder gather(g_ceph_context);
+ mds->server->flush_client_sessions(export_client_set, gather);
+ if (gather.has_subs()) {
+ it->second.warning_ack_waiting.insert(-1);
+ gather.set_finisher(new C_M_ExportSessionsFlushed(this, dir, it->second.tid));
+ gather.activate();
+ }
+}
+
+void Migrator::get_export_client_set(CDir *dir, set<client_t>& client_set)
+{
+ list<CDir*> dfs;
+ dfs.push_back(dir);
+ while (!dfs.empty()) {
+ CDir *dir = dfs.front();
+ dfs.pop_front();
+ for (CDir::map_t::iterator p = dir->begin(); p != dir->end(); ++p) {
+ CDentry *dn = p->second;
+ if (!dn->get_linkage()->is_primary())
+ continue;
+ CInode *in = dn->get_linkage()->get_inode();
+ if (in->is_dir()) {
+ // directory?
+ list<CDir*> ls;
+ in->get_dirfrags(ls);
+ for (list<CDir*>::iterator q = ls.begin(); q != ls.end(); ++q) {
+ if (!(*q)->state_test(CDir::STATE_EXPORTBOUND)) {
+ // include nested dirfrag
+ assert((*q)->get_dir_auth().first == CDIR_AUTH_PARENT);
+ dfs.push_back(*q); // it's ours, recurse (later)
+ }
+ }
+ }
+ for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
+ q != in->client_caps.end();
+ ++q)
+ client_set.insert(q->first);
+ }
+ }
+}
+
+void Migrator::get_export_client_set(CInode *in, set<client_t>& client_set)
+{
+ for (map<client_t, Capability*>::iterator q = in->client_caps.begin();
+ q != in->client_caps.end();
+ ++q)
+ client_set.insert(q->first);
}
/* This function DOES put the passed message before returning*/
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
- assert(it->second.warning_ack_waiting.empty());
+ assert(it->second.warning_ack_waiting.empty() ||
+ (it->second.warning_ack_waiting.size() == 1 &&
+ it->second.warning_ack_waiting.count(-1) > 0));
assert(it->second.notify_ack_waiting.empty());
for (map<int,unsigned>::iterator p = dir->replicas_begin();
}
void get_export_lock_set(CDir *dir, set<SimpleLock*>& locks);
+ void get_export_client_set(CDir *dir, set<client_t> &client_set);
+ void get_export_client_set(CInode *in, set<client_t> &client_set);
void encode_export_inode(CInode *in, bufferlist& bl,
map<client_t,entity_inst_t>& exported_client_map);
void handle_export_discover_ack(MExportDirDiscoverAck *m);
void export_frozen(CDir *dir);
void handle_export_prep_ack(MExportDirPrepAck *m);
+ void export_sessions_flushed(CDir *dir, uint64_t tid);
void export_go(CDir *dir);
void export_go_synced(CDir *dir, uint64_t tid);
void export_try_cancel(CDir *dir);
friend class C_MDC_ExportFreeze;
friend class C_MDS_ExportFinishLogged;
friend class C_M_ExportGo;
+ friend class C_M_ExportSessionsFlushed;
// importer
void handle_export_discover(MExportDirDiscover *m);
}
break;
+ case CEPH_SESSION_FLUSHMSG_ACK:
+ finish_flush_session(session, m->get_seq());
+ break;
+
default:
assert(0);
}
m->put();
}
+void Server::flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather)
+{
+ for (set<client_t>::iterator p = client_set.begin(); p != client_set.end(); ++p) {
+ Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
+ assert(session);
+ if (session->is_stale())
+ continue;
+ version_t seq = session->wait_for_flush(gather.new_sub());
+ mds->send_message_client(new MClientSession(CEPH_SESSION_FLUSHMSG, seq), session);
+ }
+}
+
+void Server::finish_flush_session(Session *session, version_t seq)
+{
+ list<Context*> finished;
+ session->finish_flush(seq, finished);
+ mds->queue_waiters(finished);
+}
+
void Server::_session_logged(Session *session, uint64_t state_seq, bool open, version_t pv,
interval_set<inodeno_t>& inos, version_t piv)
{
mds->locker->revoke_stale_caps(session);
mds->locker->remove_stale_leases(session);
mds->send_message_client(new MClientSession(CEPH_SESSION_STALE, session->get_push_seq()), session);
+ finish_flush_session(session, session->get_push_seq());
}
// autoclose
++p;
mdcache->request_kill(mdr);
}
+
+ finish_flush_session(session, session->get_push_seq());
}
void Server::reconnect_clients()
}
};
+class C_MDS_SlaveRenameSessionsFlushed : public Context {
+ Server *server;
+ MDRequest *mdr;
+public:
+ C_MDS_SlaveRenameSessionsFlushed(Server *s, MDRequest *r) :
+ server(s), mdr(r) {
+ mdr->get();
+ }
+ void finish(int r) {
+ server->_slave_rename_sessions_flushed(mdr);
+ mdr->put();
+ }
+};
+
/* This function DOES put the mdr->slave_request before returning*/
void Server::handle_slave_rename_prep(MDRequest *mdr)
{
mds->send_message_mds(notify, *p);
mdr->more()->waiting_on_slave.insert(*p);
}
+
+ // make sure clients have received all cap related messages
+ set<client_t> export_client_set;
+ mdcache->migrator->get_export_client_set(srcdnl->get_inode(), export_client_set);
+
+ C_GatherBuilder gather(g_ceph_context);
+ flush_client_sessions(export_client_set, gather);
+ if (gather.has_subs()) {
+ mdr->more()->waiting_on_slave.insert(-1);
+ gather.set_finisher(new C_MDS_SlaveRenameSessionsFlushed(this, mdr));
+ gather.activate();
+ }
}
// is witness list sufficient?
}
}
+void Server::_slave_rename_sessions_flushed(MDRequest *mdr)
+{
+ dout(10) << "_slave_rename_sessions_flushed " << *mdr << dendl;
+ if (mdr->more()->waiting_on_slave.count(-1)) {
+ mdr->more()->waiting_on_slave.erase(-1);
+
+ if (mdr->more()->waiting_on_slave.empty()) {
+ if (mdr->slave_request)
+ dispatch_slave_request(mdr);
+ } else
+ dout(10) << " still waiting for rename notify acks from "
+ << mdr->more()->waiting_on_slave << dendl;
+ }
+}
// snaps
/* This function takes responsibility for the passed mdr*/
map<client_t,uint64_t>& sseqmap);
void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
map<client_t,uint64_t>& sseqmap);
+ void flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather);
+ void finish_flush_session(Session *session, version_t seq);
void terminate_sessions();
void find_idle_sessions();
void kill_session(Session *session);
void handle_slave_rename_prep(MDRequest *mdr);
void handle_slave_rename_prep_ack(MDRequest *mdr, MMDSSlaveRequest *m);
void handle_slave_rename_notify_ack(MDRequest *mdr, MMDSSlaveRequest *m);
+ void _slave_rename_sessions_flushed(MDRequest *mdr);
void _logged_slave_rename(MDRequest *mdr, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void _commit_slave_rename(MDRequest *mdr, int r, CDentry *srcdn, CDentry *destdn, CDentry *straydn);
void do_rename_rollback(bufferlist &rbl, int master, MDRequest *mdr, bool finish_mdr=false);
// -- caps --
private:
version_t cap_push_seq; // cap push seq #
+ map<version_t, list<Context*> > waitfor_flush; // flush session messages
public:
xlist<Capability*> caps; // inodes with caps; front=most recently used
xlist<ClientLease*> leases; // metadata leases to clients
version_t inc_push_seq() { return ++cap_push_seq; }
version_t get_push_seq() const { return cap_push_seq; }
+ version_t wait_for_flush(Context* c) {
+ waitfor_flush[get_push_seq()].push_back(c);
+ return get_push_seq();
+ }
+ void finish_flush(version_t seq, list<Context*>& ls) {
+ while (!waitfor_flush.empty()) {
+ if (waitfor_flush.begin()->first > seq)
+ break;
+ ls.splice(ls.end(), waitfor_flush.begin()->second);
+ waitfor_flush.erase(waitfor_flush.begin());
+ }
+ }
+
void add_cap(Capability *cap) {
caps.push_back(&cap->item_session_caps);
}