Add the two new message type and change the corresponding code flow as well.
Basically the idea to have MOSDRepOp is to seperate subop(read/write)
out of other subop(pull/push,etc), so that we can cleanup some unused fields in
the message type, then save some encoding/decoding overhead.
The backward compatibility is also remian, if talking with old version OSD who
doesn't support osd_client_subop/subopreply, will fall back to osd_subop/subopreply.
Sage: rename MOSDClientSubOp -> MOSDRepOp
Signed-off-by: Xiaoxi Chen <xiaoxi.chen@intel.com>
Signed-off-by: Sage Weil <sage@redhat.com>
#define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 (1ULL<<44)
#define CEPH_FEATURE_OSD_SET_ALLOC_HINT (1ULL<<45)
#define CEPH_FEATURE_OSD_FADVISE_FLAGS (1ULL<<46)
+#define CEPH_FEATURE_OSD_REPOP (1ULL<<46) /* overlap with fadvise */
#define CEPH_FEATURE_OSD_OBJECT_DIGEST (1ULL<<46) /* overlap with fadvise */
#define CEPH_FEATURE_MDS_QUOTA (1ULL<<47)
CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 | \
CEPH_FEATURE_OSD_SET_ALLOC_HINT | \
CEPH_FEATURE_OSD_FADVISE_FLAGS | \
+ CEPH_FEATURE_OSD_REPOP | \
CEPH_FEATURE_OSD_OBJECT_DIGEST | \
CEPH_FEATURE_MDS_QUOTA | \
0ULL)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_MOSDREPOP_H
+#define CEPH_MOSDREPOP_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+/*
+ * OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever)
+ */
+
+class MOSDRepOp : public Message {
+
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+
+public:
+ epoch_t map_epoch;
+
+ // metadata from original request
+ osd_reqid_t reqid;
+
+ // subop
+ pg_shard_t from;
+ spg_t pgid;
+ hobject_t poid;
+
+ __u8 acks_wanted;
+
+ // transaction to exec
+ bufferlist logbl;
+ pg_stat_t pg_stats;
+
+ // subop metadata
+ eversion_t version;
+
+ // piggybacked osd/og state
+ eversion_t pg_trim_to; // primary->replica: trim to here
+ eversion_t pg_trim_rollback_to; // primary->replica: trim rollback
+ // info to here
+
+ hobject_t new_temp_oid; ///< new temp object that we must now start tracking
+ hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking
+
+ /// non-empty if this transaction involves a hit_set history update
+ boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+
+ int get_cost() const {
+ return data.length();
+ }
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(map_epoch, p);
+ ::decode(reqid, p);
+ ::decode(pgid, p);
+ ::decode(poid, p);
+
+ ::decode(acks_wanted, p);
+ ::decode(version, p);
+ ::decode(logbl, p);
+ ::decode(pg_stats, p);
+ ::decode(pg_trim_to, p);
+
+
+ ::decode(new_temp_oid, p);
+ ::decode(discard_temp_oid, p);
+
+ ::decode(from, p);
+ ::decode(updated_hit_set_history, p);
+ ::decode(pg_trim_rollback_to, p);
+ }
+
+ virtual void encode_payload(uint64_t features) {
+ ::encode(map_epoch, payload);
+ ::encode(reqid, payload);
+ ::encode(pgid, payload);
+ ::encode(poid, payload);
+
+ ::encode(acks_wanted, payload);
+ ::encode(version, payload);
+ ::encode(logbl, payload);
+ ::encode(pg_stats, payload);
+ ::encode(pg_trim_to, payload);
+ ::encode(new_temp_oid, payload);
+ ::encode(discard_temp_oid, payload);
+ ::encode(from, payload);
+ ::encode(updated_hit_set_history, payload);
+ ::encode(pg_trim_rollback_to, payload);
+ }
+
+ MOSDRepOp()
+ : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION) { }
+ MOSDRepOp(osd_reqid_t r, pg_shard_t from,
+ spg_t p, const hobject_t& po, int aw,
+ epoch_t mape, ceph_tid_t rtid, eversion_t v)
+ : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
+ map_epoch(mape),
+ reqid(r),
+ from(from),
+ pgid(p),
+ poid(po),
+ acks_wanted(aw),
+ version(v) {
+ set_tid(rtid);
+ }
+private:
+ ~MOSDRepOp() {}
+
+public:
+ const char *get_type_name() const { return "osd_repop"; }
+ void print(ostream& out) const {
+ out << "osd_repop(" << reqid
+ << " " << pgid
+ << " " << poid;
+ out << " v " << version;
+ if (updated_hit_set_history)
+ out << ", has_updated_hit_set_history";
+ out << ")";
+ }
+};
+
+
+#endif
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_MOSDREPOPREPLY_H
+#define CEPH_MOSDREPOPREPLY_H
+
+#include "msg/Message.h"
+
+#include "os/ObjectStore.h"
+
+/*
+ * OSD Client Subop reply
+ *
+ * oid - object id
+ * op - OSD_OP_DELETE, etc.
+ *
+ */
+
+class MOSDRepOpReply : public Message {
+ static const int HEAD_VERSION = 1;
+ static const int COMPAT_VERSION = 1;
+public:
+ epoch_t map_epoch;
+
+ // subop metadata
+ osd_reqid_t reqid;
+ pg_shard_t from;
+ spg_t pgid;
+
+ // result
+ __u8 ack_type;
+ int32_t result;
+
+ // piggybacked osd state
+ eversion_t last_complete_ondisk;
+
+
+ virtual void decode_payload() {
+ bufferlist::iterator p = payload.begin();
+ ::decode(map_epoch, p);
+ ::decode(reqid, p);
+ ::decode(pgid, p);
+
+ ::decode(ack_type, p);
+ ::decode(result, p);
+ ::decode(last_complete_ondisk, p);
+
+ ::decode(from, p);
+ }
+ virtual void encode_payload(uint64_t features) {
+ ::encode(map_epoch, payload);
+ ::encode(reqid, payload);
+ ::encode(pgid, payload);
+ ::encode(ack_type, payload);
+ ::encode(result, payload);
+ ::encode(last_complete_ondisk, payload);
+ ::encode(from, payload);
+ }
+
+ epoch_t get_map_epoch() { return map_epoch; }
+
+ spg_t get_pg() { return pgid; }
+
+ int get_ack_type() { return ack_type; }
+ bool is_ondisk() { return ack_type & CEPH_OSD_FLAG_ONDISK; }
+ bool is_onnvram() { return ack_type & CEPH_OSD_FLAG_ONNVRAM; }
+
+ int get_result() { return result; }
+
+ void set_last_complete_ondisk(eversion_t v) { last_complete_ondisk = v; }
+ eversion_t get_last_complete_ondisk() { return last_complete_ondisk; }
+
+public:
+ MOSDRepOpReply(
+ MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
+ Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
+ map_epoch(e),
+ reqid(req->reqid),
+ from(from),
+ pgid(req->pgid.pgid, req->from.shard),
+ ack_type(at),
+ result(result_) {
+ set_tid(req->get_tid());
+ }
+ MOSDRepOpReply() : Message(MSG_OSD_REPOPREPLY) {}
+private:
+ ~MOSDRepOpReply() {}
+
+public:
+ const char *get_type_name() const { return "osd_repop_reply"; }
+
+ void print(ostream& out) const {
+ out << "osd_repop_reply(" << reqid
+ << " " << pgid;
+ if (ack_type & CEPH_OSD_FLAG_ONDISK)
+ out << " ondisk";
+ if (ack_type & CEPH_OSD_FLAG_ONNVRAM)
+ out << " onnvram";
+ if (ack_type & CEPH_OSD_FLAG_ACK)
+ out << " ack";
+ out << ", result = " << result;
+ out << ")";
+ }
+
+};
+
+
+#endif
~MOSDSubOpReply() {}
public:
- const char *get_type_name() const { return "osd_op_reply"; }
+ const char *get_type_name() const { return "osd_subop_reply"; }
void print(ostream& out) const {
out << "osd_sub_op_reply(" << reqid
messages/MOSDScrub.h \
messages/MOSDSubOp.h \
messages/MOSDSubOpReply.h \
+ messages/MOSDRepOp.h \
+ messages/MOSDRepOpReply.h \
messages/MPGStats.h \
messages/MPGStatsAck.h \
messages/MPing.h \
#include "messages/MOSDOpReply.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDMap.h"
#include "messages/MMonGetOSDMap.h"
case MSG_OSD_SUBOPREPLY:
m = new MOSDSubOpReply();
break;
+ case MSG_OSD_REPOP:
+ m = new MOSDRepOp();
+ break;
+ case MSG_OSD_REPOPREPLY:
+ m = new MOSDRepOpReply();
+ break;
case CEPH_MSG_OSD_MAP:
m = new MOSDMap;
#define MSG_OSD_EC_READ 110
#define MSG_OSD_EC_READ_REPLY 111
+#define MSG_OSD_REPOP 112
+#define MSG_OSD_REPOPREPLY 113
+
+
// *** MDS ***
#define MSG_MDS_BEACON 100 // to monitor
#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
#include "messages/MOSDBoot.h"
}
case MSG_OSD_SUBOP:
return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ case MSG_OSD_REPOP:
+ return replica_op_required_epoch<MOSDRepOp, MSG_OSD_REPOP>(op);
case MSG_OSD_SUBOPREPLY:
return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
op);
+ case MSG_OSD_REPOPREPLY:
+ return replica_op_required_epoch<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(
+ op);
case MSG_OSD_PG_PUSH:
return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
op);
case MSG_OSD_SUBOP:
handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
break;
+ case MSG_OSD_REPOP:
+ handle_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op, osdmap);
+ break;
case MSG_OSD_SUBOPREPLY:
handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
break;
+ case MSG_OSD_REPOPREPLY:
+ handle_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op, osdmap);
+ break;
case MSG_OSD_PG_PUSH:
handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
break;
switch (m->get_type()) {
case CEPH_MSG_OSD_OP:
case MSG_OSD_SUBOP:
+ case MSG_OSD_REPOP:
case MSG_OSD_SUBOPREPLY:
+ case MSG_OSD_REPOPREPLY:
case MSG_OSD_PG_PUSH:
case MSG_OSD_PG_PULL:
case MSG_OSD_PG_PUSH_REPLY:
#include "msg/Message.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
#include "include/assert.h"
#include "osd/osd_types.h"
reqid = static_cast<MOSDOp*>(req)->get_reqid();
} else if (req->get_type() == MSG_OSD_SUBOP) {
reqid = static_cast<MOSDSubOp*>(req)->reqid;
+ } else if (req->get_type() == MSG_OSD_REPOP) {
+ reqid = static_cast<MOSDRepOp*>(req)->reqid;
}
tracker->mark_event(this, "header_read", request->get_recv_stamp());
tracker->mark_event(this, "throttled", request->get_throttle_stamp());
#include "messages/MOSDECSubOpReadReply.h"
#include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOpReply.h"
#include "common/BackTrace.h"
#ifdef WITH_LTTNG
return can_discard_op(op);
case MSG_OSD_SUBOP:
return can_discard_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ case MSG_OSD_REPOP:
+ return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
case MSG_OSD_PG_PUSH:
return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
case MSG_OSD_PG_PULL:
return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
case MSG_OSD_SUBOPREPLY:
return can_discard_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
+ case MSG_OSD_REPOPREPLY:
+ return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
case MSG_OSD_EC_WRITE:
return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
cur_epoch,
static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
+ case MSG_OSD_REPOP:
+ return !have_same_or_newer_map(
+ cur_epoch,
+ static_cast<MOSDRepOp*>(op->get_req())->map_epoch);
+
case MSG_OSD_SUBOPREPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
+ case MSG_OSD_REPOPREPLY:
+ return !have_same_or_newer_map(
+ cur_epoch,
+ static_cast<MOSDRepOpReply*>(op->get_req())->map_epoch);
+
case MSG_OSD_PG_SCAN:
return !have_same_or_newer_map(
cur_epoch,
int get_nrep() const { return acting.size(); }
void reset_peer_features() { peer_features = (uint64_t)-1; }
- uint64_t get_min_peer_features() { return peer_features; }
+ uint64_t get_min_peer_features() const { return peer_features; }
void apply_peer_features(uint64_t f) { peer_features &= f; }
void init_primary_up_acting(
virtual spg_t primary_spg_t() const = 0;
virtual pg_shard_t primary_shard() const = 0;
+ virtual uint64_t min_peer_features() const = 0;
+
virtual void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch) = 0;
virtual void send_message_osd_cluster(
#include "ReplicatedBackend.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDPGPush.h"
#include "messages/MOSDPGPull.h"
#include "messages/MOSDPGPushReply.h"
break;
}
+ case MSG_OSD_REPOP: {
+ sub_op_modify(op);
+ return true;
+ }
+
case MSG_OSD_SUBOPREPLY: {
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
if (r->ops.size() >= 1) {
sub_op_push_reply(op);
return true;
}
- } else {
- sub_op_modify_reply(op);
+ }
+ else {
+ sub_op_modify_reply<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
return true;
}
break;
}
+ case MSG_OSD_REPOPREPLY: {
+ sub_op_modify_reply<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
+ return true;
+ }
+
default:
break;
}
}
}
+template<typename T, int MSGTYPE>
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
- MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
- assert(r->get_type() == MSG_OSD_SUBOPREPLY);
+ T *r = static_cast<T *>(op->get_req());
+ assert(r->get_header().type == MSGTYPE);
+ assert(MSGTYPE == MSG_OSD_SUBOPREPLY || MSGTYPE == MSG_OSD_REPOPREPLY);
op->mark_started();
);
private:
+ template<typename T, int MSGTYPE>
+ Message * generate_subop(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t,
+ pg_shard_t peer,
+ const pg_info_t &pinfo);
void issue_op(
const hobject_t &soid,
const eversion_t &at_version,
ObjectStore::Transaction *op_t);
void op_applied(InProgressOp *op);
void op_commit(InProgressOp *op);
+ template<typename T, int MSGTYPE>
void sub_op_modify_reply(OpRequestRef op);
void sub_op_modify(OpRequestRef op);
+ template<typename T, int MSGTYPE>
+ void sub_op_modify_impl(OpRequestRef op);
struct RepModify {
OpRequestRef op;
#include "messages/MOSDOpReply.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDPGNotify.h"
#include "messages/MOSDPGInfo.h"
repop->ctx->op);
repop->ctx->op_t = NULL;
}
+
+template<typename T, int MSGTYPE>
+Message * ReplicatedBackend::generate_subop(
+ const hobject_t &soid,
+ const eversion_t &at_version,
+ ceph_tid_t tid,
+ osd_reqid_t reqid,
+ eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
+ hobject_t new_temp_oid,
+ hobject_t discard_temp_oid,
+ vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
+ InProgressOp *op,
+ ObjectStore::Transaction *op_t,
+ pg_shard_t peer,
+ const pg_info_t &pinfo)
+{
+ int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+ assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
+ // forward the write/update/whatever
+ T *wr = new T(
+ reqid, parent->whoami_shard(),
+ spg_t(get_info().pgid.pgid, peer.shard),
+ soid, acks_wanted,
+ get_osdmap()->get_epoch(),
+ tid, at_version);
+
+ // ship resulting transaction, log entries, and pg_stats
+ if (!parent->should_send_op(peer, soid)) {
+ dout(10) << "issue_repop shipping empty opt to osd." << peer
+ <<", object " << soid
+ << " beyond MAX(last_backfill_started "
+ << ", pinfo.last_backfill "
+ << pinfo.last_backfill << ")" << dendl;
+ ObjectStore::Transaction t;
+ ::encode(t, wr->get_data());
+ } else {
+ ::encode(*op_t, wr->get_data());
+ }
+
+ ::encode(log_entries, wr->logbl);
+
+ if (pinfo.is_incomplete())
+ wr->pg_stats = pinfo.stats; // reflects backfill progress
+ else
+ wr->pg_stats = get_info().stats;
+ wr->pg_trim_to = pg_trim_to;
+ wr->pg_trim_rollback_to = pg_trim_rollback_to;
+
+ wr->new_temp_oid = new_temp_oid;
+ wr->discard_temp_oid = discard_temp_oid;
+ wr->updated_hit_set_history = hset_hist;
+ return wr;
+}
+
void ReplicatedBackend::issue_op(
const hobject_t &soid,
const eversion_t &at_version,
InProgressOp *op,
ObjectStore::Transaction *op_t)
{
- int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
if (parent->get_actingbackfill_shards().size() > 1) {
ostringstream ss;
pg_shard_t peer = *i;
const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
- // forward the write/update/whatever
- MOSDSubOp *wr = new MOSDSubOp(
- reqid, parent->whoami_shard(),
- spg_t(get_info().pgid.pgid, i->shard),
- soid,
- acks_wanted,
- get_osdmap()->get_epoch(),
- tid, at_version);
-
- // ship resulting transaction, log entries, and pg_stats
- if (!parent->should_send_op(peer, soid)) {
- dout(10) << "issue_repop shipping empty opt to osd." << peer
- <<", object " << soid
- << " beyond MAX(last_backfill_started "
- << ", pinfo.last_backfill "
- << pinfo.last_backfill << ")" << dendl;
- ObjectStore::Transaction t;
- ::encode(t, wr->get_data());
+ Message *wr;
+ uint64_t min_features = parent->min_peer_features();
+ if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
+ dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
+ wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
} else {
- ::encode(*op_t, wr->get_data());
+ wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
+ soid,
+ at_version,
+ tid,
+ reqid,
+ pg_trim_to,
+ pg_trim_rollback_to,
+ new_temp_oid,
+ discard_temp_oid,
+ log_entries,
+ hset_hist,
+ op,
+ op_t,
+ peer,
+ pinfo);
}
- ::encode(log_entries, wr->logbl);
-
- if (pinfo.is_incomplete())
- wr->pg_stats = pinfo.stats; // reflects backfill progress
- else
- wr->pg_stats = get_info().stats;
-
- wr->pg_trim_to = pg_trim_to;
- wr->pg_trim_rollback_to = pg_trim_rollback_to;
-
- wr->new_temp_oid = new_temp_oid;
- wr->discard_temp_oid = discard_temp_oid;
- wr->updated_hit_set_history = hset_hist;
-
get_parent()->send_message_osd_cluster(
peer.osd, wr, get_osdmap()->get_epoch());
}
}
// sub op modify
+void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
+ Message *m = op->get_req();
+ int msg_type = m->get_type();
+ if (msg_type == MSG_OSD_SUBOP) {
+ sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
+ } else if (msg_type == MSG_OSD_REPOP) {
+ sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
+ } else {
+ assert(0);
+ }
+}
-void ReplicatedBackend::sub_op_modify(OpRequestRef op)
+template<typename T, int MSGTYPE>
+void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
{
- MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
- assert(m->get_type() == MSG_OSD_SUBOP);
+ T *m = static_cast<T *>(op->get_req());
+ int msg_type = m->get_type();
+ assert(MSGTYPE == msg_type);
+ assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
const hobject_t& soid = m->poid;
- const char *opname;
- if (m->ops.size())
- opname = ceph_osd_op_name(m->ops[0].op.op);
- else
- opname = "trans";
-
- dout(10) << "sub_op_modify " << opname
+ dout(10) << "sub_op_modify trans"
<< " " << soid
<< " v " << m->version
<< (m->logbl.length() ? " (transaction)" : " (parallel exec")
p = m->logbl.begin();
::decode(log, p);
+ rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
bool update_snaps = false;
if (!rm->opt.empty()) {
// If the opt is non-empty, we infer we are before
dout(10) << "sub_op_modify_applied on " << rm << " op "
<< *rm->op->get_req() << dendl;
- MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
- assert(m->get_type() == MSG_OSD_SUBOP);
-
- if (!rm->committed) {
- // send ack to acker only if we haven't sent a commit already
- MOSDSubOpReply *ack = new MOSDSubOpReply(
- m, parent->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ Message *m = rm->op->get_req();
+
+ Message *ack = NULL;
+ eversion_t version;
+
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDSubOpReply(
+ req, parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
+ version = req->version;
+ if (!rm->committed)
+ ack = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ } else {
+ assert(0);
+ }
+
+ // send ack to acker only if we haven't sent a commit already
+ if (ack) {
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
get_parent()->send_message_osd_cluster(
rm->ackerosd, ack, get_osdmap()->get_epoch());
}
- parent->op_applied(m->version);
+ parent->op_applied(version);
}
void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
assert(get_osdmap()->is_up(rm->ackerosd));
get_parent()->update_last_complete_ondisk(rm->last_complete);
- MOSDSubOpReply *commit = new MOSDSubOpReply(
- static_cast<MOSDSubOp*>(rm->op->get_req()),
- get_parent()->whoami_shard(),
- 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
- commit->set_last_complete_ondisk(rm->last_complete);
+
+ Message *m = rm->op->get_req();
+ Message *commit;
+ if (m->get_type() == MSG_OSD_SUBOP) {
+ // doesn't have CLIENT SUBOP feature ,use Subop
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ static_cast<MOSDSubOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ } else if (m->get_type() == MSG_OSD_REPOP) {
+ MOSDRepOpReply *reply = new MOSDRepOpReply(
+ static_cast<MOSDRepOp*>(m),
+ get_parent()->whoami_shard(),
+ 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+ reply->set_last_complete_ondisk(rm->last_complete);
+ commit = reply;
+ }
+ else {
+ assert(0);
+ }
+
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
get_parent()->send_message_osd_cluster(
rm->ackerosd, commit, get_osdmap()->get_epoch());
pg_shard_t primary_shard() const {
return primary;
}
+ uint64_t min_peer_features() const {
+ return get_min_peer_features();
+ }
void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch);