class MOSDRepOp : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 1;
+ static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
public:
- epoch_t map_epoch;
+ epoch_t map_epoch, min_epoch;
// metadata from original request
osd_reqid_t reqid;
epoch_t get_map_epoch() const override {
return map_epoch;
}
+ epoch_t get_min_epoch() const override {
+ return min_epoch;
+ }
spg_t get_spg() const override {
return pgid;
}
p = payload.begin();
// splitted to partial and final
::decode(map_epoch, p);
+ if (header.version >= 2)
+ ::decode(min_epoch, p);
+ else
+ min_epoch = map_epoch;
::decode(reqid, p);
::decode(pgid, p);
}
void encode_payload(uint64_t features) override {
::encode(map_epoch, payload);
+ if (HAVE_FEATURE(features, SERVER_LUMINOUS))
+ ::encode(min_epoch, payload);
+ else
+ header.version = 1;
::encode(reqid, payload);
::encode(pgid, payload);
::encode(poid, payload);
final_decode_needed(true), acks_wanted (0) {}
MOSDRepOp(osd_reqid_t r, pg_shard_t from,
spg_t p, const hobject_t& po, int aw,
- epoch_t mape, ceph_tid_t rtid, eversion_t v)
+ epoch_t mape, epoch_t min_epoch, ceph_tid_t rtid, eversion_t v)
: MOSDFastDispatchOp(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(mape),
+ min_epoch(min_epoch),
reqid(r),
pgid(p),
final_decode_needed(false),
const char *get_type_name() const override { return "osd_repop"; }
void print(ostream& out) const override {
out << "osd_repop(" << reqid
- << " " << pgid;
+ << " " << pgid << " e" << map_epoch << "/" << min_epoch;
if (!final_decode_needed) {
- out << " " << poid << " v " << version;
+ out << " " << poid << " v " << version;
if (updated_hit_set_history)
out << ", has_updated_hit_set_history";
}
*/
class MOSDRepOpReply : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 1;
+ static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
public:
- epoch_t map_epoch;
+ epoch_t map_epoch, min_epoch;
// subop metadata
osd_reqid_t reqid;
epoch_t get_map_epoch() const override {
return map_epoch;
}
+ epoch_t get_min_epoch() const override {
+ return min_epoch;
+ }
spg_t get_spg() const override {
return pgid;
}
void decode_payload() override {
p = payload.begin();
::decode(map_epoch, p);
+ if (header.version >= 2)
+ ::decode(min_epoch, p);
+ else
+ min_epoch = map_epoch;
::decode(reqid, p);
::decode(pgid, p);
}
}
void encode_payload(uint64_t features) override {
::encode(map_epoch, payload);
+ if (HAVE_FEATURE(features, SERVER_LUMINOUS))
+ ::encode(min_epoch, payload);
+ else
+ header.version = 1;
::encode(reqid, payload);
::encode(pgid, payload);
::encode(ack_type, payload);
::encode(from, payload);
}
- epoch_t get_map_epoch() { return map_epoch; }
-
spg_t get_pg() { return pgid; }
int get_ack_type() { return ack_type; }
public:
MOSDRepOpReply(
- const MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
+ const MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, epoch_t mine,
+ int at) :
MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
map_epoch(e),
+ min_epoch(mine),
reqid(req->reqid),
from(from),
pgid(req->pgid.pgid, req->from.shard),
MOSDRepOpReply()
: MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
map_epoch(0),
+ min_epoch(0),
ack_type(0), result(0),
final_decode_needed(true) {}
private:
void print(ostream& out) const override {
out << "osd_repop_reply(" << reqid
- << " " << pgid;
+ << " " << pgid << " e" << map_epoch << "/" << min_epoch;
if (!final_decode_needed) {
if (ack_type & CEPH_OSD_FLAG_ONDISK)
out << " ondisk";
spg_t(get_info().pgid.pgid, peer.shard),
soid, acks_wanted,
get_osdmap()->get_epoch(),
+ parent->get_last_peering_reset_epoch(),
tid, at_version);
// ship resulting transaction, log entries, and pg_stats
if (!rm->committed)
ack = new MOSDRepOpReply(
static_cast<const MOSDRepOp*>(m), parent->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ 0, get_osdmap()->get_epoch(), req->min_epoch, CEPH_OSD_FLAG_ACK);
// send ack to acker only if we haven't sent a commit already
if (ack) {
rm->committed = true;
// send commit.
- const Message *m = rm->op->get_req();
+ const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
+ assert(m->get_type() == MSG_OSD_REPOP);
dout(10) << __func__ << " on op " << *m
<< ", sending commit to osd." << rm->ackerosd
<< dendl;
- assert(m->get_type() == MSG_OSD_REPOP);
assert(get_osdmap()->is_up(rm->ackerosd));
get_parent()->update_last_complete_ondisk(rm->last_complete);
MOSDRepOpReply *reply = new MOSDRepOpReply(
- static_cast<const MOSDRepOp*>(m),
+ m,
get_parent()->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ 0, get_osdmap()->get_epoch(), m->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(rm->last_complete);
reply->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
get_parent()->send_message_osd_cluster(