From 3dcf5b9636bb9e0cd6484d18f151b457e1a0c328 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 21 Aug 2014 09:48:18 -0700 Subject: [PATCH] osd/OSDMap: encode crc Note that we include the logic in OSDMap::encode to add data that comes after the crc, although we do not use it yet. Also note that we do not yet verify the crc on decode. Signed-off-by: Sage Weil --- src/osd/OSDMap.cc | 83 ++++++++++++++++++++++++++++++++++++++++++++--- src/osd/OSDMap.h | 14 +++++++- 2 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc index 593cd982ee638..a1131eda33efc 100644 --- a/src/osd/OSDMap.cc +++ b/src/osd/OSDMap.cc @@ -403,8 +403,12 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const assert(features & CEPH_FEATURE_RESERVED); features &= ~CEPH_FEATURE_RESERVED; + size_t start_offset = bl.length(); + size_t tail_offset; + buffer::list::iterator crc_it; + // meta-encoding: how we include client-used and osd-specific data - ENCODE_START(7, 7, bl); + ENCODE_START(8, 7, bl); { ENCODE_START(3, 1, bl); // client-usable data @@ -448,8 +452,26 @@ void OSDMap::Incremental::encode(bufferlist& bl, uint64_t features) const ENCODE_FINISH(bl); // osd-only data } + ::encode((uint32_t)0, bl); // dummy inc_crc + crc_it = bl.end(); + crc_it.advance(-4); + tail_offset = bl.length(); + + ::encode(full_crc, bl); + ENCODE_FINISH(bl); // meta-encoding wrapper + // fill in crc + bufferlist front; + front.substr_of(bl, start_offset, crc_it.get_off() - start_offset); + inc_crc = front.crc32c(-1); + bufferlist tail; + tail.substr_of(bl, tail_offset, bl.length() - tail_offset); + inc_crc = tail.crc32c(inc_crc); + ceph_le32 crc_le; + crc_le = inc_crc; + crc_it.copy_in(4, (char*)&crc_le); + have_crc = true; } void OSDMap::Incremental::decode_classic(bufferlist::iterator &p) @@ -554,7 +576,11 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl) * a struct_v < 7, we must rewind to the beginning and use our * classic decoder. */ - DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper + size_t start_offset = bl.get_off(); + size_t tail_offset = 0; + bufferlist crc_front, crc_tail; + + DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper if (struct_v < 7) { int struct_v_size = sizeof(struct_v); bl.advance(-struct_v_size); @@ -615,6 +641,18 @@ void OSDMap::Incremental::decode(bufferlist::iterator& bl) DECODE_FINISH(bl); // osd-only data } + if (struct_v >= 8) { + have_crc = true; + crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset); + ::decode(inc_crc, bl); + tail_offset = bl.get_off(); + ::decode(full_crc, bl); + } else { + have_crc = false; + full_crc = 0; + inc_crc = 0; + } + DECODE_FINISH(bl); // wrapper } @@ -1801,8 +1839,12 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const assert(features & CEPH_FEATURE_RESERVED); features &= ~CEPH_FEATURE_RESERVED; + size_t start_offset = bl.length(); + size_t tail_offset; + buffer::list::iterator crc_it; + // meta-encoding: how we include client-used and osd-specific data - ENCODE_START(7, 7, bl); + ENCODE_START(8, 7, bl); { ENCODE_START(3, 1, bl); // client-usable data @@ -1862,7 +1904,26 @@ void OSDMap::encode(bufferlist& bl, uint64_t features) const ENCODE_FINISH(bl); // osd-only data } + ::encode((uint32_t)0, bl); // dummy crc + crc_it = bl.end(); + crc_it.advance(-4); + tail_offset = bl.length(); + ENCODE_FINISH(bl); // meta-encoding wrapper + + // fill in crc + bufferlist front; + front.substr_of(bl, start_offset, crc_it.get_off() - start_offset); + crc = front.crc32c(-1); + if (tail_offset < bl.length()) { + bufferlist tail; + tail.substr_of(bl, tail_offset, bl.length() - tail_offset); + crc = tail.crc32c(crc); + } + ceph_le32 crc_le; + crc_le = crc; + crc_it.copy_in(4, (char*)&crc_le); + crc_defined = true; } void OSDMap::decode(bufferlist& bl) @@ -1991,7 +2052,11 @@ void OSDMap::decode(bufferlist::iterator& bl) * a struct_v < 7, we must rewind to the beginning and use our * classic decoder. */ - DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper + size_t start_offset = bl.get_off(); + size_t tail_offset = 0; + bufferlist crc_front, crc_tail; + + DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper if (struct_v < 7) { int struct_v_size = sizeof(struct_v); bl.advance(-struct_v_size); @@ -2058,6 +2123,16 @@ void OSDMap::decode(bufferlist::iterator& bl) DECODE_FINISH(bl); // osd-only data } + if (struct_v >= 8) { + crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset); + ::decode(crc, bl); + tail_offset = bl.get_off(); + crc_defined = true; + } else { + crc_defined = false; + crc = 0; + } + DECODE_FINISH(bl); // wrapper post_decode(); diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index 7a46b256a96d0..d6c0cfe4510f6 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -155,6 +155,10 @@ public: string cluster_snapshot; + mutable bool have_crc; ///< crc values are defined + uint32_t full_crc; ///< crc of the resulting OSDMap + mutable uint32_t inc_crc; ///< crc of this incremental + int get_net_marked_out(const OSDMap *previous) const; int get_net_marked_down(const OSDMap *previous) const; int identify_osd(uuid_d u) const; @@ -169,7 +173,8 @@ public: Incremental(epoch_t e=0) : encode_features(0), - epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1) { + epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1), + have_crc(false), full_crc(0), inc_crc(0) { memset(&fsid, 0, sizeof(fsid)); } Incremental(bufferlist &bl) { @@ -240,7 +245,13 @@ private: string cluster_snapshot; bool new_blacklist_entries; + mutable bool crc_defined; + mutable uint32_t crc; + public: + bool have_crc() const { return crc_defined; } + uint32_t get_crc() const { return crc; } + ceph::shared_ptr crush; // hierarchical map friend class OSDMonitor; @@ -258,6 +269,7 @@ private: osd_uuid(new vector), cluster_snapshot_epoch(0), new_blacklist_entries(false), + crc_defined(false), crc(0), crush(new CrushWrapper) { memset(&fsid, 0, sizeof(fsid)); } -- 2.39.5