++p) {
dout(10) << "requesting remote auth_pins from mds" << p->first << dendl;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_AUTHPIN);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_AUTHPIN);
for (set<MDSCacheObject*>::iterator q = p->second.begin();
q != p->second.end();
++q) {
// send lock request
mut->more()->slaves.insert(target);
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, MMDSSlaveRequest::OP_WRLOCK);
+ MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+ MMDSSlaveRequest::OP_WRLOCK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
mds->send_message_mds(r, target);
dout(7) << "remote_wrlock_finish releasing remote wrlock on mds" << target
<< " " << *lock->get_parent() << dendl;
if (mds->mdsmap->get_state(target) >= MDSMap::STATE_REJOIN) {
- MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, MMDSSlaveRequest::OP_UNWRLOCK);
+ MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+ MMDSSlaveRequest::OP_UNWRLOCK);
slavereq->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(slavereq->get_object_info());
mds->send_message_mds(slavereq, target);
// send lock request
int auth = lock->get_parent()->authority().first;
mut->more()->slaves.insert(auth);
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, MMDSSlaveRequest::OP_XLOCK);
+ MMDSSlaveRequest *r = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+ MMDSSlaveRequest::OP_XLOCK);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
mds->send_message_mds(r, auth);
dout(7) << "xlock_finish releasing remote xlock on " << *lock->get_parent() << dendl;
int auth = lock->get_parent()->authority().first;
if (mds->mdsmap->get_state(auth) >= MDSMap::STATE_REJOIN) {
- MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, MMDSSlaveRequest::OP_UNXLOCK);
+ MMDSSlaveRequest *slavereq = new MMDSSlaveRequest(mut->reqid, mut->attempt,
+ MMDSSlaveRequest::OP_UNXLOCK);
slavereq->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(slavereq->get_object_info());
mds->send_message_mds(slavereq, auth);
dout(10) << "_logged_slave_commit from mds" << from << " " << reqid << dendl;
// send a message
- MMDSSlaveRequest *req = new MMDSSlaveRequest(reqid, MMDSSlaveRequest::OP_COMMITTED);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(reqid, 0, MMDSSlaveRequest::OP_COMMITTED);
mds->send_message_mds(req, from);
}
MDSCacheObjectInfo i;
(*q)->set_object_info(i);
if (i.ino)
- rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), p->second->reqid);
+ rejoin->add_inode_authpin(vinodeno_t(i.ino, i.snapid), p->second->reqid, p->second->attempt);
else
- rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, p->second->reqid);
+ rejoin->add_dentry_authpin(i.dirfrag, i.dname, i.snapid, p->second->reqid, p->second->attempt);
}
}
// xlocks
MDSCacheObjectInfo i;
(*q)->get_parent()->set_object_info(i);
if (i.ino)
- rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(), p->second->reqid);
+ rejoin->add_inode_xlock(vinodeno_t(i.ino, i.snapid), (*q)->get_type(),
+ p->second->reqid, p->second->attempt);
else
- rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid, p->second->reqid);
+ rejoin->add_dentry_xlock(i.dirfrag, i.dname, i.snapid,
+ p->second->reqid, p->second->attempt);
}
}
}
// dn auth_pin?
if (strong->authpinned_dentries.count(p->first) &&
strong->authpinned_dentries[p->first].count(q->first)) {
- metareqid_t ri = strong->authpinned_dentries[p->first][q->first];
- dout(10) << " dn authpin by " << ri << " on " << *dn << dendl;
+ MMDSCacheRejoin::slave_reqid r = strong->authpinned_dentries[p->first][q->first];
+ dout(10) << " dn authpin by " << r << " on " << *dn << dendl;
// get/create slave mdrequest
MDRequest *mdr;
- if (have_request(ri))
- mdr = request_get(ri);
+ if (have_request(r.reqid))
+ mdr = request_get(r.reqid);
else
- mdr = request_start_slave(ri, from);
+ mdr = request_start_slave(r.reqid, r.attempt, from);
mdr->auth_pin(dn);
}
// dn xlock?
if (strong->xlocked_dentries.count(p->first) &&
strong->xlocked_dentries[p->first].count(q->first)) {
- metareqid_t ri = strong->xlocked_dentries[p->first][q->first];
- dout(10) << " dn xlock by " << ri << " on " << *dn << dendl;
- MDRequest *mdr = request_get(ri); // should have this from auth_pin above.
+ MMDSCacheRejoin::slave_reqid r = strong->xlocked_dentries[p->first][q->first];
+ dout(10) << " dn xlock by " << r << " on " << *dn << dendl;
+ MDRequest *mdr = request_get(r.reqid); // should have this from auth_pin above.
assert(mdr->is_auth_pinned(dn));
dn->lock.set_state(LOCK_LOCK);
dn->lock.get_xlock(mdr, mdr->get_client());
// auth pin?
if (strong->authpinned_inodes.count(in->vino())) {
- metareqid_t ri = strong->authpinned_inodes[in->vino()];
- dout(10) << " inode authpin by " << ri << " on " << *in << dendl;
+ MMDSCacheRejoin::slave_reqid r = strong->authpinned_inodes[in->vino()];
+ dout(10) << " inode authpin by " << r << " on " << *in << dendl;
// get/create slave mdrequest
MDRequest *mdr;
- if (have_request(ri))
- mdr = request_get(ri);
+ if (have_request(r.reqid))
+ mdr = request_get(r.reqid);
else
- mdr = request_start_slave(ri, from);
+ mdr = request_start_slave(r.reqid, r.attempt, from);
mdr->auth_pin(in);
}
// xlock(s)?
if (strong->xlocked_inodes.count(in->vino())) {
- for (map<int,metareqid_t>::iterator r = strong->xlocked_inodes[in->vino()].begin();
+ for (map<int,MMDSCacheRejoin::slave_reqid>::iterator r = strong->xlocked_inodes[in->vino()].begin();
r != strong->xlocked_inodes[in->vino()].end();
++r) {
SimpleLock *lock = in->get_lock(r->first);
dout(10) << " inode xlock by " << r->second << " on " << *lock << " on " << *in << dendl;
- MDRequest *mdr = request_get(r->second); // should have this from auth_pin above.
+ MDRequest *mdr = request_get(r->second.reqid); // should have this from auth_pin above.
assert(mdr->is_auth_pinned(in));
lock->set_state(LOCK_LOCK);
if (lock == &in->filelock)
}
// register new client request
- MDRequest *mdr = new MDRequest(req->get_reqid(), req);
+ MDRequest *mdr = new MDRequest(req->get_reqid(), req->get_retry_attempt(), req);
active_requests[req->get_reqid()] = mdr;
dout(7) << "request_start " << *mdr << dendl;
return mdr;
}
-MDRequest *MDCache::request_start_slave(metareqid_t ri, int by)
+MDRequest *MDCache::request_start_slave(metareqid_t ri, __u32 attempt, int by)
{
- MDRequest *mdr = new MDRequest(ri, by);
+ MDRequest *mdr = new MDRequest(ri, attempt, by);
assert(active_requests.count(mdr->reqid) == 0);
active_requests[mdr->reqid] = mdr;
dout(7) << "request_start_slave " << *mdr << " by mds" << by << dendl;
for (set<int>::iterator p = mdr->more()->slaves.begin();
p != mdr->more()->slaves.end();
++p) {
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_FINISH);
+ MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_FINISH);
mds->send_message_mds(r, *p);
}
struct Mutation {
metareqid_t reqid;
+ __u32 attempt; // which attempt for this request
LogSegment *ls; // the log segment i'm committing to
utime_t now;
list<pair<CDentry*,version_t> > dirty_cow_dentries;
Mutation() :
+ attempt(0),
ls(0),
slave_to_mds(-1),
done_locking(false), committing(false), aborted(false) { }
- Mutation(metareqid_t ri, int slave_to=-1) :
- reqid(ri),
+ Mutation(metareqid_t ri, __u32 att=0, int slave_to=-1) :
+ reqid(ri), attempt(att),
ls(0),
slave_to_mds(slave_to),
done_locking(false), committing(false), aborted(false) { }
_more(0) {
in[0] = in[1] = 0;
}
- MDRequest(metareqid_t ri, MClientRequest *req) :
- Mutation(ri),
+ MDRequest(metareqid_t ri, __u32 attempt, MClientRequest *req) :
+ Mutation(ri, attempt),
ref(1),
session(0), item_session_request(this),
client_request(req), straydn(NULL), snapid(CEPH_NOSNAP), tracei(0), tracedn(0),
_more(0) {
in[0] = in[1] = 0;
}
- MDRequest(metareqid_t ri, int by) :
- Mutation(ri, by),
+ MDRequest(metareqid_t ri, __u32 attempt, int by) :
+ Mutation(ri, attempt, by),
ref(1),
session(0), item_session_request(this),
client_request(0), straydn(NULL), snapid(CEPH_NOSNAP), tracei(0), tracedn(0),
int get_num_active_requests() { return active_requests.size(); }
MDRequest* request_start(MClientRequest *req);
- MDRequest* request_start_slave(metareqid_t rid, int by);
+ MDRequest* request_start_slave(metareqid_t rid, __u32 attempt, int by);
MDRequest* request_start_internal(int op);
bool have_request(metareqid_t rid) {
return active_requests.count(rid);
m->put();
return;
}
- mdr = mdcache->request_start_slave(m->get_reqid(), m->get_source().num());
+ mdr = mdcache->request_start_slave(m->get_reqid(), m->get_attempt(), m->get_source().num());
}
assert(mdr->slave_request == 0); // only one at a time, please!
mdr->slave_request = m;
return;
// ack
- MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, replycode);
+ MMDSSlaveRequest *r = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, replycode);
r->set_lock_type(lock->get_type());
lock->get_parent()->set_object_info(r->get_object_info());
mds->send_message(r, mdr->slave_request->get_connection());
}
// ack!
- MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_AUTHPINACK);
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, MMDSSlaveRequest::OP_AUTHPINACK);
// return list of my auth_pins (if any)
for (set<MDSCacheObject*>::iterator p = mdr->auth_pins.begin();
op = MMDSSlaveRequest::OP_LINKPREP;
else
op = MMDSSlaveRequest::OP_UNLINKPREP;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, op);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt, op);
targeti->set_object_info(req->get_object_info());
req->now = mdr->now;
mds->send_message_mds(req, linkauth);
mds->balancer->hit_inode(mdr->now, targeti, META_POP_IWR);
// ack
- MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_LINKPREPACK);
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_LINKPREPACK);
mds->send_message_mds(reply, mdr->slave_to_mds);
// set up commit waiter
void Server::_committed_slave(MDRequest *mdr)
{
dout(10) << "_committed_slave " << *mdr << dendl;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_COMMITTED);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_COMMITTED);
mds->send_message_mds(req, mdr->slave_to_mds);
mds->mdcache->request_finish(mdr);
}
{
dout(10) << "_rmdir_prepare_witness mds" << who << " for " << *mdr << dendl;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RMDIRPREP);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RMDIRPREP);
dn->make_path(req->srcdnpath);
straydn->make_path(req->destdnpath);
req->now = mdr->now;
straydn->get_dir()->link_primary_inode(straydn, in);
mdcache->adjust_subtree_after_rename(in, dn->get_dir());
- MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RMDIRPREPACK);
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RMDIRPREPACK);
mds->send_message_mds(reply, mdr->slave_to_mds);
// set up commit waiter
void Server::_rename_prepare_witness(MDRequest *mdr, int who, CDentry *srcdn, CDentry *destdn, CDentry *straydn)
{
dout(10) << "_rename_prepare_witness mds" << who << dendl;
- MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREP);
+ MMDSSlaveRequest *req = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RENAMEPREP);
srcdn->make_path(req->srcdnpath);
destdn->make_path(req->destdnpath);
req->now = mdr->now;
if (*p == mdr->slave_to_mds ||
mdr->slave_request->witnesses.count(*p)) continue;
dout(10) << " witness list insufficient; providing srcdn replica list" << dendl;
- MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREPACK);
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RENAMEPREPACK);
reply->witnesses.swap(srcdnrep);
mds->send_message_mds(reply, mdr->slave_to_mds);
mdr->slave_request->put();
dout(10) << "_logged_slave_rename " << *mdr << dendl;
// prepare ack
- MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, MMDSSlaveRequest::OP_RENAMEPREPACK);
+ MMDSSlaveRequest *reply = new MMDSSlaveRequest(mdr->reqid, mdr->attempt,
+ MMDSSlaveRequest::OP_RENAMEPREPACK);
CDentry::linkage_t *srcdnl = srcdn->get_linkage();
CDentry::linkage_t *destdnl = destdn->get_linkage();
bufferlist inode_locks;
// authpins, xlocks
- map<vinodeno_t, metareqid_t> authpinned_inodes;
- map<vinodeno_t, map<__s32, metareqid_t> > xlocked_inodes;
- map<dirfrag_t, map<string_snap_t, metareqid_t> > authpinned_dentries;
- map<dirfrag_t, map<string_snap_t, metareqid_t> > xlocked_dentries;
-
+ struct slave_reqid {
+ metareqid_t reqid;
+ __u32 attempt;
+ slave_reqid() : attempt(0) {}
+ slave_reqid(const metareqid_t& r, __u32 a)
+ : reqid(r), attempt(a) {}
+ void encode(bufferlist& bl) const {
+ ::encode(reqid, bl);
+ ::encode(attempt, bl);
+ }
+ void decode(bufferlist::iterator& bl) {
+ ::decode(reqid, bl);
+ ::decode(attempt, bl);
+ }
+ };
+ map<vinodeno_t, slave_reqid> authpinned_inodes;
+ map<vinodeno_t, map<__s32, slave_reqid> > xlocked_inodes;
+ map<dirfrag_t, map<string_snap_t, slave_reqid> > authpinned_dentries;
+ map<dirfrag_t, map<string_snap_t, slave_reqid> > xlocked_dentries;
+
MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {}
MMDSCacheRejoin(int o) :
Message(MSG_MDS_CACHEREJOIN),
in->_encode_base(bl);
::encode(bl, inode_base);
}
- void add_inode_authpin(vinodeno_t ino, const metareqid_t& ri) {
- authpinned_inodes[ino] = ri;
+ void add_inode_authpin(vinodeno_t ino, const metareqid_t& ri, __u32 attempt) {
+ authpinned_inodes[ino] = slave_reqid(ri, attempt);
}
- void add_inode_xlock(vinodeno_t ino, int lt, const metareqid_t& ri) {
- xlocked_inodes[ino][lt] = ri;
+ void add_inode_xlock(vinodeno_t ino, int lt, const metareqid_t& ri, __u32 attempt) {
+ xlocked_inodes[ino][lt] = slave_reqid(ri, attempt);
}
void add_scatterlock_state(CInode *in) {
void add_strong_dentry(dirfrag_t df, const string& dname, snapid_t first, snapid_t last, inodeno_t pi, inodeno_t ri, unsigned char rdt, int n, int ls) {
strong_dentries[df][string_snap_t(dname, last)] = dn_strong(first, pi, ri, rdt, n, ls);
}
- void add_dentry_authpin(dirfrag_t df, const string& dname, snapid_t last, const metareqid_t& ri) {
- authpinned_dentries[df][string_snap_t(dname, last)] = ri;
+ void add_dentry_authpin(dirfrag_t df, const string& dname, snapid_t last,
+ const metareqid_t& ri, __u32 attempt) {
+ authpinned_dentries[df][string_snap_t(dname, last)] = slave_reqid(ri, attempt);
}
- void add_dentry_xlock(dirfrag_t df, const string& dname, snapid_t last, const metareqid_t& ri) {
- xlocked_dentries[df][string_snap_t(dname, last)] = ri;
+ void add_dentry_xlock(dirfrag_t df, const string& dname, snapid_t last,
+ const metareqid_t& ri, __u32 attempt) {
+ xlocked_dentries[df][string_snap_t(dname, last)] = slave_reqid(ri, attempt);
}
// -- encoding --
WRITE_CLASS_ENCODER(MMDSCacheRejoin::dn_strong)
WRITE_CLASS_ENCODER(MMDSCacheRejoin::dn_weak)
WRITE_CLASS_ENCODER(MMDSCacheRejoin::lock_bls)
+WRITE_CLASS_ENCODER(MMDSCacheRejoin::slave_reqid)
+
+inline ostream& operator<<(ostream& out, const MMDSCacheRejoin::slave_reqid& r) {
+ return out << r.reqid << '.' << r.attempt;
+}
#endif
private:
metareqid_t reqid;
+ __u32 attempt;
__s16 op;
// for locking
public:
metareqid_t get_reqid() { return reqid; }
+ __u32 get_attempt() const { return attempt; }
int get_op() { return op; }
bool is_reply() { return op < 0; }
void set_lock_type(int t) { lock_type = t; }
+
// ----
MMDSSlaveRequest() : Message(MSG_MDS_SLAVE_REQUEST) { }
- MMDSSlaveRequest(metareqid_t ri, int o) :
+ MMDSSlaveRequest(metareqid_t ri, __u32 att, int o) :
Message(MSG_MDS_SLAVE_REQUEST),
- reqid(ri), op(o) { }
+ reqid(ri), attempt(att), op(o) { }
private:
~MMDSSlaveRequest() {}
public:
void encode_payload(CephContext *cct) {
::encode(reqid, payload);
+ ::encode(attempt, payload);
::encode(op, payload);
::encode(lock_type, payload);
::encode(object_info, payload);
void decode_payload(CephContext *cct) {
bufferlist::iterator p = payload.begin();
::decode(reqid, p);
+ ::decode(attempt, p);
::decode(op, p);
::decode(lock_type, p);
::decode(object_info, p);
const char *get_type_name() { return "slave_request"; }
void print(ostream& out) {
out << "slave_request(" << reqid
+ << "." << attempt
<< " " << get_opname(op)
<< ")";
}