]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ceph_objectstore_tool: Check for splits and fail import if there were splits
authorDavid Zafman <dzafman@redhat.com>
Wed, 19 Nov 2014 19:47:36 +0000 (11:47 -0800)
committerDavid Zafman <dzafman@redhat.com>
Tue, 3 Mar 2015 19:20:59 +0000 (11:20 -0800)
Add osdmap into metadata_section
On export put metadata_section before file data

Fixes: #9780
Signed-off-by: David Zafman <dzafman@redhat.com>
(cherry picked from commit 19fdeea8b67091ed044ebce25799d3237b4d734a)

src/tools/ceph_objectstore_tool.cc

index 52f1dc10374a4d21f252c4274529a16b693fe695..1360b14830e61113689d5225ab8e5dfe35b1f4de 100644 (file)
@@ -312,6 +312,7 @@ struct metadata_section {
   pg_info_t info;
   pg_log_t log;
   map<epoch_t,pg_interval_t> past_intervals;
+  OSDMap osdmap;
 
   metadata_section(__u8 struct_ver, epoch_t map_epoch, const pg_info_t &info,
                   const pg_log_t &log, map<epoch_t,pg_interval_t> &past_intervals)
@@ -325,16 +326,17 @@ struct metadata_section {
       map_epoch(0) { }
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(2, 1, bl);
+    ENCODE_START(3, 1, bl);
     ::encode(struct_ver, bl);
     ::encode(map_epoch, bl);
     ::encode(info, bl);
     ::encode(log, bl);
     ::encode(past_intervals, bl);
+    osdmap.encode(bl);
     ENCODE_FINISH(bl);
   }
   void decode(bufferlist::iterator& bl) {
-    DECODE_START(2, bl);
+    DECODE_START(3, bl);
     ::decode(struct_ver, bl);
     ::decode(map_epoch, bl);
     ::decode(info, bl);
@@ -344,6 +346,11 @@ struct metadata_section {
     } else {
       cout << "NOTICE: Older export without past_intervals" << std::endl;
     }
+    if (struct_v > 2) {
+      osdmap.decode(bl);
+    } else {
+      cout << "WARNING: Older export without OSDMap information" << std::endl;
+    }
     DECODE_FINISH(bl);
   }
 };
@@ -942,6 +949,11 @@ int get_osdmap(ObjectStore *store, epoch_t e, OSDMap &osdmap)
   return 0;
 }
 
+int add_osdmap(ObjectStore *store, metadata_section &ms)
+{
+  return get_osdmap(store, ms.map_epoch, ms.osdmap);
+}
+
 //Write super_header with its fixed 16 byte length
 void write_super()
 {
@@ -989,17 +1001,22 @@ int do_export(ObjectStore *fs, coll_t coll, spg_t pgid, pg_info_t &info,
   if (ret)
     return ret;
 
+  // The metadata_section is now before files, so import can detect
+  // errors and abort without wasting time.
+  metadata_section ms(struct_ver, map_epoch, info, log, past_intervals);
+  ret = add_osdmap(fs, ms);
+  if (ret)
+    return ret;
+  ret = write_section(TYPE_PG_METADATA, ms, file_fd);
+  if (ret)
+    return ret;
+
   ret = export_files(fs, coll);
   if (ret) {
     cerr << "export_files error " << ret << std::endl;
     return ret;
   }
 
-  metadata_section ms(struct_ver, map_epoch, info, log, past_intervals);
-  ret = write_section(TYPE_PG_METADATA, ms, file_fd);
-  if (ret)
-    return ret;
-
   ret = write_simple(TYPE_PG_END, file_fd);
   if (ret)
     return ret;
@@ -1396,7 +1413,7 @@ int get_object(ObjectStore *store, coll_t coll, bufferlist &bl)
   return 0;
 }
 
-int get_pg_metadata(bufferlist &bl, metadata_section &ms)
+int get_pg_metadata(ObjectStore *store, bufferlist &bl, metadata_section &ms, const OSDSuperblock& sb)
 {
   bufferlist::iterator ebliter = bl.begin();
   ms.decode(ebliter);
@@ -1418,6 +1435,25 @@ int get_pg_metadata(bufferlist &bl, metadata_section &ms)
   cout << std::endl;
 #endif
 
+  // If the osdmap was present in the metadata we can check for splits.
+  // Pool verified to exist for call to get_pg_num().
+  if (ms.osdmap.get_epoch() != 0) {
+    OSDMap curmap;
+    int ret = get_osdmap(store, sb.current_epoch, curmap);
+    if (ret)
+      return ret;
+    spg_t parent(ms.info.pgid);
+    if (parent.is_split(
+       ms.osdmap.get_pg_num(ms.info.pgid.pgid.m_pool),
+       curmap.get_pg_num(ms.info.pgid.pgid.m_pool),
+       NULL)) {
+      // ms.past_intervals.clear();
+      // ms.map_epoch = sb.current_epoch;
+      cerr << "Import failed due to a split" << std::endl;
+      return EINVAL;    
+    }
+  }
+
   return 0;
 }
 
@@ -1660,7 +1696,7 @@ int do_import(ObjectStore *store, OSDSuperblock& sb)
       if (ret) return ret;
       break;
     case TYPE_PG_METADATA:
-      ret = get_pg_metadata(ebl, ms);
+      ret = get_pg_metadata(store, ebl, ms, sb);
       if (ret) return ret;
       found_metadata = true;
       break;