]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
MOSDOp::decode : Splitting message decoding, new version 5211/head
authorJacek J. Lakis <jacek.lakis@intel.com>
Thu, 6 Aug 2015 10:14:17 +0000 (12:14 +0200)
committerJacek J. Łakis <jlakis@gklab-126-033.igk.intel.com>
Mon, 28 Sep 2015 08:54:14 +0000 (10:54 +0200)
Signed-off-by: Jacek J. Lakis <jacek.lakis@intel.com>
src/include/ceph_features.h [changed mode: 0644->0755]
src/messages/MOSDOp.h [changed mode: 0644->0755]
src/osd/OSD.cc
src/osd/PG.cc
src/osd/ReplicatedPG.cc

old mode 100644 (file)
new mode 100755 (executable)
index 4857b0a..96f53f1
@@ -69,6 +69,7 @@
 #define CEPH_FEATURE_ERASURE_CODE_PLUGINS_V3 (1ULL<<53)
 #define CEPH_FEATURE_OSD_HITSET_GMT (1ULL<<54)
 #define CEPH_FEATURE_HAMMER_0_94_4 (1ULL<<55)
+#define CEPH_FEATURE_NEW_OSDOP_ENCODING   (1ULL<<56) /* New, v7 encoding */
 
 #define CEPH_FEATURE_RESERVED2 (1ULL<<61)  /* slow down, we are almost out... */
 #define CEPH_FEATURE_RESERVED  (1ULL<<62)  /* DO NOT USE THIS ... last bit! */
@@ -131,6 +132,7 @@ static inline unsigned long long ceph_sanitize_features(unsigned long long f) {
         CEPH_FEATURE_OSD_HBMSGS |              \
         CEPH_FEATURE_MDSENC |                  \
         CEPH_FEATURE_OSDHASHPSPOOL |       \
+        CEPH_FEATURE_NEW_OSDOP_ENCODING |        \
         CEPH_FEATURE_MON_SINGLE_PAXOS |    \
         CEPH_FEATURE_OSD_SNAPMAPPER |      \
         CEPH_FEATURE_MON_SCRUB |           \
old mode 100644 (file)
new mode 100755 (executable)
index 5d4ab62..345afa1
@@ -32,7 +32,7 @@ class OSD;
 
 class MOSDOp : public Message {
 
-  static const int HEAD_VERSION = 6;
+  static const int HEAD_VERSION = 7;
   static const int COMPAT_VERSION = 3;
 
 private:
@@ -46,6 +46,11 @@ private:
   object_t oid;
   object_locator_t oloc;
   pg_t pgid;
+  bufferlist::iterator p;
+  // Decoding flags. Decoding is only needed for messages catched by pipe reader.
+  bool partial_decode_needed;
+  bool final_decode_needed;
+  //
 public:
   vector<OSDOp> ops;
 private:
@@ -61,12 +66,8 @@ private:
 public:
   friend class MOSDOpReply;
 
-  // read
-  const snapid_t& get_snapid() { return snapid; }
+  ceph_tid_t get_client_tid() { return header.tid; }
   void set_snapid(const snapid_t& s) { snapid = s; }
-  // writ
-  const snapid_t& get_snap_seq() const { return snap_seq; }
-  const vector<snapid_t> &get_snaps() const { return snaps; }
   void set_snaps(const vector<snapid_t>& i) {
     snaps = i;
   }
@@ -75,7 +76,13 @@ public:
     reqid = rid;
   }
 
+  // Fields decoded in partial decoding
+  const pg_t&     get_pg() const { assert(!partial_decode_needed); return pgid; }
+  epoch_t  get_map_epoch() { assert(!partial_decode_needed); return osdmap_epoch; }
+  int get_flags() const { assert(!partial_decode_needed); return flags; }
+  const eversion_t& get_version() { assert(!partial_decode_needed); return reassert_version; }
   osd_reqid_t get_reqid() const {
+    assert(!partial_decode_needed);
     if (reqid != osd_reqid_t())
       return reqid;
     else
@@ -83,32 +90,40 @@ public:
                          client_inc,
                         header.tid);
   }
-  int get_client_inc() { return client_inc; }
-  ceph_tid_t get_client_tid() { return header.tid; }
-  
-  object_t& get_oid() { return oid; }
 
-  const pg_t&     get_pg() const { return pgid; }
-
-  const object_locator_t& get_object_locator() const {
-    return oloc;
+  // Fields decoded in final decoding
+  int get_client_inc() { assert(!final_decode_needed); return client_inc; }
+  utime_t get_mtime() { assert(!final_decode_needed); return mtime; }
+  const object_locator_t& get_object_locator() const { assert(!final_decode_needed); return oloc; }
+  object_t& get_oid() { assert(!final_decode_needed); return oid; }
+  const snapid_t& get_snapid() { assert(!final_decode_needed); return snapid; }
+  const snapid_t& get_snap_seq() const { assert(!final_decode_needed); return snap_seq; }
+  const vector<snapid_t> &get_snaps() const { assert(!final_decode_needed); return snaps; }
+  /**
+   * get retry attempt
+   *
+   * 0 is the first attempt.
+   *
+   * @return retry attempt, or -1 if we don't know
+   */
+  int get_retry_attempt() const {
+    return retry_attempt;
+  }
+  uint64_t get_features() const {
+    if (features)
+      return features;
+    return get_connection()->get_features();
   }
-
-  epoch_t  get_map_epoch() { return osdmap_epoch; }
-
-  const eversion_t& get_version() { return reassert_version; }
-  
-  utime_t get_mtime() { return mtime; }
 
   MOSDOp()
-    : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
+    : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), partial_decode_needed(true), final_decode_needed(true) { }
   MOSDOp(int inc, long tid,
          object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch,
         int _flags, uint64_t feat)
     : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
       client_inc(inc),
       osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
-      oid(_oid), oloc(_oloc), pgid(_pgid),
+      oid(_oid), oloc(_oloc), pgid(_pgid), partial_decode_needed(false), final_decode_needed(false),
       features(feat) {
     set_tid(tid);
   }
@@ -154,16 +169,7 @@ public:
     add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
   }
 
-  uint64_t get_features() const {
-    if (features)
-      return features;
-    return get_connection()->get_features();
-  }
-
-  // flags
-  int get_flags() const { return flags; }
   bool has_flag(__u32 flag) { return flags & flag; };
-
   bool wants_ack() const { return flags & CEPH_OSD_FLAG_ACK; }
   bool wants_ondisk() const { return flags & CEPH_OSD_FLAG_ONDISK; }
   bool wants_onnvram() const { return flags & CEPH_OSD_FLAG_ONNVRAM; }
@@ -181,17 +187,6 @@ public:
     retry_attempt = a;
   }
 
-  /**
-   * get retry attempt
-   *
-   * 0 is the first attempt.
-   *
-   * @return retry attempt, or -1 if we don't know
-   */
-  int get_retry_attempt() const {
-    return retry_attempt;
-  }
-
   // marshalling
   virtual void encode_payload(uint64_t features) {
 
@@ -248,22 +243,22 @@ struct ceph_osd_request_head {
 
       ::encode_nohead(oid.name, payload);
       ::encode_nohead(snaps, payload);
-    } else {
-      header.version = HEAD_VERSION;
+    } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
+      header.version = 6;
       ::encode(client_inc, payload);
       ::encode(osdmap_epoch, payload);
       ::encode(flags, payload);
       ::encode(mtime, payload);
       ::encode(reassert_version, payload);
-
       ::encode(oloc, payload);
       ::encode(pgid, payload);
+
       ::encode(oid, payload);
 
       __u16 num_ops = ops.size();
       ::encode(num_ops, payload);
       for (unsigned i = 0; i < ops.size(); i++)
-       ::encode(ops[i].op, payload);
+        ::encode(ops[i].op, payload);
 
       ::encode(snapid, payload);
       ::encode(snap_seq, payload);
@@ -272,11 +267,36 @@ struct ceph_osd_request_head {
       ::encode(retry_attempt, payload);
       ::encode(features, payload);
       ::encode(reqid, payload);
+    } else {
+      // new, reordered, v7 message encoding
+      header.version = HEAD_VERSION;
+      ::encode(pgid, payload);
+      ::encode(osdmap_epoch, payload);
+      ::encode(flags, payload);
+      ::encode(reassert_version, payload);
+      ::encode(reqid, payload);
+      ::encode(client_inc, payload);
+      ::encode(mtime, payload);
+      ::encode(oloc, payload);
+      ::encode(oid, payload);
+
+      __u16 num_ops = ops.size();
+      ::encode(num_ops, payload);
+      for (unsigned i = 0; i < ops.size(); i++)
+       ::encode(ops[i].op, payload);
+
+      ::encode(snapid, payload);
+      ::encode(snap_seq, payload);
+      ::encode(snaps, payload);
+
+      ::encode(retry_attempt, payload);
+      ::encode(features, payload);
     }
   }
 
   virtual void decode_payload() {
-    bufferlist::iterator p = payload.begin();
+    assert(partial_decode_needed && final_decode_needed);
+    p = payload.begin();
 
     if (header.version < 2) {
       // old decode
@@ -320,8 +340,11 @@ struct ceph_osd_request_head {
       retry_attempt = -1;
       features = 0;
       reqid = osd_reqid_t();
-    } else {
-      // new decode 
+      OSDOp::split_osd_op_vector_in_data(ops, data);
+      // In old versions, final decoding is done in first step
+      final_decode_needed = false;
+
+    } else if (header.version < 7) {
       ::decode(client_inc, p);
       ::decode(osdmap_epoch, p);
       ::decode(flags, p);
@@ -338,6 +361,26 @@ struct ceph_osd_request_head {
        ::decode(pgid, p);
       }
 
+    } else {
+      // new, v7 decode, splitted to partial and final
+      ::decode(pgid, p);
+      ::decode(osdmap_epoch, p);
+      ::decode(flags, p);
+      ::decode(reassert_version, p);
+      ::decode(reqid, p);
+    }
+
+    partial_decode_needed = false;
+
+  }
+
+  void finish_decode() {
+    assert(!partial_decode_needed); // partial decoding required
+
+    if (!final_decode_needed)
+      return; //Message is already final decoded
+
+    if (header.version < 7) {
       ::decode(oid, p);
 
       //::decode(ops, p);
@@ -345,19 +388,19 @@ struct ceph_osd_request_head {
       ::decode(num_ops, p);
       ops.resize(num_ops);
       for (unsigned i = 0; i < num_ops; i++)
-       ::decode(ops[i].op, p);
+        ::decode(ops[i].op, p);
 
       ::decode(snapid, p);
       ::decode(snap_seq, p);
       ::decode(snaps, p);
 
       if (header.version >= 4)
-       ::decode(retry_attempt, p);
+        ::decode(retry_attempt, p);
       else
-       retry_attempt = -1;
+        retry_attempt = -1;
 
       if (header.version >= 5)
-       ::decode(features, p);
+        ::decode(features, p);
       else
        features = 0;
 
@@ -365,9 +408,34 @@ struct ceph_osd_request_head {
        ::decode(reqid, p);
       else
        reqid = osd_reqid_t();
+
+      OSDOp::split_osd_op_vector_in_data(ops, data);
+
+    } else { // final decoding for reordered v7
+      ::decode(client_inc, p);
+      ::decode(mtime, p);
+      ::decode(oloc, p);
+      ::decode(oid, p);
+
+      __u16 num_ops;
+      ::decode(num_ops, p);
+      ops.resize(num_ops);
+      for (unsigned i = 0; i < num_ops; i++)
+        ::decode(ops[i].op, p);
+
+      ::decode(snapid, p);
+      ::decode(snap_seq, p);
+      ::decode(snaps, p);
+
+      ::decode(retry_attempt, p);
+
+      ::decode(features, p);
+
+      OSDOp::split_osd_op_vector_in_data(ops, data);
+
     }
 
-    OSDOp::split_osd_op_vector_in_data(ops, data);
+    final_decode_needed = false;
   }
 
   void clear_buffers() {
@@ -376,7 +444,8 @@ struct ceph_osd_request_head {
 
   const char *get_type_name() const { return "osd_op"; }
   void print(ostream& out) const {
-    out << "osd_op(" << get_reqid();
+    if (!partial_decode_needed)
+      out << "osd_op(" << get_reqid();
     out << " ";
     if (!oloc.nspace.empty())
       out << oloc.nspace << "/";
@@ -401,9 +470,10 @@ struct ceph_osd_request_head {
       out << " RETRY=" << get_retry_attempt();
     if (reassert_version != eversion_t())
       out << " reassert_version=" << reassert_version;
-    if (get_snap_seq())
+    if (!final_decode_needed)
       out << " snapc " << get_snap_seq() << "=" << snaps;
-    out << " " << ceph_osd_flag_string(get_flags());
+    if (!partial_decode_needed)
+      out << " " << ceph_osd_flag_string(get_flags());
     out << " e" << osdmap_epoch;
     out << ")";
   }
index 535b96c97077a47180e3662b1d1d2161bcb4dfaa..0a4fc1fbd97ed11f6706bb294844193324970eb2 100644 (file)
@@ -8090,9 +8090,6 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
     return;
   }
 
-  // we don't need encoded payload anymore
-  m->clear_payload();
-
   // set up a map send if the Op gets blocked for some reason
   send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
   Session *client_session =
index 79cd54a1c3602bac07640dfda8372ebea6816615..cd409b18f6679adac952abcb534cdce4faaa97f3 100644 (file)
@@ -4952,6 +4952,7 @@ ostream& operator<<(ostream& out, const PG& pg)
 bool PG::can_discard_op(OpRequestRef& op)
 {
   MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+
   if (OSD::op_is_discardable(m)) {
     dout(20) << " discard " << *m << dendl;
     return true;
index 0ca3d34cd06b4196061ea9029c0721c87e1914f5..7eb12d2e6e9708312d7ce2ffcb7f9e960c67eea8 100644 (file)
@@ -1345,10 +1345,6 @@ void ReplicatedPG::do_request(
   OpRequestRef& op,
   ThreadPool::TPHandle &handle)
 {
-  if (!op_has_sufficient_caps(op)) {
-    osd->reply_op_error(op, -EPERM);
-    return;
-  }
   assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
   if (can_discard_request(op)) {
     return;
@@ -1483,6 +1479,9 @@ void ReplicatedPG::do_op(OpRequestRef& op)
   MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
   assert(m->get_type() == CEPH_MSG_OSD_OP);
 
+  m->finish_decode();
+  m->clear_payload();
+
   if (op->rmw_flags == 0) {
     int r = osd->osd->init_op_flags(op);
     if (r) {
@@ -1499,6 +1498,11 @@ void ReplicatedPG::do_op(OpRequestRef& op)
     return do_pg_op(op);
   }
 
+  if (!op_has_sufficient_caps(op)) {
+    osd->reply_op_error(op, -EPERM);
+    return;
+  }
+
   // object name too long?
   unsigned max_name_len = MIN(g_conf->osd_max_object_name_len,
                               osd->osd->store->get_max_object_name_length());