* @param want - min version i want committed
* @param c - callback for completion
*/
-void CDir::commit(version_t want, Context *c, bool ignore_authpinnability)
+void CDir::commit(version_t want, Context *c, bool ignore_authpinnability, int op_prio)
{
dout(10) << "commit want " << want << " on " << *this << dendl;
if (want == 0) want = get_version();
waiting_for_commit[want].push_back(c);
// ok.
- _commit(want);
+ _commit(want, op_prio);
}
class C_Dir_Committed : public Context {
* Flush out the modified dentries in this dir. Keep the bufferlist
* below max_write_size;
*/
-void CDir::_omap_commit()
+void CDir::_omap_commit(int op_prio)
{
dout(10) << "_omap_commit" << dendl;
unsigned max_write_size = cache->max_dir_commit_size;
unsigned write_size = 0;
+ if (op_prio < 0)
+ op_prio = CEPH_MSG_PRIO_DEFAULT;
+
// snap purge?
const set<snapid_t> *snaps = NULL;
SnapRealm *realm = inode->find_snaprealm();
if (write_size >= max_write_size) {
ObjectOperation op;
- op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+ op.priority = op_prio;
op.tmap_to_omap(true); // convert tmap to omap
if (!to_set.empty())
}
ObjectOperation op;
- op.priority = CEPH_MSG_PRIO_LOW; // set priority lower than journal!
+ op.priority = op_prio;
op.tmap_to_omap(true); // convert tmap to omap
/*
}
}
-void CDir::_commit(version_t want)
+void CDir::_commit(version_t want, int op_prio)
{
dout(10) << "_commit want " << want << " on " << *this << dendl;
if (cache->mds->logger) cache->mds->logger->inc(l_mds_dir_c);
- _omap_commit();
+ _omap_commit(op_prio);
}
++n;
if (p->first > committed_version) {
dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
- _commit(p->first);
+ _commit(p->first, -1);
break;
}
cache->mds->queue_waiters(p->second);
// -- commit --
map<version_t, list<Context*> > waiting_for_commit;
- void _commit(version_t want);
- void _omap_commit();
+ void _commit(version_t want, int op_prio);
+ void _omap_commit(int op_prio);
void _encode_dentry(CDentry *dn, bufferlist& bl, const set<snapid_t> *snaps);
void _committed(version_t v);
public:
void wait_for_commit(Context *c, version_t v=0);
void commit_to(version_t want);
- void commit(version_t want, Context *c, bool ignore_authpinnability=false);
+ void commit(version_t want, Context *c,
+ bool ignore_authpinnability=false, int op_prio=-1);
// -- dirtyness --
version_t get_committing_version() { return committing_version; }
map<int,version_t> tablev;
// try to expire
- void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld);
+ void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio);
// cons
LogSegment(loff_t off) :
if (stop < ceph_clock_now(g_ceph_context))
break;
- if ((int)expiring_segments.size() >= g_conf->mds_log_max_expiring)
+ int num_expiring_segments = (int)expiring_segments.size();
+ if (num_expiring_segments >= g_conf->mds_log_max_expiring)
break;
+ int op_prio = CEPH_MSG_PRIO_LOW +
+ (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
+ num_expiring_segments / g_conf->mds_log_max_expiring;
+
// look at first segment
LogSegment *ls = p->second;
assert(ls);
} else if (expired_segments.count(ls)) {
dout(5) << "trim already expired segment " << ls->offset << ", " << ls->num_events << " events" << dendl;
} else {
- try_expire(ls);
+ try_expire(ls, op_prio);
}
}
}
-void MDLog::try_expire(LogSegment *ls)
+void MDLog::try_expire(LogSegment *ls, int op_prio)
{
C_GatherBuilder gather_bld(g_ceph_context);
- ls->try_to_expire(mds, gather_bld);
+ ls->try_to_expire(mds, gather_bld, op_prio);
if (gather_bld.has_subs()) {
assert(expiring_segments.count(ls) == 0);
expiring_segments.insert(ls);
expiring_events += ls->num_events;
dout(5) << "try_expire expiring segment " << ls->offset << dendl;
- gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls));
+ gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
gather_bld.activate();
} else {
dout(10) << "try_expire expired segment " << ls->offset << dendl;
logger->set(l_mdl_evexg, expiring_events);
}
-void MDLog::_maybe_expired(LogSegment *ls)
+void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
{
dout(10) << "_maybe_expired segment " << ls->offset << " " << ls->num_events << " events" << dendl;
assert(expiring_segments.count(ls));
expiring_segments.erase(ls);
expiring_events -= ls->num_events;
- try_expire(ls);
+ try_expire(ls, op_prio);
}
void MDLog::_trim_expired_segments()
class C_MaybeExpiredSegment : public Context {
MDLog *mdlog;
LogSegment *ls;
+ int op_prio;
public:
- C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s) : mdlog(mdl), ls(s) {}
+ C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : mdlog(mdl), ls(s), op_prio(p) {}
void finish(int res) {
- mdlog->_maybe_expired(ls);
+ mdlog->_maybe_expired(ls, op_prio);
}
};
- void try_expire(LogSegment *ls);
- void _maybe_expired(LogSegment *ls);
+ void try_expire(LogSegment *ls, int op_prio);
+ void _maybe_expired(LogSegment *ls, int op_prio);
void _expired(LogSegment *ls);
void _trim_expired_segments();
// -----------------------
// LogSegment
-void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld)
+void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio)
{
set<CDir*> commit;
assert(dir->is_auth());
if (dir->can_auth_pin()) {
dout(15) << "try_to_expire committing " << *dir << dendl;
- dir->commit(0, gather_bld.new_sub());
+ dir->commit(0, gather_bld.new_sub(), false, op_prio);
} else {
dout(15) << "try_to_expire waiting for unfreeze on " << *dir << dendl;
dir->add_waiter(CDir::WAIT_UNFREEZE, gather_bld.new_sub());