}
-void CDentry::add_waiter(uint64_t tag, Context *c)
+void CDentry::add_waiter(uint64_t tag, MDSInternalContextBase *c)
{
// wait on the directory?
if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
// -- wait --
//static const int WAIT_LOCK_OFFSET = 8;
- void add_waiter(uint64_t tag, Context *c);
+ void add_waiter(uint64_t tag, MDSInternalContextBase *c);
static const unsigned EXPORT_NONCE = 1;
}
}
-void CDir::finish_old_fragment(list<Context*>& waiters, bool replay)
+void CDir::finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay)
{
// take waiters _before_ unfreeze...
if (!replay) {
get(PIN_SUBTREE);
}
-void CDir::split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool replay)
+void CDir::split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
{
dout(10) << "split by " << bits << " bits on " << *this << dendl;
finish_old_fragment(waiters, replay);
}
-void CDir::merge(list<CDir*>& subs, list<Context*>& waiters, bool replay)
+void CDir::merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay)
{
dout(10) << "merge " << subs << dendl;
* WAITING
*/
-void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, Context *c)
+void CDir::add_dentry_waiter(const string& dname, snapid_t snapid, MDSInternalContextBase *c)
{
if (waiting_on_dentry.empty())
get(PIN_DNWAITER);
}
void CDir::take_dentry_waiting(const string& dname, snapid_t first, snapid_t last,
- list<Context*>& ls)
+ list<MDSInternalContextBase*>& ls)
{
if (waiting_on_dentry.empty())
return;
string_snap_t lb(dname, first);
string_snap_t ub(dname, last);
- map<string_snap_t, list<Context*> >::iterator p = waiting_on_dentry.lower_bound(lb);
+ map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.lower_bound(lb);
while (p != waiting_on_dentry.end() &&
!(ub < p->first)) {
dout(10) << "take_dentry_waiting dentry " << dname
put(PIN_DNWAITER);
}
-void CDir::take_sub_waiting(list<Context*>& ls)
+void CDir::take_sub_waiting(list<MDSInternalContextBase*>& ls)
{
dout(10) << "take_sub_waiting" << dendl;
if (!waiting_on_dentry.empty()) {
- for (map<string_snap_t, list<Context*> >::iterator p = waiting_on_dentry.begin();
+ for (map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
p != waiting_on_dentry.end();
++p)
ls.splice(ls.end(), p->second);
-void CDir::add_waiter(uint64_t tag, Context *c)
+void CDir::add_waiter(uint64_t tag, MDSInternalContextBase *c)
{
// hierarchical?
/* NOTE: this checks dentry waiters too */
-void CDir::take_waiting(uint64_t mask, list<Context*>& ls)
+void CDir::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
{
if ((mask & WAIT_DENTRY) && !waiting_on_dentry.empty()) {
// take all dentry waiters
while (!waiting_on_dentry.empty()) {
- map<string_snap_t, list<Context*> >::iterator p = waiting_on_dentry.begin();
+ map<string_snap_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dentry.begin();
dout(10) << "take_waiting dentry " << p->first.name
<< " snap " << p->first.snapid << " on " << *this << dendl;
ls.splice(ls.end(), p->second);
{
dout(11) << "finish_waiting mask " << hex << mask << dec << " result " << result << " on " << *this << dendl;
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
take_waiting(mask, finished);
if (result < 0)
finish_contexts(g_ceph_context, finished, result);
}
-struct C_Dir_Dirty : public Context {
+struct C_Dir_Dirty : public MDSInternalContext {
CDir *dir;
version_t pv;
LogSegment *ls;
- C_Dir_Dirty(CDir *d, version_t p, LogSegment *l) : dir(d), pv(p), ls(l) {}
+ C_Dir_Dirty(CDir *d, version_t p, LogSegment *l) : MDSInternalContext(d->cache->mds), dir(d), pv(p), ls(l) {}
void finish(int r) {
dir->mark_dirty(pv, ls);
}
// -----------------------
// FETCH
-void CDir::fetch(Context *c, bool ignore_authpinnability)
+void CDir::fetch(MDSInternalContextBase *c, bool ignore_authpinnability)
{
string want;
return fetch(c, want, ignore_authpinnability);
}
-void CDir::fetch(Context *c, const string& want_dn, bool ignore_authpinnability)
+void CDir::fetch(MDSInternalContextBase *c, const string& want_dn, bool ignore_authpinnability)
{
dout(10) << "fetch on " << *this << dendl;
* @param want - min version i want committed
* @param c - callback for completion
*/
-void CDir::commit(version_t want, Context *c, bool ignore_authpinnability, int op_prio)
+void CDir::commit(version_t want, MDSInternalContextBase *c, bool ignore_authpinnability, int op_prio)
{
dout(10) << "commit want " << want << " on " << *this << dendl;
if (want == 0) want = get_version();
// note: queue up a noop if necessary, so that we always
// get an auth_pin.
if (!c)
- c = new C_NoopContext;
+ c = new C_MDSInternalNoop;
// auth_pin on first waiter
if (waiting_for_commit.empty())
// finishers?
bool were_waiters = !waiting_for_commit.empty();
- map<version_t, list<Context*> >::iterator p = waiting_for_commit.begin();
+ map<version_t, list<MDSInternalContextBase*> >::iterator p = waiting_for_commit.begin();
while (p != waiting_for_commit.end()) {
- map<version_t, list<Context*> >::iterator n = p;
+ map<version_t, list<MDSInternalContextBase*> >::iterator n = p;
++n;
if (p->first > committed_version) {
dout(10) << " there are waiters for " << p->first << ", committing again" << dendl;
// newly single auth?
if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
- list<Context*> ls;
+ list<MDSInternalContextBase*> ls;
take_waiting(WAIT_SINGLEAUTH, ls);
cache->mds->queue_waiters(ls);
}
}
}
-struct C_Dir_AuthUnpin : public Context {
+class C_Dir_AuthUnpin : public MDSInternalContext {
CDir *dir;
- C_Dir_AuthUnpin(CDir *d) : dir(d) {}
+
+ public:
+ C_Dir_AuthUnpin(CDir *d) : MDSInternalContext(dir->cache->mds), dir(d) {}
void finish(int r) {
dir->auth_unpin(dir->get_inode());
}
class CDentry;
class MDCache;
class MDCluster;
-class Context;
class bloom_filter;
struct ObjectOperation;
public:
- void split(int bits, list<CDir*>& subs, list<Context*>& waiters, bool replay);
- void merge(list<CDir*>& subs, list<Context*>& waiters, bool replay);
+ void split(int bits, list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay);
+ void merge(list<CDir*>& subs, list<MDSInternalContextBase*>& waiters, bool replay);
bool should_split() {
return (int)get_frag_size() > g_conf->mds_bal_split_size;
void prepare_new_fragment(bool replay);
void prepare_old_fragment(bool replay);
void steal_dentry(CDentry *dn); // from another dir. used by merge/split.
- void finish_old_fragment(list<Context*>& waiters, bool replay);
+ void finish_old_fragment(list<MDSInternalContextBase*>& waiters, bool replay);
void init_fragment_pins();
object_t get_ondisk_object() {
return file_object_t(ino(), frag);
}
- void fetch(Context *c, bool ignore_authpinnability=false);
- void fetch(Context *c, const std::string& want_dn, bool ignore_authpinnability=false);
+ void fetch(MDSInternalContextBase *c, bool ignore_authpinnability=false);
+ void fetch(MDSInternalContextBase *c, const std::string& want_dn, bool ignore_authpinnability=false);
protected:
void _omap_fetch(const std::string& want_dn);
void _omap_fetched(bufferlist& hdrbl, std::map<std::string, bufferlist>& omap,
void _tmap_fetched(bufferlist &bl, const std::string& want_dn, int r);
// -- commit --
- std::map<version_t, std::list<Context*> > waiting_for_commit;
+ std::map<version_t, std::list<MDSInternalContextBase*> > waiting_for_commit;
void _commit(version_t want, int op_prio);
void _omap_commit(int op_prio);
void _encode_dentry(CDentry *dn, bufferlist& bl, const std::set<snapid_t> *snaps);
void _committed(version_t v);
public:
+#if 0 // unused?
void wait_for_commit(Context *c, version_t v=0);
+#endif
void commit_to(version_t want);
- void commit(version_t want, Context *c,
+ void commit(version_t want, MDSInternalContextBase *c,
bool ignore_authpinnability=false, int op_prio=-1);
// -- dirtyness --
// -- waiters --
protected:
- std::map< string_snap_t, std::list<Context*> > waiting_on_dentry;
+ std::map< string_snap_t, std::list<MDSInternalContextBase*> > waiting_on_dentry;
public:
bool is_waiting_for_dentry(const std::string& dname, snapid_t snap) {
return waiting_on_dentry.count(string_snap_t(dname, snap));
}
- void add_dentry_waiter(const std::string& dentry, snapid_t snap, Context *c);
- void take_dentry_waiting(const std::string& dentry, snapid_t first, snapid_t last, std::list<Context*>& ls);
- void take_sub_waiting(std::list<Context*>& ls); // dentry or ino
+ void add_dentry_waiter(const std::string& dentry, snapid_t snap, MDSInternalContextBase *c);
+ void take_dentry_waiting(const std::string& dentry, snapid_t first, snapid_t last, std::list<MDSInternalContextBase*>& ls);
+ void take_sub_waiting(std::list<MDSInternalContextBase*>& ls); // dentry or ino
- void add_waiter(uint64_t mask, Context *c);
- void take_waiting(uint64_t mask, std::list<Context*>& ls); // may include dentry waiters
+ void add_waiter(uint64_t mask, MDSInternalContextBase *c);
+ void take_waiting(uint64_t mask, std::list<MDSInternalContextBase*>& ls); // may include dentry waiters
void finish_waiting(uint64_t mask, int result = 0); // ditto
return object_t(n);
}
-void CInode::store(Context *fin)
+void CInode::store(MDSInternalContextBase *fin)
{
dout(10) << "store " << get_version() << dendl;
assert(is_base());
}
};
-void CInode::fetch(Context *fin)
+void CInode::fetch(MDSInternalContextBase *fin)
{
dout(10) << "fetch" << dendl;
}
};
-void CInode::store_backtrace(Context *fin, int op_prio)
+void CInode::store_backtrace(MDSInternalContextBase *fin, int op_prio)
{
dout(10) << "store_backtrace on " << *this << dendl;
assert(is_dirty_parent());
}
}
-struct C_Inode_FragUpdate : public Context {
+
+struct C_Inode_FragUpdate : public MDSInternalContext {
CInode *in;
CDir *dir;
MutationRef mut;
- C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : in(i), dir(d), mut(m) {}
+ C_Inode_FragUpdate(CInode *i, CDir *d, MutationRef& m) : MDSInternalContext(i->mdcache->mds), in(i), dir(d), mut(m) {}
void finish(int r) {
in->_finish_frag_update(dir, mut);
}
return false;
}
-void CInode::add_dir_waiter(frag_t fg, Context *c)
+void CInode::add_dir_waiter(frag_t fg, MDSInternalContextBase *c)
{
if (waiting_on_dir.empty())
get(PIN_DIRWAITER);
dout(10) << "add_dir_waiter frag " << fg << " " << c << " on " << *this << dendl;
}
-void CInode::take_dir_waiting(frag_t fg, list<Context*>& ls)
+void CInode::take_dir_waiting(frag_t fg, list<MDSInternalContextBase*>& ls)
{
if (waiting_on_dir.empty())
return;
- map<frag_t, list<Context*> >::iterator p = waiting_on_dir.find(fg);
+ map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.find(fg);
if (p != waiting_on_dir.end()) {
dout(10) << "take_dir_waiting frag " << fg << " on " << *this << dendl;
ls.splice(ls.end(), p->second);
}
}
-void CInode::add_waiter(uint64_t tag, Context *c)
+void CInode::add_waiter(uint64_t tag, MDSInternalContextBase *c)
{
dout(10) << "add_waiter tag " << std::hex << tag << std::dec << " " << c
<< " !ambig " << !state_test(STATE_AMBIGUOUSAUTH)
MDSCacheObject::add_waiter(tag, c);
}
-void CInode::take_waiting(uint64_t mask, list<Context*>& ls)
+void CInode::take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls)
{
if ((mask & WAIT_DIR) && !waiting_on_dir.empty()) {
// take all dentry waiters
while (!waiting_on_dir.empty()) {
- map<frag_t, list<Context*> >::iterator p = waiting_on_dir.begin();
+ map<frag_t, list<MDSInternalContextBase*> >::iterator p = waiting_on_dir.begin();
dout(10) << "take_waiting dirfrag " << p->first << " on " << *this << dendl;
ls.splice(ls.end(), p->second);
waiting_on_dir.erase(p);
return true;
}
-void CInode::unfreeze_inode(list<Context*>& finished)
+void CInode::unfreeze_inode(list<MDSInternalContextBase*>& finished)
{
dout(10) << "unfreeze_inode" << dendl;
if (state_test(STATE_FREEZING)) {
void CInode::unfreeze_inode()
{
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
unfreeze_inode(finished);
mdcache->mds->queue_waiters(finished);
}
assert(state_test(CInode::STATE_FROZENAUTHPIN));
state_clear(CInode::STATE_FROZENAUTHPIN);
if (!state_test(STATE_FREEZING|STATE_FROZEN)) {
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
take_waiting(WAIT_UNFREEZE, finished);
mdcache->mds->queue_waiters(finished);
}
}
-void CInode::clear_ambiguous_auth(list<Context*>& finished)
+void CInode::clear_ambiguous_auth(list<MDSInternalContextBase*>& finished)
{
assert(state_test(CInode::STATE_AMBIGUOUSAUTH));
state_clear(CInode::STATE_AMBIGUOUSAUTH);
void CInode::clear_ambiguous_auth()
{
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
clear_ambiguous_auth(finished);
mdcache->mds->queue_waiters(finished);
}
bool fcntl_removed = fcntl_locks.remove_all_from(client);
bool flock_removed = flock_locks.remove_all_from(client);
if (fcntl_removed || flock_removed) {
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
take_waiting(CInode::WAIT_FLOCK, waiters);
mdcache->mds->queue_waiters(waiters);
}
flocklock.decode_state(p, is_new);
policylock.decode_state(p, is_new);
}
-void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<Context*>& waiters,
+void CInode::_decode_locks_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters,
list<SimpleLock*>& eval_locks)
{
authlock.decode_state_rejoin(p, waiters);
void set_ambiguous_auth() {
state_set(STATE_AMBIGUOUSAUTH);
}
- void clear_ambiguous_auth(std::list<Context*>& finished);
+ void clear_ambiguous_auth(std::list<MDSInternalContextBase*>& finished);
void clear_ambiguous_auth();
inodeno_t ino() const { return inode.ino; }
void mark_dirty(version_t projected_dirv, LogSegment *ls);
void mark_clean();
- void store(Context *fin);
+ void store(MDSInternalContextBase *fin);
void _stored(version_t cv, Context *fin);
- void fetch(Context *fin);
+ void fetch(MDSInternalContextBase *fin);
void _fetched(bufferlist& bl, bufferlist& bl2, Context *fin);
void build_backtrace(int64_t pool, inode_backtrace_t& bt);
- void store_backtrace(Context *fin, int op_prio=-1);
+ void store_backtrace(MDSInternalContextBase *fin, int op_prio=-1);
void _stored_backtrace(version_t v, Context *fin);
void _mark_dirty_parent(LogSegment *ls, bool dirty_pool=false);
void clear_dirty_parent();
// -- waiting --
protected:
- std::map<frag_t, std::list<Context*> > waiting_on_dir;
+ std::map<frag_t, std::list<MDSInternalContextBase*> > waiting_on_dir;
public:
- void add_dir_waiter(frag_t fg, Context *c);
- void take_dir_waiting(frag_t fg, std::list<Context*>& ls);
+ void add_dir_waiter(frag_t fg, MDSInternalContextBase *c);
+ void take_dir_waiting(frag_t fg, std::list<MDSInternalContextBase*>& ls);
bool is_waiting_for_dir(frag_t fg) {
return waiting_on_dir.count(fg);
}
- void add_waiter(uint64_t tag, Context *c);
- void take_waiting(uint64_t tag, std::list<Context*>& ls);
+ void add_waiter(uint64_t tag, MDSInternalContextBase *c);
+ void take_waiting(uint64_t tag, std::list<MDSInternalContextBase*>& ls);
// -- encode/decode helpers --
void _encode_base(bufferlist& bl);
void _encode_locks_state_for_replica(bufferlist& bl);
void _encode_locks_state_for_rejoin(bufferlist& bl, int rep);
void _decode_locks_state(bufferlist::iterator& p, bool is_new);
- void _decode_locks_rejoin(bufferlist::iterator& p, std::list<Context*>& waiters,
+ void _decode_locks_rejoin(bufferlist::iterator& p, std::list<MDSInternalContextBase*>& waiters,
std::list<SimpleLock*>& eval_locks);
// -- import/export --
/* Freeze the inode. auth_pin_allowance lets the caller account for any
* auth_pins it is itself holding/responsible for. */
bool freeze_inode(int auth_pin_allowance=0);
- void unfreeze_inode(std::list<Context*>& finished);
+ void unfreeze_inode(std::list<MDSInternalContextBase*>& finished);
void unfreeze_inode();
void freeze_auth_pin();
#include "CDir.h"
#include "CDentry.h"
#include "Mutation.h"
+#include "MDSContext.h"
#include "MDLog.h"
#include "MDSMap.h"
return *_dout << "mds." << mds->get_nodeid() << ".locker ";
}
+
+class LockerContext : public MDSInternalContextBase {
+ protected:
+ Locker *locker;
+ MDS *get_mds()
+ {
+ assert(locker != NULL);
+ return locker->mds;
+ }
+
+ public:
+ LockerContext(Locker *locker_) : locker(locker_) {}
+};
+
+
/* This function DOES put the passed message before returning */
void Locker::dispatch(Message *m)
{
// generics
-void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<Context*> *pfinishers)
+void Locker::eval_gather(SimpleLock *lock, bool first, bool *pneed_issue, list<MDSInternalContextBase*> *pfinishers)
{
dout(10) << "eval_gather " << *lock << " on " << *lock->get_parent() << dendl;
assert(!lock->is_stable());
bool Locker::eval(CInode *in, int mask, bool caps_imported)
{
bool need_issue = caps_imported;
- list<Context*> finishers;
+ list<MDSInternalContextBase*> finishers;
dout(10) << "eval " << mask << " " << *in << dendl;
return need_issue;
}
-class C_Locker_Eval : public Context {
- Locker *locker;
+class C_Locker_Eval : public LockerContext {
MDSCacheObject *p;
int mask;
public:
- C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : locker(l), p(pp), mask(m) {
+ C_Locker_Eval(Locker *l, MDSCacheObject *pp, int m) : LockerContext(l), p(pp), mask(m) {
// We are used as an MDSCacheObject waiter, so should
// only be invoked by someone already holding the big lock.
assert(locker->mds->mds_lock.is_locked_by_me());
void Locker::eval_cap_gather(CInode *in, set<CInode*> *issue_set)
{
bool need_issue = false;
- list<Context*> finishers;
+ list<MDSInternalContextBase*> finishers;
// kick locks now
if (!in->filelock.is_stable())
void Locker::eval_scatter_gathers(CInode *in)
{
bool need_issue = false;
- list<Context*> finishers;
+ list<MDSInternalContextBase*> finishers;
dout(10) << "eval_scatter_gathers " << *in << dendl;
return false;
}
-bool Locker::rdlock_try(SimpleLock *lock, client_t client, Context *con)
+bool Locker::rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *con)
{
dout(7) << "rdlock_try on " << *lock << " on " << *lock->get_parent() << dendl;
return in->inode.file_data_version;
}
-struct C_IO_Locker_FileUpdate_finish : public Context {
- Locker *locker;
+class C_Locker_FileUpdate_finish : public LockerContext {
CInode *in;
MutationRef mut;
bool share;
client_t client;
Capability *cap;
MClientCaps *ack;
- C_IO_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
+public:
+ C_Locker_FileUpdate_finish(Locker *l, CInode *i, MutationRef& m,
bool e=false, client_t c=-1,
Capability *cp = 0,
MClientCaps *ac = 0)
- : locker(l), in(i), mut(m), share(e), client(c), cap(cp),
+ : LockerContext(l), in(i), mut(m), share(e), client(c), cap(cp),
ack(ac) {
in->get(CInode::PIN_PTRWAITER);
}
void finish(int r) {
- Mutex::Locker l(locker->mds->mds_lock);
locker->file_update_finish(in, mut, share, client, cap, ack);
}
};
}
-class C_MDL_RequestInodeFileCaps : public Context {
- Locker *locker;
+class C_MDL_RequestInodeFileCaps : public LockerContext {
CInode *in;
public:
- C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : locker(l), in(i) {
+ C_MDL_RequestInodeFileCaps(Locker *l, CInode *i) : LockerContext(l), in(i) {
in->get(CInode::PIN_PTRWAITER);
}
void finish(int r) {
- assert(locker->mds->mds_lock.is_locked_by_me());
in->put(CInode::PIN_PTRWAITER);
if (!in->is_auth())
locker->request_inode_file_caps(in);
}
-class C_MDL_CheckMaxSize : public Context {
- Locker *locker;
+class C_MDL_CheckMaxSize : public LockerContext {
CInode *in;
bool update_size;
uint64_t newsize;
public:
C_MDL_CheckMaxSize(Locker *l, CInode *i, bool _update_size, uint64_t _newsize,
bool _update_max, uint64_t _new_max_size, utime_t _mtime) :
- locker(l), in(i),
+ LockerContext(l), in(i),
update_size(_update_size), newsize(_newsize),
update_max(_update_max), new_max_size(_new_max_size),
mtime(_mtime)
in->get(CInode::PIN_PTRWAITER);
}
void finish(int r) {
- assert(locker->mds->mds_lock.is_locked_by_me());
-
in->put(CInode::PIN_PTRWAITER);
if (in->is_auth())
locker->check_inode_max_size(in, false, update_size, newsize,
mdcache->journal_dirty_inode(mut.get(), metablob, in);
}
mds->mdlog->submit_entry(le,
- new C_IO_Locker_FileUpdate_finish(this, in, mut, true));
+ new C_Locker_FileUpdate_finish(this, in, mut, true));
wrlock_force(&in->filelock, mut); // wrlock for duration of journal
mut->auth_pin(in);
m->put();
}
-class C_Locker_RetryRequestCapRelease : public Context {
- Locker *locker;
+
+class C_Locker_RetryRequestCapRelease : public LockerContext {
client_t client;
ceph_mds_request_release item;
public:
C_Locker_RetryRequestCapRelease(Locker *l, client_t c, const ceph_mds_request_release& it) :
- locker(l), client(c), item(it) { }
+ LockerContext(l), client(c), item(it) { }
void finish(int r) {
string dname;
MDRequestRef null_ref;
mdr->cap_releases[in->vino()] = cap->get_last_seq();
}
-class C_Locker_RetryKickIssueCaps : public Context {
- Locker *locker;
+class C_Locker_RetryKickIssueCaps : public LockerContext {
CInode *in;
client_t client;
ceph_seq_t seq;
public:
C_Locker_RetryKickIssueCaps(Locker *l, CInode *i, client_t c, ceph_seq_t s) :
- locker(l), in(i), client(c), seq(s) {
+ LockerContext(l), in(i), client(c), seq(s) {
in->get(CInode::PIN_PTRWAITER);
}
void finish(int r) {
mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
- mds->mdlog->submit_entry(le, new C_IO_Locker_FileUpdate_finish(this, in, mut,
+ mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
false,
client, NULL,
ack));
mdcache->predirty_journal_parents(mut, &le->metablob, in, 0, PREDIRTY_PRIMARY, 0, follows);
mdcache->journal_dirty_inode(mut.get(), &le->metablob, in, follows);
- mds->mdlog->submit_entry(le, new C_IO_Locker_FileUpdate_finish(this, in, mut,
+ mds->mdlog->submit_entry(le, new C_Locker_FileUpdate_finish(this, in, mut,
change_max,
client, cap,
ack));
m->put();
}
-class C_Locker_RetryCapRelease : public Context {
- Locker *locker;
+class C_Locker_RetryCapRelease : public LockerContext {
client_t client;
inodeno_t ino;
uint64_t cap_id;
public:
C_Locker_RetryCapRelease(Locker *l, client_t c, inodeno_t i, uint64_t id,
ceph_seq_t mseq, ceph_seq_t seq) :
- locker(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
+ LockerContext(l), client(c), ino(i), cap_id(id), migrate_seq(mseq), issue_seq(seq) {}
void finish(int r) {
locker->_do_cap_release(client, ino, cap_id, migrate_seq, issue_seq);
}
*/
+class C_Locker_ScatterWB : public LockerContext {
+ ScatterLock *lock;
+ MutationRef mut;
+public:
+ C_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
+ LockerContext(l), lock(sl), mut(m) {}
+ void finish(int r) {
+ locker->scatter_writebehind_finish(lock, mut);
+ }
+};
+
void Locker::scatter_writebehind(ScatterLock *lock)
{
CInode *in = static_cast<CInode*>(lock->get_parent());
in->finish_scatter_gather_update_accounted(lock->get_type(), mut, &le->metablob);
- mds->mdlog->submit_entry(le, new C_IO_Locker_ScatterWB(this, lock, mut));
+ mds->mdlog->submit_entry(le, new C_Locker_ScatterWB(this, lock, mut));
}
void Locker::scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut)
* we need to lock|scatter in order to push fnode changes into the
* inode.dirstat.
*/
-void Locker::scatter_nudge(ScatterLock *lock, Context *c, bool forcelockchange)
+void Locker::scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange)
{
CInode *p = static_cast<CInode *>(lock->get_parent());
void drop_non_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue=0);
void drop_rdlocks(MutationImpl *mut, set<CInode*> *pneed_issue=0);
- void eval_gather(SimpleLock *lock, bool first=false, bool *need_issue=0, list<Context*> *pfinishers=0);
+ void eval_gather(SimpleLock *lock, bool first=false, bool *need_issue=0, list<MDSInternalContextBase*> *pfinishers=0);
void eval(SimpleLock *lock, bool *need_issue);
- void eval_any(SimpleLock *lock, bool *need_issue, list<Context*> *pfinishers=0, bool first=false) {
+ void eval_any(SimpleLock *lock, bool *need_issue, list<MDSInternalContextBase*> *pfinishers=0, bool first=false) {
if (!lock->is_stable())
eval_gather(lock, first, need_issue, pfinishers);
else if (lock->get_parent()->is_auth())
eval(lock, need_issue);
}
- class C_EvalScatterGathers : public Context {
- Locker *locker;
- CInode *in;
- public:
- C_EvalScatterGathers(Locker *l, CInode *i) : locker(l), in(i) {
- in->get(CInode::PIN_PTRWAITER);
- }
- void finish(int r) {
- in->put(CInode::PIN_PTRWAITER);
- locker->eval_scatter_gathers(in);
- }
- };
void eval_scatter_gathers(CInode *in);
void eval_cap_gather(CInode *in, set<CInode*> *issue_set=0);
void try_eval(SimpleLock *lock, bool *pneed_issue);
bool _rdlock_kick(SimpleLock *lock, bool as_anon);
- bool rdlock_try(SimpleLock *lock, client_t client, Context *c);
+ bool rdlock_try(SimpleLock *lock, client_t client, MDSInternalContextBase *c);
bool rdlock_start(SimpleLock *lock, MDRequestRef& mut, bool as_anon=false);
void rdlock_finish(SimpleLock *lock, MutationImpl *mut, bool *pneed_issue);
bool can_rdlock_set(set<SimpleLock*>& locks);
// simple
public:
void try_simple_eval(SimpleLock *lock);
- bool simple_rdlock_try(SimpleLock *lock, Context *con);
+ bool simple_rdlock_try(SimpleLock *lock, MDSInternalContextBase *con);
protected:
void simple_eval(SimpleLock *lock, bool *need_issue);
void handle_simple_lock(SimpleLock *lock, MLock *m);
void scatter_eval(ScatterLock *lock, bool *need_issue); // public for MDCache::adjust_subtree_auth()
void scatter_tick();
- void scatter_nudge(ScatterLock *lock, Context *c, bool forcelockchange=false);
+ void scatter_nudge(ScatterLock *lock, MDSInternalContextBase *c, bool forcelockchange=false);
protected:
void handle_scatter_lock(ScatterLock *lock, MLock *m);
void scatter_tempsync(ScatterLock *lock, bool *need_issue=0);
void scatter_writebehind(ScatterLock *lock);
- class C_IO_Locker_ScatterWB : public Context {
- Locker *locker;
- ScatterLock *lock;
- MutationRef mut;
- public:
- C_IO_Locker_ScatterWB(Locker *l, ScatterLock *sl, MutationRef& m) :
- locker(l), lock(sl), mut(m) {}
- void finish(int r) {
- Mutex::Locker l(locker->mds->mds_lock);
- locker->scatter_writebehind_finish(lock, mut);
- }
- };
+
void scatter_writebehind_finish(ScatterLock *lock, MutationRef& mut);
xlist<ScatterLock*> updated_scatterlocks;
private:
friend class C_MDL_CheckMaxSize;
friend class C_MDL_RequestInodeFileCaps;
- friend struct C_IO_Locker_FileUpdate_finish;
+ friend class C_Locker_FileUpdate_finish;
friend class C_Locker_RetryCapRelease;
friend class C_Locker_Eval;
+ friend class LockerContext;
+ friend class C_Locker_ScatterWB;
// -- client leases --
#include "CInode.h"
#include "CDentry.h"
#include "CDir.h"
+#include "MDSContext.h"
#include "include/unordered_set.h"
using ceph::unordered_set;
map<int,version_t> tablev;
// try to expire
- void try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio);
+ void try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_prio);
// cons
LogSegment(uint64_t _seq, loff_t off=-1) :
-class C_Bal_SendHeartbeat : public Context {
+class C_Bal_SendHeartbeat : public MDSInternalContext {
public:
- MDS *mds;
- C_Bal_SendHeartbeat(MDS *mds) {
- this->mds = mds;
- }
+ C_Bal_SendHeartbeat(MDS *mds_) : MDSInternalContext(mds_) { }
virtual void finish(int f) {
mds->balancer->send_heartbeat();
}
class Message;
class MHeartbeat;
class CInode;
-class Context;
class CDir;
class MDBalancer {
set<int> SimpleLock::empty_gather_set;
+/**
+ * All non-I/O contexts that require a reference
+ * to an MDCache instance descend from this.
+ */
+class MDCacheContext : public virtual MDSInternalContextBase {
+protected:
+ MDCache *mdcache;
+ virtual MDS *get_mds()
+ {
+ assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+public:
+ MDCacheContext(MDCache *mdc_) : mdcache(mdc_) {}
+};
+
+
+/**
+ * Only for contexts called back from an I/O completion
+ *
+ * Note: duplication of members wrt MDCacheContext, because
+ * it'ls the lesser of two evils compared with introducing
+ * yet another piece of (multiple) inheritance.
+ */
+class MDCacheIOContext : public virtual MDSIOContextBase {
+protected:
+ MDCache *mdcache;
+ virtual MDS *get_mds()
+ {
+ assert(mdcache != NULL);
+ return mdcache->mds;
+ }
+public:
+ MDCacheIOContext(MDCache *mdc_) : mdcache(mdc_) {}
+};
+
+
MDCache::MDCache(MDS *m) :
recovery_queue(m),
delayed_eval_stray(member_offset(CDentry, item_stray))
return i;
}
-void MDCache::create_empty_hierarchy(C_Gather *gather)
+void MDCache::create_empty_hierarchy(MDSGather *gather)
{
// create root dir
CInode *root = create_root_inode();
root->store(gather->new_sub());
}
-void MDCache::create_mydir_hierarchy(C_Gather *gather)
+void MDCache::create_mydir_hierarchy(MDSGather *gather)
{
// create mds dir
char myname[10];
myin->store(gather->new_sub());
}
-struct C_MDC_CreateSystemFile : public Context {
- MDCache *cache;
+struct C_MDC_CreateSystemFile : public MDCacheContext {
MutationRef mut;
CDentry *dn;
version_t dpv;
- Context *fin;
- C_MDC_CreateSystemFile(MDCache *c, MutationRef& mu, CDentry *d, version_t v, Context *f) :
- cache(c), mut(mu), dn(d), dpv(v), fin(f) {}
+ MDSInternalContextBase *fin;
+ C_MDC_CreateSystemFile(MDCache *c, MutationRef& mu, CDentry *d, version_t v, MDSInternalContextBase *f) :
+ MDCacheContext(c), mut(mu), dn(d), dpv(v), fin(f) {}
void finish(int r) {
- cache->_create_system_file_finish(mut, dn, dpv, fin);
+ mdcache->_create_system_file_finish(mut, dn, dpv, fin);
}
};
-void MDCache::_create_system_file(CDir *dir, const char *name, CInode *in, Context *fin)
+void MDCache::_create_system_file(CDir *dir, const char *name, CInode *in, MDSInternalContextBase *fin)
{
dout(10) << "_create_system_file " << name << " in " << *dir << dendl;
CDentry *dn = dir->add_null_dentry(name);
mds->mdlog->flush();
}
-void MDCache::_create_system_file_finish(MutationRef& mut, CDentry *dn, version_t dpv, Context *fin)
+void MDCache::_create_system_file_finish(MutationRef& mut, CDentry *dn, version_t dpv, MDSInternalContextBase *fin)
{
dout(10) << "_create_system_file_finish " << *dn << dendl;
-struct C_MDS_RetryOpenRoot : public Context {
+struct C_MDS_RetryOpenRoot : public MDSInternalContext {
MDCache *cache;
- C_MDS_RetryOpenRoot(MDCache *c) : cache(c) {}
+ C_MDS_RetryOpenRoot(MDCache *c) : MDSInternalContext(c->mds), cache(c) {}
void finish(int r) {
if (r < 0)
cache->mds->suicide();
}
};
-void MDCache::open_root_inode(Context *c)
+void MDCache::open_root_inode(MDSInternalContextBase *c)
{
if (mds->whoami == mds->mdsmap->get_root()) {
CInode *in;
}
}
-void MDCache::open_mydir_inode(Context *c)
+void MDCache::open_mydir_inode(MDSInternalContextBase *c)
{
CInode *in = create_system_inode(MDS_INO_MDSDIR(mds->whoami), S_IFDIR|0755); // initially inaccurate!
in->fetch(c);
scan_stray_dir();
}
-void MDCache::open_foreign_mdsdir(inodeno_t ino, Context *fin)
+void MDCache::open_foreign_mdsdir(inodeno_t ino, MDSInternalContextBase *fin)
{
discover_base_ino(ino, fin, ino & (MAX_MDS-1));
}
try_subtree_merge_at(*p);
}
-class C_MDC_SubtreeMergeWB : public Context {
- MDCache *mdcache;
+class C_MDC_SubtreeMergeWB : public MDCacheContext {
CInode *in;
MutationRef mut;
public:
- C_MDC_SubtreeMergeWB(MDCache *mdc, CInode *i, MutationRef& m) : mdcache(mdc), in(i), mut(m) {}
+ C_MDC_SubtreeMergeWB(MDCache *mdc, CInode *i, MutationRef& m) : MDCacheContext(mdc), in(i), mut(m) {}
void finish(int r) {
mdcache->subtree_merge_writebehind_finish(in, mut);
}
* remove them from the uncommitted_masters map (used during recovery
* to commit|abort slaves).
*/
-struct C_MDC_CommittedMaster : public Context {
- MDCache *cache;
+struct C_MDC_CommittedMaster : public MDCacheContext {
metareqid_t reqid;
- C_MDC_CommittedMaster(MDCache *s, metareqid_t r) : cache(s), reqid(r) {}
+ C_MDC_CommittedMaster(MDCache *s, metareqid_t r) : MDCacheContext(s), reqid(r) {}
void finish(int r) {
- cache->_logged_master_commit(reqid);
+ mdcache->_logged_master_commit(reqid);
}
};
* masters when it reaches up:active (all other recovering nodes must
* complete resolve before that happens).
*/
-struct C_MDC_SlaveCommit : public Context {
- MDCache *cache;
+struct C_MDC_SlaveCommit : public MDCacheContext {
int from;
metareqid_t reqid;
- C_MDC_SlaveCommit(MDCache *c, int f, metareqid_t r) : cache(c), from(f), reqid(r) {}
+ C_MDC_SlaveCommit(MDCache *c, int f, metareqid_t r) : MDCacheContext(c), from(f), reqid(r) {}
void finish(int r) {
- cache->_logged_slave_commit(from, reqid);
+ mdcache->_logged_slave_commit(from, reqid);
}
};
static const uint64_t i_mask = CInode::WAIT_ANY_MASK & ~CInode::WAIT_DIR;
static const uint64_t d_mask = CDir::WAIT_ANY_MASK & ~CDir::WAIT_DENTRY;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
// wake up any waiters in their subtrees
for (map<CDir*,set<CDir*> >::iterator p = subtrees.begin();
}
}
-class C_MDC_RejoinGatherFinish : public Context {
- MDCache *cache;
+class C_MDC_RejoinGatherFinish : public MDCacheContext {
public:
- C_MDC_RejoinGatherFinish(MDCache *c) : cache(c) {}
+ C_MDC_RejoinGatherFinish(MDCache *c) : MDCacheContext(c) {}
void finish(int r) {
- cache->rejoin_gather_finish();
+ mdcache->rejoin_gather_finish();
}
};
}
}
-class C_MDC_RejoinOpenInoFinish: public Context {
- MDCache *cache;
+class C_MDC_RejoinOpenInoFinish: public MDCacheContext {
inodeno_t ino;
public:
- C_MDC_RejoinOpenInoFinish(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ C_MDC_RejoinOpenInoFinish(MDCache *c, inodeno_t i) : MDCacheContext(c), ino(i) {}
void finish(int r) {
- cache->rejoin_open_ino_finish(ino, r);
+ mdcache->rejoin_open_ino_finish(ino, r);
}
};
}
}
-class C_MDC_RejoinSessionsOpened : public Context {
- MDCache *cache;
+class C_MDC_RejoinSessionsOpened : public MDCacheContext {
public:
map<client_t,entity_inst_t> client_map;
map<client_t,uint64_t> sseqmap;
C_MDC_RejoinSessionsOpened(MDCache *c, map<client_t,entity_inst_t>& cm) :
- cache(c), client_map(cm) {}
+ MDCacheContext(c), client_map(cm) {}
void finish(int r) {
assert(r == 0);
- cache->rejoin_open_sessions_finish(client_map, sseqmap);
+ mdcache->rejoin_open_sessions_finish(client_map, sseqmap);
}
};
#endif
}
-struct C_MDC_OpenSnapParents : public Context {
- MDCache *mdcache;
- C_MDC_OpenSnapParents(MDCache *c) : mdcache(c) {}
+struct C_MDC_OpenSnapParents : public MDCacheContext {
+ C_MDC_OpenSnapParents(MDCache *c) : MDCacheContext(c) {}
void finish(int r) {
mdcache->open_snap_parents();
}
dout(10) << "open_snap_parents" << dendl;
map<client_t,MClientSnap*> splits;
- C_GatherBuilder gather(g_ceph_context);
+ MDSGatherBuilder gather(g_ceph_context);
map<CInode*,map<client_t,set<inodeno_t> > >::iterator p = missing_snap_parents.begin();
while (p != missing_snap_parents.end()) {
if (fetch_queue.empty())
return false;
- C_GatherBuilder gather(g_ceph_context, new C_MDC_RejoinGatherFinish(this));
+ MDSGatherBuilder gather(g_ceph_context, new C_MDC_RejoinGatherFinish(this));
for (set<CDir*>::iterator p = fetch_queue.begin();
p != fetch_queue.end();
++p) {
// ===============================================================================
-struct C_MDC_QueuedCow : public Context {
- MDCache *mdcache;
+struct C_MDC_QueuedCow : public MDCacheContext {
CInode *in;
MutationRef mut;
C_MDC_QueuedCow(MDCache *mdc, CInode *i, MutationRef& m) :
- mdcache(mdc), in(i), mut(m) {}
+ MDCacheContext(mdc), in(i), mut(m) {}
void finish(int r) {
mdcache->_queued_file_recover_cow(in, mut);
}
_truncate_inode(in, ls);
}
-struct C_IO_MDC_TruncateFinish : public Context {
- MDCache *mdc;
+struct C_IO_MDC_TruncateFinish : public MDCacheIOContext {
CInode *in;
LogSegment *ls;
C_IO_MDC_TruncateFinish(MDCache *c, CInode *i, LogSegment *l) :
- mdc(c), in(i), ls(l) {}
+ MDCacheIOContext(c), in(i), ls(l) {}
void finish(int r) {
assert(r == 0 || r == -ENOENT);
- Mutex::Locker l(mdc->mds->mds_lock);
- mdc->truncate_inode_finish(in, ls);
+ mdcache->truncate_inode_finish(in, ls);
}
};
&mds->finisher));
}
-struct C_IO_MDC_TruncateLogged : public Context {
- MDCache *mdc;
+struct C_MDC_TruncateLogged : public MDCacheContext {
CInode *in;
MutationRef mut;
- C_IO_MDC_TruncateLogged(MDCache *m, CInode *i, MutationRef& mu) :
- mdc(m), in(i), mut(mu) {}
+ C_MDC_TruncateLogged(MDCache *m, CInode *i, MutationRef& mu) :
+ MDCacheContext(m), in(i), mut(mu) {}
void finish(int r) {
- mdc->truncate_inode_logged(in, mut);
+ mdcache->truncate_inode_logged(in, mut);
}
};
le->metablob.add_truncate_finish(in->ino(), ls->seq);
journal_dirty_inode(mut.get(), &le->metablob, in);
- mds->mdlog->submit_entry(le, new C_IO_MDC_TruncateLogged(this, in, mut));
+ mds->mdlog->submit_entry(le, new C_MDC_TruncateLogged(this, in, mut));
// flush immediately if there are readers/writers waiting
if (in->get_caps_wanted() & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR))
in->put(CInode::PIN_TRUNCATING);
in->auth_unpin(this);
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
in->take_waiting(CInode::WAIT_TRUNC, waiters);
mds->queue_waiters(waiters);
}
// =========================================================================================
// shutdown
-class C_MDC_ShutdownCheck : public Context {
- MDCache *mdc;
+class C_MDC_ShutdownCheck : public MDCacheContext {
public:
- C_MDC_ShutdownCheck(MDCache *m) : mdc(m) {}
+ C_MDC_ShutdownCheck(MDCache *m) : MDCacheContext(m) {}
void finish(int) {
- mdc->shutdown_check();
+ mdcache->shutdown_check();
}
};
}
}
-Context *MDCache::_get_waiter(MDRequestRef& mdr, Message *req, Context *fin)
+MDSInternalContextBase *MDCache::_get_waiter(MDRequestRef& mdr, Message *req, MDSInternalContextBase *fin)
{
if (mdr) {
dout(20) << "_get_waiter retryrequest" << dendl;
}
}
-int MDCache::path_traverse(MDRequestRef& mdr, Message *req, Context *fin, // who
+int MDCache::path_traverse(MDRequestRef& mdr, Message *req, MDSInternalContextBase *fin, // who
const filepath& path, // what
vector<CDentry*> *pdnvec, // result
CInode **pin,
* @param approxfg approximate fragment.
* @param fin completion callback
*/
-void MDCache::open_remote_dirfrag(CInode *diri, frag_t approxfg, Context *fin)
+void MDCache::open_remote_dirfrag(CInode *diri, frag_t approxfg, MDSInternalContextBase *fin)
{
dout(10) << "open_remote_dir on " << *diri << dendl;
}
}
-struct C_MDC_OpenRemoteDentry : public Context {
- MDCache *mdc;
+struct C_MDC_OpenRemoteDentry : public MDCacheContext {
CDentry *dn;
inodeno_t ino;
- Context *onfinish;
+ MDSInternalContextBase *onfinish;
bool want_xlocked;
- C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, inodeno_t i, Context *f, bool wx) :
- mdc(m), dn(d), ino(i), onfinish(f), want_xlocked(wx) {}
+ C_MDC_OpenRemoteDentry(MDCache *m, CDentry *d, inodeno_t i, MDSInternalContextBase *f, bool wx) :
+ MDCacheContext(m), dn(d), ino(i), onfinish(f), want_xlocked(wx) {}
void finish(int r) {
- mdc->_open_remote_dentry_finish(dn, ino, onfinish, want_xlocked, r);
+ mdcache->_open_remote_dentry_finish(dn, ino, onfinish, want_xlocked, r);
}
};
-void MDCache::open_remote_dentry(CDentry *dn, bool projected, Context *fin, bool want_xlocked)
+void MDCache::open_remote_dentry(CDentry *dn, bool projected, MDSInternalContextBase *fin, bool want_xlocked)
{
dout(10) << "open_remote_dentry " << *dn << dendl;
CDentry::linkage_t *dnl = projected ? dn->get_projected_linkage() : dn->get_linkage();
inodeno_t ino = dnl->get_remote_ino();
uint64_t pool = dnl->get_remote_d_type() == DT_DIR ? mds->mdsmap->get_metadata_pool() : -1;
- Context *fin2 = new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked);
- open_ino(ino, pool, fin2, true, want_xlocked); // backtrace
+ open_ino(ino, pool,
+ new C_MDC_OpenRemoteDentry(this, dn, ino, fin, want_xlocked), true, want_xlocked); // backtrace
}
-void MDCache::_open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin,
+void MDCache::_open_remote_dentry_finish(CDentry *dn, inodeno_t ino, MDSInternalContextBase *fin,
bool want_xlocked, int r)
{
if (r < 0) {
// -------------------------------------------------------------------------------
// Open inode by inode number
-class C_IO_MDC_OpenInoBacktraceFetched : public Context {
- MDCache *cache;
+class C_IO_MDC_OpenInoBacktraceFetched : public MDCacheIOContext {
inodeno_t ino;
public:
bufferlist bl;
C_IO_MDC_OpenInoBacktraceFetched(MDCache *c, inodeno_t i) :
- cache(c), ino(i) {}
+ MDCacheIOContext(c), ino(i) {}
void finish(int r) {
- Mutex::Locker l(cache->mds->mds_lock);
- cache->_open_ino_backtrace_fetched(ino, bl, r);
+ mdcache->_open_ino_backtrace_fetched(ino, bl, r);
}
};
-struct C_MDC_OpenInoTraverseDir : public Context {
- MDCache *cache;
+struct C_MDC_OpenInoTraverseDir : public MDCacheContext {
inodeno_t ino;
public:
- C_MDC_OpenInoTraverseDir(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ C_MDC_OpenInoTraverseDir(MDCache *c, inodeno_t i) : MDCacheContext(c), ino(i) {}
void finish(int r) {
- assert(cache->opening_inodes.count(ino));
- cache->_open_ino_traverse_dir(ino, cache->opening_inodes[ino], r);
+ assert(mdcache->opening_inodes.count(ino));
+ mdcache->_open_ino_traverse_dir(ino, mdcache->opening_inodes[ino], r);
}
};
-struct C_MDC_OpenInoParentOpened : public Context {
- MDCache *cache;
+struct C_MDC_OpenInoParentOpened : public MDCacheContext {
inodeno_t ino;
public:
- C_MDC_OpenInoParentOpened(MDCache *c, inodeno_t i) : cache(c), ino(i) {}
+ C_MDC_OpenInoParentOpened(MDCache *c, inodeno_t i) : MDCacheContext(c), ino(i) {}
void finish(int r) {
- cache->_open_ino_parent_opened(ino, r);
+ mdcache->_open_ino_parent_opened(ino, r);
}
};
}
}
-Context* MDCache::_open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m)
+MDSInternalContextBase* MDCache::_open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m)
{
if (m)
return new C_MDS_RetryMessage(mds, m);
{
dout(10) << "open_ino_finish ino " << ino << " ret " << ret << dendl;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
waiters.swap(info.waiters);
opening_inodes.erase(ino);
finish_contexts(g_ceph_context, waiters, ret);
}
}
-void MDCache::open_ino(inodeno_t ino, int64_t pool, Context* fin,
+void MDCache::open_ino(inodeno_t ino, int64_t pool, MDSInternalContextBase* fin,
bool want_replica, bool want_xlocked)
{
dout(10) << "open_ino " << ino << " pool " << pool << " want_replica "
- traverse path
*/
-void MDCache::find_ino_peers(inodeno_t ino, Context *c, int hint)
+void MDCache::find_ino_peers(inodeno_t ino, MDSInternalContextBase *c, int hint)
{
dout(5) << "find_ino_peers " << ino << " hint " << hint << dendl;
assert(!have_inode(ino));
// -------------------------------------------------------------------------------
// SNAPREALMS
-struct C_MDC_snaprealm_create_finish : public Context {
- MDCache *cache;
+struct C_MDC_snaprealm_create_finish : public MDCacheContext {
MDRequestRef mdr;
MutationRef mut;
CInode *in;
C_MDC_snaprealm_create_finish(MDCache *c, MDRequestRef& m,
MutationRef& mu, CInode *i) :
- cache(c), mdr(m), mut(mu), in(i) {}
+ MDCacheContext(c), mdr(m), mut(mu), in(i) {}
void finish(int r) {
- cache->_snaprealm_create_finish(mdr, mut, in);
+ mdcache->_snaprealm_create_finish(mdr, mut, in);
}
};
// -------------------------------------------------------------------------------
// STRAYS
-struct C_MDC_RetryScanStray : public Context {
- MDCache *cache;
+struct C_MDC_RetryScanStray : public MDCacheContext {
dirfrag_t next;
- C_MDC_RetryScanStray(MDCache *c, dirfrag_t n) : cache(c), next(n) { }
+ C_MDC_RetryScanStray(MDCache *c, dirfrag_t n) : MDCacheContext(c), next(n) { }
void finish(int r) {
- cache->scan_stray_dir(next);
+ mdcache->scan_stray_dir(next);
}
};
}
}
-struct C_MDC_EvalStray : public Context {
- MDCache *mdcache;
+struct C_MDC_EvalStray : public MDCacheContext {
CDentry *dn;
- C_MDC_EvalStray(MDCache *c, CDentry *d) : mdcache(c), dn(d) {}
+ C_MDC_EvalStray(MDCache *c, CDentry *d) : MDCacheContext(c), dn(d) {}
void finish(int r) {
mdcache->eval_stray(dn);
}
mds->objecter->getxattr(oid, object_locator_t(pool), "parent", CEPH_NOSNAP, &bl, 0, fin);
}
-class C_IO_MDC_PurgeStrayPurged : public Context {
- MDCache *cache;
+class C_IO_MDC_PurgeStrayPurged : public MDCacheIOContext {
CDentry *dn;
public:
C_IO_MDC_PurgeStrayPurged(MDCache *c, CDentry *d) :
- cache(c), dn(d) { }
+ MDCacheIOContext(c), dn(d) { }
void finish(int r) {
- Mutex::Locker l(cache->mds->mds_lock);
assert(r == 0 || r == -ENOENT);
- cache->_purge_stray_purged(dn, r);
+ mdcache->_purge_stray_purged(dn, r);
}
};
gather.activate();
}
-class C_MDC_PurgeStrayLogged : public Context {
- MDCache *cache;
+class C_MDC_PurgeStrayLogged : public MDCacheContext {
CDentry *dn;
version_t pdv;
LogSegment *ls;
public:
C_MDC_PurgeStrayLogged(MDCache *c, CDentry *d, version_t v, LogSegment *s) :
- cache(c), dn(d), pdv(v), ls(s) { }
+ MDCacheContext(c), dn(d), pdv(v), ls(s) { }
void finish(int r) {
- cache->_purge_stray_logged(dn, pdv, ls);
+ mdcache->_purge_stray_logged(dn, pdv, ls);
}
};
-class C_MDC_PurgeStrayLoggedTruncate : public Context {
- MDCache *cache;
+class C_MDC_PurgeStrayLoggedTruncate : public MDCacheContext {
CDentry *dn;
LogSegment *ls;
public:
C_MDC_PurgeStrayLoggedTruncate(MDCache *c, CDentry *d, LogSegment *s) :
- cache(c), dn(d), ls(s) { }
+ MDCacheContext(c), dn(d), ls(s) { }
void finish(int r) {
- cache->_purge_stray_logged_truncate(dn, ls);
+ mdcache->_purge_stray_logged_truncate(dn, ls);
}
};
}
void MDCache::discover_base_ino(inodeno_t want_ino,
- Context *onfinish,
+ MDSInternalContextBase *onfinish,
int from)
{
dout(7) << "discover_base_ino " << want_ino << " from mds." << from << dendl;
void MDCache::discover_dir_frag(CInode *base,
frag_t approx_fg,
- Context *onfinish,
+ MDSInternalContextBase *onfinish,
int from)
{
if (from < 0)
base->add_dir_waiter(approx_fg, onfinish);
}
-struct C_MDC_RetryDiscoverPath : public Context {
- MDCache *mdc;
+struct C_MDC_RetryDiscoverPath : public MDCacheContext {
CInode *base;
snapid_t snapid;
filepath path;
int from;
C_MDC_RetryDiscoverPath(MDCache *c, CInode *b, snapid_t s, filepath &p, int f) :
- mdc(c), base(b), snapid(s), path(p), from(f) {}
+ MDCacheContext(c), base(b), snapid(s), path(p), from(f) {}
void finish(int r) {
- mdc->discover_path(base, snapid, path, 0, from);
+ mdcache->discover_path(base, snapid, path, 0, from);
}
};
void MDCache::discover_path(CInode *base,
snapid_t snap,
filepath want_path,
- Context *onfinish,
+ MDSInternalContextBase *onfinish,
bool want_xlocked,
int from)
{
base->add_waiter(CInode::WAIT_SINGLEAUTH, onfinish);
return;
} else if (from == mds->get_nodeid()) {
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
base->take_waiting(CInode::WAIT_DIR, finished);
mds->queue_waiters(finished);
return;
base->add_dir_waiter(fg, onfinish);
}
-struct C_MDC_RetryDiscoverPath2 : public Context {
- MDCache *mdc;
+struct C_MDC_RetryDiscoverPath2 : public MDCacheContext {
CDir *base;
snapid_t snapid;
filepath path;
C_MDC_RetryDiscoverPath2(MDCache *c, CDir *b, snapid_t s, filepath &p) :
- mdc(c), base(b), snapid(s), path(p) {}
+ MDCacheContext(c), base(b), snapid(s), path(p) {}
void finish(int r) {
- mdc->discover_path(base, snapid, path, 0);
+ mdcache->discover_path(base, snapid, path, 0);
}
};
void MDCache::discover_path(CDir *base,
snapid_t snap,
filepath want_path,
- Context *onfinish,
+ MDSInternalContextBase *onfinish,
bool want_xlocked)
{
int from = base->authority().first;
base->add_waiter(CDir::WAIT_SINGLEAUTH, onfinish);
return;
} else if (from == mds->get_nodeid()) {
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
base->take_sub_waiting(finished);
mds->queue_waiters(finished);
return;
if (m->is_flag_error_dn())
dout(7) << " flag error, dentry = " << m->get_error_dentry() << dendl;
- list<Context*> finished, error;
+ list<MDSInternalContextBase*> finished, error;
int from = m->get_source().num();
// starting point
// REPLICAS
CDir *MDCache::add_replica_dir(bufferlist::iterator& p, CInode *diri, int from,
- list<Context*>& finished)
+ list<MDSInternalContextBase*>& finished)
{
dirfrag_t df;
::decode(df, p);
return dir;
}
-CDentry *MDCache::add_replica_dentry(bufferlist::iterator& p, CDir *dir, list<Context*>& finished)
+CDentry *MDCache::add_replica_dentry(bufferlist::iterator& p, CDir *dir, list<MDSInternalContextBase*>& finished)
{
string name;
snapid_t last;
return dn;
}
-CInode *MDCache::add_replica_inode(bufferlist::iterator& p, CDentry *dn, list<Context*>& finished)
+CInode *MDCache::add_replica_inode(bufferlist::iterator& p, CDentry *dn, list<MDSInternalContextBase*>& finished)
{
inodeno_t ino;
snapid_t last;
CDentry *MDCache::add_replica_stray(bufferlist &bl, int from)
{
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
bufferlist::iterator p = bl.begin();
CInode *mdsin = add_replica_inode(p, NULL, finished);
}
bufferlist::iterator p = m->bl.begin();
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
if (dn) {
if (m->get_is_primary()) {
// primary link.
*/
void MDCache::adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& resultfrags,
- list<Context*>& waiters,
+ list<MDSInternalContextBase*>& waiters,
bool replay)
{
dout(10) << "adjust_dir_fragments " << basefrag << " " << bits
dout(10) << "force_dir_fragment " << fg << " on " << *diri << dendl;
list<CDir*> src, result;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
// split a parent?
frag_t parent = diri->dirfragtree.get_branch_or_leaf(fg);
list<CDir*>& srcfrags,
frag_t basefrag, int bits,
list<CDir*>& resultfrags,
- list<Context*>& waiters,
+ list<MDSInternalContextBase*>& waiters,
bool replay)
{
dout(10) << "adjust_dir_fragments " << basefrag << " bits " << bits
}
-class C_MDC_FragmentFrozen : public Context {
+class C_MDC_FragmentFrozen : public MDSInternalContext {
MDCache *mdcache;
MDRequestRef mdr;
public:
C_MDC_FragmentFrozen(MDCache *m, MDRequestRef& r) :
- mdcache(m), mdr(r) {}
+ MDSInternalContext(m->mds), mdcache(m), mdr(r) {}
virtual void finish(int r) {
mdcache->fragment_frozen(mdr, r);
}
}
}
-class C_MDC_FragmentMarking : public Context {
- MDCache *mdcache;
+class C_MDC_FragmentMarking : public MDCacheContext {
MDRequestRef mdr;
public:
- C_MDC_FragmentMarking(MDCache *m, MDRequestRef& r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentMarking(MDCache *m, MDRequestRef& r) : MDCacheContext(m), mdr(r) {}
virtual void finish(int r) {
mdcache->fragment_mark_and_complete(mdr);
}
CInode *diri = info.dirs.front()->get_inode();
dout(10) << "fragment_mark_and_complete " << info.dirs << " on " << *diri << dendl;
- C_GatherBuilder gather(g_ceph_context);
+ MDSGatherBuilder gather(g_ceph_context);
for (list<CDir*>::iterator p = info.dirs.begin();
p != info.dirs.end();
}
}
-class C_MDC_FragmentPrep : public Context {
+class C_MDC_FragmentPrep : public MDSInternalContext {
MDCache *mdcache;
MDRequestRef mdr;
public:
- C_MDC_FragmentPrep(MDCache *m, MDRequestRef& r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentPrep(MDCache *m, MDRequestRef& r) : MDSInternalContext(m->mds), mdcache(m), mdr(r) {}
virtual void finish(int r) {
mdcache->_fragment_logged(mdr);
}
};
-class C_MDC_FragmentStore : public Context {
+class C_MDC_FragmentStore : public MDSInternalContext {
MDCache *mdcache;
MDRequestRef mdr;
public:
- C_MDC_FragmentStore(MDCache *m, MDRequestRef& r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentStore(MDCache *m, MDRequestRef& r) : MDSInternalContext(m->mds), mdcache(m), mdr(r) {}
virtual void finish(int r) {
mdcache->_fragment_stored(mdr);
}
};
-class C_MDC_FragmentCommit : public Context {
+class C_MDC_FragmentCommit : public MDSInternalContext {
MDCache *mdcache;
dirfrag_t basedirfrag;
list<CDir*> resultfrags;
public:
C_MDC_FragmentCommit(MDCache *m, dirfrag_t df, list<CDir*>& l) :
- mdcache(m), basedirfrag(df), resultfrags(l) {}
+ MDSInternalContext(m->mds), mdcache(m), basedirfrag(df), resultfrags(l) {}
virtual void finish(int r) {
mdcache->_fragment_committed(basedirfrag, resultfrags);
}
};
-class C_IO_MDC_FragmentFinish : public Context {
- MDCache *mdcache;
+class C_IO_MDC_FragmentFinish : public MDCacheIOContext {
dirfrag_t basedirfrag;
list<CDir*> resultfrags;
public:
C_IO_MDC_FragmentFinish(MDCache *m, dirfrag_t f, list<CDir*>& l) :
- mdcache(m), basedirfrag(f) {
+ MDCacheIOContext(m), basedirfrag(f) {
resultfrags.swap(l);
}
virtual void finish(int r) {
- Mutex::Locker l(mdcache->mds->mds_lock);
assert(r == 0 || r == -ENOENT);
mdcache->_fragment_finish(basedirfrag, resultfrags);
}
}
// refragment
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
adjust_dir_fragments(diri, info.dirs, basedirfrag.frag, info.bits,
info.resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
mdr->apply(); // mark scatterlock
// store resulting frags
- C_GatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr));
+ MDSGatherBuilder gather(g_ceph_context, new C_MDC_FragmentStore(this, mdr));
for (list<CDir*>::iterator p = info.resultfrags.begin();
p != info.resultfrags.end();
*/
// refragment
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
list<CDir*> resultfrags;
adjust_dir_fragments(diri, base, bits, resultfrags, waiters, false);
if (g_conf->mds_debug_frag)
list<CDir*> resultfrags;
if (uf.old_frags.empty()) {
// created by old format EFragment
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
adjust_dir_fragments(diri, p->first.frag, -uf.bits, resultfrags, waiters, true);
} else {
bufferlist::iterator bp = uf.rollback.begin();
C_MDS_RetryRequest::C_MDS_RetryRequest(MDCache *c, MDRequestRef& r)
- : cache(c), mdr(r)
+ : MDSInternalContext(c->mds), cache(c), mdr(r)
{}
void C_MDS_RetryRequest::finish(int r)
#include "include/Context.h"
#include "events/EMetaBlob.h"
#include "RecoveryQueue.h"
+#include "MDSContext.h"
#include "messages/MClientRequest.h"
#include "messages/MMDSSlaveRequest.h"
}
// waiters
- map<int, map<inodeno_t, list<Context*> > > waiting_for_base_ino;
+ map<int, map<inodeno_t, list<MDSInternalContextBase*> > > waiting_for_base_ino;
- void discover_base_ino(inodeno_t want_ino, Context *onfinish, int from=-1);
- void discover_dir_frag(CInode *base, frag_t approx_fg, Context *onfinish,
+ void discover_base_ino(inodeno_t want_ino, MDSInternalContextBase *onfinish, int from=-1);
+ void discover_dir_frag(CInode *base, frag_t approx_fg, MDSInternalContextBase *onfinish,
int from=-1);
- void discover_path(CInode *base, snapid_t snap, filepath want_path, Context *onfinish,
+ void discover_path(CInode *base, snapid_t snap, filepath want_path, MDSInternalContextBase *onfinish,
bool want_xlocked=false, int from=-1);
- void discover_path(CDir *base, snapid_t snap, filepath want_path, Context *onfinish,
+ void discover_path(CDir *base, snapid_t snap, filepath want_path, MDSInternalContextBase *onfinish,
bool want_xlocked=false);
void kick_discovers(int who); // after a failure.
uncommitted_masters[reqid].slaves = slaves;
uncommitted_masters[reqid].safe = safe;
}
- void wait_for_uncommitted_master(metareqid_t reqid, Context *c) {
+ void wait_for_uncommitted_master(metareqid_t reqid, MDSInternalContextBase *c) {
uncommitted_masters[reqid].waiters.push_back(c);
}
void log_master_commit(metareqid_t reqid);
struct umaster {
set<int> slaves;
LogSegment *ls;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
bool safe;
bool committing;
bool recovering;
vector<CInode*> rejoin_recover_q, rejoin_check_q;
list<SimpleLock*> rejoin_eval_locks;
- list<Context*> rejoin_waiters;
+ list<MDSInternalContextBase*> rejoin_waiters;
void rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin);
void handle_cache_rejoin(MMDSCacheRejoin *m);
private:
bool opening_root, open;
- list<Context*> waiting_for_open;
+ list<MDSInternalContextBase*> waiting_for_open;
public:
void init_layouts();
CInode *create_system_inode(inodeno_t ino, int mode);
CInode *create_root_inode();
- void create_empty_hierarchy(C_Gather *gather);
- void create_mydir_hierarchy(C_Gather *gather);
+ void create_empty_hierarchy(MDSGather *gather);
+ void create_mydir_hierarchy(MDSGather *gather);
bool is_open() { return open; }
- void wait_for_open(Context *c) {
+ void wait_for_open(MDSInternalContextBase *c) {
waiting_for_open.push_back(c);
}
- void open_root_inode(Context *c);
+ void open_root_inode(MDSInternalContextBase *c);
void open_root();
- void open_mydir_inode(Context *c);
+ void open_mydir_inode(MDSInternalContextBase *c);
void populate_mydir();
- void _create_system_file(CDir *dir, const char *name, CInode *in, Context *fin);
+ void _create_system_file(CDir *dir, const char *name, CInode *in, MDSInternalContextBase *fin);
void _create_system_file_finish(MutationRef& mut, CDentry *dn,
- version_t dpv, Context *fin);
+ version_t dpv, MDSInternalContextBase *fin);
- void open_foreign_mdsdir(inodeno_t ino, Context *c);
+ void open_foreign_mdsdir(inodeno_t ino, MDSInternalContextBase *c);
CDentry *get_or_create_stray_dentry(CInode *in);
- Context *_get_waiter(MDRequestRef& mdr, Message *req, Context *fin);
+ MDSInternalContextBase *_get_waiter(MDRequestRef& mdr, Message *req, MDSInternalContextBase *fin);
/**
* Find the given dentry (and whether it exists or not), its ancestors,
* If it returns 2 the request has been forwarded, and again the requester
* should unwind itself and back out.
*/
- int path_traverse(MDRequestRef& mdr, Message *req, Context *fin, const filepath& path,
+ int path_traverse(MDRequestRef& mdr, Message *req, MDSInternalContextBase *fin, const filepath& path,
vector<CDentry*> *pdnvec, CInode **pin, int onfail);
bool path_is_mine(filepath& path);
bool path_is_mine(string& p) {
CInode *cache_traverse(const filepath& path);
- void open_remote_dirfrag(CInode *diri, frag_t fg, Context *fin);
+ void open_remote_dirfrag(CInode *diri, frag_t fg, MDSInternalContextBase *fin);
CInode *get_dentry_inode(CDentry *dn, MDRequestRef& mdr, bool projected=false);
bool parallel_fetch(map<inodeno_t,filepath>& pathmap, set<inodeno_t>& missing);
set<CDir*>& fetch_queue, set<inodeno_t>& missing,
C_GatherBuilder &gather_bld);
- void open_remote_dentry(CDentry *dn, bool projected, Context *fin,
+ void open_remote_dentry(CDentry *dn, bool projected, MDSInternalContextBase *fin,
bool want_xlocked=false);
- void _open_remote_dentry_finish(CDentry *dn, inodeno_t ino, Context *fin,
+ void _open_remote_dentry_finish(CDentry *dn, inodeno_t ino, MDSInternalContextBase *fin,
bool want_xlocked, int r);
void make_trace(vector<CDentry*>& trace, CInode *in);
bool want_xlocked;
version_t tid;
int64_t pool;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
open_ino_info_t() : checking(-1), auth_hint(-1),
check_peers(true), fetch_backtrace(true), discover(false) {}
};
void _open_ino_parent_opened(inodeno_t ino, int ret);
void _open_ino_traverse_dir(inodeno_t ino, open_ino_info_t& info, int err);
void _open_ino_fetch_dir(inodeno_t ino, MMDSOpenIno *m, CDir *dir);
- Context* _open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m);
+ MDSInternalContextBase* _open_ino_get_waiter(inodeno_t ino, MMDSOpenIno *m);
int open_ino_traverse_dir(inodeno_t ino, MMDSOpenIno *m,
vector<inode_backpointer_t>& ancestors,
bool discover, bool want_xlocked, int *hint);
public:
void kick_open_ino_peers(int who);
- void open_ino(inodeno_t ino, int64_t pool, Context *fin,
+ void open_ino(inodeno_t ino, int64_t pool, MDSInternalContextBase *fin,
bool want_replica=true, bool want_xlocked=false);
// -- find_ino_peer --
struct find_ino_peer_info_t {
inodeno_t ino;
ceph_tid_t tid;
- Context *fin;
+ MDSInternalContextBase *fin;
int hint;
int checking;
set<int> checked;
map<ceph_tid_t, find_ino_peer_info_t> find_ino_peer;
ceph_tid_t find_ino_peer_last_tid;
- void find_ino_peers(inodeno_t ino, Context *c, int hint=-1);
+ void find_ino_peers(inodeno_t ino, MDSInternalContextBase *c, int hint=-1);
void _do_find_ino_peer(find_ino_peer_info_t& fip);
void handle_find_ino(MMDSFindIno *m);
void handle_find_ino_reply(MMDSFindInoReply *m);
in->encode_replica(to, bl);
}
- CDir* add_replica_dir(bufferlist::iterator& p, CInode *diri, int from, list<Context*>& finished);
+ CDir* add_replica_dir(bufferlist::iterator& p, CInode *diri, int from, list<MDSInternalContextBase*>& finished);
CDir* forge_replica_dir(CInode *diri, frag_t fg, int from);
- CDentry *add_replica_dentry(bufferlist::iterator& p, CDir *dir, list<Context*>& finished);
- CInode *add_replica_inode(bufferlist::iterator& p, CDentry *dn, list<Context*>& finished);
+ CDentry *add_replica_dentry(bufferlist::iterator& p, CDir *dir, list<MDSInternalContextBase*>& finished);
+ CInode *add_replica_inode(bufferlist::iterator& p, CDentry *dn, list<MDSInternalContextBase*>& finished);
void replicate_stray(CDentry *straydn, int who, bufferlist& bl);
CDentry *add_replica_stray(bufferlist &bl, int from);
int bits;
bool committed;
LogSegment *ls;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
list<frag_t> old_frags;
bufferlist rollback;
ufragment() : bits(0), committed(false), ls(NULL) {}
map<dirfrag_t,fragment_info_t> fragments;
void adjust_dir_fragments(CInode *diri, frag_t basefrag, int bits,
- list<CDir*>& frags, list<Context*>& waiters, bool replay);
+ list<CDir*>& frags, list<MDSInternalContextBase*>& waiters, bool replay);
void adjust_dir_fragments(CInode *diri,
list<CDir*>& srcfrags,
frag_t basefrag, int bits,
list<CDir*>& resultfrags,
- list<Context*>& waiters,
+ list<MDSInternalContextBase*>& waiters,
bool replay);
CDir *force_dir_fragment(CInode *diri, frag_t fg, bool replay=true);
void get_force_dirfrag_bound_set(vector<dirfrag_t>& dfs, set<CDir*>& bounds);
void finish_uncommitted_fragment(dirfrag_t basedirfrag, int op);
void rollback_uncommitted_fragment(dirfrag_t basedirfrag, list<frag_t>& old_frags);
public:
- void wait_for_uncommitted_fragment(dirfrag_t dirfrag, Context *c) {
+ void wait_for_uncommitted_fragment(dirfrag_t dirfrag, MDSInternalContextBase *c) {
assert(uncommitted_fragments.count(dirfrag));
uncommitted_fragments[dirfrag].waiters.push_back(c);
}
};
-class C_MDS_RetryRequest : public Context {
+class C_MDS_RetryRequest : public MDSInternalContext {
MDCache *cache;
MDRequestRef mdr;
public:
#include "MDS.h"
#include "MDCache.h"
#include "LogEvent.h"
+#include "MDSContext.h"
#include "osdc/Journaler.h"
#include "mds/JournalPointer.h"
g_ceph_context->get_perfcounters_collection()->add(logger);
}
-void MDLog::handle_journaler_write_error(int r)
-{
- if (r == -EBLACKLISTED) {
- derr << "we have been blacklisted (fenced), respawning..." << dendl;
- mds->respawn();
- } else {
- derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
- mds->suicide();
+class C_MDL_WriteError : public MDSIOContextBase {
+ protected:
+ MDLog *mdlog;
+ MDS *get_mds() {return mdlog->mds;}
+
+ void finish(int r) {
+ MDS *mds = get_mds();
+
+ if (r == -EBLACKLISTED) {
+ derr << "we have been blacklisted (fenced), respawning..." << dendl;
+ mds->respawn();
+ } else {
+ derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
+ mds->suicide();
+ }
}
-}
-void MDLog::write_head(Context *c)
+ public:
+ C_MDL_WriteError(MDLog *m) : mdlog(m) {}
+};
+
+
+void MDLog::write_head(MDSInternalContextBase *c)
{
- journaler->write_head(c);
+ MDSIOContext *fin = NULL;
+ if (c != NULL) {
+ fin = new C_IO_Wrapper(mds, c);
+ }
+ journaler->write_head(fin);
}
uint64_t MDLog::get_read_pos()
-void MDLog::create(Context *c)
+void MDLog::create(MDSInternalContextBase *c)
{
dout(5) << "create empty log" << dendl;
C_GatherBuilder gather(g_ceph_context);
- gather.set_finisher(c);
+ gather.set_finisher(new C_IO_Wrapper(mds, c));
// The inode of the default Journaler we will create
ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
submit_thread.create();
}
-void MDLog::open(Context *c)
+void MDLog::open(MDSInternalContextBase *c)
{
dout(5) << "open discovering log bounds" << dendl;
* Final part of reopen() procedure, after recovery_thread
* has done its thing we call append()
*/
-class C_ReopenComplete : public Context {
+class C_ReopenComplete : public MDSInternalContext {
MDLog *mdlog;
- Context *on_complete;
+ MDSInternalContextBase *on_complete;
public:
- C_ReopenComplete(MDLog *mdlog_, Context *on_complete_) : mdlog(mdlog_), on_complete(on_complete_) {}
+ C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog->mds), mdlog(mdlog_), on_complete(on_complete_) {}
void finish(int r) {
mdlog->append();
on_complete->complete(r);
* recovery procedure again, potentially reformatting the journal if it
* was in an old format.
*/
-void MDLog::reopen(Context *c)
+void MDLog::reopen(MDSInternalContextBase *c)
{
dout(5) << "reopen" << dendl;
delete le;
}
-void MDLog::_submit_entry(LogEvent *le, Context *c)
+void MDLog::_submit_entry(LogEvent *le, MDSInternalContextBase *c)
{
assert(submit_mutex.is_locked_by_me());
assert(!mds->is_any_replay());
ls->end = journaler->get_write_pos();
if (data.fin)
- journaler->wait_for_flush(data.fin);
+ journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
if (data.flush)
journaler->flush();
} else {
mds->mds_lock.Lock();
if (data.fin)
- journaler->wait_for_flush(data.fin);
+ journaler->wait_for_flush(new C_IO_Wrapper(mds, data.fin));
if (data.flush)
journaler->flush();
mds->mds_lock.Unlock();
submit_mutex.Unlock();
}
-void MDLog::wait_for_safe(Context *c)
+void MDLog::wait_for_safe(MDSInternalContextBase *c)
{
if (!g_conf->mds_log) {
// hack: bypass.
submit_mutex.Unlock();
if (no_pending && c)
- journaler->wait_for_flush(c);
+ journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
}
void MDLog::flush()
mds->mdcache->advance_stray();
}
-void MDLog::_journal_segment_subtree_map(Context *onsync)
+void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
{
assert(submit_mutex.is_locked_by_me());
void MDLog::try_expire(LogSegment *ls, int op_prio)
{
- C_GatherBuilder gather_bld(g_ceph_context);
+ MDSGatherBuilder gather_bld(g_ceph_context);
ls->try_to_expire(mds, gather_bld, op_prio);
if (gather_bld.has_subs()) {
-void MDLog::replay(Context *c)
+void MDLog::replay(MDSInternalContextBase *c)
{
assert(journaler->is_active());
assert(journaler->is_readonly());
* When this function completes, the `journaler` attribute will be set to
* a Journaler instance using the latest available serialization format.
*/
-void MDLog::_recovery_thread(Context *completion)
+void MDLog::_recovery_thread(MDSInternalContextBase *completion)
{
assert(journaler == NULL);
if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
if (recovery_result != 0) {
derr << "Error recovering journal " << jp.front << ": " << cpp_strerror(recovery_result) << dendl;
+ mds->mds_lock.Lock();
completion->complete(recovery_result);
+ mds->mds_lock.Unlock();
return;
}
* swapping pointers to make that one the front journal only when we have
* safely completed.
*/
-void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, Context *completion)
+void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
{
assert(!jp_in.is_null());
assert(completion != NULL);
friend class ReplayThread;
friend class C_MDL_Replay;
- list<Context*> waitfor_replay;
+ list<MDSInternalContextBase*> waitfor_replay;
void _replay(); // old way
void _replay_thread(); // new way
// Journal recovery/rewrite logic
class RecoveryThread : public Thread {
MDLog *log;
- Context *completion;
+ MDSInternalContextBase *completion;
public:
- void set_completion(Context *c) {completion = c;}
+ void set_completion(MDSInternalContextBase *c) {completion = c;}
RecoveryThread(MDLog *l) : log(l), completion(NULL) {}
void* entry() {
log->_recovery_thread(completion);
return 0;
}
} recovery_thread;
- void _recovery_thread(Context *completion);
- void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, Context *completion);
+ void _recovery_thread(MDSInternalContextBase *completion);
+ void _reformat_journal(JournalPointer const &jp, Journaler *old_journal, MDSInternalContextBase *completion);
// -- segments --
map<uint64_t,LogSegment*> segments;
struct PendingEvent {
LogEvent *le;
- Context *fin;
+ MDSInternalContextBase *fin;
bool flush;
- PendingEvent(LogEvent *e, Context *c, bool f=false) : le(e), fin(c), flush(f) {}
+ PendingEvent(LogEvent *e, MDSInternalContextBase *c, bool f=false) : le(e), fin(c), flush(f) {}
};
map<uint64_t,list<PendingEvent> > pending_events; // log segment -> event list
segments.erase(p);
}
- struct C_MDL_WriteError : public Context {
- MDLog *mdlog;
- C_MDL_WriteError(MDLog *m) : mdlog(m) {}
- void finish(int r) {
- mdlog->handle_journaler_write_error(r);
- }
- };
- void handle_journaler_write_error(int r);
-
public:
void create_logger();
// -- segments --
void _start_new_segment();
void _prepare_new_segment();
- void _journal_segment_subtree_map(Context *onsync);
+ void _journal_segment_subtree_map(MDSInternalContextBase *onsync);
public:
void start_new_segment() {
Mutex::Locker l(submit_mutex);
Mutex::Locker l(submit_mutex);
_prepare_new_segment();
}
- void journal_segment_subtree_map(Context *onsync=NULL) {
+ void journal_segment_subtree_map(MDSInternalContextBase *onsync=NULL) {
submit_mutex.Lock();
_journal_segment_subtree_map(onsync);
submit_mutex.Unlock();
_start_entry(e);
}
void cancel_entry(LogEvent *e);
- void _submit_entry(LogEvent *e, Context *c);
- void submit_entry(LogEvent *e, Context *c = 0) {
+ void _submit_entry(LogEvent *e, MDSInternalContextBase *c);
+ void submit_entry(LogEvent *e, MDSInternalContextBase *c = 0) {
Mutex::Locker l(submit_mutex);
_submit_entry(e, c);
submit_cond.Signal();
}
- void start_submit_entry(LogEvent *e, Context *c = 0) {
+ void start_submit_entry(LogEvent *e, MDSInternalContextBase *c = 0) {
Mutex::Locker l(submit_mutex);
_start_entry(e);
_submit_entry(e, c);
}
bool entry_is_open() { return cur_event != NULL; }
- void wait_for_safe( Context *c );
+ void wait_for_safe( MDSInternalContextBase *c );
void flush();
bool is_flushed() {
return unflushed == 0;
}
private:
- class C_MaybeExpiredSegment : public Context {
+ class C_MaybeExpiredSegment : public MDSInternalContext {
MDLog *mdlog;
LogSegment *ls;
int op_prio;
public:
- C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : mdlog(mdl), ls(s), op_prio(p) {}
+ C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) : MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
void finish(int res) {
mdlog->_maybe_expired(ls, op_prio);
}
void trim(int max=-1);
private:
- void write_head(Context *onfinish);
+ void write_head(MDSInternalContextBase *onfinish);
public:
- void create(Context *onfinish); // fresh, empty log!
- void open(Context *onopen); // append() or replay() to follow!
- void reopen(Context *onopen);
+ void create(MDSInternalContextBase *onfinish); // fresh, empty log!
+ void open(MDSInternalContextBase *onopen); // append() or replay() to follow!
+ void reopen(MDSInternalContextBase *onopen);
void append();
- void replay(Context *onfinish);
+ void replay(MDSInternalContextBase *onfinish);
void standby_trim_segments();
};
balancer->try_rebalance();
{
- map<epoch_t,list<Context*> >::iterator p = waiting_for_mdsmap.begin();
+ map<epoch_t,list<MDSInternalContextBase*> >::iterator p = waiting_for_mdsmap.begin();
while (p != waiting_for_mdsmap.end() && p->first <= mdsmap->get_epoch()) {
- list<Context*> ls;
+ list<MDSInternalContextBase*> ls;
ls.swap(p->second);
waiting_for_mdsmap.erase(p++);
finish_contexts(g_ceph_context, ls);
}
-class C_MDS_CreateFinish : public Context {
- MDS *mds;
+class C_MDS_CreateFinish : public MDSInternalContext {
public:
- C_MDS_CreateFinish(MDS *m) : mds(m) {}
+ C_MDS_CreateFinish(MDS *m) : MDSInternalContext(m) {}
void finish(int r) { mds->creating_done(); }
};
{
dout(3) << "boot_create" << dendl;
- C_GatherBuilder fin(g_ceph_context, new C_MDS_CreateFinish(this));
+ MDSGatherBuilder fin(g_ceph_context, new C_MDS_CreateFinish(this));
mdcache->init_layouts();
}
-class C_MDS_BootStart : public Context {
- MDS *mds;
+class C_MDS_BootStart : public MDSInternalContext {
MDS::BootStep nextstep;
public:
- C_MDS_BootStart(MDS *m, MDS::BootStep n) : mds(m), nextstep(n) {}
+ C_MDS_BootStart(MDS *m, MDS::BootStep n) : MDSInternalContext(m), nextstep(n) {}
void finish(int r) {
- Mutex::Locker l(mds->mds_lock);
mds->boot_start(nextstep, r);
}
};
{
mdcache->init_layouts();
- C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT), &finisher));
+ MDSGatherBuilder gather(g_ceph_context,
+ new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT));
dout(2) << "boot_start " << step << ": opening inotable" << dendl;
inotable->load(gather.new_sub());
{
dout(2) << "boot_start " << step << ": loading/discovering base inodes" << dendl;
- C_GatherBuilder gather(g_ceph_context, new C_OnFinisher(new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG), &finisher));
+ MDSGatherBuilder gather(g_ceph_context,
+ new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG));
mdcache->open_mydir_inode(gather.new_sub());
case MDS_BOOT_PREPARE_LOG:
if (is_any_replay()) {
dout(2) << "boot_start " << step << ": replaying mds log" << dendl;
- mdlog->replay(new C_OnFinisher(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE), &finisher));
+ mdlog->replay(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE));
} else {
dout(2) << "boot_start " << step << ": positioning at end of old mds log" << dendl;
mdlog->append();
calc_recovery_set();
// Check if we need to wait for a newer OSD map before starting
- Context *fin = new C_OnFinisher(new C_MDS_BootStart(this, MDS_BOOT_INITIAL),
- &finisher);
+ Context *fin = new C_OnFinisher(new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL)), &finisher);
bool const ready = objecter->wait_for_map(
mdsmap->get_last_failure_osd_epoch(),
fin);
}
-class MDS::C_MDS_StandbyReplayRestartFinish : public Context {
- MDS *mds;
+class MDS::C_MDS_StandbyReplayRestartFinish : public MDSIOContext {
uint64_t old_read_pos;
public:
C_MDS_StandbyReplayRestartFinish(MDS *mds_, uint64_t old_read_pos_) :
- mds(mds_), old_read_pos(old_read_pos_) {}
+ MDSIOContext(mds_), old_read_pos(old_read_pos_) {}
void finish(int r) {
mds->_standby_replay_restart_finish(r, old_read_pos);
}
} else {
/* We are transitioning out of standby: wait for OSD map update
before making final pass */
- Context *fin = new C_OnFinisher(
- new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG),
+ Context *fin = new C_OnFinisher(new C_IO_Wrapper(this,
+ new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG)),
&finisher);
bool const ready =
objecter->wait_for_map(mdsmap->get_last_failure_osd_epoch(), fin);
}
}
-class MDS::C_MDS_StandbyReplayRestart : public Context {
- MDS *mds;
+class MDS::C_MDS_StandbyReplayRestart : public MDSInternalContext {
public:
- C_MDS_StandbyReplayRestart(MDS *m) : mds(m) {}
+ C_MDS_StandbyReplayRestart(MDS *m) : MDSInternalContext(m) {}
void finish(int r) {
assert(!r);
mds->standby_replay_restart();
if (g_conf->mds_wipe_sessions) {
dout(1) << "wiping out client sessions" << dendl;
sessionmap.wipe();
- sessionmap.save(new C_NoopContext);
+ sessionmap.save(new C_MDSInternalNoop);
}
if (g_conf->mds_wipe_ino_prealloc) {
dout(1) << "wiping out ino prealloc from sessions" << dendl;
sessionmap.wipe_ino_prealloc();
- sessionmap.save(new C_NoopContext);
+ sessionmap.save(new C_MDSInternalNoop);
}
if (g_conf->mds_skip_ino) {
inodeno_t i = g_conf->mds_skip_ino;
dout(1) << "skipping " << i << " inodes" << dendl;
inotable->skip_inos(i);
- inotable->save(new C_NoopContext);
+ inotable->save(new C_MDSInternalNoop);
}
if (mdsmap->get_num_in_mds() == 1 &&
while (!finished_queue.empty()) {
dout(7) << "mds has " << finished_queue.size() << " queued contexts" << dendl;
dout(10) << finished_queue << dendl;
- list<Context*> ls;
+ list<MDSInternalContextBase*> ls;
ls.swap(finished_queue);
while (!ls.empty()) {
dout(10) << " finish " << ls.front() << dendl;
+
+ dout(1) << "trigger: " << this << dendl;
+ dout(1) << " " << this->mds_lock.is_locked() << dendl;
+ dout(1) << " " << this->mds_lock.is_locked_by_me() << dendl;
ls.front()->complete(0);
ls.pop_front();
class MDCache;
class MDLog;
class MDBalancer;
+class MDSInternalContextBase;
class CInode;
class CDir;
MDSMap::DaemonState state; // my confirmed state
MDSMap::DaemonState want_state; // the state i want
- list<Context*> waiting_for_active, waiting_for_replay, waiting_for_reconnect, waiting_for_resolve;
- list<Context*> replay_queue;
- map<int, list<Context*> > waiting_for_active_peer;
+ list<MDSInternalContextBase*> waiting_for_active, waiting_for_replay, waiting_for_reconnect, waiting_for_resolve;
+ list<MDSInternalContextBase*> replay_queue;
+ map<int, list<MDSInternalContextBase*> > waiting_for_active_peer;
list<Message*> waiting_for_nolaggy;
- map<epoch_t, list<Context*> > waiting_for_mdsmap;
+ map<epoch_t, list<MDSInternalContextBase*> > waiting_for_mdsmap;
map<int,version_t> peer_mdsmap_epoch;
ceph_tid_t last_tid; // for mds-initiated requests (e.g. stray rename)
public:
- void wait_for_active(Context *c) {
+ void wait_for_active(MDSInternalContextBase *c) {
waiting_for_active.push_back(c);
}
- void wait_for_active_peer(int who, Context *c) {
+ void wait_for_active_peer(int who, MDSInternalContextBase *c) {
waiting_for_active_peer[who].push_back(c);
}
- void wait_for_replay(Context *c) {
+ void wait_for_replay(MDSInternalContextBase *c) {
waiting_for_replay.push_back(c);
}
- void wait_for_reconnect(Context *c) {
+ void wait_for_reconnect(MDSInternalContextBase *c) {
waiting_for_reconnect.push_back(c);
}
- void wait_for_resolve(Context *c) {
+ void wait_for_resolve(MDSInternalContextBase *c) {
waiting_for_resolve.push_back(c);
}
- void wait_for_mdsmap(epoch_t e, Context *c) {
+ void wait_for_mdsmap(epoch_t e, MDSInternalContextBase *c) {
waiting_for_mdsmap[e].push_back(c);
}
- void enqueue_replay(Context *c) {
+ void enqueue_replay(MDSInternalContextBase *c) {
replay_queue.push_back(c);
}
// -- waiters --
- list<Context*> finished_queue;
+ list<MDSInternalContextBase*> finished_queue;
- void queue_waiter(Context *c) {
+ void queue_waiter(MDSInternalContextBase *c) {
finished_queue.push_back(c);
}
- void queue_waiters(list<Context*>& ls) {
+ void queue_waiters(list<MDSInternalContextBase*>& ls) {
finished_queue.splice( finished_queue.end(), ls );
}
bool queue_one_replay() {
bool is_laggy();
utime_t get_laggy_until() { return laggy_until; }
- class C_MDS_BeaconSender : public Context {
- MDS *mds;
+ class C_MDS_BeaconSender : public MDSInternalContext {
public:
- C_MDS_BeaconSender(MDS *m) : mds(m) {}
+ C_MDS_BeaconSender(MDS *m) : MDSInternalContext(m) {}
void finish(int r) {
mds->beacon_sender = 0;
mds->beacon_send();
} *beacon_sender;
// tick and other timer fun
- class C_MDS_Tick : public Context {
- MDS *mds;
+ class C_MDS_Tick : public MDSInternalContext {
public:
- C_MDS_Tick(MDS *m) : mds(m) {}
+ C_MDS_Tick(MDS *m) : MDSInternalContext(m) {}
void finish(int r) {
mds->tick_event = 0;
mds->tick();
} BootStep;
friend class C_MDS_BootStart;
+ friend class C_MDS_InternalBootStart;
void boot_start(BootStep step=MDS_BOOT_INITIAL, int r=0); // starting|replay
void calc_recovery_set();
public:
/* This expects to be given a reference which it is responsible for.
* The finish function calls functions which
* will put the Message exactly once.*/
-class C_MDS_RetryMessage : public Context {
+class C_MDS_RetryMessage : public MDSInternalContext {
Message *m;
- MDS *mds;
public:
- C_MDS_RetryMessage(MDS *mds, Message *m) {
+ C_MDS_RetryMessage(MDS *mds, Message *m) : MDSInternalContext(mds) {
assert(m);
this->m = m;
- this->mds = mds;
}
virtual void finish(int r) {
mds->inc_dispatch_depth();
}
};
-void MDSTable::save(Context *onfinish, version_t v)
+void MDSTable::save(MDSInternalContextBase *onfinish, version_t v)
{
if (v > 0 && v <= committing_version) {
dout(10) << "save v " << version << " - already saving "
assert(r >= 0);
committed_version = v;
- list<Context*> ls;
+ list<MDSInternalContextBase*> ls;
while (!waitfor_save.empty()) {
if (waitfor_save.begin()->first > v) break;
ls.splice(ls.end(), waitfor_save.begin()->second);
return object_t(n);
}
-void MDSTable::load(Context *onfinish)
+void MDSTable::load(MDSInternalContextBase *onfinish)
{
dout(10) << "load" << dendl;
#include "mdstypes.h"
#include "mds_table_types.h"
#include "include/buffer.h"
-#include "include/Context.h"
class MDS;
+class Context;
+class MDSInternalContextBase;
class MDSTable {
public:
version_t version, committing_version, committed_version, projected_version;
- map<version_t, list<Context*> > waitfor_save;
+ map<version_t, list<MDSInternalContextBase*> > waitfor_save;
public:
MDSTable(MDS *m, const char *n, bool is_per_mds) :
bool is_opening() { return state == STATE_OPENING; }
void reset();
- void save(Context *onfinish=0, version_t need=0);
+ void save(MDSInternalContextBase *onfinish=0, version_t need=0);
void save_2(int r, version_t v);
void shutdown() {
if (is_active()) save(0);
}
- void load(Context *onfinish);
+ void load(MDSInternalContextBase *onfinish);
void load_2(int, bufferlist&, Context *onfinish);
// child must overload these
#include "MDSMap.h"
-#include "include/Context.h"
+#include "MDSContext.h"
#include "msg/Messenger.h"
#include "MDS.h"
#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".tableclient(" << get_mdstable_name(table) << ") "
+class C_LoggedAck : public MDSInternalContext {
+ MDSTableClient *tc;
+ version_t tid;
+public:
+ C_LoggedAck(MDSTableClient *a, version_t t) : MDSInternalContext(a->mds), tc(a), tid(t) {}
+ void finish(int r) {
+ tc->_logged_ack(tid);
+ }
+};
+
+
void MDSTableClient::handle_request(class MMDSTableRequest *m)
{
dout(10) << "handle_request " << *m << dendl;
assert(g_conf->mds_kill_mdstable_at != 3);
- Context *onfinish = pending_prepare[reqid].onfinish;
+ MDSInternalContextBase *onfinish = pending_prepare[reqid].onfinish;
*pending_prepare[reqid].ptid = tid;
if (pending_prepare[reqid].pbl)
*pending_prepare[reqid].pbl = m->bl;
}
void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl,
- Context *onfinish)
+ MDSInternalContextBase *onfinish)
{
if (last_reqid == ~0ULL) {
dout(10) << "tableserver is not ready yet, waiting for request id" << dendl;
#define CEPH_MDSTABLECLIENT_H
#include "include/types.h"
-#include "include/Context.h"
+#include "MDSContext.h"
#include "mds_table_types.h"
class MDS;
// prepares
struct _pending_prepare {
- Context *onfinish;
+ MDSInternalContextBase *onfinish;
version_t *ptid;
bufferlist *pbl;
bufferlist mutation;
_pending_prepare() : onfinish(0), ptid(0), pbl(0) {}
- _pending_prepare(Context *c, version_t *pt, bufferlist *pb, bufferlist& m) :
+ _pending_prepare(MDSInternalContextBase *c, version_t *pt, bufferlist *pb, bufferlist& m) :
onfinish(c), ptid(pt), pbl(pb), mutation(m) {}
};
// pending commits
map<version_t, LogSegment*> pending_commit;
- map<version_t, list<Context*> > ack_waiters;
+ map<version_t, list<MDSInternalContextBase*> > ack_waiters;
void handle_reply(class MMDSTableQuery *m);
-
- class C_LoggedAck : public Context {
- MDSTableClient *tc;
- version_t tid;
- public:
- C_LoggedAck(MDSTableClient *a, version_t t) : tc(a), tid(t) {}
- void finish(int r) {
- tc->_logged_ack(tid);
- }
- };
void _logged_ack(version_t tid);
+ friend class C_LoggedAck;
public:
MDSTableClient(MDS *m, int tab) :
void handle_request(MMDSTableRequest *m);
- void _prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl, Context *onfinish);
+ void _prepare(bufferlist& mutation, version_t *ptid, bufferlist *pbl, MDSInternalContextBase *onfinish);
void commit(version_t tid, LogSegment *ls);
void resend_commits();
bool has_committed(version_t tid) {
return pending_commit.count(tid) == 0;
}
- void wait_for_ack(version_t tid, Context *c) {
+ void wait_for_ack(version_t tid, MDSInternalContextBase *c) {
ack_waiters[tid].push_back(c);
}
}
}
+class C_Prepare : public MDSInternalContext {
+ MDSTableServer *server;
+ MMDSTableRequest *req;
+ version_t tid;
+public:
+
+ C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : MDSInternalContext(s->mds), server(s), req(r), tid(v) {}
+ void finish(int r) {
+ server->_prepare_logged(req, tid);
+ }
+};
+
// prepare
/* This function DOES put the passed message before returning */
void MDSTableServer::handle_prepare(MMDSTableRequest *req)
req->put();
}
+class C_Commit : public MDSInternalContext {
+ MDSTableServer *server;
+ MMDSTableRequest *req;
+public:
+ C_Commit(MDSTableServer *s, MMDSTableRequest *r) : MDSInternalContext(s->mds), server(s), req(r) {}
+ void finish(int r) {
+ server->_commit_logged(req);
+ }
+};
// commit
/* This function DOES put the passed message before returning */
private:
void handle_prepare(MMDSTableRequest *m);
void _prepare_logged(MMDSTableRequest *m, version_t tid);
- struct C_Prepare : public Context {
- MDSTableServer *server;
- MMDSTableRequest *req;
- version_t tid;
- C_Prepare(MDSTableServer *s, MMDSTableRequest *r, version_t v) : server(s), req(r), tid(v) {}
- void finish(int r) {
- server->_prepare_logged(req, tid);
- }
- };
+ friend class C_Prepare;
void handle_commit(MMDSTableRequest *m);
void _commit_logged(MMDSTableRequest *m);
- struct C_Commit : public Context {
- MDSTableServer *server;
- MMDSTableRequest *req;
- C_Commit(MDSTableServer *s, MMDSTableRequest *r) : server(s), req(r) {}
- void finish(int r) {
- server->_commit_logged(req);
- }
- };
+ friend class C_Commit;
+
void handle_rollback(MMDSTableRequest *m);
#undef dout_prefix
#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".migrator "
+
+class MigratorContext : public MDSInternalContextBase {
+protected:
+ Migrator *mig;
+ MDS *get_mds() {
+ return mig->mds;
+ }
+public:
+ MigratorContext(Migrator *mig_) : mig(mig_) {
+ assert(mig != NULL);
+ }
+};
+
+
/* This function DOES put the passed message before returning*/
void Migrator::dispatch(Message *m)
{
}
-class C_MDC_EmptyImport : public Context {
- Migrator *mig;
+class C_MDC_EmptyImport : public MigratorContext {
CDir *dir;
public:
- C_MDC_EmptyImport(Migrator *m, CDir *d) : mig(m), dir(d) {}
+ C_MDC_EmptyImport(Migrator *m, CDir *d) : MigratorContext(m), dir(d) {}
void finish(int r) {
mig->export_empty_import(dir);
}
-class C_MDC_ExportFreeze : public Context {
- Migrator *mig;
+class C_MDC_ExportFreeze : public MigratorContext {
CDir *ex; // dir i'm exporting
uint64_t tid;
public:
C_MDC_ExportFreeze(Migrator *m, CDir *e, uint64_t t) :
- mig(m), ex(e), tid(t) {}
+ MigratorContext(m), ex(e), tid(t) {
+ assert(ex != NULL);
+ }
virtual void finish(int r) {
if (r >= 0)
mig->export_frozen(ex, tid);
m->put(); // done
}
-class C_M_ExportSessionsFlushed : public Context {
- Migrator *migrator;
+class C_M_ExportSessionsFlushed : public MigratorContext {
CDir *dir;
uint64_t tid;
public:
- C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t) :
- migrator(m), dir(d), tid(t) {}
+ C_M_ExportSessionsFlushed(Migrator *m, CDir *d, uint64_t t)
+ : MigratorContext(m), dir(d), tid(t) {
+ assert(dir != NULL);
+ }
void finish(int r) {
- migrator->export_sessions_flushed(dir, tid);
+ mig->export_sessions_flushed(dir, tid);
}
};
set<client_t> export_client_set;
get_export_client_set(dir, export_client_set);
- C_GatherBuilder gather(g_ceph_context);
+ MDSGatherBuilder gather(g_ceph_context);
mds->server->flush_client_sessions(export_client_set, gather);
if (gather.has_subs()) {
it->second.warning_ack_waiting.insert(-1);
}
-class C_M_ExportGo : public Context {
- Migrator *migrator;
+class C_M_ExportGo : public MigratorContext {
CDir *dir;
uint64_t tid;
public:
C_M_ExportGo(Migrator *m, CDir *d, uint64_t t) :
- migrator(m), dir(d), tid(t) {}
+ MigratorContext(m), dir(d), tid(t) {
+ assert(dir != NULL);
+ }
void finish(int r) {
- migrator->export_go_synced(dir, tid);
+ mig->export_go_synced(dir, tid);
}
};
void Migrator::finish_export_inode(CInode *in, utime_t now, int peer,
map<client_t,Capability::Import>& peer_imported,
- list<Context*>& finished)
+ list<MDSInternalContextBase*>& finished)
{
dout(12) << "finish_export_inode " << *in << dendl;
void Migrator::finish_export_dir(CDir *dir, utime_t now, int peer,
map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
- list<Context*>& finished, int *num_dentries)
+ list<MDSInternalContextBase*>& finished, int *num_dentries)
{
dout(10) << "finish_export_dir " << *dir << dendl;
finish_export_dir(*it, now, peer, peer_imported, finished, num_dentries);
}
-class C_MDS_ExportFinishLogged : public Context {
- Migrator *migrator;
+class C_MDS_ExportFinishLogged : public MigratorContext {
CDir *dir;
public:
- C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : migrator(m), dir(d) {}
+ C_MDS_ExportFinishLogged(Migrator *m, CDir *d) : MigratorContext(m), dir(d) {}
void finish(int r) {
- migrator->export_logged_finish(dir);
+ mig->export_logged_finish(dir);
}
};
// finish export (adjust local cache state)
int num_dentries = 0;
- C_Contexts *fin = new C_Contexts(g_ceph_context);
+ C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather> *fin = new C_ContextsBase<MDSInternalContextBase, MDSInternalContextGather>(g_ceph_context);
finish_export_dir(dir, ceph_clock_now(g_ceph_context), it->second.peer,
it->second.peer_imported, fin->contexts, &num_dentries);
dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
CDir *dir;
CInode *diri;
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
// assimilate root dir.
map<dirfrag_t,import_state_t>::iterator it = import_state.find(m->get_dirfrag());
-class C_MDS_ImportDirLoggedStart : public Context {
- Migrator *migrator;
+class C_MDS_ImportDirLoggedStart : public MigratorContext {
dirfrag_t df;
CDir *dir;
int from;
map<client_t,uint64_t> sseqmap;
C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) :
- migrator(m), df(d->dirfrag()), dir(d), from(f) {
+ MigratorContext(m), df(d->dirfrag()), dir(d), from(f) {
}
void finish(int r) {
- migrator->import_logged_start(df, dir, from, imported_client_map, sseqmap);
+ mig->import_logged_start(df, dir, from, imported_client_map, sseqmap);
}
};
assert(dir);
dout(7) << "import_reverse_unfreeze " << *dir << dendl;
dir->unfreeze_tree();
- list<Context*> ls;
+ list<MDSInternalContextBase*> ls;
mds->queue_waiters(ls);
cache->discard_delayed_expire(dir);
import_reverse_final(dir);
// take all waiters on this dir
// NOTE: a pass of imported data is guaranteed to get all of my waiters because
// a replica's presense in my cache implies/forces it's presense in authority's.
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
dir->take_waiting(CDir::WAIT_ANY_MASK, waiters);
- for (list<Context*>::iterator it = waiters.begin();
+ for (list<MDSInternalContextBase*>::iterator it = waiters.begin();
it != waiters.end();
++it)
import_root->add_waiter(CDir::WAIT_UNFREEZE, *it); // UNFREEZE will get kicked both on success or failure
mds->send_message_mds(ex, dest);
}
-class C_M_LoggedImportCaps : public Context {
- Migrator *migrator;
+class C_M_LoggedImportCaps : public MigratorContext {
CInode *in;
int from;
public:
map<client_t,entity_inst_t> client_map;
map<client_t,uint64_t> sseqmap;
- C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : migrator(m), in(i), from(f) {}
+ C_M_LoggedImportCaps(Migrator *m, CInode *i, int f) : MigratorContext(m), in(i), from(f) {}
void finish(int r) {
- migrator->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
+ mig->logged_import_caps(in, from, peer_exports, client_map, sseqmap);
}
};
set<int> warning_ack_waiting;
set<int> notify_ack_waiting;
map<inodeno_t,map<client_t,Capability::Import> > peer_imported;
- list<Context*> waiting_for_finish;
+ list<MDSInternalContextBase*> waiting_for_finish;
MutationRef mut;
// for freeze tree deadlock detection
utime_t last_cum_auth_pins_change;
map<client_t,entity_inst_t>& exported_client_map);
void finish_export_inode(CInode *in, utime_t now, int target,
map<client_t,Capability::Import>& peer_imported,
- list<Context*>& finished);
+ list<MDSInternalContextBase*>& finished);
void finish_export_inode_caps(CInode *in, int target,
map<client_t,Capability::Import>& peer_imported);
utime_t now);
void finish_export_dir(CDir *dir, utime_t now, int target,
map<inodeno_t,map<client_t,Capability::Import> >& peer_imported,
- list<Context*>& finished, int *num_dentries);
+ list<MDSInternalContextBase*>& finished, int *num_dentries);
- void add_export_finish_waiter(CDir *dir, Context *c) {
+ void add_export_finish_waiter(CDir *dir, MDSInternalContextBase *c) {
map<CDir*, export_state_t>::iterator it = export_state.find(dir);
assert(it != export_state.end());
it->second.waiting_for_finish.push_back(c);
friend class C_MDS_ExportFinishLogged;
friend class C_M_ExportGo;
friend class C_M_ExportSessionsFlushed;
+ friend class MigratorContext;
// importer
void handle_export_discover(MExportDirDiscover *m);
Context *slave_commit;
bufferlist rollback_bl;
- list<Context*> waiting_for_finish;
+ list<MDSInternalContextBase*> waiting_for_finish;
// export & fragment
CDir* export_dir;
#define dout_subsys ceph_subsys_mds
-struct C_MDC_Recover : public Context {
+class C_MDC_Recover : public MDSIOContextBase {
+protected:
RecoveryQueue *rq;
CInode *in;
- uint64_t size;
- utime_t mtime;
- C_MDC_Recover(RecoveryQueue *rq_, CInode *i) : rq(rq_), in(i), size(0) {}
void finish(int r) {
rq->_recovered(in, r, size, mtime);
}
+
+ MDS *get_mds() {
+ return rq->mds;
+ }
+
+public:
+ uint64_t size;
+ utime_t mtime;
+
+ C_MDC_Recover(RecoveryQueue *rq_, CInode *i) : rq(rq_), in(i), size(0) {
+ assert(rq != NULL);
+ }
};
::encode(s, bl);
}
- void decode_state_rejoin(bufferlist::iterator& p, list<Context*>& waiters) {
+ void decode_state_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters) {
SimpleLock::decode_state_rejoin(p, waiters);
if (is_flushing()) {
set_dirty();
#undef dout_prefix
#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".server "
+
+class C_ServerContext : public MDSInternalContextBase {
+ protected:
+ Server *server;
+ MDS *get_mds()
+ {
+ return server->mds;
+ }
+
+ public:
+ C_ServerContext(Server *s) : server(s) {
+ assert(server != NULL);
+ }
+};
+
+
void Server::create_logger()
{
PerfCountersBuilder plb(g_ceph_context, "mds_server", l_mdss_first, l_mdss_last);
// ----------------------------------------------------------
// SESSION management
-class C_MDS_session_finish : public Context {
- MDS *mds;
+class C_MDS_session_finish : public MDSInternalContext {
Session *session;
uint64_t state_seq;
bool open;
Context *fin;
public:
C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv, Context *fin_ = NULL) :
- mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0), fin(fin_) { }
+ MDSInternalContext(m), session(se), state_seq(sseq), open(s), cmapv(mv), inotablev(0), fin(fin_) { }
C_MDS_session_finish(MDS *m, Session *se, uint64_t sseq, bool s, version_t mv, interval_set<inodeno_t>& i, version_t iv, Context *fin_ = NULL) :
- mds(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv), fin(fin_) { }
+ MDSInternalContext(m), session(se), state_seq(sseq), open(s), cmapv(mv), inos(i), inotablev(iv), fin(fin_) { }
void finish(int r) {
assert(r == 0);
mds->server->_session_logged(session, state_seq, open, cmapv, inos, inotablev);
m->put();
}
-void Server::flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather)
+void Server::flush_client_sessions(set<client_t>& client_set, MDSGatherBuilder& gather)
{
for (set<client_t>::iterator p = client_set.begin(); p != client_set.end(); ++p) {
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(p->v));
void Server::finish_flush_session(Session *session, version_t seq)
{
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
session->finish_flush(seq, finished);
mds->queue_waiters(finished);
}
mds->sessionmap.version++;
}
-struct C_MDS_TerminatedSessions : public Context {
- Server *server;
- C_MDS_TerminatedSessions(Server *s) : server(s) {}
+class C_MDS_TerminatedSessions : public C_ServerContext {
void finish(int r) {
server->terminating_sessions = false;
}
+ public:
+ C_MDS_TerminatedSessions(Server *s) : C_ServerContext(s) {}
};
void Server::terminate_sessions()
}
}
+/*
+ * XXX bump in the interface here, not using an MDSInternalContextBase here
+ * because all the callers right now happen to use a SaferCond
+ */
void Server::kill_session(Session *session, Context *on_safe)
{
if ((session->is_opening() ||
/*******
* some generic stuff for finishing off requests
*/
-void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEvent *le, Context *fin)
+void Server::journal_and_reply(MDRequestRef& mdr, CInode *in, CDentry *dn, LogEvent *le, MDSInternalContextBase *fin)
{
dout(10) << "journal_and_reply tracei " << in << " tracedn " << dn << dendl;
mdlog->flush();
}
-class C_IO_MarkEvent : public Context
+class C_MarkEvent : public MDSInternalContext
{
- MDS *mds;
Context *true_finisher;
MDRequestRef mdr;
string event_str;
public:
- C_IO_MarkEvent(MDS *mds_, Context *f, MDRequestRef& _mdr,
+ C_MarkEvent(MDS *mds_, Context *f, MDRequestRef& _mdr,
const char *evt)
- : mds(mds_), true_finisher(f), mdr(_mdr),
+ : MDSInternalContext(mds_), true_finisher(f), mdr(_mdr),
event_str("journal_committed: ") {
event_str += evt;
}
virtual void finish(int r) {
- Mutex::Locker l(mds->mds_lock);
mdr->mark_event(event_str);
true_finisher->complete(r);
}
};
-void Server::submit_mdlog_entry(LogEvent *le, Context *fin, MDRequestRef& mdr,
+
+void Server::submit_mdlog_entry(LogEvent *le, MDSInternalContextBase *fin, MDRequestRef& mdr,
const char *event)
{
if (mdr) {
string event_str("submit entry: ");
event_str += event;
mdr->mark_event(event_str);
- mdlog->submit_entry(le, new C_IO_MarkEvent(mds, fin, mdr, event));
+ mdlog->submit_entry(le, new C_MarkEvent(mds, fin, mdr, event));
} else
mdlog->submit_entry(le, fin);
}
return dir;
}
-class C_MDS_TryFindInode : public Context {
- Server *server;
+class C_MDS_TryFindInode : public C_ServerContext {
MDRequestRef mdr;
public:
- C_MDS_TryFindInode(Server *s, MDRequestRef& r) : server(s), mdr(r) {}
+ C_MDS_TryFindInode(Server *s, MDRequestRef& r) : C_ServerContext(s), mdr(r) {}
virtual void finish(int r) {
if (r == -ESTALE) // :( find_ino_peers failed
server->reply_request(mdr, r);
reply_request(mdr, r, NULL, no_lookup ? NULL : mdr->dn[n][mdr->dn[n].size()-1]);
} else if (r == -ESTALE) {
dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
- Context *c = new C_MDS_TryFindInode(this, mdr);
+ MDSInternalContextBase *c = new C_MDS_TryFindInode(this, mdr);
mdcache->find_ino_peers(refpath.get_ino(), c);
} else {
dout(10) << "FAIL on error " << r << dendl;
is_lookup ? mdr->dn[0].back() : 0);
}
-struct C_MDS_LookupIno2 : public Context {
- Server *server;
+struct C_MDS_LookupIno2 : public C_ServerContext {
MDRequestRef mdr;
- C_MDS_LookupIno2(Server *s, MDRequestRef& r) : server(s), mdr(r) {}
+ C_MDS_LookupIno2(Server *s, MDRequestRef& r) : C_ServerContext(s), mdr(r) {}
void finish(int r) {
server->_lookup_ino_2(mdr, r);
}
reply_request(mdr, 0, cur, dn);
}
-class C_MDS_openc_finish : public Context {
- MDS *mds;
+class C_MDS_openc_finish : public MDSInternalContext {
MDRequestRef mdr;
CDentry *dn;
CInode *newi;
snapid_t follows;
public:
C_MDS_openc_finish(MDS *m, MDRequestRef& r, CDentry *d, CInode *ni, snapid_t f) :
- mds(m), mdr(r), dn(d), newi(ni), follows(f) {}
+ MDSInternalContext(m), mdr(r), dn(d), newi(ni), follows(f) {}
void finish(int r) {
assert(r == 0);
if (r < 0 && r != -ENOENT) {
if (r == -ESTALE) {
dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
- Context *c = new C_MDS_TryFindInode(this, mdr);
+ MDSInternalContextBase *c = new C_MDS_TryFindInode(this, mdr);
mdcache->find_ino_peers(req->get_filepath().get_ino(), c);
} else {
dout(10) << "FAIL on error " << r << dendl;
// already issued caps and leases, reply immediately.
if (dnbl.length() > 0) {
- mdcache->open_remote_dentry(dn, dnp, new C_NoopContext);
+ mdcache->open_remote_dentry(dn, dnp, new C_MDSInternalNoop);
dout(10) << " open remote dentry after caps were issued, stopping at "
<< dnbl.length() << " < " << bytes_left << dendl;
break;
/*
* finisher for basic inode updates
*/
-class C_MDS_inode_update_finish : public Context {
- MDS *mds;
+class C_MDS_inode_update_finish : public MDSInternalContext {
MDRequestRef mdr;
CInode *in;
bool truncating_smaller, changed_ranges;
public:
C_MDS_inode_update_finish(MDS *m, MDRequestRef& r, CInode *i,
bool sm=false, bool cr=false) :
- mds(m), mdr(r), in(i), truncating_smaller(sm), changed_ranges(cr) { }
+ MDSInternalContext(m), mdr(r), in(i), truncating_smaller(sm), changed_ranges(cr) { }
void finish(int r) {
assert(r == 0);
dout(10) << " state prior to lock change: " << *lock_state << dendl;
if (CEPH_LOCK_UNLOCK == set_lock.type) {
list<ceph_filelock> activated_locks;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
if (lock_state->is_waiting(set_lock)) {
dout(10) << " unlock removing waiting lock " << set_lock << dendl;
lock_state->remove_waiting(set_lock);
reply_request(mdr, -ENODATA);
}
-class C_MDS_inode_xattr_update_finish : public Context {
- MDS *mds;
+class C_MDS_inode_xattr_update_finish : public MDSInternalContext {
MDRequestRef mdr;
CInode *in;
public:
C_MDS_inode_xattr_update_finish(MDS *m, MDRequestRef& r, CInode *i) :
- mds(m), mdr(r), in(i) { }
+ MDSInternalContext(m), mdr(r), in(i) { }
void finish(int r) {
assert(r == 0);
// MKNOD
-class C_MDS_mknod_finish : public Context {
- MDS *mds;
+class C_MDS_mknod_finish : public MDSInternalContext {
MDRequestRef mdr;
CDentry *dn;
CInode *newi;
snapid_t follows;
public:
C_MDS_mknod_finish(MDS *m, MDRequestRef& r, CDentry *d, CInode *ni, snapid_t f) :
- mds(m), mdr(r), dn(d), newi(ni), follows(f) {}
+ MDSInternalContext(m), mdr(r), dn(d), newi(ni), follows(f) {}
void finish(int r) {
assert(r == 0);
}
-class C_MDS_link_local_finish : public Context {
- MDS *mds;
+class C_MDS_link_local_finish : public MDSInternalContext {
MDRequestRef mdr;
CDentry *dn;
CInode *targeti;
public:
C_MDS_link_local_finish(MDS *m, MDRequestRef& r, CDentry *d, CInode *ti,
version_t dnpv_, version_t tipv_) :
- mds(m), mdr(r), dn(d), targeti(ti),
+ MDSInternalContext(m), mdr(r), dn(d), targeti(ti),
dnpv(dnpv_), tipv(tipv_) { }
void finish(int r) {
assert(r == 0);
// link / unlink remote
-class C_MDS_link_remote_finish : public Context {
- MDS *mds;
+class C_MDS_link_remote_finish : public MDSInternalContext {
MDRequestRef mdr;
bool inc;
CDentry *dn;
version_t dpv;
public:
C_MDS_link_remote_finish(MDS *m, MDRequestRef& r, bool i, CDentry *d, CInode *ti) :
- mds(m), mdr(r), inc(i), dn(d), targeti(ti),
+ MDSInternalContext(m), mdr(r), inc(i), dn(d), targeti(ti),
dpv(d->get_projected_version()) {}
void finish(int r) {
assert(r == 0);
// remote linking/unlinking
-class C_MDS_SlaveLinkPrep : public Context {
- Server *server;
+class C_MDS_SlaveLinkPrep : public C_ServerContext {
MDRequestRef mdr;
CInode *targeti;
public:
C_MDS_SlaveLinkPrep(Server *s, MDRequestRef& r, CInode *t) :
- server(s), mdr(r), targeti(t) { }
+ C_ServerContext(s), mdr(r), targeti(t) { }
void finish(int r) {
assert(r == 0);
server->_logged_slave_link(mdr, targeti);
}
};
-class C_MDS_SlaveLinkCommit : public Context {
- Server *server;
+class C_MDS_SlaveLinkCommit : public C_ServerContext {
MDRequestRef mdr;
CInode *targeti;
public:
C_MDS_SlaveLinkCommit(Server *s, MDRequestRef& r, CInode *t) :
- server(s), mdr(r), targeti(t) { }
+ C_ServerContext(s), mdr(r), targeti(t) { }
void finish(int r) {
server->_commit_slave_link(mdr, r, targeti);
}
}
-struct C_MDS_CommittedSlave : public Context {
- Server *server;
+struct C_MDS_CommittedSlave : public C_ServerContext {
MDRequestRef mdr;
- C_MDS_CommittedSlave(Server *s, MDRequestRef& m) : server(s), mdr(m) {}
+ C_MDS_CommittedSlave(Server *s, MDRequestRef& m) : C_ServerContext(s), mdr(m) {}
void finish(int r) {
server->_committed_slave(mdr);
}
mds->mdcache->request_finish(mdr);
}
-struct C_MDS_LoggedLinkRollback : public Context {
- Server *server;
+struct C_MDS_LoggedLinkRollback : public C_ServerContext {
MutationRef mut;
MDRequestRef mdr;
- C_MDS_LoggedLinkRollback(Server *s, MutationRef& m, MDRequestRef& r) : server(s), mut(m), mdr(r) {}
+ C_MDS_LoggedLinkRollback(Server *s, MutationRef& m, MDRequestRef& r) : C_ServerContext(s), mut(m), mdr(r) {}
void finish(int r) {
server->_link_rollback_finish(mut, mdr);
}
_unlink_local(mdr, dn, straydn);
}
-class C_MDS_unlink_local_finish : public Context {
- MDS *mds;
+class C_MDS_unlink_local_finish : public MDSInternalContext {
MDRequestRef mdr;
CDentry *dn;
CDentry *straydn;
version_t dnpv; // deleted dentry
public:
C_MDS_unlink_local_finish(MDS *m, MDRequestRef& r, CDentry *d, CDentry *sd) :
- mds(m), mdr(r), dn(d), straydn(sd),
+ MDSInternalContext(m), mdr(r), dn(d), straydn(sd),
dnpv(d->get_projected_version()) {}
void finish(int r) {
assert(r == 0);
return true;
}
-struct C_MDS_SlaveRmdirPrep : public Context {
- Server *server;
+struct C_MDS_SlaveRmdirPrep : public C_ServerContext {
MDRequestRef mdr;
CDentry *dn, *straydn;
C_MDS_SlaveRmdirPrep(Server *s, MDRequestRef& r, CDentry *d, CDentry *st)
- : server(s), mdr(r), dn(d), straydn(st) {}
+ : C_ServerContext(s), mdr(r), dn(d), straydn(st) {}
void finish(int r) {
server->_logged_slave_rmdir(mdr, dn, straydn);
}
}
}
-struct C_MDS_LoggedRmdirRollback : public Context {
- Server *server;
+struct C_MDS_LoggedRmdirRollback : public C_ServerContext {
MDRequestRef mdr;
metareqid_t reqid;
CDentry *dn;
CDentry *straydn;
C_MDS_LoggedRmdirRollback(Server *s, MDRequestRef& m, metareqid_t mr, CDentry *d, CDentry *st)
- : server(s), mdr(m), reqid(mr), dn(d), straydn(st) {}
+ : C_ServerContext(s), mdr(m), reqid(mr), dn(d), straydn(st) {}
void finish(int r) {
server->_rmdir_rollback_finish(mdr, reqid, dn, straydn);
}
// ======================================================
-class C_MDS_rename_finish : public Context {
- MDS *mds;
+class C_MDS_rename_finish : public MDSInternalContext {
MDRequestRef mdr;
CDentry *srcdn;
CDentry *destdn;
public:
C_MDS_rename_finish(MDS *m, MDRequestRef& r,
CDentry *sdn, CDentry *ddn, CDentry *stdn) :
- mds(m), mdr(r),
+ MDSInternalContext(m), mdr(r),
srcdn(sdn), destdn(ddn), straydn(stdn) { }
void finish(int r) {
assert(r == 0);
if (r < 0) {
if (r == -ESTALE) {
dout(10) << "FAIL on ESTALE but attempting recovery" << dendl;
- Context *c = new C_MDS_TryFindInode(this, mdr);
- mdcache->find_ino_peers(srcpath.get_ino(), c);
+ mdcache->find_ino_peers(srcpath.get_ino(), new C_MDS_TryFindInode(this, mdr));
} else {
dout(10) << "FAIL on error " << r << dendl;
reply_request(mdr, r);
-
-
// ------------
// SLAVE
-class C_MDS_SlaveRenamePrep : public Context {
- Server *server;
+class C_MDS_SlaveRenamePrep : public C_ServerContext {
MDRequestRef mdr;
CDentry *srcdn, *destdn, *straydn;
public:
C_MDS_SlaveRenamePrep(Server *s, MDRequestRef& m, CDentry *sr, CDentry *de, CDentry *st) :
- server(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {}
+ C_ServerContext(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {}
void finish(int r) {
server->_logged_slave_rename(mdr, srcdn, destdn, straydn);
}
};
-class C_MDS_SlaveRenameCommit : public Context {
- Server *server;
+class C_MDS_SlaveRenameCommit : public C_ServerContext {
MDRequestRef mdr;
CDentry *srcdn, *destdn, *straydn;
public:
C_MDS_SlaveRenameCommit(Server *s, MDRequestRef& m, CDentry *sr, CDentry *de, CDentry *st) :
- server(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {}
+ C_ServerContext(s), mdr(m), srcdn(sr), destdn(de), straydn(st) {}
void finish(int r) {
server->_commit_slave_rename(mdr, r, srcdn, destdn, straydn);
}
};
-class C_MDS_SlaveRenameSessionsFlushed : public Context {
- Server *server;
+class C_MDS_SlaveRenameSessionsFlushed : public C_ServerContext {
MDRequestRef mdr;
public:
C_MDS_SlaveRenameSessionsFlushed(Server *s, MDRequestRef& r) :
- server(s), mdr(r) {}
+ C_ServerContext(s), mdr(r) {}
void finish(int r) {
server->_slave_rename_sessions_flushed(mdr);
}
set<client_t> export_client_set;
mdcache->migrator->get_export_client_set(srcdnl->get_inode(), export_client_set);
- C_GatherBuilder gather(g_ceph_context);
+ MDSGatherBuilder gather(g_ceph_context);
flush_client_sessions(export_client_set, gather);
if (gather.has_subs()) {
mdr->more()->waiting_on_slave.insert(-1);
CDentry::linkage_t *destdnl = destdn->get_linkage();
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
if (r == 0) {
// unfreeze+singleauth inode
// hmm, do i really need to delay this?
mut->add_updated_lock(&dir->get_inode()->nestlock);
}
-struct C_MDS_LoggedRenameRollback : public Context {
- Server *server;
+struct C_MDS_LoggedRenameRollback : public C_ServerContext {
MutationRef mut;
MDRequestRef mdr;
CDentry *srcdn;
bool finish_mdr;
C_MDS_LoggedRenameRollback(Server *s, MutationRef& m, MDRequestRef& r,
CDentry *sd, version_t pv, CDentry *dd,
- CDentry *st, bool f) :
- server(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd),
+ CDentry *st, bool f) :
+ C_ServerContext(s), mut(m), mdr(r), srcdn(sd), srcdnpv(pv), destdn(dd),
straydn(st), finish_mdr(f) {}
void finish(int r) {
server->_rename_rollback_finish(mut, mdr, srcdn, srcdnpv,
assert(!le->commit.empty());
if (mdr)
mdr->more()->slave_update_journaled = false;
- Context *fin = new C_MDS_LoggedRenameRollback(this, mut, mdr, srcdn, srcdnpv,
+ MDSInternalContextBase *fin = new C_MDS_LoggedRenameRollback(this, mut, mdr, srcdn, srcdnpv,
destdn, straydn, finish_mdr);
submit_mdlog_entry(le, fin, mdr, __func__);
mdlog->flush();
}
if (mdr) {
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
if (mdr->more()->is_ambiguous_auth) {
if (srcdn->is_auth())
mdr->more()->rename_inode->unfreeze_inode(finished);
// MKSNAP
-struct C_MDS_mksnap_finish : public Context {
- MDS *mds;
+struct C_MDS_mksnap_finish : public MDSInternalContext {
MDRequestRef mdr;
CInode *diri;
SnapInfo info;
C_MDS_mksnap_finish(MDS *m, MDRequestRef& r, CInode *di, SnapInfo &i) :
- mds(m), mdr(r), diri(di), info(i) {}
+ MDSInternalContext(m), mdr(r), diri(di), info(i) {}
void finish(int r) {
mds->server->_mksnap_finish(mdr, diri, info);
}
// RMSNAP
-struct C_MDS_rmsnap_finish : public Context {
- MDS *mds;
+struct C_MDS_rmsnap_finish : public MDSInternalContext {
MDRequestRef mdr;
CInode *diri;
snapid_t snapid;
C_MDS_rmsnap_finish(MDS *m, MDRequestRef& r, CInode *di, snapid_t sn) :
- mds(m), mdr(r), diri(di), snapid(sn) {}
+ MDSInternalContext(m), mdr(r), diri(di), snapid(sn) {}
void finish(int r) {
mds->server->_rmsnap_finish(mdr, diri, snapid);
}
};
class Server {
+public:
+ // XXX FIXME: can probably friend enough contexts to make this not need to be public
MDS *mds;
+private:
MDCache *mdcache;
MDLog *mdlog;
Messenger *messenger;
void finish_force_open_sessions(map<client_t,entity_inst_t> &cm,
map<client_t,uint64_t>& sseqmap,
bool dec_import=true);
- void flush_client_sessions(set<client_t>& client_set, C_GatherBuilder& gather);
+ void flush_client_sessions(set<client_t>& client_set, MDSGatherBuilder& gather);
void finish_flush_session(Session *session, version_t seq);
void terminate_sessions();
void find_idle_sessions();
void handle_client_request(MClientRequest *m);
void journal_and_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn,
- LogEvent *le, Context *fin);
- void submit_mdlog_entry(LogEvent *le, Context *fin,
+ LogEvent *le, MDSInternalContextBase *fin);
+ void submit_mdlog_entry(LogEvent *le, MDSInternalContextBase *fin,
MDRequestRef& mdr, const char *evt);
void dispatch_client_request(MDRequestRef& mdr);
void early_reply(MDRequestRef& mdr, CInode *tracei, CDentry *tracedn);
}
};
-void SessionMap::load(Context *onload)
+void SessionMap::load(MDSInternalContextBase *onload)
{
dout(10) << "load" << dendl;
}
};
-void SessionMap::save(Context *onsave, version_t needv)
+void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
{
dout(10) << "save needv " << needv << ", v " << version << dendl;
// -- caps --
private:
version_t cap_push_seq; // cap push seq #
- map<version_t, list<Context*> > waitfor_flush; // flush session messages
+ map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
public:
xlist<Capability*> caps; // inodes with caps; front=most recently used
xlist<ClientLease*> leases; // metadata leases to clients
version_t inc_push_seq() { return ++cap_push_seq; }
version_t get_push_seq() const { return cap_push_seq; }
- version_t wait_for_flush(Context* c) {
+ version_t wait_for_flush(MDSInternalContextBase* c) {
waitfor_flush[get_push_seq()].push_back(c);
return get_push_seq();
}
- void finish_flush(version_t seq, list<Context*>& ls) {
+ void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
while (!waitfor_flush.empty()) {
if (waitfor_flush.begin()->first > seq)
break;
public: // i am lazy
version_t version, projected, committing, committed;
- map<version_t, list<Context*> > commit_waiters;
+ map<version_t, list<MDSInternalContextBase*> > commit_waiters;
public:
SessionMap(MDS *m) : mds(m),
// -- loading, saving --
inodeno_t ino;
- list<Context*> waiting_for_load;
+ list<MDSInternalContextBase*> waiting_for_load;
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& blp);
object_t get_object_name();
- void load(Context *onload);
+ void load(MDSInternalContextBase *onload);
void _load_finish(int r, bufferlist &bl);
- void save(Context *onsave, version_t needv=0);
+ void save(MDSInternalContextBase *onsave, version_t needv=0);
void _save_finish(version_t v);
};
#ifndef CEPH_SIMPLELOCK_H
#define CEPH_SIMPLELOCK_H
+#include "MDSContext.h"
+
// -- lock types --
// see CEPH_LOCK_*
void finish_waiters(uint64_t mask, int r=0) {
parent->finish_waiting(mask << get_wait_shift(), r);
}
- void take_waiting(uint64_t mask, list<Context*>& ls) {
+ void take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls) {
parent->take_waiting(mask << get_wait_shift(), ls);
}
- void add_waiter(uint64_t mask, Context *c) {
+ void add_waiter(uint64_t mask, MDSInternalContextBase *c) {
parent->add_waiter(mask << get_wait_shift(), c);
}
bool is_waiter_for(uint64_t mask) const {
//assert(!is_stable() || gather_set.size() == 0); // gather should be empty in stable states.
return s;
}
- void set_state_rejoin(int s, list<Context*>& waiters) {
+ void set_state_rejoin(int s, list<MDSInternalContextBase*>& waiters) {
if (!is_stable() && get_parent()->is_auth()) {
state = s;
get_parent()->auth_unpin(this);
if (is_new)
state = s;
}
- void decode_state_rejoin(bufferlist::iterator& p, list<Context*>& waiters) {
+ void decode_state_rejoin(bufferlist::iterator& p, list<MDSInternalContextBase*>& waiters) {
__s16 s;
::decode(s, p);
set_state_rejoin(s, waiters);
#include "MDSTableClient.h"
#include "snap.h"
-class Context;
+class MDSInternalContextBase;
class MDS;
class LogSegment;
void handle_query_result(MMDSTableRequest *m) {}
void prepare_create(inodeno_t dirino, const string& name, utime_t stamp,
- version_t *pstid, bufferlist *pbl, Context *onfinish) {
+ version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) {
bufferlist bl;
__u32 op = TABLE_OP_CREATE;
::encode(op, bl);
_prepare(bl, pstid, pbl, onfinish);
}
- void prepare_create_realm(inodeno_t ino, version_t *pstid, bufferlist *pbl, Context *onfinish) {
+ void prepare_create_realm(inodeno_t ino, version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) {
bufferlist bl;
__u32 op = TABLE_OP_CREATE;
::encode(op, bl);
_prepare(bl, pstid, pbl, onfinish);
}
- void prepare_destroy(inodeno_t ino, snapid_t snapid, version_t *pstid, bufferlist *pbl, Context *onfinish) {
+ void prepare_destroy(inodeno_t ino, snapid_t snapid, version_t *pstid, bufferlist *pbl, MDSInternalContextBase *onfinish) {
bufferlist bl;
__u32 op = TABLE_OP_DESTROY;
::encode(op, bl);
}
-
-
void SnapRealm::add_open_past_parent(SnapRealm *parent)
{
open_past_parents[parent->inode->ino()] = parent;
parent->inode->get(CInode::PIN_PASTSNAPPARENT);
}
-bool SnapRealm::_open_parents(Context *finish, snapid_t first, snapid_t last)
+bool SnapRealm::_open_parents(MDSInternalContextBase *finish, snapid_t first, snapid_t last)
{
dout(10) << "open_parents [" << first << "," << last << "]" << dendl;
if (open)
#include "include/elist.h"
#include "common/snap_types.h"
+
struct SnapRealm {
// realm state
return false;
}
- bool _open_parents(Context *retryorfinish, snapid_t first=1, snapid_t last=CEPH_NOSNAP);
- bool open_parents(Context *retryorfinish) {
+ bool _open_parents(MDSInternalContextBase *retryorfinish, snapid_t first=1, snapid_t last=CEPH_NOSNAP);
+ bool open_parents(MDSInternalContextBase *retryorfinish) {
if (!_open_parents(retryorfinish))
return false;
delete retryorfinish;
// -----------------------
// LogSegment
-void LogSegment::try_to_expire(MDS *mds, C_GatherBuilder &gather_bld, int op_prio)
+void LogSegment::try_to_expire(MDS *mds, MDSGatherBuilder &gather_bld, int op_prio)
{
set<CDir*> commit;
dout(10) << "EFragment.replay " << op_name(op) << " " << ino << " " << basefrag << " by " << bits << dendl;
list<CDir*> resultfrags;
- list<Context*> waiters;
+ list<MDSInternalContextBase*> waiters;
list<frag_t> old_frags;
// in may be NULL if it wasn't in our cache yet. if it's a prepare
#include "common/config.h"
#include "common/Clock.h"
#include "common/DecayCounter.h"
-#include "include/Context.h"
+#include "MDSContext.h"
#include "include/frag.h"
#include "include/xlist.h"
#include "include/assert.h"
#include "include/hash_namespace.h"
+
#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v011"
// ---------------------------------------------
// waiting
protected:
- multimap<uint64_t, Context*> waiting;
+ multimap<uint64_t, MDSInternalContextBase*> waiting;
public:
bool is_waiter_for(uint64_t mask, uint64_t min=0) {
while (min & (min-1)) // if more than one bit is set
min &= min-1; // clear LSB
}
- for (multimap<uint64_t,Context*>::iterator p = waiting.lower_bound(min);
+ for (multimap<uint64_t,MDSInternalContextBase*>::iterator p = waiting.lower_bound(min);
p != waiting.end();
++p) {
if (p->first & mask) return true;
}
return false;
}
- virtual void add_waiter(uint64_t mask, Context *c) {
+ virtual void add_waiter(uint64_t mask, MDSInternalContextBase *c) {
if (waiting.empty())
get(PIN_WAITER);
- waiting.insert(pair<uint64_t,Context*>(mask, c));
+ waiting.insert(pair<uint64_t,MDSInternalContextBase*>(mask, c));
// pdout(10,g_conf->debug_mds) << (mdsco_db_line_prefix(this))
// << "add_waiter " << hex << mask << dec << " " << c
// << " on " << *this
// << dendl;
}
- virtual void take_waiting(uint64_t mask, list<Context*>& ls) {
+ virtual void take_waiting(uint64_t mask, list<MDSInternalContextBase*>& ls) {
if (waiting.empty()) return;
- multimap<uint64_t,Context*>::iterator it = waiting.begin();
+ multimap<uint64_t,MDSInternalContextBase*>::iterator it = waiting.begin();
while (it != waiting.end()) {
if (it->first & mask) {
ls.push_back(it->second);
put(PIN_WAITER);
}
void finish_waiting(uint64_t mask, int result = 0) {
- list<Context*> finished;
+ list<MDSInternalContextBase*> finished;
take_waiting(mask, finished);
finish_contexts(g_ceph_context, finished, result);
}
virtual void encode_lock_state(int type, bufferlist& bl) { assert(0); }
virtual void decode_lock_state(int type, bufferlist& bl) { assert(0); }
virtual void finish_lock_waiters(int type, uint64_t mask, int r=0) { assert(0); }
- virtual void add_lock_waiter(int type, uint64_t mask, Context *c) { assert(0); }
+ virtual void add_lock_waiter(int type, uint64_t mask, MDSInternalContextBase *c) { assert(0); }
virtual bool is_lock_waiting(int type, uint64_t mask) { assert(0); return false; }
virtual void clear_dirty_scattered(int type) { assert(0); }