]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: explicitly use a 'meta' Sequencer
authorSage Weil <sage@redhat.com>
Mon, 6 Apr 2015 20:55:46 +0000 (13:55 -0700)
committerSage Weil <sage@redhat.com>
Wed, 19 Aug 2015 21:03:54 +0000 (17:03 -0400)
Use an explicit Sequencer for all transactions.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osd/OSD.cc
src/osd/OSD.h

index 96fae67a5ceb930b8a62d3a8266413abbc3b4a65..4fbcf7fc7a53c7cbc643f99dde9f5ea30cde4b8a 100644 (file)
@@ -195,6 +195,7 @@ CompatSet OSD::get_osd_compat_set() {
 OSDService::OSDService(OSD *osd) :
   osd(osd),
   cct(osd->cct),
+  meta_osr(new ObjectStore::Sequencer("meta")),
   whoami(osd->whoami), store(osd->store),
   log_client(osd->log_client), clog(osd->clog),
   pg_recovery_stats(osd->pg_recovery_stats),
@@ -1323,6 +1324,9 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
 {
   int ret;
 
+  ceph::shared_ptr<ObjectStore::Sequencer> osr(
+    new ObjectStore::Sequencer("mkfs"));
+
   try {
     // if we are fed a uuid for this osd, use it.
     store->set_fsid(cct->_conf->osd_uuid);
@@ -1373,7 +1377,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
       ObjectStore::Transaction t;
       t.create_collection(coll_t::meta(), 0);
       t.write(coll_t::meta(), OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl);
-      ret = store->apply_transaction(t);
+      ret = store->apply_transaction(osr.get(), t);
       if (ret) {
        derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_POBJECT: "
             << "apply_transaction returned " << ret << dendl;
@@ -1381,7 +1385,10 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev,
       }
     }
 
-    store->sync_and_flush();
+    C_SaferCond waiter;
+    if (!osr->flush_commit(&waiter)) {
+      waiter.wait();
+    }
 
     ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami);
     if (ret) {
@@ -1830,7 +1837,7 @@ int OSD::init()
     dout(5) << "Upgrading superblock adding: " << diff << dendl;
     ObjectStore::Transaction t;
     write_superblock(t);
-    r = store->apply_transaction(t);
+    r = store->apply_transaction(service.meta_osr.get(), t);
     if (r < 0)
       goto out;
   }
@@ -1840,7 +1847,7 @@ int OSD::init()
     dout(10) << "init creating/touching snapmapper object" << dendl;
     ObjectStore::Transaction t;
     t.touch(coll_t::meta(), OSD::make_snapmapper_oid());
-    r = store->apply_transaction(t);
+    r = store->apply_transaction(service.meta_osr.get(), t);
     if (r < 0)
       goto out;
   }
@@ -2393,7 +2400,7 @@ int OSD::shutdown()
   superblock.clean_thru = osdmap->get_epoch();
   ObjectStore::Transaction t;
   write_superblock(t);
-  int r = store->apply_transaction(t);
+  int r = store->apply_transaction(service.meta_osr.get(), t);
   if (r) {
     derr << "OSD::shutdown: error writing superblock: "
         << cpp_strerror(r) << dendl;
@@ -2542,6 +2549,8 @@ void OSD::recursive_remove_collection(ObjectStore *store, spg_t pgid, coll_t tmp
     coll_t(),
     make_snapmapper_oid());
 
+  ceph::shared_ptr<ObjectStore::Sequencer> osr(
+    new ObjectStore::Sequencer("rm"));
   ObjectStore::Transaction t;
   SnapMapper mapper(&driver, 0, 0, 0, pgid.shard);
 
@@ -2560,16 +2569,20 @@ void OSD::recursive_remove_collection(ObjectStore *store, spg_t pgid, coll_t tmp
       assert(0);
     t.remove(tmp, *p);
     if (removed > 300) {
-      int r = store->apply_transaction(t);
+      int r = store->apply_transaction(osr.get(), t);
       assert(r == 0);
       t = ObjectStore::Transaction();
       removed = 0;
     }
   }
   t.remove_collection(tmp);
-  int r = store->apply_transaction(t);
+  int r = store->apply_transaction(osr.get(), t);
   assert(r == 0);
-  store->sync_and_flush();
+
+  C_SaferCond waiter;
+  if (!osr->flush_commit(&waiter)) {
+    waiter.wait();
+  }
 }
 
 
@@ -2944,7 +2957,7 @@ void OSD::load_pgs()
     dout(1) << __func__ << " removing legacy infos object" << dendl;
     ObjectStore::Transaction t;
     t.remove(coll_t::meta(), OSD::make_infos_oid());
-    int r = store->apply_transaction(t);
+    int r = store->apply_transaction(service.meta_osr.get(), t);
     if (r != 0) {
       derr << __func__ << ": apply_transaction returned "
           << cpp_strerror(r) << dendl;
@@ -3111,13 +3124,13 @@ void OSD::build_past_intervals_parallel()
 
     // don't let the transaction get too big
     if (++num >= cct->_conf->osd_target_transaction_size) {
-      store->apply_transaction(t);
+      store->apply_transaction(service.meta_osr.get(), t);
       t = ObjectStore::Transaction();
       num = 0;
     }
   }
   if (!t.empty())
-    store->apply_transaction(t);
+    store->apply_transaction(service.meta_osr.get(), t);
 }
 
 /*
@@ -4129,7 +4142,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       val.append(valstr);
       newattrs[key] = val;
       t.omap_setkeys(coll_t(pgid), ghobject_t(obj), newattrs);
-      r = store->apply_transaction(t);
+      r = store->apply_transaction(service->meta_osr.get(), t);
       if (r < 0)
         ss << "error=" << r;
       else
@@ -4141,7 +4154,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
 
       keys.insert(key);
       t.omap_rmkeys(coll_t(pgid), ghobject_t(obj), keys);
-      r = store->apply_transaction(t);
+      r = store->apply_transaction(service->meta_osr.get(), t);
       if (r < 0)
         ss << "error=" << r;
       else
@@ -4153,7 +4166,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       cmd_getval(service->cct, cmdmap, "header", headerstr);
       newheader.append(headerstr);
       t.omap_setheader(coll_t(pgid), ghobject_t(obj), newheader);
-      r = store->apply_transaction(t);
+      r = store->apply_transaction(service->meta_osr.get(), t);
       if (r < 0)
         ss << "error=" << r;
       else
@@ -4176,7 +4189,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       int64_t trunclen;
       cmd_getval(service->cct, cmdmap, "len", trunclen);
       t.truncate(coll_t(pgid), ghobject_t(obj), trunclen);
-      r = store->apply_transaction(t);
+      r = store->apply_transaction(service->meta_osr.get(), t);
       if (r < 0)
        ss << "error=" << r;
       else
@@ -5051,6 +5064,9 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
     cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30);
     cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20);
 
+    ceph::shared_ptr<ObjectStore::Sequencer> osr(
+      new ObjectStore::Sequencer("bench"));
+
     uint32_t duration = g_conf->osd_bench_duration;
 
     if (bsize > (int64_t) g_conf->osd_bench_max_block_size) {
@@ -5115,7 +5131,13 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
 
     ObjectStore::Transaction *cleanupt = new ObjectStore::Transaction;
 
-    store->sync_and_flush();
+    {
+      C_SaferCond waiter;
+      if (!osr->flush_commit(&waiter)) {
+       waiter.wait();
+      }
+    }
+
     utime_t start = ceph_clock_now(cct);
     for (int64_t pos = 0; pos < count; pos += bsize) {
       char nm[30];
@@ -5124,14 +5146,20 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, buffe
       hobject_t soid(sobject_t(oid, 0));
       ObjectStore::Transaction *t = new ObjectStore::Transaction;
       t->write(coll_t::meta(), ghobject_t(soid), 0, bsize, bl);
-      store->queue_transaction_and_cleanup(NULL, t);
+      store->queue_transaction_and_cleanup(osr.get(), t);
       cleanupt->remove(coll_t::meta(), ghobject_t(soid));
     }
-    store->sync_and_flush();
+
+    {
+      C_SaferCond waiter;
+      if (!osr->flush_commit(&waiter)) {
+       waiter.wait();
+      }
+    }
     utime_t end = ceph_clock_now(cct);
 
     // clean up
-    store->queue_transaction_and_cleanup(NULL, cleanupt);
+    store->queue_transaction_and_cleanup(osr.get(), cleanupt);
 
     uint64_t rate = (double)count / (end - start);
     if (f) {
index d005e713091b7e3029b633ea46fa8f7932c9f99e..9a762997bcd456d1db68840e0ae876f634bd1129 100644 (file)
@@ -388,6 +388,7 @@ public:
   OSD *osd;
   CephContext *cct;
   SharedPtrRegistry<spg_t, ObjectStore::Sequencer> osr_registry;
+  ceph::shared_ptr<ObjectStore::Sequencer> meta_osr;
   SharedPtrRegistry<spg_t, DeletingState> deleting_pgs;
   const int whoami;
   ObjectStore *&store;