flush_snaps(in);
}
-void Client::flush_snaps(Inode *in, bool all_again, CapSnap *again)
+void Client::flush_snaps(Inode *in, bool all_again)
{
- ldout(cct, 10) << "flush_snaps on " << *in
- << " all_again " << all_again
- << " again " << again << dendl;
+ ldout(cct, 10) << "flush_snaps on " << *in << " all_again " << all_again << dendl;
assert(in->cap_snaps.size());
// pick auth mds
for (map<snapid_t,CapSnap*>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); ++p) {
CapSnap *capsnap = p->second;
- if (again) {
- // only one capsnap
- if (again != capsnap)
- continue;
- } else if (!all_again) {
+ if (!all_again) {
// only flush once per session
- if (capsnap->flushing_item.is_on_list())
+ if (capsnap->flush_tid > 0)
continue;
}
if (capsnap->dirty_data || capsnap->writing)
continue;
- in->auth_cap->session->flushing_capsnaps.push_back(&capsnap->flushing_item);
+ if (capsnap->flush_tid == 0) {
+ capsnap->flush_tid = ++last_flush_tid;
+ if (!in->flushing_cap_item.is_on_list())
+ session->flushing_caps.push_back(&in->flushing_cap_item);
+ session->flushing_caps_tids.insert(capsnap->flush_tid);
+ }
- capsnap->flush_tid = ++last_flush_tid;
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino, in->snaprealm->ino, 0, mseq,
cap_epoch_barrier);
if (user_id >= 0)
m->inline_data = in->inline_data;
}
+ assert(!session->flushing_caps_tids.empty());
+ m->set_oldest_flush_tid(*session->flushing_caps_tids.begin());
+
session->con->send_message(m);
}
}
in->flushing_caps |= flushing;
in->dirty_caps = 0;
-
- session->flushing_caps.push_back(&in->flushing_cap_item);
+ if (!in->flushing_cap_item.is_on_list())
+ session->flushing_caps.push_back(&in->flushing_cap_item);
session->flushing_caps_tids.insert(flush_tid);
*ptid = flush_tid;
void Client::adjust_session_flushing_caps(Inode *in, MetaSession *old_s, MetaSession *new_s)
{
+ for (auto p = in->cap_snaps.begin(); p != in->cap_snaps.end(); ++p) {
+ CapSnap *capsnap = p->second;
+ if (capsnap->flush_tid > 0) {
+ old_s->flushing_caps_tids.erase(capsnap->flush_tid);
+ new_s->flushing_caps_tids.insert(capsnap->flush_tid);
+ }
+ }
for (map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
it != in->flushing_cap_tids.end();
++it) {
for (map<ceph_tid_t,int>::iterator p = in->flushing_cap_tids.begin();
p != in->flushing_cap_tids.end();
++p) {
- if (session->kicked_flush_tids.count(p->first))
- continue;
send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
in->caps_wanted(), (cap->issued | cap->implemented),
p->second, p->first);
mds_rank_t mds = session->mds_num;
ldout(cct, 10) << "kick_flushing_caps mds." << mds << dendl;
- for (xlist<CapSnap*>::iterator p = session->flushing_capsnaps.begin(); !p.end(); ++p) {
- CapSnap *capsnap = *p;
- InodeRef& in = capsnap->in;
- ldout(cct, 20) << " reflushing capsnap " << capsnap
- << " on " << *in << " to mds." << mds << dendl;
- flush_snaps(in.get(), false, capsnap);
- }
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
+ if (session->early_flushing_caps.count(in))
+ continue;
ldout(cct, 20) << " reflushing caps on " << *in << " to mds." << mds << dendl;
+ if (in->cap_snaps.size())
+ flush_snaps(in, true);
if (in->flushing_caps)
flush_caps(in, session);
}
- session->kicked_flush_tids.clear();
+ session->early_flushing_caps.clear();
}
void Client::early_kick_flushing_caps(MetaSession *session)
{
- session->kicked_flush_tids.clear();
+ session->early_flushing_caps.clear();
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
- if (!in->flushing_caps)
- continue;
assert(in->auth_cap);
- Cap *cap = in->auth_cap;
// if flushing caps were revoked, we re-send the cap flush in client reconnect
// stage. This guarantees that MDS processes the cap flush message before issuing
ldout(cct, 20) << " reflushing caps (revoked) on " << *in
<< " to mds." << session->mds_num << dendl;
- for (map<ceph_tid_t,int>::iterator q = in->flushing_cap_tids.begin();
- q != in->flushing_cap_tids.end();
- ++q) {
- send_cap(in, session, cap, (get_caps_used(in) | in->caps_dirty()),
- in->caps_wanted(), (cap->issued | cap->implemented),
- q->second, q->first);
- session->kicked_flush_tids.insert(q->first);
- }
+ session->early_flushing_caps.insert(in);
+
+ if (in->cap_snaps.size())
+ flush_snaps(in, true);
+ if (in->flushing_caps)
+ flush_caps(in, session);
+
}
}
in->flushing_caps &= ~cleaned;
if (in->flushing_caps == 0) {
ldout(cct, 10) << " " << *in << " !flushing" << dendl;
- in->flushing_cap_item.remove_myself();
num_flushing_caps--;
+ if (in->cap_snaps.empty())
+ in->flushing_cap_item.remove_myself();
}
if (!in->caps_dirty())
put_inode(in);
ldout(cct, 5) << "handle_cap_flushedsnap mds." << mds << " flushed snap follows " << follows
<< " on " << *in << dendl;
in->cap_snaps.erase(follows);
- capsnap->flushing_item.remove_myself();
+ if (in->flushing_caps == 0 && in->cap_snaps.empty())
+ in->flushing_cap_item.remove_myself();
+ session->flushing_caps_tids.erase(capsnap->flush_tid);
+
delete capsnap;
}
} else {