//
root = 0;
+ num_flushing_caps = 0;
+
lru.lru_set_max(g_conf.client_cache_size);
// file handles
}
dout(1) << "handle_mds_map epoch " << m->get_epoch() << dendl;
+
+ MDSMap *oldmap = mdsmap;
+ mdsmap = new MDSMap;
mdsmap->decode(m->get_encoded());
// reset session
for (map<int,MDSSession>::iterator p = mds_sessions.begin();
p != mds_sessions.end();
- p++)
+ p++) {
+ int oldstate = oldmap->get_state(p->first);
+ int newstate = mdsmap->get_state(p->first);
if (!mdsmap->is_up(p->first) ||
- mdsmap->get_inst(p->first) != p->second.inst)
+ mdsmap->get_inst(p->first) != p->second.inst) {
messenger->mark_down(p->second.inst.addr);
-
- // send reconnect?
- if (frommds >= 0 &&
- mdsmap->get_state(frommds) == MDSMap::STATE_RECONNECT) {
- send_reconnect(frommds);
- }
+ } else if (oldstate == newstate)
+ continue; // no change
+
+ if (newstate == MDSMap::STATE_RECONNECT)
+ send_reconnect(p->first);
- // kick requests?
- if (frommds >= 0 &&
- mdsmap->get_state(frommds) == MDSMap::STATE_ACTIVE) {
- kick_requests(frommds, false);
+ if (oldstate < MDSMap::STATE_ACTIVE &&
+ newstate >= MDSMap::STATE_ACTIVE) {
+ kick_requests(p->first, false);
+ kick_flushing_caps(p->first);
+ }
}
// kick any waiting threads
ls.swap(waiting_for_mdsmap);
signal_cond_list(ls);
+ delete oldmap;
delete m;
}
}
}
-void Client::resend_unsafe_requests(int mds_num) {
+void Client::resend_unsafe_requests(int mds_num)
+{
MDSSession& mds = mds_sessions[mds_num];
for (xlist<MetaRequest*>::iterator iter = mds.unsafe_requests.begin();
!iter.end();
send_request(*iter, mds_num);
}
+
+
+
/************
* leases
*/
ack:
if (cap == in->auth_cap) {
flush = in->dirty_caps;
+ if (flush && !in->flushing_caps) {
+ dout(10) << " " << *in << " flushing" << dendl;
+ mds_sessions[mds].flushing_caps.push_back(&in->flushing_cap_item);
+ in->get();
+ num_flushing_caps++;
+ }
in->flushing_caps |= flush;
in->dirty_caps = 0;
dout(10) << " flushing " << ccap_string(flush) << dendl;
in->dirty_caps |= caps;
}
+void Client::flush_caps()
+{
+ dout(10) << "flush_caps" << dendl;
+ xlist<Inode*>::iterator p = delayed_caps.begin();
+ while (!p.end()) {
+ Inode *in = *p;
+ ++p;
+ delayed_caps.pop_front();
+ check_caps(in, true);
+ }
+
+ // other caps, too
+ p = cap_list.begin();
+ while (!p.end()) {
+ Inode *in = *p;
+ ++p;
+ check_caps(in, true);
+ }
+}
+
+void Client::kick_flushing_caps(int mds)
+{
+ dout(10) << "kick_flushing_caps" << dendl;
+ MDSSession *session = &mds_sessions[mds];
+
+ for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
+ Inode *in = *p;
+ dout(20) << " reflushing caps on " << *in << " to mds" << mds << dendl;
+ InodeCap *cap = in->auth_cap;
+ assert(cap->session == session);
+ send_cap(in, mds, cap, in->caps_used(), in->caps_wanted(),
+ cap->issued | cap->implemented,
+ in->flushing_caps);
+ }
+}
+
+
void SnapRealm::build_snap_context()
{
set<snapid_t> snaps;
dout(5) << " flushing_caps " << ccap_string(in->flushing_caps)
<< " -> " << ccap_string(in->flushing_caps & ~cleaned) << dendl;
in->flushing_caps &= ~cleaned;
+ if (in->flushing_caps == 0) {
+ dout(10) << " " << *in << " !flushing" << dendl;
+ in->flushing_cap_item.remove_myself();
+ num_flushing_caps--;
+ put_inode(in);
+ if (!num_flushing_caps)
+ sync_cond.Signal();
+ else
+ dout(20) << " still " << num_flushing_caps << " more flushing caps" << dendl;
+ }
if (!in->caps_dirty())
put_inode(in);
}
}
}
- // flush delayed caps
- xlist<Inode*>::iterator p = delayed_caps.begin();
- while (!p.end()) {
- Inode *in = *p;
- ++p;
- delayed_caps.pop_front();
- check_caps(in, true);
- }
-
- // other caps, too
- p = cap_list.begin();
- while (!p.end()) {
- Inode *in = *p;
- ++p;
- check_caps(in, true);
- }
+ flush_caps();
//if (0) {// hack
while (lru.lru_get_size() > 0 ||
int Client::_sync_fs()
{
dout(10) << "_sync_fs" << dendl;
+
+ // wait for unsafe mds requests
+ // FIXME
+
+ // flush caps
+ flush_caps();
+ while (num_flushing_caps) {
+ // FIXME: starvation
+ dout(10) << "waiting on " << num_flushing_caps << " flushing caps" << dendl;
+ sync_cond.Wait(client_lock);
+ }
+
+ // flush file data
+ // FIXME
+
return 0;
}
bool was_stale;
xlist<InodeCap*> caps;
+ xlist<Inode*> flushing_caps;
xlist<MetaRequest*> unsafe_requests;
MClientCapRelease *release;
int exporting_mds;
ceph_seq_t exporting_mseq;
utime_t hold_caps_until;
- xlist<Inode*>::item cap_item;
+ xlist<Inode*>::item cap_item, flushing_cap_item;
SnapRealm *snaprealm;
xlist<Inode*>::item snaprealm_item;
dirty_caps(0), flushing_caps(0), shared_gen(0), cache_gen(0),
snap_caps(0), snap_cap_refs(0),
exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
- cap_item(this),
+ cap_item(this), flushing_cap_item(this),
snaprealm(0), snaprealm_item(this), snapdir_parent(0),
reported_size(0), wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
// all inodes with caps sit on either cap_list or delayed_caps.
xlist<Inode*> delayed_caps, cap_list;
+ int num_flushing_caps;
hash_map<inodeno_t,SnapRealm*> snap_realms;
SnapRealm *get_snap_realm(inodeno_t r) {
ofstream traceout;
- Cond mount_cond;
+ Cond mount_cond, sync_cond;
// friends
void remove_session_caps(int mds_num);
void trim_caps(int mds, int max);
void mark_caps_dirty(Inode *in, int caps);
+ void flush_caps();
+ void kick_flushing_caps(int mds);
void maybe_update_snaprealm(SnapRealm *realm, snapid_t snap_created, snapid_t snap_highwater,
vector<snapid_t>& snaps);