]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd/OSDMap: encode crc
authorSage Weil <sage@redhat.com>
Thu, 21 Aug 2014 16:48:18 +0000 (09:48 -0700)
committerSage Weil <sage@redhat.com>
Mon, 10 Nov 2014 22:20:25 +0000 (14:20 -0800)
Note that we include the logic in OSDMap::encode to add data that comes
after the crc, although we do not use it yet.

Also note that we do not yet verify the crc on decode.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSDMap.cc
src/osd/OSDMap.h

index 593cd982ee638aae1b554e68679130f0e07bda25..a1131eda33efc59378134a4a8c3eb3b4ddac3927 100644 (file)
@@ -403,8 +403,12 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
   assert(features & CEPH_FEATURE_RESERVED);
   features &= ~CEPH_FEATURE_RESERVED;
 
+  size_t start_offset = bl.length();
+  size_t tail_offset;
+  buffer::list::iterator crc_it;
+
   // meta-encoding: how we include client-used and osd-specific data
-  ENCODE_START(7, 7, bl);
+  ENCODE_START(8, 7, bl);
 
   {
     ENCODE_START(3, 1, bl); // client-usable data
@@ -448,8 +452,26 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const
     ENCODE_FINISH(bl); // osd-only data
   }
 
+  ::encode((uint32_t)0, bl); // dummy inc_crc
+  crc_it = bl.end();
+  crc_it.advance(-4);
+  tail_offset = bl.length();
+
+  ::encode(full_crc, bl);
+
   ENCODE_FINISH(bl); // meta-encoding wrapper
 
+  // fill in crc
+  bufferlist front;
+  front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+  inc_crc = front.crc32c(-1);
+  bufferlist tail;
+  tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
+  inc_crc = tail.crc32c(inc_crc);
+  ceph_le32 crc_le;
+  crc_le = inc_crc;
+  crc_it.copy_in(4, (char*)&crc_le);
+  have_crc = true;
 }
 
 void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
@@ -554,7 +576,11 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
    * a struct_v < 7, we must rewind to the beginning and use our
    * classic decoder.
    */
-  DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper
+  size_t start_offset = bl.get_off();
+  size_t tail_offset = 0;
+  bufferlist crc_front, crc_tail;
+
+  DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
     int struct_v_size = sizeof(struct_v);
     bl.advance(-struct_v_size);
@@ -615,6 +641,18 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl)
     DECODE_FINISH(bl); // osd-only data
   }
 
+  if (struct_v >= 8) {
+    have_crc = true;
+    crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
+    ::decode(inc_crc, bl);
+    tail_offset = bl.get_off();
+    ::decode(full_crc, bl);
+  } else {
+    have_crc = false;
+    full_crc = 0;
+    inc_crc = 0;
+  }
+
   DECODE_FINISH(bl); // wrapper
 }
 
@@ -1801,8 +1839,12 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
   assert(features & CEPH_FEATURE_RESERVED);
   features &= ~CEPH_FEATURE_RESERVED;
 
+  size_t start_offset = bl.length();
+  size_t tail_offset;
+  buffer::list::iterator crc_it;
+
   // meta-encoding: how we include client-used and osd-specific data
-  ENCODE_START(7, 7, bl);
+  ENCODE_START(8, 7, bl);
 
   {
     ENCODE_START(3, 1, bl); // client-usable data
@@ -1862,7 +1904,26 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const
     ENCODE_FINISH(bl); // osd-only data
   }
 
+  ::encode((uint32_t)0, bl); // dummy crc
+  crc_it = bl.end();
+  crc_it.advance(-4);
+  tail_offset = bl.length();
+
   ENCODE_FINISH(bl); // meta-encoding wrapper
+
+  // fill in crc
+  bufferlist front;
+  front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+  crc = front.crc32c(-1);
+  if (tail_offset < bl.length()) {
+    bufferlist tail;
+    tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
+    crc = tail.crc32c(crc);
+  }
+  ceph_le32 crc_le;
+  crc_le = crc;
+  crc_it.copy_in(4, (char*)&crc_le);
+  crc_defined = true;
 }
 
 void OSDMap::decode(bufferlist& bl)
@@ -1991,7 +2052,11 @@ void OSDMap::decode(bufferlist::iterator& bl)
    * a struct_v < 7, we must rewind to the beginning and use our
    * classic decoder.
    */
-  DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper
+  size_t start_offset = bl.get_off();
+  size_t tail_offset = 0;
+  bufferlist crc_front, crc_tail;
+
+  DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
   if (struct_v < 7) {
     int struct_v_size = sizeof(struct_v);
     bl.advance(-struct_v_size);
@@ -2058,6 +2123,16 @@ void OSDMap::decode(bufferlist::iterator& bl)
     DECODE_FINISH(bl); // osd-only data
   }
 
+  if (struct_v >= 8) {
+    crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
+    ::decode(crc, bl);
+    tail_offset = bl.get_off();
+    crc_defined = true;
+  } else {
+    crc_defined = false;
+    crc = 0;
+  }
+
   DECODE_FINISH(bl); // wrapper
 
   post_decode();
index 7a46b256a96d0d3fcccd4c693a792a04d6cef714..d6c0cfe4510f6c0a146ab12ecba99c3f35949e9c 100644 (file)
@@ -155,6 +155,10 @@ public:
 
     string cluster_snapshot;
 
+    mutable bool have_crc;      ///< crc values are defined
+    uint32_t full_crc;  ///< crc of the resulting OSDMap
+    mutable uint32_t inc_crc;   ///< crc of this incremental
+
     int get_net_marked_out(const OSDMap *previous) const;
     int get_net_marked_down(const OSDMap *previous) const;
     int identify_osd(uuid_d u) const;
@@ -169,7 +173,8 @@ public:
 
     Incremental(epoch_t e=0) :
       encode_features(0),
-      epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1) {
+      epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1),
+      have_crc(false), full_crc(0), inc_crc(0) {
       memset(&fsid, 0, sizeof(fsid));
     }
     Incremental(bufferlist &bl) {
@@ -240,7 +245,13 @@ private:
   string cluster_snapshot;
   bool new_blacklist_entries;
 
+  mutable bool crc_defined;
+  mutable uint32_t crc;
+
  public:
+  bool have_crc() const { return crc_defined; }
+  uint32_t get_crc() const { return crc; }
+
   ceph::shared_ptr<CrushWrapper> crush;       // hierarchical map
 
   friend class OSDMonitor;
@@ -258,6 +269,7 @@ private:
             osd_uuid(new vector<uuid_d>),
             cluster_snapshot_epoch(0),
             new_blacklist_entries(false),
+            crc_defined(false), crc(0),
             crush(new CrushWrapper) {
     memset(&fsid, 0, sizeof(fsid));
   }