inline MClientRequest* Client::make_request_from_Meta(MetaRequest *request)
{
MClientRequest *req = new MClientRequest(request->get_op());
+ req->set_tid(request->tid);
memcpy(&req->head, &request->head, sizeof(ceph_mds_request_head));
//if the filepath's haven't been set, set them!
if (request->path.empty()) {
flush,
cap->mseq);
m->head.issue_seq = cap->issue_seq;
- m->head.client_tid = tid;
+ m->set_tid(tid);
m->head.uid = in->uid;
m->head.gid = in->gid;
class Dentry;
struct MetaRequest {
+ __u64 tid;
ceph_mds_request_head head;
filepath path, path2;
bufferlist data;
target(0) {
memset(&head, 0, sizeof(ceph_mds_request_head));
head.op = op;
-}
+ }
MetaRequest* get() {++ref; return this; }
void put() {if(--ref == 0) delete this; }
// normal fields
- void set_tid(tid_t t) { head.tid = t; }
+ void set_tid(tid_t t) { tid = t; }
void set_oldest_client_tid(tid_t t) { head.oldest_client_tid = t; }
void inc_num_fwd() { head.num_fwd = head.num_fwd + 1; }
void set_retry_attempt(int a) { head.num_retry = a; }
head.flags = head.flags | CEPH_MDS_FLAG_WANT_DENTRY;
}
int get_op() { return head.op; }
- tid_t get_tid() { return head.tid; }
+ tid_t get_tid() { return tid; }
filepath& get_filepath() { return path; }
filepath& get_filepath2() { return path2; }
struct ceph_mon_statfs {
struct ceph_mon_request_header monhdr;
struct ceph_fsid fsid;
- __le64 tid;
} __attribute__ ((packed));
struct ceph_statfs {
struct ceph_mon_statfs_reply {
struct ceph_fsid fsid;
- __le64 tid;
__le64 version;
struct ceph_statfs st;
} __attribute__ ((packed));
#define CEPH_MDS_FLAG_WANT_DENTRY 2 /* want dentry in reply */
struct ceph_mds_request_head {
- __le64 tid, oldest_client_tid;
+ __le64 oldest_client_tid;
__le32 mdsmap_epoch; /* on client */
__le32 flags; /* CEPH_MDS_FLAG_* */
__u8 num_retry, num_fwd; /* count retry, fwd attempts */
/* client reply */
struct ceph_mds_reply_head {
- __le64 tid;
__le32 op;
__le32 result;
__le32 mdsmap_epoch;
__le32 migrate_seq;
__le64 snap_follows;
__le32 snap_trace_len;
- __le64 client_tid; /* for FLUSH(SNAP) -> FLUSH(SNAP)_ACK */
/* authlock */
__le32 uid, gid, mode;
* whenever the wire protocol changes. try to keep this string length
* constant.
*/
-#define CEPH_BANNER "ceph v024"
+#define CEPH_BANNER "ceph v025"
#define CEPH_BANNER_MAX_LEN 30
*/
struct ceph_msg_header {
__le64 seq; /* message seq# for this session */
+ __le64 tid; /* transaction id */
__le16 type; /* message type */
__le16 priority; /* priority. higher value == higher priority */
__le16 version; /* version of message encoding */
* ceph_osd_op object operations.
*/
struct ceph_osd_request_head {
- __le64 tid; /* transaction id */
__le32 client_inc; /* client incarnation */
struct ceph_object_layout layout; /* pgid */
__le32 osdmap_epoch; /* client's osdmap epoch */
} __attribute__ ((packed));
struct ceph_osd_reply_head {
- __le64 tid; /* transaction id */
__le32 client_inc; /* client incarnation */
__le32 flags;
struct ceph_object_layout layout;
dout(10) << "handle_request " << *m << dendl;
assert(m->table == table);
- version_t tid = m->tid;
+ version_t tid = m->get_tid();
__u64 reqid = m->reqid;
switch (m->op) {
{
dout(7) << "handle_commit " << *req << dendl;
- version_t tid = req->tid;
+ version_t tid = req->get_tid();
if (pending_for_mds.count(tid)) {
assert(g_conf.mds_kill_mdstable_at != 6);
- MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->tid);
+ MMDSTableRequest *reply = new MMDSTableRequest(table, TABLESERVER_OP_ACK, req->reqid, req->get_tid());
mds->send_message_mds(reply, req->get_source().num());
delete req;
}
void MDSTableServer::handle_rollback(MMDSTableRequest *req)
{
dout(7) << "handle_rollback " << *req << dendl;
- _rollback(req->tid);
- _note_rollback(req->tid);
+ _rollback(req->get_tid());
+ _note_rollback(req->get_tid());
mds->mdlog->start_submit_entry(new ETableServer(table, TABLESERVER_OP_ROLLBACK, 0, -1,
- req->tid, version));
+ req->get_tid(), version));
delete req;
}
int get_migrate_seq() { return head.migrate_seq; }
int get_op() { return head.op; }
- __u64 get_client_tid() { return head.client_tid; }
- void set_client_tid(__u64 s) { head.client_tid = s; }
+ __u64 get_client_tid() { return get_tid(); }
+ void set_client_tid(__u64 s) { set_tid(s); }
snapid_t get_snap_follows() { return snapid_t(head.snap_follows); }
void set_snap_follows(snapid_t s) { head.snap_follows = s; }
bufferlist snapbl;
public:
- long get_tid() { return head.tid; }
int get_op() { return head.op; }
void set_mdsmap_epoch(epoch_t e) { head.mdsmap_epoch = e; }
MClientReply(MClientRequest *req, int result = 0) :
Message(CEPH_MSG_CLIENT_REPLY) {
memset(&head, 0, sizeof(head));
- head.tid = req->get_tid();
+ header.tid = req->get_tid();
head.op = req->get_op();
head.result = result;
head.safe = 1;
}
const char *get_type_name() { return "creply"; }
void print(ostream& o) {
- o << "client_reply(???:" << head.tid;
+ o << "client_reply(???:" << get_tid();
o << " = " << get_result();
if (get_result() <= 0) {
char buf[80];
metareqid_t get_reqid() {
// FIXME: for now, assume clients always have 1 incarnation
- return metareqid_t(get_orig_source(), head.tid);
+ return metareqid_t(get_orig_source(), header.tid);
}
/*bool open_file_mode_is_readonly() {
}
// normal fields
- void set_tid(tid_t t) { head.tid = t; }
void set_oldest_client_tid(tid_t t) { head.oldest_client_tid = t; }
void inc_num_fwd() { head.num_fwd = head.num_fwd + 1; }
void set_retry_attempt(int a) { head.num_retry = a; }
head.flags = head.flags | CEPH_MDS_FLAG_REPLAY;
}
- tid_t get_tid() { return head.tid; }
tid_t get_oldest_client_tid() { return head.oldest_client_tid; }
int get_num_fwd() { return head.num_fwd; }
int get_retry_attempt() { return head.num_retry; }
#define __MCLIENTREQUESTFORWARD_H
class MClientRequestForward : public Message {
- tid_t tid;
int32_t dest_mds;
int32_t num_fwd;
bool client_must_resend;
MClientRequestForward() : Message(CEPH_MSG_CLIENT_REQUEST_FORWARD) {}
MClientRequestForward(tid_t t, int dm, int nf, bool cmr) :
Message(CEPH_MSG_CLIENT_REQUEST_FORWARD),
- tid(t), dest_mds(dm), num_fwd(nf), client_must_resend(cmr) { }
+ dest_mds(dm), num_fwd(nf), client_must_resend(cmr) {
+ header.tid = t;
+ }
- tid_t get_tid() { return tid; }
int get_dest_mds() { return dest_mds; }
int get_num_fwd() { return num_fwd; }
bool must_resend() { return client_must_resend; }
const char *get_type_name() { return "cfwd"; }
void print(ostream& o) {
- o << "client_request_forward(" << tid
+ o << "client_request_forward(" << get_tid()
<< " to " << dest_mds
<< " num_fwd=" << num_fwd
<< (client_must_resend ? " client_must_resend":"")
}
void encode_payload() {
- ::encode(tid, payload);
::encode(dest_mds, payload);
::encode(num_fwd, payload);
::encode(client_must_resend, payload);
void decode_payload() {
bufferlist::iterator p = payload.begin();
- ::decode(tid, p);
::decode(dest_mds, p);
::decode(num_fwd, p);
::decode(client_must_resend, p);
class MGetPoolStats : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
- tid_t tid;
vector<string> pools;
MGetPoolStats() : PaxosServiceMessage(MSG_GETPOOLSTATS, 0) {}
MGetPoolStats(const ceph_fsid_t& f, tid_t t, vector<string>& ls, version_t l) :
PaxosServiceMessage(MSG_GETPOOLSTATS, l),
- fsid(f), tid(t), pools(ls) { }
+ fsid(f), pools(ls) {
+ set_tid(t);
+ }
const char *get_type_name() { return "getpoolstats"; }
void print(ostream& out) {
- out << "getpoolstats(" << tid << " " << pools << " v" << version << ")";
+ out << "getpoolstats(" << get_tid() << " " << pools << " v" << version << ")";
}
void encode_payload() {
paxos_encode();
::encode(fsid, payload);
- ::encode(tid, payload);
::encode(pools, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(fsid, p);
- ::decode(tid, p);
::decode(pools, p);
}
};
class MGetPoolStatsReply : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
- tid_t tid;
map<string,pool_stat_t> pool_stats;
MGetPoolStatsReply() : PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, 0) {}
MGetPoolStatsReply(ceph_fsid_t& f, tid_t t, version_t v) :
PaxosServiceMessage(MSG_GETPOOLSTATSREPLY, v),
- fsid(f), tid(t) { }
+ fsid(f) {
+ set_tid(t);
+ }
const char *get_type_name() { return "getpoolstats"; }
void print(ostream& out) {
- out << "getpoolstatsreply(" << tid << " v" << version << ")";
+ out << "getpoolstatsreply(" << get_tid() << " v" << version << ")";
}
void encode_payload() {
paxos_encode();
::encode(fsid, payload);
- ::encode(tid, payload);
::encode(pool_stats, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(fsid, p);
- ::decode(tid, p);
::decode(pool_stats, p);
}
};
__u16 table;
__s16 op;
__u64 reqid;
- version_t tid;
bufferlist bl;
MMDSTableRequest() {}
MMDSTableRequest(int tab, int o, __u64 r, version_t v=0) :
Message(MSG_MDS_TABLE_REQUEST),
- table(tab), op(o), reqid(r), tid(v) { }
+ table(tab), op(o), reqid(r) {
+ set_tid(v);
+ }
virtual const char *get_type_name() { return "mds_table_request"; }
void print(ostream& o) {
o << "mds_table_request(" << get_mdstable_name(table)
<< " " << get_mdstableserver_opname(op);
if (reqid) o << " " << reqid;
- if (tid) o << " tid " << tid;
+ if (get_tid()) o << " tid " << get_tid();
if (bl.length()) o << " " << bl.length() << " bytes";
o << ")";
}
::decode(table, p);
::decode(op, p);
::decode(reqid, p);
- ::decode(tid, p);
::decode(bl, p);
}
::encode(table, payload);
::encode(op, payload);
::encode(reqid, payload);
- ::encode(tid, payload);
::encode(bl, payload);
}
};
osd_reqid_t get_reqid() { return osd_reqid_t(get_orig_source(),
head.client_inc,
- head.tid); }
+ header.tid); }
int get_client_inc() { return head.client_inc; }
- tid_t get_client_tid() { return head.tid; }
+ tid_t get_client_tid() { return header.tid; }
object_t& get_oid() { return oid; }
pg_t get_pg() { return pg_t(head.layout.ol_pgid); }
oid(_oid),
rmw_flags(flags) {
memset(&head, 0, sizeof(head));
- head.tid = tid;
+ set_tid(tid);
head.client_inc = inc;
head.layout = ol;
head.osdmap_epoch = mapepoch;
object_t oid;
vector<OSDOp> ops;
- long get_tid() { return head.tid; }
object_t get_oid() { return oid; }
pg_t get_pg() { return pg_t(head.layout.ol_pgid); }
int get_flags() { return head.flags; }
MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) :
Message(CEPH_MSG_OSD_OPREPLY) {
memset(&head, 0, sizeof(head));
- head.tid = req->head.tid;
+ set_tid(req->get_tid());
head.client_inc = req->head.client_inc;
ops = req->ops;
head.result = result;
const char *get_type_name() { return "osd_op_reply"; }
void print(ostream& out) {
- out << "osd_op_reply(" << head.tid
+ out << "osd_op_reply(" << get_tid()
<< " " << oid << " " << ops;
if (may_write()) {
if (is_ondisk())
pg_stat_t pg_stats;
// subop metadata
- tid_t rep_tid;
eversion_t version;
// piggybacked osd/og state
::decode(mtime, p);
::decode(noop, p);
::decode(acks_wanted, p);
- ::decode(rep_tid, p);
::decode(version, p);
::decode(old_exists, p);
::decode(old_size, p);
::encode(mtime, payload);
::encode(noop, payload);
::encode(acks_wanted, payload);
- ::encode(rep_tid, payload);
::encode(version, payload);
::encode(old_exists, payload);
::encode(old_size, payload);
acks_wanted(aw),
noop(noop_),
old_exists(false), old_size(0),
- rep_tid(rtid),
version(v)
{
memset(&peer_stat, 0, sizeof(peer_stat));
+ set_tid(rtid);
}
MOSDSubOp() {}
// subop metadata
osd_reqid_t reqid;
pg_t pgid;
- tid_t rep_tid;
sobject_t poid;
vector<OSDOp> ops;
::decode(map_epoch, p);
::decode(reqid, p);
::decode(pgid, p);
- ::decode(rep_tid, p);
::decode(poid, p);
unsigned num_ops;
::encode(map_epoch, payload);
::encode(reqid, payload);
::encode(pgid, payload);
- ::encode(rep_tid, payload);
::encode(poid, payload);
__u32 num_ops = ops.size();
::encode(num_ops, payload);
epoch_t get_map_epoch() { return map_epoch; }
pg_t get_pg() { return pgid; }
- tid_t get_rep_tid() { return rep_tid; }
sobject_t get_poid() { return poid; }
int get_ack_type() { return ack_type; }
map_epoch(e),
reqid(req->reqid),
pgid(req->pgid),
- rep_tid(req->rep_tid),
poid(req->poid),
ops(req->ops),
ack_type(at),
result(result_) {
memset(&peer_stat, 0, sizeof(peer_stat));
+ set_tid(req->get_tid());
}
MOSDSubOpReply() {}
class MPoolOp : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
- tid_t tid;
int pool;
string name;
int op;
MPoolOp() : PaxosServiceMessage(MSG_POOLOP, 0) {}
MPoolOp(const ceph_fsid_t& f, tid_t t, int p, string& n, int o, version_t v) :
- PaxosServiceMessage(MSG_POOLOP, v), fsid(f), tid(t), pool(p), name(n), op(o) {}
+ PaxosServiceMessage(MSG_POOLOP, v), fsid(f), pool(p), name(n), op(o) {
+ set_tid(t);
+ }
const char *get_type_name() { return "poolop"; }
void print(ostream& out) {
- out << "poolop(" << get_pool_op_name(op) << " " << tid << " " << name << " v" << version << ")";
+ out << "poolop(" << get_pool_op_name(op) << " " << get_tid() << " " << name << " v" << version << ")";
}
void encode_payload() {
paxos_encode();
::encode(fsid, payload);
- ::encode(tid, payload);
::encode(pool, payload);
::encode(name, payload);
::encode(op, payload);
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(fsid, p);
- ::decode(tid, p);
::decode(pool, p);
::decode(name, p);
::decode(op, p);
class MPoolOpReply : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
- tid_t tid;
int replyCode;
epoch_t epoch;
MPoolOpReply() : PaxosServiceMessage(MSG_POOLOPREPLY, 0) {}
MPoolOpReply( ceph_fsid_t& f, tid_t t, int rc, int e, version_t v) :
- PaxosServiceMessage(MSG_POOLOPREPLY, v), fsid(f), tid(t), replyCode(rc), epoch(e) {}
+ PaxosServiceMessage(MSG_POOLOPREPLY, v), fsid(f), replyCode(rc), epoch(e) {
+ set_tid(t);
+ }
const char *get_type_name() { return "poolopreply"; }
void print(ostream& out) {
- out << "poolopreply(" << tid << " v" << version << ")";
+ out << "poolopreply(" << get_tid() << " v" << version << ")";
}
void encode_payload() {
paxos_encode();
::encode(fsid, payload);
- ::encode(tid, payload);
::encode(replyCode, payload);
::encode(epoch, payload);
}
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(fsid, p);
- ::decode(tid, p);
::decode(replyCode, p);
::decode(epoch, p);
}
class MStatfs : public PaxosServiceMessage {
public:
ceph_fsid_t fsid;
- tid_t tid;
MStatfs() : PaxosServiceMessage(CEPH_MSG_STATFS, 0) {}
MStatfs(const ceph_fsid_t& f, tid_t t, version_t v) :
- PaxosServiceMessage(CEPH_MSG_STATFS, v), fsid(f), tid(t) {}
+ PaxosServiceMessage(CEPH_MSG_STATFS, v), fsid(f) {
+ set_tid(t);
+ }
const char *get_type_name() { return "statfs"; }
void print(ostream& out) {
- out << "statfs(" << tid << " v" << version << ")";
+ out << "statfs(" << get_tid() << " v" << version << ")";
}
void encode_payload() {
paxos_encode();
::encode(fsid, payload);
- ::encode(tid, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(fsid, p);
- ::decode(tid, p);
}
};
MStatfsReply() : Message(CEPH_MSG_STATFS_REPLY) {}
MStatfsReply(ceph_fsid_t &f, tid_t t, epoch_t epoch) : Message(CEPH_MSG_STATFS_REPLY) {
h.fsid = f;
- h.tid = t;
+ header.tid = t;
h.version = epoch;
}
const char *get_type_name() { return "statfs_reply"; }
void print(ostream& out) {
- out << "statfs_reply(" << h.tid << ")";
+ out << "statfs_reply(" << header.tid << ")";
}
void encode_payload() {
void OSDMonitor::_pool_op(MPoolOp *m, int replyCode, epoch_t epoch)
{
- MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->tid,
+ MPoolOpReply *reply = new MPoolOpReply(m->fsid, m->get_tid(),
replyCode, epoch, mon->get_epoch());
mon->send_reply(m, reply);
delete m;
}
// fill out stfs
- reply = new MStatfsReply(mon->monmap->fsid, statfs->tid, mon->get_epoch());
+ reply = new MStatfsReply(mon->monmap->fsid, statfs->get_tid(), mon->get_epoch());
// these are in KB.
reply->h.st.kb = pg_map.osd_sum.kb;
goto out;
}
- reply = new MGetPoolStatsReply(m->fsid, m->tid, paxos->get_version());
+ reply = new MGetPoolStatsReply(m->fsid, m->get_tid(), paxos->get_version());
for (vector<string>::iterator p = m->pools.begin();
p != m->pools.end();
int get_type() { return header.type; }
void set_type(int t) { header.type = t; }
+ __u64 get_tid() { return header.tid; }
+ void set_tid(__u64 t) { header.tid = t; }
+
unsigned get_seq() { return header.seq; }
void set_seq(unsigned s) { header.seq = s; }
void ReplicatedPG::sub_op_modify_reply(MOSDSubOpReply *r)
{
// must be replication.
- tid_t rep_tid = r->get_rep_tid();
+ tid_t rep_tid = r->get_tid();
int fromosd = r->get_source().num();
osd->take_peer_stat(fromosd, r->get_peer_stat());
void Objecter::handle_pool_op_reply(MPoolOpReply *m) {
dout(10) << "handle_pool_op_reply " << *m << dendl;
- tid_t tid = m->tid;
+ tid_t tid = m->get_tid();
if (op_pool.count(tid)) {
PoolOp *op = op_pool[tid];
dout(10) << "have request " << tid << " at " << op << " Op: " << get_pool_op_name(op->pool_op) << dendl;
void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
{
dout(10) << "handle_get_pool_stats_reply " << *m << dendl;
- tid_t tid = m->tid;
+ tid_t tid = m->get_tid();
if (op_poolstat.count(tid)) {
PoolStatOp *op = op_poolstat[tid];
void Objecter::handle_fs_stats_reply(MStatfsReply *m) {
dout(10) << "handle_fs_stats_reply " << *m << dendl;
- tid_t tid = m->h.tid;
+ tid_t tid = m->get_tid();
if (op_statfs.count(tid)) {
StatfsOp *op = op_statfs[tid];