used &= ~(CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO);
}
- if (!in->cap_snaps.empty())
- flush_snaps(in);
for (auto &p : in->caps) {
mds_rank_t mds = p.first;
if (in->flags & I_KICK_FLUSH) {
ldout(cct, 20) << " reflushing caps (check_caps) on " << *in
<< " to mds." << mds << dendl;
- in->flags &= ~I_KICK_FLUSH;
- if (in->cap_snaps.size())
- flush_snaps(in, true);
- if (in->flushing_caps)
- flush_caps(in, session, flags & CHECK_CAPS_SYNCHRONOUS);
+ kick_flushing_caps(in, session);
}
+ if (!in->cap_snaps.empty() &&
+ in->cap_snaps.rbegin()->second.flush_tid == 0)
+ flush_snaps(in);
}
int flushing;
session->con->send_message2(std::move(m));
}
-void Client::flush_snaps(Inode *in, bool all_again)
+void Client::flush_snaps(Inode *in)
{
- ldout(cct, 10) << "flush_snaps on " << *in << " all_again " << all_again << dendl;
+ ldout(cct, 10) << "flush_snaps on " << *in << dendl;
ceph_assert(in->cap_snaps.size());
// pick auth mds
for (auto &p : in->cap_snaps) {
CapSnap &capsnap = p.second;
- if (!all_again) {
- // only flush once per session
- if (capsnap.flush_tid > 0)
- continue;
- }
+ // only do new flush
+ if (capsnap.flush_tid > 0)
+ continue;
ldout(cct, 10) << "flush_snaps mds." << session->mds_num
<< " follows " << p.first
<< " writing=" << capsnap.writing
<< " on " << *in << dendl;
if (capsnap.dirty_data || capsnap.writing)
- continue;
+ break;
- if (capsnap.flush_tid == 0) {
- capsnap.flush_tid = ++last_flush_tid;
- if (!in->flushing_cap_item.is_on_list())
- session->flushing_caps.push_back(&in->flushing_cap_item);
- session->flushing_caps_tids.insert(capsnap.flush_tid);
- }
+ capsnap.flush_tid = ++last_flush_tid;
+ session->flushing_caps_tids.insert(capsnap.flush_tid);
+ in->flushing_cap_tids[capsnap.flush_tid] = 0;
+ if (!in->flushing_cap_item.is_on_list())
+ session->flushing_caps.push_back(&in->flushing_cap_item);
send_flush_snap(in, session, p.first, capsnap);
}
while (s->caps.size()) {
Cap *cap = *s->caps.begin();
InodeRef in(&cap->inode);
- bool dirty_caps = false, cap_snaps = false;
+ bool dirty_caps = false;
if (in->auth_cap == cap) {
- cap_snaps = !in->cap_snaps.empty();
dirty_caps = in->dirty_caps | in->flushing_caps;
in->wanted_max_size = 0;
in->requested_max_size = 0;
if (cap->wanted | cap->issued)
in->flags |= I_CAP_DROPPED;
remove_cap(cap, false);
- if (cap_snaps) {
- in->cap_snaps.clear();
- }
+ in->cap_snaps.clear();
if (dirty_caps) {
lderr(cct) << __func__ << " still has dirty|flushing caps on " << *in << dendl;
if (in->flushing_caps) {
}
}
-void Client::flush_caps(Inode *in, MetaSession *session, bool sync)
-{
- ldout(cct, 10) << __func__ << " " << in << " mds." << session->mds_num << dendl;
- Cap *cap = in->auth_cap;
- ceph_assert(cap->session == session);
-
- for (map<ceph_tid_t,int>::iterator p = in->flushing_cap_tids.begin();
- p != in->flushing_cap_tids.end();
- ++p) {
- bool req_sync = false;
-
- /* If this is a synchronous request, then flush the journal on last one */
- if (sync && (p->first == in->flushing_cap_tids.rbegin()->first))
- req_sync = true;
-
- send_cap(in, session, cap, req_sync,
- (get_caps_used(in) | in->caps_dirty()),
- in->caps_wanted(), (cap->issued | cap->implemented),
- p->second, p->first);
- }
-}
-
void Client::wait_sync_caps(Inode *in, ceph_tid_t want)
{
while (in->flushing_caps) {
}
}
+void Client::kick_flushing_caps(Inode *in, MetaSession *session)
+{
+ in->flags &= ~I_KICK_FLUSH;
+
+ Cap *cap = in->auth_cap;
+ ceph_assert(cap->session == session);
+
+ int wanted = in->caps_wanted();
+ int used = get_caps_used(in) | in->caps_dirty();
+ auto it = in->cap_snaps.begin();
+ for (auto& p : in->flushing_cap_tids) {
+ if (p.second) {
+ send_cap(in, session, cap, false,
+ used, wanted, (cap->issued | cap->implemented),
+ p.second, p.first);
+ } else {
+ ceph_assert(it != in->cap_snaps.end());
+ ceph_assert(it->second.flush_tid == p.first);
+ send_flush_snap(in, session, it->first, it->second);
+ ++it;
+ }
+ }
+}
+
void Client::kick_flushing_caps(MetaSession *session)
{
mds_rank_t mds = session->mds_num;
for (xlist<Inode*>::iterator p = session->flushing_caps.begin(); !p.end(); ++p) {
Inode *in = *p;
- if (!(in->flags & I_KICK_FLUSH))
- continue;
- in->flags &= ~I_KICK_FLUSH;
- ldout(cct, 20) << " reflushing caps on " << *in << " to mds." << mds << dendl;
- if (in->cap_snaps.size())
- flush_snaps(in, true);
- if (in->flushing_caps)
- flush_caps(in, session);
+ if (in->flags & I_KICK_FLUSH) {
+ ldout(cct, 20) << " reflushing caps on " << *in << " to mds." << mds << dendl;
+ kick_flushing_caps(in, session);
+ }
}
}
ldout(cct, 20) << " reflushing caps (early_kick) on " << *in
<< " to mds." << session->mds_num << dendl;
-
- in->flags &= ~I_KICK_FLUSH;
-
// send_reconnect() also will reset these sequence numbers. make sure
// sequence numbers in cap flush message match later reconnect message.
cap->seq = 0;
cap->mseq = 0;
cap->issued = cap->implemented;
- if (in->cap_snaps.size())
- flush_snaps(in, true);
- if (in->flushing_caps)
- flush_caps(in, session);
-
+ kick_flushing_caps(in, session);
}
}
if (in->auth_cap && in->auth_cap->session == session) {
// reflush any/all caps (if we are now the auth_cap)
- in->flags &= ~I_KICK_FLUSH;
- if (in->cap_snaps.size())
- flush_snaps(in, true);
- if (in->flushing_caps)
- flush_caps(in, session);
+ kick_flushing_caps(in, session);
}
}
<< " expected is " << it->first << dendl;
}
for (; it != in->flushing_cap_tids.end(); ) {
+ if (!it->second) {
+ // cap snap
+ ++it;
+ continue;
+ }
if (it->first == flush_ack_tid)
cleaned = it->second;
if (it->first <= flush_ack_tid) {
if (in->flushing_caps == 0) {
ldout(cct, 10) << " " << *in << " !flushing" << dendl;
num_flushing_caps--;
- if (in->cap_snaps.empty())
+ if (in->flushing_cap_tids.empty())
in->flushing_cap_item.remove_myself();
}
if (!in->caps_dirty())
void Client::handle_cap_flushsnap_ack(MetaSession *session, Inode *in, const MConstRef<MClientCaps>& m)
{
+ ceph_tid_t flush_ack_tid = m->get_client_tid();
mds_rank_t mds = session->mds_num;
ceph_assert(in->caps.count(mds));
snapid_t follows = m->get_snap_follows();
if (auto it = in->cap_snaps.find(follows); it != in->cap_snaps.end()) {
auto& capsnap = it->second;
- if (m->get_client_tid() != capsnap.flush_tid) {
- ldout(cct, 10) << " tid " << m->get_client_tid() << " != " << capsnap.flush_tid << dendl;
+ if (flush_ack_tid != capsnap.flush_tid) {
+ ldout(cct, 10) << " tid " << flush_ack_tid << " != " << capsnap.flush_tid << dendl;
} else {
+ InodeRef tmp_ref(in);
ldout(cct, 5) << __func__ << " mds." << mds << " flushed snap follows " << follows
<< " on " << *in << dendl;
- InodeRef tmp_ref;
- if (in->get_num_ref() == 1)
- tmp_ref = in; // make sure inode not get freed while erasing item from in->cap_snaps
- if (in->flushing_caps == 0 && in->cap_snaps.empty())
- in->flushing_cap_item.remove_myself();
session->flushing_caps_tids.erase(capsnap.flush_tid);
+ in->flushing_cap_tids.erase(capsnap.flush_tid);
+ if (in->flushing_caps == 0 && in->flushing_cap_tids.empty())
+ in->flushing_cap_item.remove_myself();
in->cap_snaps.erase(it);
+
+ signal_cond_list(in->waitfor_caps);
+ if (session->flushing_caps_tids.empty() ||
+ *session->flushing_caps_tids.begin() > flush_ack_tid)
+ sync_cond.Signal();
}
} else {
ldout(cct, 5) << __func__ << " DUP(?) mds." << mds << " flushed snap follows " << follows