onfinish->finish(0);
delete onfinish;
}
-
- delete m;
}
void AnchorClient::resend_queries()
//dump();
}
-bool AnchorServer::_is_prepared(version_t tid)
-{
- return
- pending_create.count(tid) ||
- pending_destroy.count(tid) ||
- pending_update.count(tid);
-}
-
void AnchorServer::_commit(version_t tid)
{
if (pending_create.count(tid)) {
else
assert(0);
- pending_for_mds.erase(tid);
-
// bump version.
version++;
//dump();
void AnchorServer::handle_query(MMDSTableRequest *req)
{
bufferlist::iterator p = req->bl.begin();
- inodeno_t curino;
- ::decode(curino, p);
- dout(7) << "handle_lookup " << *req << " ino " << curino << dendl;
+ inodeno_t ino;
+ ::decode(ino, p);
+ dout(7) << "handle_lookup " << *req << " ino " << ino << dendl;
vector<Anchor> trace;
+ inodeno_t curino = ino;
while (true) {
assert(anchor_map.count(curino) == 1);
Anchor &anchor = anchor_map[curino];
dout(10) << "handle_lookup adding " << anchor << dendl;
trace.insert(trace.begin(), anchor); // lame FIXME
- if (anchor.dirino < MDS_INO_BASE) break;
+ if (anchor.dirino < MDS_INO_BASE)
+ break;
curino = anchor.dirino;
}
// reply
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_QUERY_REPLY, req->reqid, version);
- ::encode(curino, req->bl);
- ::encode(trace, req->bl);
+ ::encode(ino, reply->bl);
+ ::encode(trace, reply->bl);
mds->send_message_mds(reply, req->get_source().num());
delete req;
// server bits
void _prepare(bufferlist &bl, __u64 reqid, int bymds);
- bool _is_prepared(version_t tid);
void _commit(version_t tid);
void _rollback(version_t tid);
void handle_query(MMDSTableRequest *m);
int from = req->get_source().num();
_prepare(req->bl, req->reqid, from);
- pending_for_mds[req->reqid].mds = from;
- pending_for_mds[req->reqid].reqid = req->reqid;
- pending_for_mds[req->reqid].tid = version;
+ pending_for_mds[version].mds = from;
+ pending_for_mds[version].reqid = req->reqid;
+ pending_for_mds[version].tid = version;
ETableServer *le = new ETableServer(table, TABLE_OP_PREPARE, req->reqid, from, version, version);
le->mutation = req->bl;
version_t tid = req->tid;
- if (_is_prepared(tid)) {
+ if (pending_for_mds.count(tid)) {
_commit(tid);
pending_for_mds.erase(tid);
mds->mdlog->submit_entry(new ETableServer(table, TABLE_OP_COMMIT, 0, -1, tid, version));
for (map<version_t,_pending>::iterator p = pending_for_mds.begin();
p != pending_for_mds.end();
p++) {
- if (who >= 0 && p->second.mds != who) continue;
+ if (who >= 0 && p->second.mds != who)
+ continue;
MMDSTableRequest *reply = new MMDSTableRequest(table, TABLE_OP_AGREE, p->second.reqid, p->second.tid);
- mds->send_message_mds(reply, who);
+ mds->send_message_mds(reply, p->second.mds);
}
}
public:
virtual void handle_query(MMDSTableRequest *m) = 0;
virtual void _prepare(bufferlist &bl, __u64 reqid, int bymds) = 0;
- virtual bool _is_prepared(version_t tid) = 0;
virtual void _commit(version_t tid) = 0;
virtual void _rollback(version_t tid) = 0;
virtual const char *get_type_name() { return "mds_table_request"; }
void print(ostream& o) {
- o << "mds_table_request(" << get_mdstable_opname(op) << " " << reqid;
+ o << "mds_table_request(" << get_mdstable_name(table)
+ << " " << get_mdstable_opname(op);
+ if (reqid) o << " " << reqid;
if (tid) o << " tid " << tid;
if (bl.length()) o << " " << bl.length() << " bytes";
o << ")";