]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/newstore: assigned unique nid to each new object
authorSage Weil <sage@redhat.com>
Tue, 18 Aug 2015 14:10:54 +0000 (10:10 -0400)
committerSage Weil <sage@redhat.com>
Tue, 1 Sep 2015 17:39:36 +0000 (13:39 -0400)
Use this as the key for omap (omap_head), but keep the omap_head field
so that we can tell when no omap data is present.

Signed-off-by: Sage Weil <sage@redhat.com>
src/common/config_opts.h
src/os/newstore/NewStore.cc
src/os/newstore/NewStore.h
src/os/newstore/newstore_types.cc
src/os/newstore/newstore_types.h

index 6316c31bd04cfd7f3ce0c9d5cf95e2f66a7dcdd5..96a9f78726b6de85062a79efefe9b8fdf9789432 100644 (file)
@@ -799,6 +799,7 @@ OPTION(newstore_fsync_threads, OPT_INT, 16)  // num threads calling fsync
 OPTION(newstore_fsync_thread_timeout, OPT_INT, 30) // thread timeout value
 OPTION(newstore_fsync_thread_suicide_timeout, OPT_INT, 120) // suicide timeout value
 OPTION(newstore_fid_prealloc, OPT_INT, 1024)
+OPTION(newstore_nid_prealloc, OPT_INT, 1024)
 
 OPTION(filestore_omap_backend, OPT_STR, "leveldb")
 
index d42721ddfda37a202a0b1e525fde2c2ace142197..87a8fbab01ad9760b08f814b43c32b06f47cc6e4 100644 (file)
@@ -565,6 +565,8 @@ NewStore::NewStore(CephContext *cct, const string& path)
     mounted(false),
     coll_lock("NewStore::coll_lock"),
     fid_lock("NewStore::fid_lock"),
+    nid_lock("NewStore::nid_lock"),
+    nid_max(0),
     wal_lock("NewStore::wal_lock"),
     wal_seq(0),
     finisher(cct),
@@ -921,7 +923,7 @@ int NewStore::mount()
   if (r < 0)
     goto out_db;
 
-  r = _recover_next_omap_id();
+  r = _recover_next_nid();
   if (r < 0)
     goto out_db;
 
@@ -1509,7 +1511,7 @@ int NewStore::OmapIteratorImpl::next()
 {
   RWLock::RLocker l(c->lock);
   it->next();
-  if (!it->valid() || it->key() == tail) {
+  if (!it->valid() || it->key() >= tail) {
     it = KeyValueDB::Iterator();
   }
   return 0;
@@ -1559,7 +1561,7 @@ int NewStore::omap_get(
       if (it->key() == head) {
        dout(30) << __func__ << "  got header" << dendl;
        *header = it->value();
-      } else if (it->key() == tail) {
+      } else if (it->key() >= tail) {
        dout(30) << __func__ << "  reached tail" << dendl;
        break;
       } else {
@@ -1642,7 +1644,7 @@ int NewStore::omap_get_keys(
        it->next();
        continue;
       }
-      if (it->key() == tail) {
+      if (it->key() >= tail) {
        dout(30) << __func__ << "  reached tail" << dendl;
        break;
       }
@@ -1741,38 +1743,34 @@ ObjectMap::ObjectMapIterator NewStore::get_omap_iterator(
 // -----------------
 // write helpers
 
-int NewStore::_recover_next_omap_id()
+int NewStore::_recover_next_nid()
 {
-  KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
-  it->lower_bound("GGGGGGGG");
-  if (!it->valid()) {
-    omap_id.set(1);
-    dout(10) << __func__ << " no omap keys, starting at 1" << dendl;
-    return 0;
-  }
-  dout(20) << __func__ << " last key is " << it->key() << dendl;
-  uint64_t id;
-  int r = sscanf(it->key().c_str(), "%llx", (unsigned long long*)&id);
-  if (r < 0) {
-    derr << "unable to parse " << it->key() << dendl;
-    return -EIO;
+  nid_max = 0;
+  bufferlist bl;
+  db->get(PREFIX_SUPER, "nid_max", &bl);
+  try {
+    ::decode(nid_max, bl);
+  } catch (buffer::error& e) {
   }
-  omap_id.set(id);
+  dout(1) << __func__ << " old nid_max " << nid_max << dendl;
+  nid_last = nid_max;
   return 0;
 }
 
-void NewStore::_get_omap_id(TransContext *txc, OnodeRef o)
+void NewStore::_assign_nid(TransContext *txc, OnodeRef o)
 {
-  if (o->onode.omap_head)
+  if (o->onode.nid)
     return;
-
-  o->onode.omap_head = omap_id.inc();
-  dout(10) << __func__ << " assigned " << o->oid
-          << " id " << o->onode.omap_head << dendl;
-  string tail;
-  get_omap_tail(o->onode.omap_head, &tail);
-  bufferlist empty;
-  txc->t->set(PREFIX_OMAP, tail, empty);
+  Mutex::Locker l(nid_lock);
+  o->onode.nid = ++nid_last;
+  dout(20) << __func__ << " " << o->onode.nid << dendl;
+  if (nid_last > nid_max) {
+    nid_max += g_conf->newstore_nid_prealloc;
+    bufferlist bl;
+    ::encode(nid_max, bl);
+    txc->t->set(PREFIX_SUPER, "nid_max", bl);
+    dout(10) << __func__ << " nid_max now " << nid_max << dendl;
+  }
 }
 
 int NewStore::_recover_next_fid()
@@ -2723,6 +2721,7 @@ int NewStore::_touch(TransContext *txc,
   OnodeRef o = c->get_onode(oid, true);
   assert(o);
   o->exists = true;
+  _assign_nid(txc, o);
   txc->write_onode(o);
   dout(10) << __func__ << " " << c->cid << " " << oid << " = " << r << dendl;
   return r;
@@ -2889,6 +2888,7 @@ int NewStore::_write(TransContext *txc,
           << dendl;
   RWLock::WLocker l(c->lock);
   OnodeRef o = c->get_onode(oid, true);
+  _assign_nid(txc, o);
   int r = _do_write(txc, o, offset, length, bl, fadvise_flags);
   txc->write_onode(o);
 
@@ -2910,6 +2910,7 @@ int NewStore::_zero(TransContext *txc,
 
   RWLock::WLocker l(c->lock);
   OnodeRef o = c->get_onode(oid, true);
+  _assign_nid(txc, o);
 
   if (o->onode.data_map.empty()) {
     // we're already a big hole
@@ -3027,7 +3028,7 @@ int NewStore::_truncate(TransContext *txc,
   int r = 0;
 
   RWLock::WLocker l(c->lock);
-  OnodeRef o = c->get_onode(oid, true);
+  OnodeRef o = c->get_onode(oid, false);
   if (!o->exists) {
     r = -ENOENT;
     goto out;
@@ -3059,7 +3060,7 @@ int NewStore::_do_remove(TransContext *txc,
   o->onode.data_map.clear();
   o->onode.size = 0;
   if (o->onode.omap_head) {
-    _do_omap_clear(txc, o->onode.omap_head, true);
+    _do_omap_clear(txc, o->onode.omap_head);
   }
 
   get_object_key(o->oid, &key);
@@ -3074,7 +3075,7 @@ int NewStore::_remove(TransContext *txc,
   dout(15) << __func__ << " " << c->cid << " " << oid << dendl;
   int r;
   RWLock::WLocker l(c->lock);
-  OnodeRef o = c->get_onode(oid, true);
+  OnodeRef o = c->get_onode(oid, false);
   if (!o || !o->exists) {
     r = -ENOENT;
     goto out;
@@ -3191,8 +3192,7 @@ int NewStore::_rmattrs(TransContext *txc,
   return r;
 }
 
-void NewStore::_do_omap_clear(TransContext *txc, uint64_t id,
-                             bool remove_tail)
+void NewStore::_do_omap_clear(TransContext *txc, uint64_t id)
 {
   KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
   string prefix, tail;
@@ -3200,10 +3200,7 @@ void NewStore::_do_omap_clear(TransContext *txc, uint64_t id,
   get_omap_tail(id, &tail);
   it->lower_bound(prefix);
   while (it->valid()) {
-    if (it->key() == tail) {
-      if (remove_tail) {
-       txc->t->rmkey(PREFIX_OMAP, it->key());
-      }
+    if (it->key() >= tail) {
       dout(30) << __func__ << "  stop at " << tail << dendl;
       break;
     }
@@ -3227,7 +3224,7 @@ int NewStore::_omap_clear(TransContext *txc,
     goto out;
   }
   if (o->onode.omap_head != 0) {
-    _do_omap_clear(txc, o->onode.omap_head, false);
+    _do_omap_clear(txc, o->onode.omap_head);
   }
   r = 0;
 
@@ -3250,7 +3247,10 @@ int NewStore::_omap_setkeys(TransContext *txc,
     r = -ENOENT;
     goto out;
   }
-  _get_omap_id(txc, o);
+  if (!o->onode.omap_head) {
+    o->onode.omap_head = o->onode.nid;
+    txc->write_onode(o);
+  }
   for (map<string,bufferlist>::const_iterator p = m.begin(); p != m.end(); ++p) {
     string key;
     get_omap_key(o->onode.omap_head, p->first, &key);
@@ -3279,7 +3279,10 @@ int NewStore::_omap_setheader(TransContext *txc,
     r = -ENOENT;
     goto out;
   }
-  _get_omap_id(txc, o);
+  if (!o->onode.omap_head) {
+    o->onode.omap_head = o->onode.nid;
+    txc->write_onode(o);
+  }
   get_omap_header(o->onode.omap_head, &key);
   txc->t->set(PREFIX_OMAP, key, bl);
   r = 0;
@@ -3307,7 +3310,10 @@ int NewStore::_omap_rmkeys(TransContext *txc,
     r = 0;
     goto out;
   }
-  _get_omap_id(txc, o);
+  if (!o->onode.omap_head) {
+    o->onode.omap_head = o->onode.nid;
+    txc->write_onode(o);
+  }
   for (set<string>::const_iterator p = m.begin(); p != m.end(); ++p) {
     string key;
     get_omap_key(o->onode.omap_head, *p, &key);
@@ -3411,6 +3417,7 @@ int NewStore::_clone(TransContext *txc,
   newo = c->get_onode(new_oid, true);
   assert(newo);
   newo->exists = true;
+  _assign_nid(txc, newo);
 
   r = _do_read(oldo, 0, oldo->onode.size, bl, 0);
   if (r < 0)
@@ -3431,11 +3438,13 @@ int NewStore::_clone(TransContext *txc,
   // clone omap
   if (newo->onode.omap_head) {
     dout(20) << __func__ << " clearing old omap data" << dendl;
-    _do_omap_clear(txc, newo->onode.omap_head, true);
+    _do_omap_clear(txc, newo->onode.omap_head);
   }
   if (oldo->onode.omap_head) {
     dout(20) << __func__ << " copying omap data" << dendl;
-    _get_omap_id(txc, newo);
+    if (!newo->onode.omap_head) {
+      newo->onode.omap_head = newo->onode.nid;
+    }
     KeyValueDB::Iterator it = db->get_iterator(PREFIX_OMAP);
     string head, tail;
     get_omap_header(oldo->onode.omap_head, &head);
@@ -3443,7 +3452,7 @@ int NewStore::_clone(TransContext *txc,
     it->lower_bound(head);
     while (it->valid()) {
       string key;
-      if (it->key() == tail) {
+      if (it->key() >= tail) {
        dout(30) << __func__ << "  reached tail" << dendl;
        break;
       } else {
index a8d28e18896be9fb837c69339fefeb5c0e810048..137d1b6403693f33ec087b1708feba04c88c94f2 100644 (file)
@@ -378,7 +378,9 @@ private:
   fid_t fid_last;  ///< last allocated fid
   fid_t fid_max;   ///< max fid we can allocate before reserving more
 
-  atomic64_t omap_id;
+  Mutex nid_lock;
+  uint64_t nid_last;
+  uint64_t nid_max;
 
   Mutex wal_lock;
   atomic64_t wal_seq;
@@ -432,8 +434,8 @@ private:
   int _open_fid(fid_t fid);
   int _remove_fid(fid_t fid);
 
-  int _recover_next_omap_id();
-  void _get_omap_id(TransContext *txc, OnodeRef o);
+  int _recover_next_nid();
+  void _assign_nid(TransContext *txc, OnodeRef o);
 
   int _clean_fid_tail(TransContext *txc, const fragment_t& f);
 
@@ -657,7 +659,7 @@ private:
   int _rmattrs(TransContext *txc,
               CollectionRef& c,
               const ghobject_t& oid);
-  void _do_omap_clear(TransContext *txc, uint64_t id, bool remove_tail);
+  void _do_omap_clear(TransContext *txc, uint64_t id);
   int _omap_clear(TransContext *txc,
                  CollectionRef& c,
                  const ghobject_t& oid);
index 3e5d0aca6642918f3d981e247c263d398ef41d17..53e0887b7615b1ff0123040a48e5083e8238f7fb 100644 (file)
@@ -97,6 +97,7 @@ void fragment_t::generate_test_instances(list<fragment_t*>& o)
 void onode_t::encode(bufferlist& bl) const
 {
   ENCODE_START(1, 1, bl);
+  ::encode(nid, bl);
   ::encode(size, bl);
   ::encode(attrs, bl);
   ::encode(data_map, bl);
@@ -109,6 +110,7 @@ void onode_t::encode(bufferlist& bl) const
 void onode_t::decode(bufferlist::iterator& p)
 {
   DECODE_START(1, p);
+  ::decode(nid, p);
   ::decode(size, p);
   ::decode(attrs, p);
   ::decode(data_map, p);
@@ -120,6 +122,7 @@ void onode_t::decode(bufferlist::iterator& p)
 
 void onode_t::dump(Formatter *f) const
 {
+  f->dump_unsigned("nid", nid);
   f->dump_unsigned("size", size);
   f->open_object_section("attrs");
   for (map<string,bufferptr>::const_iterator p = attrs.begin();
index 6d9ddfc778d1b6422e3f5ea24f15958339b21fdd..8411d6bb44e6f61d0159bc770f9b98cd259b4618 100644 (file)
@@ -90,6 +90,7 @@ WRITE_CLASS_ENCODER(fragment_t)
 
 /// onode: per-object metadata
 struct onode_t {
+  uint64_t nid;                        ///< numeric id (locally unique)
   uint64_t size;                       ///< object size
   map<string, bufferptr> attrs;        ///< attrs
   map<uint64_t, fragment_t> data_map;  ///< data (offset to fragment mapping)
@@ -99,7 +100,9 @@ struct onode_t {
   uint32_t expected_write_size;
 
   onode_t()
-    : size(0), omap_head(0),
+    : nid(0),
+      size(0),
+      omap_head(0),
       expected_object_size(0),
       expected_write_size(0) {}