if (flush) {
flush_tid = ++in->last_flush_tid;
- for (int i = 0; i < CEPH_CAP_BITS; ++i) {
- if (flush & (1<<i))
- in->flushing_cap_tid[i] = flush_tid;
- }
+ in->flushing_cap_tids[flush_tid] = flush;
follows = in->snaprealm->get_snap_context().seq;
}
signal_cond_list(in->waitfor_caps);
if (dirty_caps) {
lderr(cct) << "remove_session_caps still has dirty|flushing caps on " << *in << dendl;
- if (in->flushing_caps)
+ if (in->flushing_caps) {
num_flushing_caps--;
+ in->flushing_cap_tids.clear();
+ }
in->flushing_caps = 0;
in->dirty_caps = 0;
put_inode(in);
(cap->issued | cap->implemented), in->flushing_caps);
}
-void Client::wait_sync_caps(Inode *in, uint16_t flush_tid[])
+void Client::wait_sync_caps(Inode *in, ceph_tid_t want)
{
-retry:
- for (int i = 0; i < CEPH_CAP_BITS; ++i) {
- if (!(in->flushing_caps & (1 << i)))
- continue;
- // handle uint16_t wrapping
- if ((int16_t)(in->flushing_cap_tid[i] - flush_tid[i]) <= 0) {
- ldout(cct, 10) << "wait_sync_caps on " << *in << " flushing "
- << ccap_string(1 << i) << " want " << flush_tid[i]
- << " last " << in->flushing_cap_tid[i] << dendl;
- wait_on_list(in->waitfor_caps);
- goto retry;
- }
+ while (in->flushing_caps) {
+ map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
+ assert(it != in->flushing_cap_tids.end());
+ if (it->first > want)
+ break;
+ ldout(cct, 10) << "wait_sync_caps on " << *in << " flushing "
+ << ccap_string(it->second) << " want " << want
+ << " last " << it->first << dendl;
+ wait_on_list(in->waitfor_caps);
}
}
void Client::handle_cap_flush_ack(MetaSession *session, Inode *in, Cap *cap, MClientCaps *m)
{
- mds_rank_t mds = session->mds_num;
+ ceph_tid_t flush_ack_tid = m->get_client_tid();
int dirty = m->get_dirty();
int cleaned = 0;
- uint16_t flush_ack_tid = static_cast<uint16_t>(m->get_client_tid());
- for (int i = 0; i < CEPH_CAP_BITS; ++i) {
- if ((dirty & (1 << i)) &&
- (flush_ack_tid == in->flushing_cap_tid[i]))
- cleaned |= 1 << i;
+ int flushed = 0;
+
+ for (map<ceph_tid_t, int>::iterator it = in->flushing_cap_tids.begin();
+ it != in->flushing_cap_tids.end(); ) {
+ if (it->first == flush_ack_tid)
+ cleaned = it->second;
+ if (it->first <= flush_ack_tid) {
+ in->flushing_cap_tids.erase(it++);
+ ++flushed;
+ continue;
+ }
+ cleaned &= ~it->second;
+ if (!cleaned)
+ break;
+ ++it;
}
- ldout(cct, 5) << "handle_cap_flush_ack mds." << mds
+ ldout(cct, 5) << "handle_cap_flush_ack mds." << session->mds_num
<< " cleaned " << ccap_string(cleaned) << " on " << *in
<< " with " << ccap_string(dirty) << dendl;
+ if (flushed)
+ signal_cond_list(in->waitfor_caps);
if (!cleaned) {
ldout(cct, 10) << " tid " << m->get_client_tid() << " != any cap bit tids" << dendl;
num_flushing_caps--;
sync_cond.Signal();
}
- signal_cond_list(in->waitfor_caps);
if (!in->caps_dirty())
put_inode(in);
}
int Client::_fsync(Inode *in, bool syncdataonly)
{
int r = 0;
- uint16_t wait_on_flush[CEPH_CAP_BITS];
bool flushed_metadata = false;
Mutex lock("Client::_fsync::lock");
Cond cond;
if (!syncdataonly && (in->dirty_caps & ~CEPH_CAP_ANY_FILE_WR)) {
check_caps(in, true);
- if (in->flushing_caps) {
+ if (in->flushing_caps)
flushed_metadata = true;
- memcpy(wait_on_flush, in->flushing_cap_tid, sizeof(wait_on_flush));
- }
} else ldout(cct, 10) << "no metadata needs to commit" << dendl;
if (object_cacher_completion) { // wait on a real reply instead of guessing
if (!r) {
if (flushed_metadata)
- wait_sync_caps(in, wait_on_flush);
+ wait_sync_caps(in, in->last_flush_tid);
ldout(cct, 10) << "ino " << in->ino << " has no uncommitted writes" << dendl;
} else {
Cap *auth_cap;
unsigned dirty_caps, flushing_caps;
uint64_t flushing_cap_seq;
- __u16 flushing_cap_tid[CEPH_CAP_BITS];
+ std::map<ceph_tid_t, int> flushing_cap_tids;
int shared_gen, cache_gen;
int snap_caps, snap_cap_refs;
utime_t hold_caps_until;
{
memset(&dir_layout, 0, sizeof(dir_layout));
memset(&layout, 0, sizeof(layout));
- memset(&flushing_cap_tid, 0, sizeof(__u16)*CEPH_CAP_BITS);
memset("a, 0, sizeof(quota));
}
~Inode() { }