pg_info_t info;
pg_log_t log;
map<epoch_t,pg_interval_t> past_intervals;
+ OSDMap osdmap;
metadata_section(__u8 struct_ver, epoch_t map_epoch, const pg_info_t &info,
const pg_log_t &log, map<epoch_t,pg_interval_t> &past_intervals)
map_epoch(0) { }
void encode(bufferlist& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
::encode(struct_ver, bl);
::encode(map_epoch, bl);
::encode(info, bl);
::encode(log, bl);
::encode(past_intervals, bl);
+ osdmap.encode(bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
::decode(struct_ver, bl);
::decode(map_epoch, bl);
::decode(info, bl);
} else {
cout << "NOTICE: Older export without past_intervals" << std::endl;
}
+ if (struct_v > 2) {
+ osdmap.decode(bl);
+ } else {
+ cout << "WARNING: Older export without OSDMap information" << std::endl;
+ }
DECODE_FINISH(bl);
}
};
return 0;
}
+int add_osdmap(ObjectStore *store, metadata_section &ms)
+{
+ return get_osdmap(store, ms.map_epoch, ms.osdmap);
+}
+
//Write super_header with its fixed 16 byte length
void write_super()
{
if (ret)
return ret;
+ // The metadata_section is now before files, so import can detect
+ // errors and abort without wasting time.
+ metadata_section ms(struct_ver, map_epoch, info, log, past_intervals);
+ ret = add_osdmap(fs, ms);
+ if (ret)
+ return ret;
+ ret = write_section(TYPE_PG_METADATA, ms, file_fd);
+ if (ret)
+ return ret;
+
ret = export_files(fs, coll);
if (ret) {
cerr << "export_files error " << ret << std::endl;
return ret;
}
- metadata_section ms(struct_ver, map_epoch, info, log, past_intervals);
- ret = write_section(TYPE_PG_METADATA, ms, file_fd);
- if (ret)
- return ret;
-
ret = write_simple(TYPE_PG_END, file_fd);
if (ret)
return ret;
return 0;
}
-int get_pg_metadata(bufferlist &bl, metadata_section &ms)
+int get_pg_metadata(ObjectStore *store, bufferlist &bl, metadata_section &ms, const OSDSuperblock& sb)
{
bufferlist::iterator ebliter = bl.begin();
ms.decode(ebliter);
cout << std::endl;
#endif
+ // If the osdmap was present in the metadata we can check for splits.
+ // Pool verified to exist for call to get_pg_num().
+ if (ms.osdmap.get_epoch() != 0) {
+ OSDMap curmap;
+ int ret = get_osdmap(store, sb.current_epoch, curmap);
+ if (ret)
+ return ret;
+ spg_t parent(ms.info.pgid);
+ if (parent.is_split(
+ ms.osdmap.get_pg_num(ms.info.pgid.pgid.m_pool),
+ curmap.get_pg_num(ms.info.pgid.pgid.m_pool),
+ NULL)) {
+ // ms.past_intervals.clear();
+ // ms.map_epoch = sb.current_epoch;
+ cerr << "Import failed due to a split" << std::endl;
+ return EINVAL;
+ }
+ }
+
return 0;
}
if (ret) return ret;
break;
case TYPE_PG_METADATA:
- ret = get_pg_metadata(ebl, ms);
+ ret = get_pg_metadata(store, ebl, ms, sb);
if (ret) return ret;
found_metadata = true;
break;