]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: move pg log into leveldb
authorDavid Zafman <david.zafman@inktank.com>
Fri, 8 Feb 2013 22:43:50 +0000 (14:43 -0800)
committerSamuel Just <sam.just@inktank.com>
Fri, 15 Feb 2013 19:11:13 +0000 (11:11 -0800)
log from wip-pginfo
Fix bugs in PG::read_log() handling
Eliminate compiler warnings

Feature #4075: osd: move pg log into leveldb

Signed-off-by: Sage Weil <sage@inktank.com>
Signed-off-by: David Zafman <david.zafman@inktank.com>
src/osd/PG.cc
src/osd/PG.h
src/osd/osd_types.cc
src/osd/osd_types.h

index dd32a3e38ca0368af9c15f3527013449051de73f..ee1df9fc253f26171667bcecb990ba22bbefe615 100644 (file)
@@ -142,7 +142,7 @@ std::string PG::gen_prefix() const
   
 
 
-void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s) 
+void PG::IndexedLog::trim(ObjectStore::Transaction& t, hobject_t& log_oid, eversion_t s)
 {
   if (complete_to != log.end() &&
       complete_to->version <= s) {
@@ -150,14 +150,17 @@ void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s)
                    << " on " << *this << dendl;
   }
 
+  set<string> keys_to_rm;
   while (!log.empty()) {
     pg_log_entry_t &e = *log.begin();
     if (e.version > s)
       break;
     generic_dout(20) << "trim " << e << dendl;
     unindex(e);         // remove from index,
+    keys_to_rm.insert(e.get_key_name());
     log.pop_front();    // from log
   }
+  t.omap_rmkeys(coll_t::META_COLL, log_oid, keys_to_rm);
 
   // raise tail?
   if (tail < s)
@@ -2398,37 +2401,22 @@ epoch_t PG::peek_map_epoch(ObjectStore *store, coll_t coll, hobject_t &infos_oid
 void PG::write_log(ObjectStore::Transaction& t)
 {
   dout(10) << "write_log" << dendl;
-
-  // assemble buffer
-  bufferlist bl;
-
-  // build buffer
-  ondisklog.tail = 0;
+  t.remove(coll_t::META_COLL, log_oid);
+  t.touch(coll_t::META_COLL, log_oid);
+  map<string,bufferlist> keys;
   for (list<pg_log_entry_t>::iterator p = log.log.begin();
        p != log.log.end();
        p++) {
-    uint64_t startoff = bl.length();
-
-    bufferlist ebl(sizeof(*p)*2);
-    ::encode(*p, ebl);
-    __u32 crc = ebl.crc32c(0);
-    ::encode(ebl, bl);
-    ::encode(crc, bl);
-
-    p->offset = startoff;
+    bufferlist bl(sizeof(*p) * 2);
+    p->encode_with_checksum(bl);
+    keys[p->get_key_name()].claim(bl);
   }
-  ondisklog.head = bl.length();
-  ondisklog.has_checksums = true;
+  dout(10) << "write_log " << keys.size() << " keys" << dendl;
 
-  // write it
-  t.remove(coll_t::META_COLL, log_oid );
-  t.write(coll_t::META_COLL, log_oid , 0, bl.length(), bl);
+  ::encode(ondisklog.divergent_priors, keys["divergent_priors"]);
+
+  t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
 
-  bufferlist blb(sizeof(ondisklog));
-  ::encode(ondisklog, blb);
-  t.collection_setattr(coll, "ondisklog", blb);
-  
-  dout(10) << "write_log to " << ondisklog.tail << "~" << ondisklog.length() << dendl;
   dirty_log = false;
 }
 
@@ -2452,47 +2440,11 @@ void PG::trim(ObjectStore::Transaction& t, eversion_t trim_to)
     assert(trim_to <= info.last_complete);
 
     dout(10) << "trim " << log << " to " << trim_to << dendl;
-    log.trim(t, trim_to);
+    log.trim(t, log_oid, trim_to);
     info.log_tail = log.tail;
-    trim_ondisklog(t);
   }
 }
 
-void PG::trim_ondisklog(ObjectStore::Transaction& t) 
-{
-  uint64_t new_tail;
-  if (log.empty()) {
-    new_tail = ondisklog.head;
-  } else {
-    new_tail = log.log.front().offset;
-  }
-  bool same_block = (new_tail & ~4095) == (ondisklog.tail & ~4095);
-  dout(15) << "trim_ondisklog tail " << ondisklog.tail << " -> " << new_tail
-          << ", now " << new_tail << "~" << (ondisklog.head - new_tail)
-          << " " << (same_block ? "(same block)" : "(different block)")
-          << dendl;
-  assert(new_tail >= ondisklog.tail);
-  
-  if (same_block)
-    return;
-
-  ondisklog.tail = new_tail;
-
-  if (!g_conf->osd_preserve_trimmed_log) {
-    uint64_t zt = new_tail & ~4095;
-    if (zt > ondisklog.zero_to) {
-      t.zero(coll_t::META_COLL, log_oid, ondisklog.zero_to, zt - ondisklog.zero_to);
-      dout(15) << "trim_ondisklog zeroing from " << ondisklog.zero_to
-              << " to " << zt << dendl;
-      ondisklog.zero_to = zt;
-    }
-  }
-
-  bufferlist blb(sizeof(ondisklog));
-  ::encode(ondisklog, blb);
-  t.collection_setattr(coll, "ondisklog", blb);
-}
-
 void PG::trim_peers()
 {
   calc_trim_to();
@@ -2518,16 +2470,9 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl)
 
   // log mutation
   log.add(e);
-  if (ondisklog.has_checksums) {
-    bufferlist ebl(sizeof(e)*2);
-    ::encode(e, ebl);
-    __u32 crc = ebl.crc32c(0);
-    ::encode(ebl, log_bl);
-    ::encode(crc, log_bl);
-  } else {
-    ::encode(e, log_bl);
-  }
   dout(10) << "add_log_entry " << e << dendl;
+
+  e.encode_with_checksum(log_bl);
 }
 
 
@@ -2536,24 +2481,16 @@ void PG::append_log(
 {
   dout(10) << "append_log " << log << " " << logv << dendl;
 
-  bufferlist bl;
+  map<string,bufferlist> keys;
   for (vector<pg_log_entry_t>::iterator p = logv.begin();
        p != logv.end();
        p++) {
-    p->offset = ondisklog.head + bl.length();
-    add_log_entry(*p, bl);
+    p->offset = 0;
+    add_log_entry(*p, keys[p->get_key_name()]);
   }
 
-  dout(10) << "append_log  " << ondisklog.tail << "~" << ondisklog.length()
-          << " adding " << bl.length() << dendl;
-
-  t.write(coll_t::META_COLL, log_oid, ondisklog.head, bl.length(), bl );
-  ondisklog.head += bl.length();
-
-  bufferlist blb(sizeof(ondisklog));
-  ::encode(ondisklog, blb);
-  t.collection_setattr(coll, "ondisklog", blb);
-  dout(10) << "append_log  now " << ondisklog.tail << "~" << ondisklog.length() << dendl;
+  dout(10) << "append_log  adding " << keys.size() << " keys" << dendl;
+  t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
 
   trim(t, trim_to);
 
@@ -2718,39 +2655,10 @@ void PG::read_state(ObjectStore *store, bufferlist &bl)
     osd->infos_oid, snap_collections, info_struct_v);
   assert(r >= 0);
 
-  try {
-    ostringstream oss;
-    read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this);
-    if (oss.str().length())
-      osd->clog.error() << oss.str();
-  }
-  catch (const buffer::error &e) {
-    string cr_log_coll_name(get_corrupt_pg_log_name());
-    dout(0) << "Got exception '" << e.what() << "' while reading log. "
-            << "Moving corrupted log file to '" << cr_log_coll_name
-           << "' for later " << "analysis." << dendl;
-
-    ondisklog.zero();
-
-    // clear log index
-    log.head = log.tail = info.last_update;
-
-    // reset info
-    info.log_tail = info.last_update;
-
-    // Move the corrupt log to a new place and create a new zero-length log entry.
-    ObjectStore::Transaction t;
-    coll_t cr_log_coll(cr_log_coll_name);
-    t.create_collection(cr_log_coll);
-    t.collection_move(cr_log_coll, coll_t::META_COLL, log_oid);
-    t.touch(coll_t::META_COLL, log_oid);
-    dirty_info = true;
-    write_if_dirty(t);
-    store->apply_transaction(t);
-
-    info.last_backfill = hobject_t();
-    info.stats.stats.clear();
-  }
+  ostringstream oss;
+  read_log(store, coll, log_oid, info, ondisklog, log, missing, oss, this);
+  if (oss.str().length())
+    osd->clog.error() << oss;
 
   // log any weirdness
   log_weirdness();
@@ -5139,16 +5047,125 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
   const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
   pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
 {
-  // load bounds
-  ondisklog.tail = ondisklog.head = 0;
+  dout(10) << "read_log" << dendl;
 
-  bufferlist blb;
-  store->collection_getattr(coll, "ondisklog", blb);
-  bufferlist::iterator p = blb.begin();
-  ::decode(ondisklog, p);
+  // legacy?
+  struct stat st;
+  int r = store->stat(coll_t::META_COLL, log_oid, &st);
+  assert(r == 0);
+  if (st.st_size > 0) {
+    read_log_old(store, coll, log_oid, info, ondisklog, log, missing, oss, passedpg);
+    return;
+  }
+
+  log.tail = info.log_tail;
+  ObjectMap::ObjectMapIterator p = store->get_omap_iterator(coll_t::META_COLL, log_oid);
+  if (p) for (p->seek_to_first(); p->valid() ; p->next()) {
+    bufferlist bl = p->value();//Copy bufferlist before creating iterator
+    bufferlist::iterator bp = bl.begin();
+    if (p->key() == "divergent_priors") {
+      ::decode(ondisklog.divergent_priors, bp);
+      dout(20) << "read_log " << ondisklog.divergent_priors.size() << " divergent_priors" << dendl;
+    } else {
+      pg_log_entry_t e;
+      e.decode_with_checksum(bp);
+      dout(20) << "read_log " << e << dendl;
+      log.log.push_back(e);
+      log.head = e.version;
+    }
+  }
+  log.head = info.last_update;
+  log.index();
 
-  dout(10) << "read_log " << ondisklog.tail << "~" << ondisklog.length() << dendl;
+  // build missing
+  if (info.last_complete < info.last_update) {
+    dout(10) << "read_log checking for missing items over interval (" << info.last_complete
+            << "," << info.last_update << "]" << dendl;
 
+    set<hobject_t> did;
+    for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
+        i != log.log.rend();
+        i++) {
+      if (i->version <= info.last_complete) break;
+      if (did.count(i->soid)) continue;
+      did.insert(i->soid);
+      
+      if (i->is_delete()) continue;
+      
+      bufferlist bv;
+      int r = store->getattr(coll, i->soid, OI_ATTR, bv);
+      if (r >= 0) {
+       object_info_t oi(bv);
+       if (oi.version < i->version) {
+         dout(15) << "read_log  missing " << *i << " (have " << oi.version << ")" << dendl;
+         missing.add(i->soid, i->version, oi.version);
+       }
+      } else {
+       dout(15) << "read_log  missing " << *i << dendl;
+       missing.add(i->soid, i->version, eversion_t());
+      }
+    }
+    for (map<eversion_t, hobject_t>::reverse_iterator i =
+          ondisklog.divergent_priors.rbegin();
+        i != ondisklog.divergent_priors.rend();
+        ++i) {
+      if (i->first <= info.last_complete) break;
+      if (did.count(i->second)) continue;
+      did.insert(i->second);
+      bufferlist bv;
+      int r = store->getattr(coll, i->second, OI_ATTR, bv);
+      if (r >= 0) {
+       object_info_t oi(bv);
+       /**
+        * 1) we see this entry in the divergent priors mapping
+        * 2) we didn't see an entry for this object in the log
+        *
+        * From 1 & 2 we know that either the object does not exist
+        * or it is at the version specified in the divergent_priors
+        * map since the object would have been deleted atomically
+        * with the addition of the divergent_priors entry, an older
+        * version would not have been recovered, and a newer version
+        * would show up in the log above.
+        */
+       assert(oi.version == i->first);
+      } else {
+       dout(15) << "read_log  missing " << *i << dendl;
+       missing.add(i->second, i->first, eversion_t());
+      }
+    }
+  }
+  dout(10) << "read_log done" << dendl;
+}
+
+void PG::read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
+  const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+  pg_missing_t &missing, ostringstream &oss, const PG *passedpg)
+{
+  // load bounds, based on old OndiskLog encoding.
+  uint64_t ondisklog_tail = 0;
+  uint64_t ondisklog_head = 0;
+  uint64_t ondisklog_zero_to;
+  bool ondisklog_has_checksums;
+
+  bufferlist blb;
+  store->collection_getattr(coll, "ondisklog", blb);
+  {
+    bufferlist::iterator bl = blb.begin();
+    DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl);
+    ondisklog_has_checksums = (struct_v >= 2);
+    ::decode(ondisklog_tail, bl);
+    ::decode(ondisklog_head, bl);
+    if (struct_v >= 4)
+      ::decode(ondisklog_zero_to, bl);
+    else
+      ondisklog_zero_to = 0;
+    if (struct_v >= 5)
+      ::decode(ondisklog.divergent_priors, bl);
+    DECODE_FINISH(bl);
+  }
+  uint64_t ondisklog_length = ondisklog_head - ondisklog_tail;
+  dout(10) << "read_log " << ondisklog_tail << "~" << ondisklog_length << dendl;
   log.tail = info.log_tail;
 
   // In case of sobject_t based encoding, may need to list objects in the store
@@ -5156,15 +5173,15 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
   bool listed_collection = false;
   vector<hobject_t> ls;
   
-  if (ondisklog.head > 0) {
+  if (ondisklog_head > 0) {
     // read
     bufferlist bl;
-    store->read(coll_t::META_COLL, log_oid, ondisklog.tail, ondisklog.length(), bl);
-    if (bl.length() < ondisklog.length()) {
+    store->read(coll_t::META_COLL, log_oid, ondisklog_tail, ondisklog_length, bl);
+    if (bl.length() < ondisklog_length) {
       std::ostringstream oss;
       oss << "read_log got " << bl.length() << " bytes, expected "
-         << ondisklog.head << "-" << ondisklog.tail << "="
-         << ondisklog.length();
+         << ondisklog_head << "-" << ondisklog_tail << "="
+         << ondisklog_length;
       throw read_log_error(oss.str().c_str());
     }
     
@@ -5174,8 +5191,8 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
     eversion_t last;
     bool reorder = false;
     while (!p.end()) {
-      uint64_t pos = ondisklog.tail + p.get_off();
-      if (ondisklog.has_checksums) {
+      uint64_t pos = ondisklog_tail + p.get_off();
+      if (ondisklog_has_checksums) {
        bufferlist ebl;
        ::decode(ebl, p);
        __u32 crc;
@@ -5243,19 +5260,19 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
       }
 
       e.offset = pos;
-      uint64_t endpos = ondisklog.tail + p.get_off();
+      uint64_t endpos = ondisklog_tail + p.get_off();
       log.log.push_back(e);
       last = e.version;
 
       // [repair] at end of log?
       if (!p.end() && e.version == info.last_update) {
        oss << info.pgid << " log has extra data at "
-           << endpos << "~" << (ondisklog.head-endpos) << " after "
-           << info.last_update << "\n";
+          << endpos << "~" << (ondisklog_head-endpos) << " after "
+          << info.last_update << "\n";
 
        dout(0) << "read_log " << endpos << " *** extra gunk at end of log, "
-               << "adjusting ondisklog.head" << dendl;
-       ondisklog.head = endpos;
+               << "adjusting ondisklog_head" << dendl;
+       ondisklog_head = endpos;
        break;
       }
     }
@@ -5270,68 +5287,6 @@ void PG::read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
        log.log.push_back(p->second);
     }
   }
-
-  log.head = info.last_update;
-  log.index();
-
-  // build missing
-  if (info.last_complete < info.last_update) {
-    dout(10) << "read_log checking for missing items over interval (" << info.last_complete
-            << "," << info.last_update << "]" << dendl;
-
-    set<hobject_t> did;
-    for (list<pg_log_entry_t>::reverse_iterator i = log.log.rbegin();
-        i != log.log.rend();
-        i++) {
-      if (i->version <= info.last_complete) break;
-      if (did.count(i->soid)) continue;
-      did.insert(i->soid);
-      
-      if (i->is_delete()) continue;
-      
-      bufferlist bv;
-      int r = store->getattr(coll, i->soid, OI_ATTR, bv);
-      if (r >= 0) {
-       object_info_t oi(bv);
-       if (oi.version < i->version) {
-         dout(15) << "read_log  missing " << *i << " (have " << oi.version << ")" << dendl;
-         missing.add(i->soid, i->version, oi.version);
-       }
-      } else {
-       dout(15) << "read_log  missing " << *i << dendl;
-       missing.add(i->soid, i->version, eversion_t());
-      }
-    }
-    for (map<eversion_t, hobject_t>::reverse_iterator i =
-          ondisklog.divergent_priors.rbegin();
-        i != ondisklog.divergent_priors.rend();
-        ++i) {
-      if (i->first <= info.last_complete) break;
-      if (did.count(i->second)) continue;
-      did.insert(i->second);
-      bufferlist bv;
-      int r = store->getattr(coll, i->second, OI_ATTR, bv);
-      if (r >= 0) {
-       object_info_t oi(bv);
-       /**
-        * 1) we see this entry in the divergent priors mapping
-        * 2) we didn't see an entry for this object in the log
-        *
-        * From 1 & 2 we know that either the object does not exist
-        * or it is at the version specified in the divergent_priors
-        * map since the object would have been deleted atomically
-        * with the addition of the divergent_priors entry, an older
-        * version would not have been recovered, and a newer version
-        * would show up in the log above.
-        */
-       assert(oi.version == i->first);
-      } else {
-       dout(15) << "read_log  missing " << *i << dendl;
-       missing.add(i->second, i->first, eversion_t());
-      }
-    }
-  }
-  dout(10) << "read_log done" << dendl;
 }
 
 /*------------ Recovery State Machine----------------*/
index a60de9a5f201cdd0b9453235671fccee4edc3e83..26c5368897155cb36b5cc89b07714525c6243b1c 100644 (file)
@@ -267,7 +267,7 @@ public:
       caller_ops[e.reqid] = &(log.back());
     }
 
-    void trim(ObjectStore::Transaction &t, eversion_t s);
+    void trim(ObjectStore::Transaction &t, hobject_t& oid, eversion_t s);
 
     ostream& print(ostream& out) const;
   };
@@ -334,6 +334,16 @@ public:
       f->dump_unsigned("head", head);
       f->dump_unsigned("tail", tail);
       f->dump_unsigned("zero_to", zero_to);
+      f->open_array_section("divergent_priors");
+      for (map<eversion_t, hobject_t>::const_iterator p = divergent_priors.begin();
+          p != divergent_priors.end();
+          ++p) {
+       f->open_object_section("prior");
+       f->dump_stream("version") << p->first;
+       f->dump_stream("object") << p->second;
+       f->close_section();
+      }
+      f->close_section();
     }
     static void generate_test_instances(list<OndiskLog*>& o) {
       o.push_back(new OndiskLog);
@@ -1791,9 +1801,11 @@ public:
   static void read_log(ObjectStore *store, coll_t coll, hobject_t log_oid,
     const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
     pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL);
+  static void read_log_old(ObjectStore *store, coll_t coll, hobject_t log_oid,
+    const pg_info_t &info, OndiskLog &ondisklog, IndexedLog &log,
+    pg_missing_t &missing, ostringstream &oss, const PG *passedpg = NULL);
   bool check_log_for_corruption(ObjectStore *store);
   void trim(ObjectStore::Transaction& t, eversion_t v);
-  void trim_ondisklog(ObjectStore::Transaction& t);
   void trim_peers();
 
   std::string get_corrupt_pg_log_name() const;
index d6faa273c47fcf962864167a12070809329988d3..8ef0c9b58a148f5e950fce949ad3b85fb4a58af6 100644 (file)
@@ -1710,6 +1710,34 @@ void pg_query_t::generate_test_instances(list<pg_query_t*>& o)
 
 // -- pg_log_entry_t --
 
+string pg_log_entry_t::get_key_name() const
+{
+  char key[40];
+  snprintf(key, sizeof(key), "%010u.%020lu", version.epoch, version.version);
+  return string(key);
+}
+
+void pg_log_entry_t::encode_with_checksum(bufferlist& bl) const
+{
+  bufferlist ebl(sizeof(*this)*2);
+  encode(ebl);
+  __u32 crc = ebl.crc32c(0);
+  ::encode(ebl, bl);
+  ::encode(crc, bl);
+}
+
+void pg_log_entry_t::decode_with_checksum(bufferlist::iterator& p)
+{
+  bufferlist bl;
+  ::decode(bl, p);
+  __u32 crc;
+  ::decode(crc, p);
+  if (crc != bl.crc32c(0))
+    throw buffer::malformed_input("bad checksum on pg_log_entry_t");
+  bufferlist::iterator q = bl.begin();
+  decode(q);
+}
+
 void pg_log_entry_t::encode(bufferlist &bl) const
 {
   ENCODE_START(7, 4, bl);
index 66818bc07570b70b1b014a4dd7343710a7ce98d0..1d793ec515b3b0b8dda527d800ea23320a1fdb3b 100644 (file)
@@ -1318,6 +1318,10 @@ struct pg_log_entry_t {
     return reqid != osd_reqid_t() && (op == MODIFY || op == DELETE);
   }
 
+  string get_key_name() const;
+  void encode_with_checksum(bufferlist& bl) const;
+  void decode_with_checksum(bufferlist::iterator& p);
+
   void encode(bufferlist &bl) const;
   void decode(bufferlist::iterator &bl);
   void dump(Formatter *f) const;