From: Xiaoxi Chen Date: Fri, 21 Nov 2014 00:34:54 +0000 (+0800) Subject: Add MOSDRepOp and MOSDRepOpReply X-Git-Tag: v0.92~69^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=78d2d310ba50b89000aa5a51069e6702de2e8c80;p=ceph.git Add MOSDRepOp and MOSDRepOpReply Add the two new message type and change the corresponding code flow as well. Basically the idea to have MOSDRepOp is to seperate subop(read/write) out of other subop(pull/push,etc), so that we can cleanup some unused fields in the message type, then save some encoding/decoding overhead. The backward compatibility is also remian, if talking with old version OSD who doesn't support osd_client_subop/subopreply, will fall back to osd_subop/subopreply. Sage: rename MOSDClientSubOp -> MOSDRepOp Signed-off-by: Xiaoxi Chen Signed-off-by: Sage Weil --- diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index 151e547780eb..a723b97dca48 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -55,6 +55,7 @@ #define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 (1ULL<<44) #define CEPH_FEATURE_OSD_SET_ALLOC_HINT (1ULL<<45) #define CEPH_FEATURE_OSD_FADVISE_FLAGS (1ULL<<46) +#define CEPH_FEATURE_OSD_REPOP (1ULL<<46) /* overlap with fadvise */ #define CEPH_FEATURE_OSD_OBJECT_DIGEST (1ULL<<46) /* overlap with fadvise */ #define CEPH_FEATURE_MDS_QUOTA (1ULL<<47) @@ -136,6 +137,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) { CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 | \ CEPH_FEATURE_OSD_SET_ALLOC_HINT | \ CEPH_FEATURE_OSD_FADVISE_FLAGS | \ + CEPH_FEATURE_OSD_REPOP | \ CEPH_FEATURE_OSD_OBJECT_DIGEST | \ CEPH_FEATURE_MDS_QUOTA | \ 0ULL) diff --git a/src/messages/MOSDRepOp.h b/src/messages/MOSDRepOp.h new file mode 100644 index 000000000000..25cd09d970a9 --- /dev/null +++ b/src/messages/MOSDRepOp.h @@ -0,0 +1,138 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef CEPH_MOSDREPOP_H +#define CEPH_MOSDREPOP_H + +#include "msg/Message.h" +#include "osd/osd_types.h" + +/* + * OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever) + */ + +class MOSDRepOp : public Message { + + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; + +public: + epoch_t map_epoch; + + // metadata from original request + osd_reqid_t reqid; + + // subop + pg_shard_t from; + spg_t pgid; + hobject_t poid; + + __u8 acks_wanted; + + // transaction to exec + bufferlist logbl; + pg_stat_t pg_stats; + + // subop metadata + eversion_t version; + + // piggybacked osd/og state + eversion_t pg_trim_to; // primary->replica: trim to here + eversion_t pg_trim_rollback_to; // primary->replica: trim rollback + // info to here + + hobject_t new_temp_oid; ///< new temp object that we must now start tracking + hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking + + /// non-empty if this transaction involves a hit_set history update + boost::optional updated_hit_set_history; + + int get_cost() const { + return data.length(); + } + + virtual void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(map_epoch, p); + ::decode(reqid, p); + ::decode(pgid, p); + ::decode(poid, p); + + ::decode(acks_wanted, p); + ::decode(version, p); + ::decode(logbl, p); + ::decode(pg_stats, p); + ::decode(pg_trim_to, p); + + + ::decode(new_temp_oid, p); + ::decode(discard_temp_oid, p); + + ::decode(from, p); + ::decode(updated_hit_set_history, p); + ::decode(pg_trim_rollback_to, p); + } + + virtual void encode_payload(uint64_t features) { + ::encode(map_epoch, payload); + ::encode(reqid, payload); + ::encode(pgid, payload); + ::encode(poid, payload); + + ::encode(acks_wanted, payload); + ::encode(version, payload); + ::encode(logbl, payload); + ::encode(pg_stats, payload); + ::encode(pg_trim_to, payload); + ::encode(new_temp_oid, payload); + ::encode(discard_temp_oid, payload); + ::encode(from, payload); + ::encode(updated_hit_set_history, payload); + ::encode(pg_trim_rollback_to, payload); + } + + MOSDRepOp() + : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION) { } + MOSDRepOp(osd_reqid_t r, pg_shard_t from, + spg_t p, const hobject_t& po, int aw, + epoch_t mape, ceph_tid_t rtid, eversion_t v) + : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION), + map_epoch(mape), + reqid(r), + from(from), + pgid(p), + poid(po), + acks_wanted(aw), + version(v) { + set_tid(rtid); + } +private: + ~MOSDRepOp() {} + +public: + const char *get_type_name() const { return "osd_repop"; } + void print(ostream& out) const { + out << "osd_repop(" << reqid + << " " << pgid + << " " << poid; + out << " v " << version; + if (updated_hit_set_history) + out << ", has_updated_hit_set_history"; + out << ")"; + } +}; + + +#endif diff --git a/src/messages/MOSDRepOpReply.h b/src/messages/MOSDRepOpReply.h new file mode 100644 index 000000000000..957502dddddf --- /dev/null +++ b/src/messages/MOSDRepOpReply.h @@ -0,0 +1,120 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + + +#ifndef CEPH_MOSDREPOPREPLY_H +#define CEPH_MOSDREPOPREPLY_H + +#include "msg/Message.h" + +#include "os/ObjectStore.h" + +/* + * OSD Client Subop reply + * + * oid - object id + * op - OSD_OP_DELETE, etc. + * + */ + +class MOSDRepOpReply : public Message { + static const int HEAD_VERSION = 1; + static const int COMPAT_VERSION = 1; +public: + epoch_t map_epoch; + + // subop metadata + osd_reqid_t reqid; + pg_shard_t from; + spg_t pgid; + + // result + __u8 ack_type; + int32_t result; + + // piggybacked osd state + eversion_t last_complete_ondisk; + + + virtual void decode_payload() { + bufferlist::iterator p = payload.begin(); + ::decode(map_epoch, p); + ::decode(reqid, p); + ::decode(pgid, p); + + ::decode(ack_type, p); + ::decode(result, p); + ::decode(last_complete_ondisk, p); + + ::decode(from, p); + } + virtual void encode_payload(uint64_t features) { + ::encode(map_epoch, payload); + ::encode(reqid, payload); + ::encode(pgid, payload); + ::encode(ack_type, payload); + ::encode(result, payload); + ::encode(last_complete_ondisk, payload); + ::encode(from, payload); + } + + epoch_t get_map_epoch() { return map_epoch; } + + spg_t get_pg() { return pgid; } + + int get_ack_type() { return ack_type; } + bool is_ondisk() { return ack_type & CEPH_OSD_FLAG_ONDISK; } + bool is_onnvram() { return ack_type & CEPH_OSD_FLAG_ONNVRAM; } + + int get_result() { return result; } + + void set_last_complete_ondisk(eversion_t v) { last_complete_ondisk = v; } + eversion_t get_last_complete_ondisk() { return last_complete_ondisk; } + +public: + MOSDRepOpReply( + MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) : + Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION), + map_epoch(e), + reqid(req->reqid), + from(from), + pgid(req->pgid.pgid, req->from.shard), + ack_type(at), + result(result_) { + set_tid(req->get_tid()); + } + MOSDRepOpReply() : Message(MSG_OSD_REPOPREPLY) {} +private: + ~MOSDRepOpReply() {} + +public: + const char *get_type_name() const { return "osd_repop_reply"; } + + void print(ostream& out) const { + out << "osd_repop_reply(" << reqid + << " " << pgid; + if (ack_type & CEPH_OSD_FLAG_ONDISK) + out << " ondisk"; + if (ack_type & CEPH_OSD_FLAG_ONNVRAM) + out << " onnvram"; + if (ack_type & CEPH_OSD_FLAG_ACK) + out << " ack"; + out << ", result = " << result; + out << ")"; + } + +}; + + +#endif diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h index 7b441ae31f48..a084246e7f94 100644 --- a/src/messages/MOSDSubOpReply.h +++ b/src/messages/MOSDSubOpReply.h @@ -144,7 +144,7 @@ private: ~MOSDSubOpReply() {} public: - const char *get_type_name() const { return "osd_op_reply"; } + const char *get_type_name() const { return "osd_subop_reply"; } void print(ostream& out) const { out << "osd_sub_op_reply(" << reqid diff --git a/src/messages/Makefile.am b/src/messages/Makefile.am index e9a74a3e1b3f..d1be28b0614a 100644 --- a/src/messages/Makefile.am +++ b/src/messages/Makefile.am @@ -104,6 +104,8 @@ noinst_HEADERS += \ messages/MOSDScrub.h \ messages/MOSDSubOp.h \ messages/MOSDSubOpReply.h \ + messages/MOSDRepOp.h \ + messages/MOSDRepOpReply.h \ messages/MPGStats.h \ messages/MPGStatsAck.h \ messages/MPing.h \ diff --git a/src/msg/Message.cc b/src/msg/Message.cc index 0fffe55c7ff4..9b652f1293b4 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -65,6 +65,8 @@ using namespace std; #include "messages/MOSDOpReply.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" #include "messages/MOSDMap.h" #include "messages/MMonGetOSDMap.h" @@ -419,6 +421,12 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot case MSG_OSD_SUBOPREPLY: m = new MOSDSubOpReply(); break; + case MSG_OSD_REPOP: + m = new MOSDRepOp(); + break; + case MSG_OSD_REPOPREPLY: + m = new MOSDRepOpReply(); + break; case CEPH_MSG_OSD_MAP: m = new MOSDMap; diff --git a/src/msg/Message.h b/src/msg/Message.h index 2bedde5b9aed..bf447ab1f982 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -114,6 +114,10 @@ #define MSG_OSD_EC_READ 110 #define MSG_OSD_EC_READ_REPLY 111 +#define MSG_OSD_REPOP 112 +#define MSG_OSD_REPOPREPLY 113 + + // *** MDS *** #define MSG_MDS_BEACON 100 // to monitor diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index c50284ad45aa..ac0c9c749b20 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -65,6 +65,8 @@ #include "messages/MOSDMarkMeDown.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" #include "messages/MOSDBoot.h" @@ -5535,9 +5537,14 @@ epoch_t op_required_epoch(OpRequestRef op) } case MSG_OSD_SUBOP: return replica_op_required_epoch(op); + case MSG_OSD_REPOP: + return replica_op_required_epoch(op); case MSG_OSD_SUBOPREPLY: return replica_op_required_epoch( op); + case MSG_OSD_REPOPREPLY: + return replica_op_required_epoch( + op); case MSG_OSD_PG_PUSH: return replica_op_required_epoch( op); @@ -5638,9 +5645,15 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap) case MSG_OSD_SUBOP: handle_replica_op(op, osdmap); break; + case MSG_OSD_REPOP: + handle_replica_op(op, osdmap); + break; case MSG_OSD_SUBOPREPLY: handle_replica_op(op, osdmap); break; + case MSG_OSD_REPOPREPLY: + handle_replica_op(op, osdmap); + break; case MSG_OSD_PG_PUSH: handle_replica_op(op, osdmap); break; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index aeb857b140bd..c80920aa7860 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -2228,7 +2228,9 @@ protected: switch (m->get_type()) { case CEPH_MSG_OSD_OP: case MSG_OSD_SUBOP: + case MSG_OSD_REPOP: case MSG_OSD_SUBOPREPLY: + case MSG_OSD_REPOPREPLY: case MSG_OSD_PG_PUSH: case MSG_OSD_PG_PULL: case MSG_OSD_PG_PUSH_REPLY: diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 5de80f30d036..52d957141b3f 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -9,6 +9,7 @@ #include "msg/Message.h" #include "messages/MOSDOp.h" #include "messages/MOSDSubOp.h" +#include "messages/MOSDRepOp.h" #include "include/assert.h" #include "osd/osd_types.h" @@ -30,6 +31,8 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) : reqid = static_cast(req)->get_reqid(); } else if (req->get_type() == MSG_OSD_SUBOP) { reqid = static_cast(req)->reqid; + } else if (req->get_type() == MSG_OSD_REPOP) { + reqid = static_cast(req)->reqid; } tracker->mark_event(this, "header_read", request->get_recv_stamp()); tracker->mark_event(this, "throttled", request->get_throttle_stamp()); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 80c9d74149f0..e8bbc969e339 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -39,7 +39,9 @@ #include "messages/MOSDECSubOpReadReply.h" #include "messages/MOSDSubOp.h" +#include "messages/MOSDRepOp.h" #include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOpReply.h" #include "common/BackTrace.h" #ifdef WITH_LTTNG @@ -4993,6 +4995,8 @@ bool PG::can_discard_request(OpRequestRef& op) return can_discard_op(op); case MSG_OSD_SUBOP: return can_discard_replica_op(op); + case MSG_OSD_REPOP: + return can_discard_replica_op(op); case MSG_OSD_PG_PUSH: return can_discard_replica_op(op); case MSG_OSD_PG_PULL: @@ -5001,6 +5005,8 @@ bool PG::can_discard_request(OpRequestRef& op) return can_discard_replica_op(op); case MSG_OSD_SUBOPREPLY: return can_discard_replica_op(op); + case MSG_OSD_REPOPREPLY: + return can_discard_replica_op(op); case MSG_OSD_EC_WRITE: return can_discard_replica_op(op); @@ -5032,11 +5038,21 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op) cur_epoch, static_cast(op->get_req())->map_epoch); + case MSG_OSD_REPOP: + return !have_same_or_newer_map( + cur_epoch, + static_cast(op->get_req())->map_epoch); + case MSG_OSD_SUBOPREPLY: return !have_same_or_newer_map( cur_epoch, static_cast(op->get_req())->map_epoch); + case MSG_OSD_REPOPREPLY: + return !have_same_or_newer_map( + cur_epoch, + static_cast(op->get_req())->map_epoch); + case MSG_OSD_PG_SCAN: return !have_same_or_newer_map( cur_epoch, diff --git a/src/osd/PG.h b/src/osd/PG.h index 6cc7aa472b21..0e02dcaa73c8 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -2013,7 +2013,7 @@ public: int get_nrep() const { return acting.size(); } void reset_peer_features() { peer_features = (uint64_t)-1; } - uint64_t get_min_peer_features() { return peer_features; } + uint64_t get_min_peer_features() const { return peer_features; } void apply_peer_features(uint64_t f) { peer_features &= f; } void init_primary_up_acting( diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 614cb9f0232c..3a99c53832a3 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -204,6 +204,8 @@ virtual spg_t primary_spg_t() const = 0; virtual pg_shard_t primary_shard() const = 0; + virtual uint64_t min_peer_features() const = 0; + virtual void send_message_osd_cluster( int peer, Message *m, epoch_t from_epoch) = 0; virtual void send_message_osd_cluster( diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index e646b2e4ff15..c1c7217f91b6 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -15,7 +15,9 @@ #include "ReplicatedBackend.h" #include "messages/MOSDOp.h" #include "messages/MOSDSubOp.h" +#include "messages/MOSDRepOp.h" #include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOpReply.h" #include "messages/MOSDPGPush.h" #include "messages/MOSDPGPull.h" #include "messages/MOSDPGPushReply.h" @@ -163,6 +165,11 @@ bool ReplicatedBackend::handle_message( break; } + case MSG_OSD_REPOP: { + sub_op_modify(op); + return true; + } + case MSG_OSD_SUBOPREPLY: { MOSDSubOpReply *r = static_cast(op->get_req()); if (r->ops.size() >= 1) { @@ -173,13 +180,19 @@ bool ReplicatedBackend::handle_message( sub_op_push_reply(op); return true; } - } else { - sub_op_modify_reply(op); + } + else { + sub_op_modify_reply(op); return true; } break; } + case MSG_OSD_REPOPREPLY: { + sub_op_modify_reply(op); + return true; + } + default: break; } @@ -626,10 +639,12 @@ void ReplicatedBackend::op_commit( } } +template void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op) { - MOSDSubOpReply *r = static_cast(op->get_req()); - assert(r->get_type() == MSG_OSD_SUBOPREPLY); + T *r = static_cast(op->get_req()); + assert(r->get_header().type == MSGTYPE); + assert(MSGTYPE == MSG_OSD_SUBOPREPLY || MSGTYPE == MSG_OSD_REPOPREPLY); op->mark_started(); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index 927ebb87a270..6784bdffc284 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -355,6 +355,22 @@ public: ); private: + template + Message * generate_subop( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + vector &log_entries, + boost::optional &hset_history, + InProgressOp *op, + ObjectStore::Transaction *op_t, + pg_shard_t peer, + const pg_info_t &pinfo); void issue_op( const hobject_t &soid, const eversion_t &at_version, @@ -370,8 +386,11 @@ private: ObjectStore::Transaction *op_t); void op_applied(InProgressOp *op); void op_commit(InProgressOp *op); + template void sub_op_modify_reply(OpRequestRef op); void sub_op_modify(OpRequestRef op); + template + void sub_op_modify_impl(OpRequestRef op); struct RepModify { OpRequestRef op; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index aeb138f93202..147e0286273a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -28,6 +28,8 @@ #include "messages/MOSDOpReply.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" +#include "messages/MOSDRepOp.h" +#include "messages/MOSDRepOpReply.h" #include "messages/MOSDPGNotify.h" #include "messages/MOSDPGInfo.h" @@ -7334,7 +7336,63 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) repop->ctx->op); repop->ctx->op_t = NULL; } + +template +Message * ReplicatedBackend::generate_subop( + const hobject_t &soid, + const eversion_t &at_version, + ceph_tid_t tid, + osd_reqid_t reqid, + eversion_t pg_trim_to, + eversion_t pg_trim_rollback_to, + hobject_t new_temp_oid, + hobject_t discard_temp_oid, + vector &log_entries, + boost::optional &hset_hist, + InProgressOp *op, + ObjectStore::Transaction *op_t, + pg_shard_t peer, + const pg_info_t &pinfo) +{ + int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP); + // forward the write/update/whatever + T *wr = new T( + reqid, parent->whoami_shard(), + spg_t(get_info().pgid.pgid, peer.shard), + soid, acks_wanted, + get_osdmap()->get_epoch(), + tid, at_version); + + // ship resulting transaction, log entries, and pg_stats + if (!parent->should_send_op(peer, soid)) { + dout(10) << "issue_repop shipping empty opt to osd." << peer + <<", object " << soid + << " beyond MAX(last_backfill_started " + << ", pinfo.last_backfill " + << pinfo.last_backfill << ")" << dendl; + ObjectStore::Transaction t; + ::encode(t, wr->get_data()); + } else { + ::encode(*op_t, wr->get_data()); + } + + ::encode(log_entries, wr->logbl); + + if (pinfo.is_incomplete()) + wr->pg_stats = pinfo.stats; // reflects backfill progress + else + wr->pg_stats = get_info().stats; + wr->pg_trim_to = pg_trim_to; + wr->pg_trim_rollback_to = pg_trim_rollback_to; + + wr->new_temp_oid = new_temp_oid; + wr->discard_temp_oid = discard_temp_oid; + wr->updated_hit_set_history = hset_hist; + return wr; +} + void ReplicatedBackend::issue_op( const hobject_t &soid, const eversion_t &at_version, @@ -7349,7 +7407,6 @@ void ReplicatedBackend::issue_op( InProgressOp *op, ObjectStore::Transaction *op_t) { - int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; if (parent->get_actingbackfill_shards().size() > 1) { ostringstream ss; @@ -7367,42 +7424,43 @@ void ReplicatedBackend::issue_op( pg_shard_t peer = *i; const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second; - // forward the write/update/whatever - MOSDSubOp *wr = new MOSDSubOp( - reqid, parent->whoami_shard(), - spg_t(get_info().pgid.pgid, i->shard), - soid, - acks_wanted, - get_osdmap()->get_epoch(), - tid, at_version); - - // ship resulting transaction, log entries, and pg_stats - if (!parent->should_send_op(peer, soid)) { - dout(10) << "issue_repop shipping empty opt to osd." << peer - <<", object " << soid - << " beyond MAX(last_backfill_started " - << ", pinfo.last_backfill " - << pinfo.last_backfill << ")" << dendl; - ObjectStore::Transaction t; - ::encode(t, wr->get_data()); + Message *wr; + uint64_t min_features = parent->min_peer_features(); + if (!(min_features & CEPH_FEATURE_OSD_REPOP)) { + dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl; + wr = generate_subop( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); } else { - ::encode(*op_t, wr->get_data()); + wr = generate_subop( + soid, + at_version, + tid, + reqid, + pg_trim_to, + pg_trim_rollback_to, + new_temp_oid, + discard_temp_oid, + log_entries, + hset_hist, + op, + op_t, + peer, + pinfo); } - ::encode(log_entries, wr->logbl); - - if (pinfo.is_incomplete()) - wr->pg_stats = pinfo.stats; // reflects backfill progress - else - wr->pg_stats = get_info().stats; - - wr->pg_trim_to = pg_trim_to; - wr->pg_trim_rollback_to = pg_trim_rollback_to; - - wr->new_temp_oid = new_temp_oid; - wr->discard_temp_oid = discard_temp_oid; - wr->updated_hit_set_history = hset_hist; - get_parent()->send_message_osd_cluster( peer.osd, wr, get_osdmap()->get_epoch()); } @@ -8112,21 +8170,29 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) } // sub op modify +void ReplicatedBackend::sub_op_modify(OpRequestRef op) { + Message *m = op->get_req(); + int msg_type = m->get_type(); + if (msg_type == MSG_OSD_SUBOP) { + sub_op_modify_impl(op); + } else if (msg_type == MSG_OSD_REPOP) { + sub_op_modify_impl(op); + } else { + assert(0); + } +} -void ReplicatedBackend::sub_op_modify(OpRequestRef op) +template +void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->get_req()); - assert(m->get_type() == MSG_OSD_SUBOP); + T *m = static_cast(op->get_req()); + int msg_type = m->get_type(); + assert(MSGTYPE == msg_type); + assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP); const hobject_t& soid = m->poid; - const char *opname; - if (m->ops.size()) - opname = ceph_osd_op_name(m->ops[0].op.op); - else - opname = "trans"; - - dout(10) << "sub_op_modify " << opname + dout(10) << "sub_op_modify trans" << " " << soid << " v " << m->version << (m->logbl.length() ? " (transaction)" : " (parallel exec") @@ -8174,6 +8240,8 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op) p = m->logbl.begin(); ::decode(log, p); + rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + bool update_snaps = false; if (!rm->opt.empty()) { // If the opt is non-empty, we infer we are before @@ -8213,20 +8281,38 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm) dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl; - MOSDSubOp *m = static_cast(rm->op->get_req()); - assert(m->get_type() == MSG_OSD_SUBOP); - - if (!rm->committed) { - // send ack to acker only if we haven't sent a commit already - MOSDSubOpReply *ack = new MOSDSubOpReply( - m, parent->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + Message *m = rm->op->get_req(); + + Message *ack = NULL; + eversion_t version; + + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOp *req = static_cast(m); + version = req->version; + if (!rm->committed) + ack = new MOSDSubOpReply( + req, parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOp *req = static_cast(m); + version = req->version; + if (!rm->committed) + ack = new MOSDRepOpReply( + static_cast(m), parent->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK); + } else { + assert(0); + } + + // send ack to acker only if we haven't sent a commit already + if (ack) { ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority! get_parent()->send_message_osd_cluster( rm->ackerosd, ack, get_osdmap()->get_epoch()); } - parent->op_applied(m->version); + parent->op_applied(version); } void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) @@ -8241,11 +8327,29 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm) assert(get_osdmap()->is_up(rm->ackerosd)); get_parent()->update_last_complete_ondisk(rm->last_complete); - MOSDSubOpReply *commit = new MOSDSubOpReply( - static_cast(rm->op->get_req()), - get_parent()->whoami_shard(), - 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); - commit->set_last_complete_ondisk(rm->last_complete); + + Message *m = rm->op->get_req(); + Message *commit; + if (m->get_type() == MSG_OSD_SUBOP) { + // doesn't have CLIENT SUBOP feature ,use Subop + MOSDSubOpReply *reply = new MOSDSubOpReply( + static_cast(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } else if (m->get_type() == MSG_OSD_REPOP) { + MOSDRepOpReply *reply = new MOSDRepOpReply( + static_cast(m), + get_parent()->whoami_shard(), + 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + reply->set_last_complete_ondisk(rm->last_complete); + commit = reply; + } + else { + assert(0); + } + commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! get_parent()->send_message_osd_cluster( rm->ackerosd, commit, get_osdmap()->get_epoch()); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c6e3a39d36ea..687455826b3a 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -414,6 +414,9 @@ public: pg_shard_t primary_shard() const { return primary; } + uint64_t min_peer_features() const { + return get_min_peer_features(); + } void send_message_osd_cluster( int peer, Message *m, epoch_t from_epoch);