assert(features & CEPH_FEATURE_RESERVED);
features &= ~CEPH_FEATURE_RESERVED;
+ size_t start_offset = bl.length();
+ size_t tail_offset;
+ buffer::list::iterator crc_it;
+
// meta-encoding: how we include client-used and osd-specific data
- ENCODE_START(7, 7, bl);
+ ENCODE_START(8, 7, bl);
{
ENCODE_START(3, 1, bl); // client-usable data
ENCODE_FINISH(bl); // osd-only data
}
+ ::encode((uint32_t)0, bl); // dummy inc_crc
+ crc_it = bl.end();
+ crc_it.advance(-4);
+ tail_offset = bl.length();
+
+ ::encode(full_crc, bl);
+
ENCODE_FINISH(bl); // meta-encoding wrapper
+ // fill in crc
+ bufferlist front;
+ front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+ inc_crc = front.crc32c(-1);
+ bufferlist tail;
+ tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
+ inc_crc = tail.crc32c(inc_crc);
+ ceph_le32 crc_le;
+ crc_le = inc_crc;
+ crc_it.copy_in(4, (char*)&crc_le);
+ have_crc = true;
}
void OSDMap::Incremental::decode_classic(bufferlist::iterator &p)
* a struct_v < 7, we must rewind to the beginning and use our
* classic decoder.
*/
- DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper
+ size_t start_offset = bl.get_off();
+ size_t tail_offset = 0;
+ bufferlist crc_front, crc_tail;
+
+ DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
if (struct_v < 7) {
int struct_v_size = sizeof(struct_v);
bl.advance(-struct_v_size);
DECODE_FINISH(bl); // osd-only data
}
+ if (struct_v >= 8) {
+ have_crc = true;
+ crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
+ ::decode(inc_crc, bl);
+ tail_offset = bl.get_off();
+ ::decode(full_crc, bl);
+ } else {
+ have_crc = false;
+ full_crc = 0;
+ inc_crc = 0;
+ }
+
DECODE_FINISH(bl); // wrapper
}
assert(features & CEPH_FEATURE_RESERVED);
features &= ~CEPH_FEATURE_RESERVED;
+ size_t start_offset = bl.length();
+ size_t tail_offset;
+ buffer::list::iterator crc_it;
+
// meta-encoding: how we include client-used and osd-specific data
- ENCODE_START(7, 7, bl);
+ ENCODE_START(8, 7, bl);
{
ENCODE_START(3, 1, bl); // client-usable data
ENCODE_FINISH(bl); // osd-only data
}
+ ::encode((uint32_t)0, bl); // dummy crc
+ crc_it = bl.end();
+ crc_it.advance(-4);
+ tail_offset = bl.length();
+
ENCODE_FINISH(bl); // meta-encoding wrapper
+
+ // fill in crc
+ bufferlist front;
+ front.substr_of(bl, start_offset, crc_it.get_off() - start_offset);
+ crc = front.crc32c(-1);
+ if (tail_offset < bl.length()) {
+ bufferlist tail;
+ tail.substr_of(bl, tail_offset, bl.length() - tail_offset);
+ crc = tail.crc32c(crc);
+ }
+ ceph_le32 crc_le;
+ crc_le = crc;
+ crc_it.copy_in(4, (char*)&crc_le);
+ crc_defined = true;
}
void OSDMap::decode(bufferlist& bl)
* a struct_v < 7, we must rewind to the beginning and use our
* classic decoder.
*/
- DECODE_START_LEGACY_COMPAT_LEN(7, 7, 7, bl); // wrapper
+ size_t start_offset = bl.get_off();
+ size_t tail_offset = 0;
+ bufferlist crc_front, crc_tail;
+
+ DECODE_START_LEGACY_COMPAT_LEN(8, 7, 7, bl); // wrapper
if (struct_v < 7) {
int struct_v_size = sizeof(struct_v);
bl.advance(-struct_v_size);
DECODE_FINISH(bl); // osd-only data
}
+ if (struct_v >= 8) {
+ crc_front.substr_of(bl.get_bl(), start_offset, bl.get_off() - start_offset);
+ ::decode(crc, bl);
+ tail_offset = bl.get_off();
+ crc_defined = true;
+ } else {
+ crc_defined = false;
+ crc = 0;
+ }
+
DECODE_FINISH(bl); // wrapper
post_decode();
string cluster_snapshot;
+ mutable bool have_crc; ///< crc values are defined
+ uint32_t full_crc; ///< crc of the resulting OSDMap
+ mutable uint32_t inc_crc; ///< crc of this incremental
+
int get_net_marked_out(const OSDMap *previous) const;
int get_net_marked_down(const OSDMap *previous) const;
int identify_osd(uuid_d u) const;
Incremental(epoch_t e=0) :
encode_features(0),
- epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1) {
+ epoch(e), new_pool_max(-1), new_flags(-1), new_max_osd(-1),
+ have_crc(false), full_crc(0), inc_crc(0) {
memset(&fsid, 0, sizeof(fsid));
}
Incremental(bufferlist &bl) {
string cluster_snapshot;
bool new_blacklist_entries;
+ mutable bool crc_defined;
+ mutable uint32_t crc;
+
public:
+ bool have_crc() const { return crc_defined; }
+ uint32_t get_crc() const { return crc; }
+
ceph::shared_ptr<CrushWrapper> crush; // hierarchical map
friend class OSDMonitor;
osd_uuid(new vector<uuid_d>),
cluster_snapshot_epoch(0),
new_blacklist_entries(false),
+ crc_defined(false), crc(0),
crush(new CrushWrapper) {
memset(&fsid, 0, sizeof(fsid));
}