]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Add MOSDRepOp and MOSDRepOpReply
authorXiaoxi Chen <xiaoxi.chen@intel.com>
Fri, 21 Nov 2014 00:34:54 +0000 (08:34 +0800)
committerSage Weil <sage@redhat.com>
Mon, 5 Jan 2015 20:53:58 +0000 (12:53 -0800)
Add the two new message type and change the corresponding code flow as well.

Basically the idea to have MOSDRepOp is to seperate subop(read/write)
out of other subop(pull/push,etc), so that we can cleanup some unused fields in
the message type, then save some encoding/decoding overhead.

The backward compatibility is also remian, if talking with old version OSD who
doesn't support osd_client_subop/subopreply, will fall back to osd_subop/subopreply.

Sage: rename MOSDClientSubOp -> MOSDRepOp

Signed-off-by: Xiaoxi Chen <xiaoxi.chen@intel.com>
Signed-off-by: Sage Weil <sage@redhat.com>
17 files changed:
src/include/ceph_features.h
src/messages/MOSDRepOp.h [new file with mode: 0644]
src/messages/MOSDRepOpReply.h [new file with mode: 0644]
src/messages/MOSDSubOpReply.h
src/messages/Makefile.am
src/msg/Message.cc
src/msg/Message.h
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OpRequest.cc
src/osd/PG.cc
src/osd/PG.h
src/osd/PGBackend.h
src/osd/ReplicatedBackend.cc
src/osd/ReplicatedBackend.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 151e547780ebd5332119f773f6072dc07f6e07ab..a723b97dca48e33e5c585943bbca0d4669b7dab8 100644 (file)
@@ -55,6 +55,7 @@
 #define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 (1ULL<<44)
 #define CEPH_FEATURE_OSD_SET_ALLOC_HINT (1ULL<<45)
 #define CEPH_FEATURE_OSD_FADVISE_FLAGS (1ULL<<46)
+#define CEPH_FEATURE_OSD_REPOP         (1ULL<<46)   /* overlap with fadvise */
 #define CEPH_FEATURE_OSD_OBJECT_DIGEST  (1ULL<<46)  /* overlap with fadvise */
 #define CEPH_FEATURE_MDS_QUOTA      (1ULL<<47)
 
@@ -136,6 +137,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
          CEPH_FEATURE_ERASURE_CODE_PLUGINS_V2 |   \
          CEPH_FEATURE_OSD_SET_ALLOC_HINT |   \
         CEPH_FEATURE_OSD_FADVISE_FLAGS |     \
+         CEPH_FEATURE_OSD_REPOP |   \
         CEPH_FEATURE_OSD_OBJECT_DIGEST |    \
         CEPH_FEATURE_MDS_QUOTA | \
         0ULL)
diff --git a/src/messages/MOSDRepOp.h b/src/messages/MOSDRepOp.h
new file mode 100644 (file)
index 0000000..25cd09d
--- /dev/null
@@ -0,0 +1,138 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_MOSDREPOP_H
+#define CEPH_MOSDREPOP_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+/*
+ * OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever)
+ */
+
+class MOSDRepOp : public Message {
+
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+
+public:
+  epoch_t map_epoch;
+
+  // metadata from original request
+  osd_reqid_t reqid;
+
+  // subop
+  pg_shard_t from;
+  spg_t pgid;
+  hobject_t poid;
+
+  __u8 acks_wanted;
+
+  // transaction to exec
+  bufferlist logbl;
+  pg_stat_t pg_stats;
+
+  // subop metadata
+  eversion_t version;
+
+  // piggybacked osd/og state
+  eversion_t pg_trim_to;   // primary->replica: trim to here
+  eversion_t pg_trim_rollback_to;   // primary->replica: trim rollback
+                                    // info to here
+
+  hobject_t new_temp_oid;      ///< new temp object that we must now start tracking
+  hobject_t discard_temp_oid;  ///< previously used temp object that we can now stop tracking
+
+  /// non-empty if this transaction involves a hit_set history update
+  boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+
+  int get_cost() const {
+    return data.length();
+  }
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(map_epoch, p);
+    ::decode(reqid, p);
+    ::decode(pgid, p);
+    ::decode(poid, p);
+
+    ::decode(acks_wanted, p);
+    ::decode(version, p);
+    ::decode(logbl, p);
+    ::decode(pg_stats, p);
+    ::decode(pg_trim_to, p);
+
+
+    ::decode(new_temp_oid, p);
+    ::decode(discard_temp_oid, p);
+
+    ::decode(from, p);
+    ::decode(updated_hit_set_history, p);
+    ::decode(pg_trim_rollback_to, p);
+  }
+
+  virtual void encode_payload(uint64_t features) {
+    ::encode(map_epoch, payload);
+    ::encode(reqid, payload);
+    ::encode(pgid, payload);
+    ::encode(poid, payload);
+
+    ::encode(acks_wanted, payload);
+    ::encode(version, payload);
+    ::encode(logbl, payload);
+    ::encode(pg_stats, payload);
+    ::encode(pg_trim_to, payload);
+    ::encode(new_temp_oid, payload);
+    ::encode(discard_temp_oid, payload);
+    ::encode(from, payload);
+    ::encode(updated_hit_set_history, payload);
+    ::encode(pg_trim_rollback_to, payload);
+  }
+
+  MOSDRepOp()
+    : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION) { }
+  MOSDRepOp(osd_reqid_t r, pg_shard_t from,
+           spg_t p, const hobject_t& po, int aw,
+           epoch_t mape, ceph_tid_t rtid, eversion_t v)
+    : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
+      map_epoch(mape),
+      reqid(r),
+      from(from),
+      pgid(p),
+      poid(po),
+      acks_wanted(aw),
+      version(v) {
+    set_tid(rtid);
+  }
+private:
+  ~MOSDRepOp() {}
+
+public:
+  const char *get_type_name() const { return "osd_repop"; }
+  void print(ostream& out) const {
+    out << "osd_repop(" << reqid
+       << " " << pgid
+       << " " << poid;
+    out << " v " << version;
+    if (updated_hit_set_history)
+      out << ", has_updated_hit_set_history";
+    out << ")";
+  }
+};
+
+
+#endif
diff --git a/src/messages/MOSDRepOpReply.h b/src/messages/MOSDRepOpReply.h
new file mode 100644 (file)
index 0000000..957502d
--- /dev/null
@@ -0,0 +1,120 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef CEPH_MOSDREPOPREPLY_H
+#define CEPH_MOSDREPOPREPLY_H
+
+#include "msg/Message.h"
+
+#include "os/ObjectStore.h"
+
+/*
+ * OSD Client Subop reply
+ *
+ * oid - object id
+ * op  - OSD_OP_DELETE, etc.
+ *
+ */
+
+class MOSDRepOpReply : public Message {
+  static const int HEAD_VERSION = 1;
+  static const int COMPAT_VERSION = 1;
+public:
+  epoch_t map_epoch;
+
+  // subop metadata
+  osd_reqid_t reqid;
+  pg_shard_t from;
+  spg_t pgid;
+
+  // result
+  __u8 ack_type;
+  int32_t result;
+
+  // piggybacked osd state
+  eversion_t last_complete_ondisk;
+
+
+  virtual void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    ::decode(map_epoch, p);
+    ::decode(reqid, p);
+    ::decode(pgid, p);
+
+    ::decode(ack_type, p);
+    ::decode(result, p);
+    ::decode(last_complete_ondisk, p);
+
+    ::decode(from, p);
+  }
+  virtual void encode_payload(uint64_t features) {
+    ::encode(map_epoch, payload);
+    ::encode(reqid, payload);
+    ::encode(pgid, payload);
+    ::encode(ack_type, payload);
+    ::encode(result, payload);
+    ::encode(last_complete_ondisk, payload);
+    ::encode(from, payload);
+  }
+
+  epoch_t get_map_epoch() { return map_epoch; }
+
+  spg_t get_pg() { return pgid; }
+
+  int get_ack_type() { return ack_type; }
+  bool is_ondisk() { return ack_type & CEPH_OSD_FLAG_ONDISK; }
+  bool is_onnvram() { return ack_type & CEPH_OSD_FLAG_ONNVRAM; }
+
+  int get_result() { return result; }
+
+  void set_last_complete_ondisk(eversion_t v) { last_complete_ondisk = v; }
+  eversion_t get_last_complete_ondisk() { return last_complete_ondisk; }
+
+public:
+  MOSDRepOpReply(
+    MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
+    Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
+    map_epoch(e),
+    reqid(req->reqid),
+    from(from),
+    pgid(req->pgid.pgid, req->from.shard),
+    ack_type(at),
+    result(result_) {
+    set_tid(req->get_tid());
+  }
+  MOSDRepOpReply() : Message(MSG_OSD_REPOPREPLY) {}
+private:
+  ~MOSDRepOpReply() {}
+
+public:
+  const char *get_type_name() const { return "osd_repop_reply"; }
+
+  void print(ostream& out) const {
+    out << "osd_repop_reply(" << reqid
+       << " " << pgid;
+    if (ack_type & CEPH_OSD_FLAG_ONDISK)
+      out << " ondisk";
+    if (ack_type & CEPH_OSD_FLAG_ONNVRAM)
+      out << " onnvram";
+    if (ack_type & CEPH_OSD_FLAG_ACK)
+      out << " ack";
+    out << ", result = " << result;
+    out << ")";
+  }
+
+};
+
+
+#endif
index 7b441ae31f48fa8597d6c2ba8825b5bd090521ed..a084246e7f94e3aa483387a786b9367738634d1d 100644 (file)
@@ -144,7 +144,7 @@ private:
   ~MOSDSubOpReply() {}
 
 public:
-  const char *get_type_name() const { return "osd_op_reply"; }
+  const char *get_type_name() const { return "osd_subop_reply"; }
   
   void print(ostream& out) const {
     out << "osd_sub_op_reply(" << reqid
index e9a74a3e1b3fc64447ce72dec6d81b1955896787..d1be28b0614a94328a76c0b3182e2074bd4e9024 100644 (file)
@@ -104,6 +104,8 @@ noinst_HEADERS += \
        messages/MOSDScrub.h \
        messages/MOSDSubOp.h \
        messages/MOSDSubOpReply.h \
+       messages/MOSDRepOp.h \
+       messages/MOSDRepOpReply.h \
        messages/MPGStats.h \
        messages/MPGStatsAck.h \
        messages/MPing.h \
index 0fffe55c7ff48777604e54c919555e4c0a8f5c55..9b652f1293b47efc2936672147e7776e7807af6c 100644 (file)
@@ -65,6 +65,8 @@ using namespace std;
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDMap.h"
 #include "messages/MMonGetOSDMap.h"
 
@@ -419,6 +421,12 @@ Message *decode_message(CephContext *cct, ceph_msg_header& header, ceph_msg_foot
   case MSG_OSD_SUBOPREPLY:
     m = new MOSDSubOpReply();
     break;
+  case MSG_OSD_REPOP:
+    m = new MOSDRepOp();
+    break;
+  case MSG_OSD_REPOPREPLY:
+    m = new MOSDRepOpReply();
+    break;
 
   case CEPH_MSG_OSD_MAP:
     m = new MOSDMap;
index 2bedde5b9aed87e9874a6a5f71587acd5b0d3359..bf447ab1f982236ab3efe793425190397c04a368 100644 (file)
 #define MSG_OSD_EC_READ        110
 #define MSG_OSD_EC_READ_REPLY  111
 
+#define MSG_OSD_REPOP         112
+#define MSG_OSD_REPOPREPLY    113
+
+
 // *** MDS ***
 
 #define MSG_MDS_BEACON             100  // to monitor
index c50284ad45aad85cc0b942dd70b234c448e1a531..ac0c9c749b208c1e6f69bd9a837b6b508581107e 100644 (file)
@@ -65,6 +65,8 @@
 #include "messages/MOSDMarkMeDown.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
 #include "messages/MOSDBoot.h"
@@ -5535,9 +5537,14 @@ epoch_t op_required_epoch(OpRequestRef op)
   }
   case MSG_OSD_SUBOP:
     return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  case MSG_OSD_REPOP:
+    return replica_op_required_epoch<MOSDRepOp, MSG_OSD_REPOP>(op);
   case MSG_OSD_SUBOPREPLY:
     return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
       op);
+  case MSG_OSD_REPOPREPLY:
+    return replica_op_required_epoch<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(
+      op);
   case MSG_OSD_PG_PUSH:
     return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
       op);
@@ -5638,9 +5645,15 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
   case MSG_OSD_SUBOP:
     handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
     break;
+  case MSG_OSD_REPOP:
+    handle_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op, osdmap);
+    break;
   case MSG_OSD_SUBOPREPLY:
     handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
     break;
+  case MSG_OSD_REPOPREPLY:
+    handle_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op, osdmap);
+    break;
   case MSG_OSD_PG_PUSH:
     handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
     break;
index aeb857b140bd791f7ae1be0a140e50ca3e848897..c80920aa78602c2fe1c6ae44c4ae6e223a1caa56 100644 (file)
@@ -2228,7 +2228,9 @@ protected:
     switch (m->get_type()) {
     case CEPH_MSG_OSD_OP:
     case MSG_OSD_SUBOP:
+    case MSG_OSD_REPOP:
     case MSG_OSD_SUBOPREPLY:
+    case MSG_OSD_REPOPREPLY:
     case MSG_OSD_PG_PUSH:
     case MSG_OSD_PG_PULL:
     case MSG_OSD_PG_PUSH_REPLY:
index 5de80f30d036f97b61e0ea3d25ec3245e788e0f6..52d957141b3fe94873e88a8a3913b105337bd06c 100644 (file)
@@ -9,6 +9,7 @@
 #include "msg/Message.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
 #include "include/assert.h"
 #include "osd/osd_types.h"
 
@@ -30,6 +31,8 @@ OpRequest::OpRequest(Message *req, OpTracker *tracker) :
     reqid = static_cast<MOSDOp*>(req)->get_reqid();
   } else if (req->get_type() == MSG_OSD_SUBOP) {
     reqid = static_cast<MOSDSubOp*>(req)->reqid;
+  } else if (req->get_type() == MSG_OSD_REPOP) {
+    reqid = static_cast<MOSDRepOp*>(req)->reqid;
   }
   tracker->mark_event(this, "header_read", request->get_recv_stamp());
   tracker->mark_event(this, "throttled", request->get_throttle_stamp());
index 80c9d74149f0c3a00f2f0fff90162b577b284bfe..e8bbc969e339d0a3fbfdca86130a0ae725648d73 100644 (file)
@@ -39,7 +39,9 @@
 #include "messages/MOSDECSubOpReadReply.h"
 
 #include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
 #include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOpReply.h"
 #include "common/BackTrace.h"
 
 #ifdef WITH_LTTNG
@@ -4993,6 +4995,8 @@ bool PG::can_discard_request(OpRequestRef& op)
     return can_discard_op(op);
   case MSG_OSD_SUBOP:
     return can_discard_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  case MSG_OSD_REPOP:
+    return can_discard_replica_op<MOSDRepOp, MSG_OSD_REPOP>(op);
   case MSG_OSD_PG_PUSH:
     return can_discard_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op);
   case MSG_OSD_PG_PULL:
@@ -5001,6 +5005,8 @@ bool PG::can_discard_request(OpRequestRef& op)
     return can_discard_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op);
   case MSG_OSD_SUBOPREPLY:
     return can_discard_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
+  case MSG_OSD_REPOPREPLY:
+    return can_discard_replica_op<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
 
   case MSG_OSD_EC_WRITE:
     return can_discard_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
@@ -5032,11 +5038,21 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
       cur_epoch,
       static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
 
+  case MSG_OSD_REPOP:
+    return !have_same_or_newer_map(
+      cur_epoch,
+      static_cast<MOSDRepOp*>(op->get_req())->map_epoch);
+
   case MSG_OSD_SUBOPREPLY:
     return !have_same_or_newer_map(
       cur_epoch,
       static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
 
+  case MSG_OSD_REPOPREPLY:
+    return !have_same_or_newer_map(
+      cur_epoch,
+      static_cast<MOSDRepOpReply*>(op->get_req())->map_epoch);
+
   case MSG_OSD_PG_SCAN:
     return !have_same_or_newer_map(
       cur_epoch,
index 6cc7aa472b21e352ac9bf5fbc56f9ac8b4e372b0..0e02dcaa73c8fbc6ab4d1a2ae104536be0a094fb 100644 (file)
@@ -2013,7 +2013,7 @@ public:
   int        get_nrep() const { return acting.size(); }
 
   void reset_peer_features() { peer_features = (uint64_t)-1; }
-  uint64_t get_min_peer_features() { return peer_features; }
+  uint64_t get_min_peer_features() const { return peer_features; }
   void apply_peer_features(uint64_t f) { peer_features &= f; }
 
   void init_primary_up_acting(
index 614cb9f0232ccf5b72313fc459d37655b6befbb2..3a99c53832a3983a06d6c6cef6166d5fd378d511 100644 (file)
      virtual spg_t primary_spg_t() const = 0;
      virtual pg_shard_t primary_shard() const = 0;
 
+     virtual uint64_t min_peer_features() const = 0;
+
      virtual void send_message_osd_cluster(
        int peer, Message *m, epoch_t from_epoch) = 0;
      virtual void send_message_osd_cluster(
index e646b2e4ff153b86f532d90768621e190f1e7880..c1c7217f91b61f74c304e989875d54d0e560d6d9 100644 (file)
@@ -15,7 +15,9 @@
 #include "ReplicatedBackend.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDSubOp.h"
+#include "messages/MOSDRepOp.h"
 #include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOpReply.h"
 #include "messages/MOSDPGPush.h"
 #include "messages/MOSDPGPull.h"
 #include "messages/MOSDPGPushReply.h"
@@ -163,6 +165,11 @@ bool ReplicatedBackend::handle_message(
     break;
   }
 
+  case MSG_OSD_REPOP: {
+    sub_op_modify(op);
+    return true;
+  }
+
   case MSG_OSD_SUBOPREPLY: {
     MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
     if (r->ops.size() >= 1) {
@@ -173,13 +180,19 @@ bool ReplicatedBackend::handle_message(
        sub_op_push_reply(op);
        return true;
       }
-    } else {
-      sub_op_modify_reply(op);
+    }
+    else {
+      sub_op_modify_reply<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op);
       return true;
     }
     break;
   }
 
+  case MSG_OSD_REPOPREPLY: {
+    sub_op_modify_reply<MOSDRepOpReply, MSG_OSD_REPOPREPLY>(op);
+    return true;
+  }
+
   default:
     break;
   }
@@ -626,10 +639,12 @@ void ReplicatedBackend::op_commit(
   }
 }
 
+template<typename T, int MSGTYPE>
 void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
 {
-  MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
-  assert(r->get_type() == MSG_OSD_SUBOPREPLY);
+  T *r = static_cast<T *>(op->get_req());
+  assert(r->get_header().type == MSGTYPE);
+  assert(MSGTYPE == MSG_OSD_SUBOPREPLY || MSGTYPE == MSG_OSD_REPOPREPLY);
 
   op->mark_started();
 
index 927ebb87a270e1b11af641a544d029bad26838e5..6784bdffc284f32c9ba436c0615a589aa5decc38 100644 (file)
@@ -355,6 +355,22 @@ public:
     );
 
 private:
+  template<typename T, int MSGTYPE>
+  Message * generate_subop(
+    const hobject_t &soid,
+    const eversion_t &at_version,
+    ceph_tid_t tid,
+    osd_reqid_t reqid,
+    eversion_t pg_trim_to,
+    eversion_t pg_trim_rollback_to,
+    hobject_t new_temp_oid,
+    hobject_t discard_temp_oid,
+    vector<pg_log_entry_t> &log_entries,
+    boost::optional<pg_hit_set_history_t> &hset_history,
+    InProgressOp *op,
+    ObjectStore::Transaction *op_t,
+    pg_shard_t peer,
+    const pg_info_t &pinfo);
   void issue_op(
     const hobject_t &soid,
     const eversion_t &at_version,
@@ -370,8 +386,11 @@ private:
     ObjectStore::Transaction *op_t);
   void op_applied(InProgressOp *op);
   void op_commit(InProgressOp *op);
+  template<typename T, int MSGTYPE>
   void sub_op_modify_reply(OpRequestRef op);
   void sub_op_modify(OpRequestRef op);
+  template<typename T, int MSGTYPE>
+  void sub_op_modify_impl(OpRequestRef op);
 
   struct RepModify {
     OpRequestRef op;
index aeb138f932027145443cd3b582b7630c96de52fa..147e0286273af9c644fdc43df8e0700d82d4d19d 100644 (file)
@@ -28,6 +28,8 @@
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDSubOp.h"
 #include "messages/MOSDSubOpReply.h"
+#include "messages/MOSDRepOp.h"
+#include "messages/MOSDRepOpReply.h"
 
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGInfo.h"
@@ -7334,7 +7336,63 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
     repop->ctx->op);
   repop->ctx->op_t = NULL;
 }
+
+template<typename T, int MSGTYPE>
+Message * ReplicatedBackend::generate_subop(
+  const hobject_t &soid,
+  const eversion_t &at_version,
+  ceph_tid_t tid,
+  osd_reqid_t reqid,
+  eversion_t pg_trim_to,
+  eversion_t pg_trim_rollback_to,
+  hobject_t new_temp_oid,
+  hobject_t discard_temp_oid,
+  vector<pg_log_entry_t> &log_entries,
+  boost::optional<pg_hit_set_history_t> &hset_hist,
+  InProgressOp *op,
+  ObjectStore::Transaction *op_t,
+  pg_shard_t peer,
+  const pg_info_t &pinfo)
+{
+  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+  assert(MSGTYPE == MSG_OSD_SUBOP || MSGTYPE == MSG_OSD_REPOP);
+  // forward the write/update/whatever
+  T *wr = new T(
+    reqid, parent->whoami_shard(),
+    spg_t(get_info().pgid.pgid, peer.shard),
+    soid, acks_wanted,
+    get_osdmap()->get_epoch(),
+    tid, at_version);
+
+  // ship resulting transaction, log entries, and pg_stats
+  if (!parent->should_send_op(peer, soid)) {
+    dout(10) << "issue_repop shipping empty opt to osd." << peer
+            <<", object " << soid
+            << " beyond MAX(last_backfill_started "
+            << ", pinfo.last_backfill "
+            << pinfo.last_backfill << ")" << dendl;
+    ObjectStore::Transaction t;
+    ::encode(t, wr->get_data());
+  } else {
+    ::encode(*op_t, wr->get_data());
+  }
+
+  ::encode(log_entries, wr->logbl);
+
+  if (pinfo.is_incomplete())
+    wr->pg_stats = pinfo.stats;  // reflects backfill progress
+  else
+    wr->pg_stats = get_info().stats;
     
+  wr->pg_trim_to = pg_trim_to;
+  wr->pg_trim_rollback_to = pg_trim_rollback_to;
+
+  wr->new_temp_oid = new_temp_oid;
+  wr->discard_temp_oid = discard_temp_oid;
+  wr->updated_hit_set_history = hset_hist;
+  return wr;
+}
+
 void ReplicatedBackend::issue_op(
   const hobject_t &soid,
   const eversion_t &at_version,
@@ -7349,7 +7407,6 @@ void ReplicatedBackend::issue_op(
   InProgressOp *op,
   ObjectStore::Transaction *op_t)
 {
-  int acks_wanted = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
 
   if (parent->get_actingbackfill_shards().size() > 1) {
     ostringstream ss;
@@ -7367,42 +7424,43 @@ void ReplicatedBackend::issue_op(
     pg_shard_t peer = *i;
     const pg_info_t &pinfo = parent->get_shard_info().find(peer)->second;
 
-    // forward the write/update/whatever
-    MOSDSubOp *wr = new MOSDSubOp(
-      reqid, parent->whoami_shard(),
-      spg_t(get_info().pgid.pgid, i->shard),
-      soid,
-      acks_wanted,
-      get_osdmap()->get_epoch(),
-      tid, at_version);
-
-    // ship resulting transaction, log entries, and pg_stats
-    if (!parent->should_send_op(peer, soid)) {
-      dout(10) << "issue_repop shipping empty opt to osd." << peer
-              <<", object " << soid
-              << " beyond MAX(last_backfill_started "
-              << ", pinfo.last_backfill "
-              << pinfo.last_backfill << ")" << dendl;
-      ObjectStore::Transaction t;
-      ::encode(t, wr->get_data());
+    Message *wr;
+    uint64_t min_features = parent->min_peer_features();
+    if (!(min_features & CEPH_FEATURE_OSD_REPOP)) {
+      dout(20) << "Talking to old version of OSD, doesn't support RepOp, fall back to SubOp" << dendl;
+      wr = generate_subop<MOSDSubOp, MSG_OSD_SUBOP>(
+           soid,
+           at_version,
+           tid,
+           reqid,
+           pg_trim_to,
+           pg_trim_rollback_to,
+           new_temp_oid,
+           discard_temp_oid,
+           log_entries,
+           hset_hist,
+           op,
+           op_t,
+           peer,
+           pinfo);
     } else {
-      ::encode(*op_t, wr->get_data());
+      wr = generate_subop<MOSDRepOp, MSG_OSD_REPOP>(
+           soid,
+           at_version,
+           tid,
+           reqid,
+           pg_trim_to,
+           pg_trim_rollback_to,
+           new_temp_oid,
+           discard_temp_oid,
+           log_entries,
+           hset_hist,
+           op,
+           op_t,
+           peer,
+           pinfo);
     }
 
-    ::encode(log_entries, wr->logbl);
-
-    if (pinfo.is_incomplete())
-      wr->pg_stats = pinfo.stats;  // reflects backfill progress
-    else
-      wr->pg_stats = get_info().stats;
-    
-    wr->pg_trim_to = pg_trim_to;
-    wr->pg_trim_rollback_to = pg_trim_rollback_to;
-
-    wr->new_temp_oid = new_temp_oid;
-    wr->discard_temp_oid = discard_temp_oid;
-    wr->updated_hit_set_history = hset_hist;
-
     get_parent()->send_message_osd_cluster(
       peer.osd, wr, get_osdmap()->get_epoch());
   }
@@ -8112,21 +8170,29 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
 }
 
 // sub op modify
+void ReplicatedBackend::sub_op_modify(OpRequestRef op) {
+  Message *m = op->get_req();
+  int msg_type = m->get_type();
+  if (msg_type == MSG_OSD_SUBOP) {
+    sub_op_modify_impl<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  } else if (msg_type == MSG_OSD_REPOP) {
+    sub_op_modify_impl<MOSDRepOp, MSG_OSD_REPOP>(op);
+  } else {
+    assert(0);
+  }
+}
 
-void ReplicatedBackend::sub_op_modify(OpRequestRef op)
+template<typename T, int MSGTYPE>
+void ReplicatedBackend::sub_op_modify_impl(OpRequestRef op)
 {
-  MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
-  assert(m->get_type() == MSG_OSD_SUBOP);
+  T *m = static_cast<T *>(op->get_req());
+  int msg_type = m->get_type();
+  assert(MSGTYPE == msg_type);
+  assert(msg_type == MSG_OSD_SUBOP || msg_type == MSG_OSD_REPOP);
 
   const hobject_t& soid = m->poid;
 
-  const char *opname;
-  if (m->ops.size())
-    opname = ceph_osd_op_name(m->ops[0].op.op);
-  else
-    opname = "trans";
-
-  dout(10) << "sub_op_modify " << opname 
+  dout(10) << "sub_op_modify trans"
            << " " << soid 
            << " v " << m->version
           << (m->logbl.length() ? " (transaction)" : " (parallel exec")
@@ -8174,6 +8240,8 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op)
   p = m->logbl.begin();
   ::decode(log, p);
 
+  rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+
   bool update_snaps = false;
   if (!rm->opt.empty()) {
     // If the opt is non-empty, we infer we are before
@@ -8213,20 +8281,38 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
 
   dout(10) << "sub_op_modify_applied on " << rm << " op "
           << *rm->op->get_req() << dendl;
-  MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
-  assert(m->get_type() == MSG_OSD_SUBOP);
-  
-  if (!rm->committed) {
-    // send ack to acker only if we haven't sent a commit already
-    MOSDSubOpReply *ack = new MOSDSubOpReply(
-      m, parent->whoami_shard(),
-      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  Message *m = rm->op->get_req();
+
+  Message *ack = NULL;
+  eversion_t version;
+
+  if (m->get_type() == MSG_OSD_SUBOP) {
+    // doesn't have CLIENT SUBOP feature ,use Subop
+    MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
+    version = req->version;
+    if (!rm->committed)
+      ack = new MOSDSubOpReply(
+       req, parent->whoami_shard(),
+       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  } else if (m->get_type() == MSG_OSD_REPOP) {
+    MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
+    version = req->version;
+    if (!rm->committed)
+      ack = new MOSDRepOpReply(
+       static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
+       0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+  } else {
+    assert(0);
+  }
+
+  // send ack to acker only if we haven't sent a commit already
+  if (ack) {
     ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
     get_parent()->send_message_osd_cluster(
       rm->ackerosd, ack, get_osdmap()->get_epoch());
   }
   
-  parent->op_applied(m->version);
+  parent->op_applied(version);
 }
 
 void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
@@ -8241,11 +8327,29 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
   
   assert(get_osdmap()->is_up(rm->ackerosd));
   get_parent()->update_last_complete_ondisk(rm->last_complete);
-  MOSDSubOpReply *commit = new MOSDSubOpReply(
-    static_cast<MOSDSubOp*>(rm->op->get_req()),
-    get_parent()->whoami_shard(),
-    0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
-  commit->set_last_complete_ondisk(rm->last_complete);
+
+  Message *m = rm->op->get_req();
+  Message *commit;
+  if (m->get_type() == MSG_OSD_SUBOP) {
+    // doesn't have CLIENT SUBOP feature ,use Subop
+    MOSDSubOpReply  *reply = new MOSDSubOpReply(
+      static_cast<MOSDSubOp*>(m),
+      get_parent()->whoami_shard(),
+      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+    reply->set_last_complete_ondisk(rm->last_complete);
+    commit = reply;
+  } else if (m->get_type() == MSG_OSD_REPOP) {
+    MOSDRepOpReply *reply = new MOSDRepOpReply(
+      static_cast<MOSDRepOp*>(m),
+      get_parent()->whoami_shard(),
+      0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
+    reply->set_last_complete_ondisk(rm->last_complete);
+    commit = reply;
+  }
+  else {
+    assert(0);
+  }
+
   commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
   get_parent()->send_message_osd_cluster(
     rm->ackerosd, commit, get_osdmap()->get_epoch());
index c6e3a39d36eabc8615e1611265abcbdb3944b302..687455826b3ac983061af8017be462dc7be8510c 100644 (file)
@@ -414,6 +414,9 @@ public:
   pg_shard_t primary_shard() const {
     return primary;
   }
+  uint64_t min_peer_features() const {
+    return get_min_peer_features();
+  }
 
   void send_message_osd_cluster(
     int peer, Message *m, epoch_t from_epoch);