mut->apply();
- if (ack)
- mds->send_message_client_counted(ack, client);
+ if (ack) {
+ Session *session = mds->get_session(client);
+ if (session) {
+ // "oldest flush tid" > 0 means client uses unique TID for each flush
+ if (ack->get_oldest_flush_tid() > 0)
+ session->add_completed_flush(ack->get_client_tid());
+ mds->send_message_client_counted(ack, session);
+ } else {
+ dout(10) << " no session for client." << client << " " << *ack << dendl;
+ ack->put();
+ }
+ }
set<CInode*> need_issue;
drop_locks(mut.get(), &need_issue);
return;
}
+ // "oldest flush tid" > 0 means client uses unique TID for each flush
+ if (m->get_oldest_flush_tid() > 0) {
+ if (session->trim_completed_flushes(m->get_oldest_flush_tid())) {
+ mds->mdlog->get_current_segment()->touched_sessions.insert(session->info.inst.name);
+ }
+ }
+
CInode *head_in = mdcache->get_inode(m->get_ino());
if (!head_in) {
dout(7) << "handle_client_caps on unknown ino " << m->get_ino() << ", dropping" << dendl;
ack = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP_ACK, in->ino(), 0, 0, 0, 0, 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
ack->set_snap_follows(follows);
ack->set_client_tid(m->get_client_tid());
+ ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
}
if (in == head_in ||
ack = new MClientCaps(CEPH_CAP_OP_FLUSH_ACK, in->ino(), 0, cap->get_cap_id(), m->get_seq(),
m->get_caps(), 0, m->get_dirty(), 0, mds->get_osd_epoch_barrier());
ack->set_client_tid(m->get_client_tid());
+ ack->set_oldest_flush_tid(m->get_oldest_flush_tid());
}
// filter wanted based on what we could ever give out (given auth/replica status)
mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
+ // "oldest flush tid" > 0 means client uses unique TID for each flush
+ if (ack && ack->get_oldest_flush_tid() > 0)
+ le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
+ ack->get_oldest_flush_tid());
+
mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
false,
client, NULL,
mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
+ // "oldest flush tid" > 0 means client uses unique TID for each flush
+ if (ack && ack->get_oldest_flush_tid() > 0)
+ le->metablob.add_client_flush(metareqid_t(m->get_source(), ack->get_client_tid()),
+ ack->get_oldest_flush_tid());
+
mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
change_max,
client, cap,
return true;
}
+ void add_completed_flush(ceph_tid_t tid) {
+ info.completed_flushes.insert(tid);
+ }
+ bool trim_completed_flushes(ceph_tid_t mintid) {
+ bool erased_any = false;
+ while (!info.completed_flushes.empty() &&
+ (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
+ info.completed_flushes.erase(info.completed_flushes.begin());
+ erased_any = true;
+ }
+ if (erased_any) {
+ completed_requests_dirty = true;
+ }
+ return erased_any;
+ }
+ bool have_completed_flush(ceph_tid_t tid) const {
+ return info.completed_flushes.count(tid);
+ }
+
unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
unsigned get_num_trim_requests_warnings() { return num_trim_requests_warnings; }
void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
*/
void session_info_t::encode(bufferlist& bl) const
{
- ENCODE_START(4, 3, bl);
+ ENCODE_START(5, 3, bl);
::encode(inst, bl);
::encode(completed_requests, bl);
::encode(prealloc_inos, bl); // hacky, see below.
::encode(used_inos, bl);
::encode(client_metadata, bl);
+ ::encode(completed_flushes, bl);
ENCODE_FINISH(bl);
}
if (struct_v >= 4) {
::decode(client_metadata, p);
}
+ if (struct_v >= 5) {
+ ::decode(completed_flushes, p);
+ }
DECODE_FINISH(p);
}