From: Sage Weil Date: Mon, 6 Apr 2015 20:55:46 +0000 (-0700) Subject: osd: explicitly use a 'meta' Sequencer X-Git-Tag: v9.1.0~324^2~28 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e7bbafa3bfbd5e936a8be026a30b83a89f6121c3;p=ceph.git osd: explicitly use a 'meta' Sequencer Use an explicit Sequencer for all transactions. Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 96fae67a5ce..4fbcf7fc7a5 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -195,6 +195,7 @@ CompatSet OSD::get_osd_compat_set() { OSDService::OSDService(OSD *osd) : osd(osd), cct(osd->cct), + meta_osr(new ObjectStore::Sequencer("meta")), whoami(osd->whoami), store(osd->store), log_client(osd->log_client), clog(osd->clog), pg_recovery_stats(osd->pg_recovery_stats), @@ -1323,6 +1324,9 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev, { int ret; + ceph::shared_ptr osr( + new ObjectStore::Sequencer("mkfs")); + try { // if we are fed a uuid for this osd, use it. store->set_fsid(cct->_conf->osd_uuid); @@ -1373,7 +1377,7 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev, ObjectStore::Transaction t; t.create_collection(coll_t::meta(), 0); t.write(coll_t::meta(), OSD_SUPERBLOCK_POBJECT, 0, bl.length(), bl); - ret = store->apply_transaction(t); + ret = store->apply_transaction(osr.get(), t); if (ret) { derr << "OSD::mkfs: error while writing OSD_SUPERBLOCK_POBJECT: " << "apply_transaction returned " << ret << dendl; @@ -1381,7 +1385,10 @@ int OSD::mkfs(CephContext *cct, ObjectStore *store, const string &dev, } } - store->sync_and_flush(); + C_SaferCond waiter; + if (!osr->flush_commit(&waiter)) { + waiter.wait(); + } ret = write_meta(store, sb.cluster_fsid, sb.osd_fsid, whoami); if (ret) { @@ -1830,7 +1837,7 @@ int OSD::init() dout(5) << "Upgrading superblock adding: " << diff << dendl; ObjectStore::Transaction t; write_superblock(t); - r = store->apply_transaction(t); + r = store->apply_transaction(service.meta_osr.get(), t); if (r < 0) goto out; } @@ -1840,7 +1847,7 @@ int OSD::init() dout(10) << "init creating/touching snapmapper object" << dendl; ObjectStore::Transaction t; t.touch(coll_t::meta(), OSD::make_snapmapper_oid()); - r = store->apply_transaction(t); + r = store->apply_transaction(service.meta_osr.get(), t); if (r < 0) goto out; } @@ -2393,7 +2400,7 @@ int OSD::shutdown() superblock.clean_thru = osdmap->get_epoch(); ObjectStore::Transaction t; write_superblock(t); - int r = store->apply_transaction(t); + int r = store->apply_transaction(service.meta_osr.get(), t); if (r) { derr << "OSD::shutdown: error writing superblock: " << cpp_strerror(r) << dendl; @@ -2542,6 +2549,8 @@ void OSD::recursive_remove_collection(ObjectStore *store, spg_t pgid, coll_t tmp coll_t(), make_snapmapper_oid()); + ceph::shared_ptr osr( + new ObjectStore::Sequencer("rm")); ObjectStore::Transaction t; SnapMapper mapper(&driver, 0, 0, 0, pgid.shard); @@ -2560,16 +2569,20 @@ void OSD::recursive_remove_collection(ObjectStore *store, spg_t pgid, coll_t tmp assert(0); t.remove(tmp, *p); if (removed > 300) { - int r = store->apply_transaction(t); + int r = store->apply_transaction(osr.get(), t); assert(r == 0); t = ObjectStore::Transaction(); removed = 0; } } t.remove_collection(tmp); - int r = store->apply_transaction(t); + int r = store->apply_transaction(osr.get(), t); assert(r == 0); - store->sync_and_flush(); + + C_SaferCond waiter; + if (!osr->flush_commit(&waiter)) { + waiter.wait(); + } } @@ -2944,7 +2957,7 @@ void OSD::load_pgs() dout(1) << __func__ << " removing legacy infos object" << dendl; ObjectStore::Transaction t; t.remove(coll_t::meta(), OSD::make_infos_oid()); - int r = store->apply_transaction(t); + int r = store->apply_transaction(service.meta_osr.get(), t); if (r != 0) { derr << __func__ << ": apply_transaction returned " << cpp_strerror(r) << dendl; @@ -3111,13 +3124,13 @@ void OSD::build_past_intervals_parallel() // don't let the transaction get too big if (++num >= cct->_conf->osd_target_transaction_size) { - store->apply_transaction(t); + store->apply_transaction(service.meta_osr.get(), t); t = ObjectStore::Transaction(); num = 0; } } if (!t.empty()) - store->apply_transaction(t); + store->apply_transaction(service.meta_osr.get(), t); } /* @@ -4129,7 +4142,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, val.append(valstr); newattrs[key] = val; t.omap_setkeys(coll_t(pgid), ghobject_t(obj), newattrs); - r = store->apply_transaction(t); + r = store->apply_transaction(service->meta_osr.get(), t); if (r < 0) ss << "error=" << r; else @@ -4141,7 +4154,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, keys.insert(key); t.omap_rmkeys(coll_t(pgid), ghobject_t(obj), keys); - r = store->apply_transaction(t); + r = store->apply_transaction(service->meta_osr.get(), t); if (r < 0) ss << "error=" << r; else @@ -4153,7 +4166,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, cmd_getval(service->cct, cmdmap, "header", headerstr); newheader.append(headerstr); t.omap_setheader(coll_t(pgid), ghobject_t(obj), newheader); - r = store->apply_transaction(t); + r = store->apply_transaction(service->meta_osr.get(), t); if (r < 0) ss << "error=" << r; else @@ -4176,7 +4189,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store, int64_t trunclen; cmd_getval(service->cct, cmdmap, "len", trunclen); t.truncate(coll_t(pgid), ghobject_t(obj), trunclen); - r = store->apply_transaction(t); + r = store->apply_transaction(service->meta_osr.get(), t); if (r < 0) ss << "error=" << r; else @@ -5051,6 +5064,9 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe cmd_getval(cct, cmdmap, "count", count, (int64_t)1 << 30); cmd_getval(cct, cmdmap, "size", bsize, (int64_t)4 << 20); + ceph::shared_ptr osr( + new ObjectStore::Sequencer("bench")); + uint32_t duration = g_conf->osd_bench_duration; if (bsize > (int64_t) g_conf->osd_bench_max_block_size) { @@ -5115,7 +5131,13 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe ObjectStore::Transaction *cleanupt = new ObjectStore::Transaction; - store->sync_and_flush(); + { + C_SaferCond waiter; + if (!osr->flush_commit(&waiter)) { + waiter.wait(); + } + } + utime_t start = ceph_clock_now(cct); for (int64_t pos = 0; pos < count; pos += bsize) { char nm[30]; @@ -5124,14 +5146,20 @@ void OSD::do_command(Connection *con, ceph_tid_t tid, vector& cmd, buffe hobject_t soid(sobject_t(oid, 0)); ObjectStore::Transaction *t = new ObjectStore::Transaction; t->write(coll_t::meta(), ghobject_t(soid), 0, bsize, bl); - store->queue_transaction_and_cleanup(NULL, t); + store->queue_transaction_and_cleanup(osr.get(), t); cleanupt->remove(coll_t::meta(), ghobject_t(soid)); } - store->sync_and_flush(); + + { + C_SaferCond waiter; + if (!osr->flush_commit(&waiter)) { + waiter.wait(); + } + } utime_t end = ceph_clock_now(cct); // clean up - store->queue_transaction_and_cleanup(NULL, cleanupt); + store->queue_transaction_and_cleanup(osr.get(), cleanupt); uint64_t rate = (double)count / (end - start); if (f) { diff --git a/src/osd/OSD.h b/src/osd/OSD.h index d005e713091..9a762997bcd 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -388,6 +388,7 @@ public: OSD *osd; CephContext *cct; SharedPtrRegistry osr_registry; + ceph::shared_ptr meta_osr; SharedPtrRegistry deleting_pgs; const int whoami; ObjectStore *&store;