From a6a1e994f91eadbddc0d3c4dbd77b9100a810325 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Sun, 10 Aug 2014 15:08:33 -0700 Subject: [PATCH] mon: interact with MonitorDBStore::Transactions by shared_ptr Ref TransactionRef everywhere! Signed-off-by: Sage Weil --- src/ceph_mon.cc | 8 +- src/mon/AuthMonitor.cc | 8 +- src/mon/AuthMonitor.h | 4 +- src/mon/ConfigKeyService.cc | 7 +- src/mon/Elector.cc | 10 +- src/mon/LogMonitor.cc | 6 +- src/mon/LogMonitor.h | 6 +- src/mon/MDSMonitor.cc | 2 +- src/mon/MDSMonitor.h | 4 +- src/mon/Monitor.cc | 161 ++++++++++++++++--------------- src/mon/Monitor.h | 18 ++-- src/mon/MonitorDBStore.h | 40 ++++---- src/mon/MonmapMonitor.cc | 10 +- src/mon/MonmapMonitor.h | 4 +- src/mon/OSDMonitor.cc | 25 +++-- src/mon/OSDMonitor.h | 6 +- src/mon/PGMonitor.cc | 2 +- src/mon/PGMonitor.h | 4 +- src/mon/Paxos.cc | 101 +++++++++---------- src/mon/Paxos.h | 19 ++-- src/mon/PaxosService.cc | 30 +++--- src/mon/PaxosService.h | 22 +++-- src/tools/ceph_monstore_tool.cc | 26 ++--- src/tools/mon_store_converter.cc | 50 +++++----- 24 files changed, 293 insertions(+), 280 deletions(-) diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 0ddf833f20a27..e8ec3c51400f2 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -554,11 +554,11 @@ int main(int argc, const char **argv) ::encode(v, final); ::encode(mapbl, final); - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); // save it - t.put("monmap", v, mapbl); - t.put("monmap", "latest", final); - t.put("monmap", "last_committed", v); + t->put("monmap", v, mapbl); + t->put("monmap", "latest", final); + t->put("monmap", "last_committed", v); store->apply_transaction(t); dout(0) << "done." << dendl; diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index 288d478434912..80cd920b6b4d4 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -189,8 +189,8 @@ void AuthMonitor::update_from_paxos(bool *need_bootstrap) mon->key_server.set_ver(keys_ver); if (keys_ver == 1 && mon->is_keyring_required()) { - MonitorDBStore::Transaction t; - t.erase("mkfs", "keyring"); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->erase("mkfs", "keyring"); mon->store->apply_transaction(t); } } @@ -227,7 +227,7 @@ void AuthMonitor::create_pending() dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; } -void AuthMonitor::encode_pending(MonitorDBStore::Transaction *t) +void AuthMonitor::encode_pending(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << " v " << (get_last_committed() + 1) << dendl; @@ -244,7 +244,7 @@ void AuthMonitor::encode_pending(MonitorDBStore::Transaction *t) put_last_committed(t, version); } -void AuthMonitor::encode_full(MonitorDBStore::Transaction *t) +void AuthMonitor::encode_full(MonitorDBStore::TransactionRef t) { version_t version = mon->key_server.get_ver(); // do not stash full version 0 as it will never be removed nor read diff --git a/src/mon/AuthMonitor.h b/src/mon/AuthMonitor.h index ab95cb7a2abec..2f15518ce8fad 100644 --- a/src/mon/AuthMonitor.h +++ b/src/mon/AuthMonitor.h @@ -133,8 +133,8 @@ private: void increase_max_global_id(); uint64_t assign_global_id(MAuth *m, bool should_increase_max); // propose pending update to peers - void encode_pending(MonitorDBStore::Transaction *t); - virtual void encode_full(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); + virtual void encode_full(MonitorDBStore::TransactionRef t); version_t get_trim_to(); bool preprocess_query(PaxosServiceMessage *m); // true if processed. diff --git a/src/mon/ConfigKeyService.cc b/src/mon/ConfigKeyService.cc index d2e204d4aa99d..9b1eacb035faa 100644 --- a/src/mon/ConfigKeyService.cc +++ b/src/mon/ConfigKeyService.cc @@ -48,10 +48,9 @@ int ConfigKeyService::store_get(string key, bufferlist &bl) void ConfigKeyService::store_put(string key, bufferlist &bl, Context *cb) { bufferlist proposal_bl; - MonitorDBStore::Transaction t; - t.put(STORE_PREFIX, key, bl); - t.encode(proposal_bl); - + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(STORE_PREFIX, key, bl); + t->encode(proposal_bl); paxos->propose_new_value(proposal_bl, cb); } diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc index 8f682221893da..1ea2179a76055 100644 --- a/src/mon/Elector.cc +++ b/src/mon/Elector.cc @@ -52,8 +52,8 @@ void Elector::bump_epoch(epoch_t e) dout(10) << "bump_epoch " << epoch << " to " << e << dendl; assert(epoch <= e); epoch = e; - MonitorDBStore::Transaction t; - t.put(Monitor::MONITOR_NAME, "election_epoch", epoch); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(Monitor::MONITOR_NAME, "election_epoch", epoch); mon->store->apply_transaction(t); mon->join_election(); @@ -429,9 +429,9 @@ void Elector::dispatch(Message *m) << ", taking it" << dendl; mon->monmap->decode(em->monmap_bl); - MonitorDBStore::Transaction t; - t.put("monmap", mon->monmap->epoch, em->monmap_bl); - t.put("monmap", "last_committed", mon->monmap->epoch); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put("monmap", mon->monmap->epoch, em->monmap_bl); + t->put("monmap", "last_committed", mon->monmap->epoch); mon->store->apply_transaction(t); //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl); cancel_timer(); diff --git a/src/mon/LogMonitor.cc b/src/mon/LogMonitor.cc index fc46bd26f7957..4f433a2bb6dad 100644 --- a/src/mon/LogMonitor.cc +++ b/src/mon/LogMonitor.cc @@ -170,7 +170,7 @@ void LogMonitor::update_from_paxos(bool *need_bootstrap) check_subs(); } -void LogMonitor::store_do_append(MonitorDBStore::Transaction *t, +void LogMonitor::store_do_append(MonitorDBStore::TransactionRef t, const string& key, bufferlist& bl) { bufferlist existing_bl; @@ -188,7 +188,7 @@ void LogMonitor::create_pending() dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl; } -void LogMonitor::encode_pending(MonitorDBStore::Transaction *t) +void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t) { version_t version = get_last_committed() + 1; bufferlist bl; @@ -203,7 +203,7 @@ void LogMonitor::encode_pending(MonitorDBStore::Transaction *t) put_last_committed(t, version); } -void LogMonitor::encode_full(MonitorDBStore::Transaction *t) +void LogMonitor::encode_full(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << " log v " << summary.version << dendl; assert(get_last_committed() == summary.version); diff --git a/src/mon/LogMonitor.h b/src/mon/LogMonitor.h index 439ea4d999b3e..799e4ee850de5 100644 --- a/src/mon/LogMonitor.h +++ b/src/mon/LogMonitor.h @@ -37,8 +37,8 @@ private: void update_from_paxos(bool *need_bootstrap); void create_pending(); // prepare a new pending // propose pending update to peers - void encode_pending(MonitorDBStore::Transaction *t); - virtual void encode_full(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); + virtual void encode_full(MonitorDBStore::TransactionRef t); version_t get_trim_to(); bool preprocess_query(PaxosServiceMessage *m); // true if processed. bool prepare_update(PaxosServiceMessage *m); @@ -74,7 +74,7 @@ private: bool _create_sub_summary(MLog *mlog, int level); void _create_sub_incremental(MLog *mlog, int level, version_t sv); - void store_do_append(MonitorDBStore::Transaction *t, + void store_do_append(MonitorDBStore::TransactionRef t, const string& key, bufferlist& bl); public: diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 3e22a5431a852..f6ef9c118e98e 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -120,7 +120,7 @@ void MDSMonitor::create_pending() dout(10) << "create_pending e" << pending_mdsmap.epoch << dendl; } -void MDSMonitor::encode_pending(MonitorDBStore::Transaction *t) +void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t) { dout(10) << "encode_pending e" << pending_mdsmap.epoch << dendl; diff --git a/src/mon/MDSMonitor.h b/src/mon/MDSMonitor.h index fda64e1484cd8..b1c5a961709fd 100644 --- a/src/mon/MDSMonitor.h +++ b/src/mon/MDSMonitor.h @@ -71,9 +71,9 @@ class MDSMonitor : public PaxosService { void create_initial(); void update_from_paxos(bool *need_bootstrap); void create_pending(); - void encode_pending(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); // we don't require full versions; don't encode any. - virtual void encode_full(MonitorDBStore::Transaction *t) { } + virtual void encode_full(MonitorDBStore::TransactionRef t) { } void update_logger(); diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9595417760735..4b357a8af756f 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -355,8 +355,8 @@ void Monitor::read_features_off_disk(MonitorDBStore *store, CompatSet *features) bufferlist bl; features->encode(bl); - MonitorDBStore::Transaction t; - t.put(MONITOR_NAME, COMPAT_SET_LOC, bl); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); store->apply_transaction(t); } else { bufferlist::iterator it = featuresbl.begin(); @@ -373,11 +373,11 @@ void Monitor::read_features() dout(10) << "required_features " << required_features << dendl; } -void Monitor::write_features(MonitorDBStore::Transaction &t) +void Monitor::write_features(MonitorDBStore::TransactionRef t) { bufferlist bl; features.encode(bl); - t.put(MONITOR_NAME, COMPAT_SET_LOC, bl); + t->put(MONITOR_NAME, COMPAT_SET_LOC, bl); } const char** Monitor::get_tracked_conf_keys() const @@ -1033,14 +1033,14 @@ void Monitor::sync_start(entity_inst_t &other, bool full) if (sync_full) { // stash key state, and mark that we are syncing - MonitorDBStore::Transaction t; - sync_stash_critical_state(&t); - t.put("mon_sync", "in_sync", 1); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + sync_stash_critical_state(t); + t->put("mon_sync", "in_sync", 1); sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version()); dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor " << sync_last_committed_floor << dendl; - t.put("mon_sync", "last_committed_floor", sync_last_committed_floor); + t->put("mon_sync", "last_committed_floor", sync_last_committed_floor); store->apply_transaction(t); @@ -1071,7 +1071,7 @@ void Monitor::sync_start(entity_inst_t &other, bool full) messenger->send_message(m, sync_provider); } -void Monitor::sync_stash_critical_state(MonitorDBStore::Transaction *t) +void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t) { dout(10) << __func__ << dendl; bufferlist backup_monmap; @@ -1097,13 +1097,14 @@ void Monitor::sync_finish(version_t last_committed) if (sync_full) { // finalize the paxos commits - MonitorDBStore::Transaction tx; - paxos->read_and_prepare_transactions(&tx, sync_start_version, last_committed); - tx.put(paxos->get_name(), "last_committed", last_committed); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + paxos->read_and_prepare_transactions(tx, sync_start_version, + last_committed); + tx->put(paxos->get_name(), "last_committed", last_committed); dout(30) << __func__ << " final tx dump:\n"; JSONFormatter f(true); - tx.dump(&f); + tx->dump(&f); f.flush(*_dout); *_dout << dendl; @@ -1112,10 +1113,10 @@ void Monitor::sync_finish(version_t last_committed) assert(g_conf->mon_sync_requester_kill_at != 8); - MonitorDBStore::Transaction t; - t.erase("mon_sync", "in_sync"); - t.erase("mon_sync", "force_sync"); - t.erase("mon_sync", "last_committed_floor"); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->erase("mon_sync", "in_sync"); + t->erase("mon_sync", "force_sync"); + t->erase("mon_sync", "last_committed_floor"); store->apply_transaction(t); assert(g_conf->mon_sync_requester_kill_at != 9); @@ -1248,16 +1249,17 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) } MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie); - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); int left = g_conf->mon_sync_max_payload_size; while (sp.last_committed < paxos->get_version() && left > 0) { bufferlist bl; sp.last_committed++; store->get(paxos->get_name(), sp.last_committed, bl); - tx.put(paxos->get_name(), sp.last_committed, bl); + tx->put(paxos->get_name(), sp.last_committed, bl); left -= bl.length(); - dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl; + dout(20) << __func__ << " including paxos state " << sp.last_committed + << dendl; } reply->last_committed = sp.last_committed; @@ -1269,9 +1271,11 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) if ((sp.full && sp.synchronizer->has_next_chunk()) || sp.last_committed < paxos->get_version()) { - dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; + dout(10) << __func__ << " chunk, through version " << sp.last_committed + << " key " << sp.last_key << dendl; } else { - dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl; + dout(10) << __func__ << " last chunk, through version " << sp.last_committed + << " key " << sp.last_key << dendl; reply->op = MMonSync::OP_LAST_CHUNK; assert(g_conf->mon_sync_provider_kill_at != 3); @@ -1280,7 +1284,7 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) sync_providers.erase(sp.cookie); } - ::encode(tx, reply->chunk_bl); + ::encode(*tx, reply->chunk_bl); m->get_connection()->send_message(reply); } @@ -1336,12 +1340,12 @@ void Monitor::handle_sync_chunk(MMonSync *m) assert(state == STATE_SYNCHRONIZING); assert(g_conf->mon_sync_requester_kill_at != 5); - MonitorDBStore::Transaction tx; - tx.append_from_encoded(m->chunk_bl); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->append_from_encoded(m->chunk_bl); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); - tx.dump(&f); + tx->dump(&f); f.flush(*_dout); *_dout << dendl; @@ -1351,13 +1355,14 @@ void Monitor::handle_sync_chunk(MMonSync *m) if (!sync_full) { dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl; - MonitorDBStore::Transaction tx; - paxos->read_and_prepare_transactions(&tx, paxos->get_version() + 1, m->last_committed); - tx.put(paxos->get_name(), "last_committed", m->last_committed); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1, + m->last_committed); + tx->put(paxos->get_name(), "last_committed", m->last_committed); dout(30) << __func__ << " tx dump:\n"; JSONFormatter f(true); - tx.dump(&f); + tx->dump(&f); f.flush(*_dout); *_dout << dendl; @@ -1804,7 +1809,7 @@ void Monitor::apply_quorum_to_compatset_features() dout(1) << __func__ << " enabling new quorum features: " << diff << dendl; features = new_features; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); write_features(t); store->apply_transaction(t); @@ -1834,9 +1839,9 @@ void Monitor::sync_force(Formatter *f, ostream& ss) free_formatter = true; } - MonitorDBStore::Transaction tx; - sync_stash_critical_state(&tx); - tx.put("mon_sync", "force_sync", 1); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + sync_stash_critical_state(tx); + tx->put("mon_sync", "force_sync", 1); store->apply_transaction(tx); f->open_object_section("sync_force"); @@ -3944,17 +3949,17 @@ void Monitor::tick() if (is_leader() && paxos->is_active() && fingerprint.is_zero()) { // this is only necessary on upgraded clusters. - MonitorDBStore::Transaction t; - prepare_new_fingerprint(&t); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + prepare_new_fingerprint(t); bufferlist tbl; - t.encode(tbl); + t->encode(tbl); paxos->propose_new_value(tbl, new C_NoopContext); } new_tick(); } -void Monitor::prepare_new_fingerprint(MonitorDBStore::Transaction *t) +void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t) { uuid_d nf; nf.generate_random(); @@ -3999,13 +4004,13 @@ int Monitor::check_fsid() int Monitor::write_fsid() { - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); int r = write_fsid(t); store->apply_transaction(t); return r; } -int Monitor::write_fsid(MonitorDBStore::Transaction &t) +int Monitor::write_fsid(MonitorDBStore::TransactionRef t) { ostringstream ss; ss << monmap->get_fsid() << "\n"; @@ -4014,7 +4019,7 @@ int Monitor::write_fsid(MonitorDBStore::Transaction &t) bufferlist b; b.append(us); - t.put(MONITOR_NAME, "cluster_uuid", b); + t->put(MONITOR_NAME, "cluster_uuid", b); return 0; } @@ -4024,7 +4029,7 @@ int Monitor::write_fsid(MonitorDBStore::Transaction &t) */ int Monitor::mkfs(bufferlist& osdmapbl) { - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); // verify cluster fsid int r = check_fsid(); @@ -4034,7 +4039,7 @@ int Monitor::mkfs(bufferlist& osdmapbl) bufferlist magicbl; magicbl.append(CEPH_MON_ONDISK_MAGIC); magicbl.append("\n"); - t.put(MONITOR_NAME, "magic", magicbl); + t->put(MONITOR_NAME, "magic", magicbl); features = get_supported_features(); @@ -4044,7 +4049,7 @@ int Monitor::mkfs(bufferlist& osdmapbl) bufferlist monmapbl; monmap->encode(monmapbl, CEPH_FEATURES_ALL); monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos() - t.put("mkfs", "monmap", monmapbl); + t->put("mkfs", "monmap", monmapbl); if (osdmapbl.length()) { // make sure it's a valid osdmap @@ -4056,7 +4061,7 @@ int Monitor::mkfs(bufferlist& osdmapbl) derr << "error decoding provided osdmap: " << e.what() << dendl; return -EINVAL; } - t.put("mkfs", "osdmap", osdmapbl); + t->put("mkfs", "osdmap", osdmapbl); } if (is_keyring_required()) { @@ -4094,7 +4099,7 @@ int Monitor::mkfs(bufferlist& osdmapbl) bufferlist keyringbl; keyring.encode_plaintext(keyringbl); - t.put("mkfs", "keyring", keyringbl); + t->put("mkfs", "keyring", keyringbl); } write_fsid(t); store->apply_transaction(t); @@ -4247,7 +4252,7 @@ bool Monitor::ms_verify_authorizer(Connection *con, int peer_type, #define dout_prefix *_dout void Monitor::StoreConverter::_convert_finish_features( - MonitorDBStore::Transaction &t) + MonitorDBStore::TransactionRef t) { dout(20) << __func__ << dendl; @@ -4271,7 +4276,7 @@ void Monitor::StoreConverter::_convert_finish_features( features.encode(features_bl); dout(20) << __func__ << " new features " << features << dendl; - t.put(MONITOR_NAME, COMPAT_SET_LOC, features_bl); + t->put(MONITOR_NAME, COMPAT_SET_LOC, features_bl); } @@ -4366,11 +4371,11 @@ void Monitor::StoreConverter::_convert_monitor() assert(store->exists_bl_ss("feature_set")); assert(store->exists_bl_ss("election_epoch")); - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); if (store->exists_bl_ss("joined")) { version_t joined = store->get_int("joined"); - tx.put(MONITOR_NAME, "joined", joined); + tx->put(MONITOR_NAME, "joined", joined); } vector keys; @@ -4386,12 +4391,12 @@ void Monitor::StoreConverter::_convert_monitor() bufferlist bl; int r = store->get_bl_ss(bl, (*it).c_str(), 0); assert(r > 0); - tx.put(MONITOR_NAME, *it, bl); + tx->put(MONITOR_NAME, *it, bl); } version_t election_epoch = store->get_int("election_epoch"); - tx.put(MONITOR_NAME, "election_epoch", election_epoch); + tx->put(MONITOR_NAME, "election_epoch", election_epoch); - assert(!tx.empty()); + assert(!tx->empty()); db->apply_transaction(tx); dout(10) << __func__ << " finished" << dendl; } @@ -4436,9 +4441,9 @@ void Monitor::StoreConverter::_convert_machines(string machine) dout(20) << __func__ << " " << machine << " ver " << ver << " bl " << bl.length() << dendl; - MonitorDBStore::Transaction tx; - tx.put(machine, ver, bl); - tx.put(machine, "last_committed", ver); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put(machine, ver, bl); + tx->put(machine, "last_committed", ver); if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) { stringstream s; @@ -4449,7 +4454,7 @@ void Monitor::StoreConverter::_convert_machines(string machine) dout(20) << __func__ << " " << machine << " ver " << ver << " -> " << gv << dendl; - MonitorDBStore::Transaction paxos_tx; + MonitorDBStore::TransactionRef paxos_tx(new MonitorDBStore::Transaction); if (gvs.count(gv) == 0) { gvs.insert(gv); @@ -4481,16 +4486,16 @@ void Monitor::StoreConverter::_convert_machines(string machine) bufferlist paxos_bl; int r = db->get("paxos", gv, paxos_bl); assert(r >= 0); - paxos_tx.append_from_encoded(paxos_bl); + paxos_tx->append_from_encoded(paxos_bl); } gv_map[gv].insert(make_pair(machine,ver)); bufferlist tx_bl; - tx.encode(tx_bl); - paxos_tx.append_from_encoded(tx_bl); + tx->encode(tx_bl); + paxos_tx->append_from_encoded(tx_bl); bufferlist paxos_bl; - paxos_tx.encode(paxos_bl); - tx.put("paxos", gv, paxos_bl); + paxos_tx->encode(paxos_bl); + tx->put("paxos", gv, paxos_bl); } db->apply_transaction(tx); } @@ -4499,10 +4504,10 @@ void Monitor::StoreConverter::_convert_machines(string machine) dout(20) << __func__ << " lc " << lc << " last_committed " << last_committed << dendl; assert(lc == last_committed); - MonitorDBStore::Transaction tx; - tx.put(machine, "first_committed", first_committed); - tx.put(machine, "last_committed", last_committed); - tx.put(machine, "conversion_first", first_committed); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put(machine, "first_committed", first_committed); + tx->put(machine, "last_committed", last_committed); + tx->put(machine, "conversion_first", first_committed); if (store->exists_bl_ss(machine.c_str(), "latest")) { bufferlist latest_bl_raw; @@ -4514,7 +4519,7 @@ void Monitor::StoreConverter::_convert_machines(string machine) goto out; } - tx.put(machine, "latest", latest_bl_raw); + tx->put(machine, "latest", latest_bl_raw); bufferlist::iterator lbl_it = latest_bl_raw.begin(); bufferlist latest_bl; @@ -4525,10 +4530,10 @@ void Monitor::StoreConverter::_convert_machines(string machine) dout(20) << __func__ << " machine " << machine << " latest ver " << latest_ver << dendl; - tx.put(machine, "full_latest", latest_ver); + tx->put(machine, "full_latest", latest_ver); stringstream os; os << "full_" << latest_ver; - tx.put(machine, os.str(), latest_bl); + tx->put(machine, os.str(), latest_bl); } out: db->apply_transaction(tx); @@ -4558,8 +4563,8 @@ void Monitor::StoreConverter::_convert_osdmap_full() << " bl " << bl.length() << " bytes" << dendl; string full_key = "full_" + stringify(ver); - MonitorDBStore::Transaction tx; - tx.put("osdmap", full_key, bl); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put("osdmap", full_key, bl); db->apply_transaction(tx); } dout(10) << __func__ << " found " << err << " conversion errors!" << dendl; @@ -4592,18 +4597,18 @@ void Monitor::StoreConverter::_convert_paxos() // erase all paxos versions between [first, last_gv[, with first being the // first gv in the map. - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); set::iterator it = gvs.begin(); dout(1) << __func__ << " first gv " << (*it) << " last gv " << last_gv << dendl; for (; it != gvs.end() && (*it < last_gv); ++it) { - tx.erase("paxos", *it); + tx->erase("paxos", *it); } - tx.put("paxos", "first_committed", last_gv); - tx.put("paxos", "last_committed", highest_gv); - tx.put("paxos", "accepted_pn", highest_accepted_pn); - tx.put("paxos", "last_pn", highest_last_pn); - tx.put("paxos", "conversion_first", last_gv); + tx->put("paxos", "first_committed", last_gv); + tx->put("paxos", "last_committed", highest_gv); + tx->put("paxos", "accepted_pn", highest_accepted_pn); + tx->put("paxos", "last_pn", highest_last_pn); + tx->put("paxos", "conversion_first", last_gv); db->apply_transaction(tx); dout(10) << __func__ << " finished" << dendl; diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 2f079116d22fc..6f17219f2d42d 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -206,7 +206,7 @@ public: const utime_t &get_leader_since() const; - void prepare_new_fingerprint(MonitorDBStore::Transaction *t); + void prepare_new_fingerprint(MonitorDBStore::TransactionRef t); // -- elector -- private: @@ -373,7 +373,7 @@ private: * We store a few things on the side that we don't want to get clobbered by sync. This * includes the latest monmap and a lower bound on last_committed. */ - void sync_stash_critical_state(MonitorDBStore::Transaction *tx); + void sync_stash_critical_state(MonitorDBStore::TransactionRef tx); /** * reset the sync timeout @@ -780,7 +780,7 @@ public: /// read the ondisk features into the CompatSet pointed to by read_features static void read_features_off_disk(MonitorDBStore *store, CompatSet *read_features); void read_features(); - void write_features(MonitorDBStore::Transaction &t); + void write_features(MonitorDBStore::TransactionRef t); public: Monitor(CephContext *cct_, string nm, MonitorDBStore *s, @@ -819,7 +819,7 @@ public: * @return 0 on success, or negative error code */ int write_fsid(); - int write_fsid(MonitorDBStore::Transaction &t); + int write_fsid(MonitorDBStore::TransactionRef t); void do_admin_command(std::string command, cmdmap_t& cmdmap, std::string format, ostream& ss); @@ -887,15 +887,15 @@ public: } void _mark_convert_start() { - MonitorDBStore::Transaction tx; - tx.put("mon_convert", "on_going", 1); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put("mon_convert", "on_going", 1); db->apply_transaction(tx); } - void _convert_finish_features(MonitorDBStore::Transaction &t); + void _convert_finish_features(MonitorDBStore::TransactionRef t); void _mark_convert_finish() { - MonitorDBStore::Transaction tx; - tx.erase("mon_convert", "on_going"); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->erase("mon_convert", "on_going"); _convert_finish_features(tx); db->apply_transaction(tx); } diff --git a/src/mon/MonitorDBStore.h b/src/mon/MonitorDBStore.h index 42e14f2a2f04f..a042be639f699 100644 --- a/src/mon/MonitorDBStore.h +++ b/src/mon/MonitorDBStore.h @@ -84,6 +84,8 @@ class MonitorDBStore } }; + struct Transaction; + typedef ceph::shared_ptr TransactionRef; struct Transaction { list ops; uint64_t bytes, keys; @@ -163,16 +165,16 @@ class MonitorDBStore ls.back()->compact_range("prefix4", "from", "to"); } - void append(Transaction& other) { - ops.splice(ops.end(), other.ops); - keys += other.keys; - bytes += other.bytes; + void append(TransactionRef other) { + ops.splice(ops.end(), other->ops); + keys += other->keys; + bytes += other->bytes; } void append_from_encoded(bufferlist& bl) { - Transaction other; + TransactionRef other(new Transaction); bufferlist::iterator it = bl.begin(); - other.decode(it); + other->decode(it); append(other); } @@ -244,17 +246,19 @@ class MonitorDBStore } }; - int apply_transaction(const MonitorDBStore::Transaction& t) { + int apply_transaction(MonitorDBStore::TransactionRef t) { KeyValueDB::Transaction dbt = db->get_transaction(); if (do_dump) { bufferlist bl; - t.encode(bl); + t->encode(bl); bl.write_fd(dump_fd); } list > > compact; - for (list::const_iterator it = t.ops.begin(); it != t.ops.end(); ++it) { + for (list::const_iterator it = t->ops.begin(); + it != t->ops.end(); + ++it) { const Op& op = *it; switch (op.type) { case Transaction::OP_PUT: @@ -295,26 +299,26 @@ class MonitorDBStore StoreIteratorImpl() : done(false) { } virtual ~StoreIteratorImpl() { } - bool add_chunk_entry(Transaction &tx, + bool add_chunk_entry(TransactionRef tx, string &prefix, string &key, bufferlist &value, uint64_t max) { - Transaction tmp; + TransactionRef tmp(new Transaction); bufferlist tmp_bl; - tmp.put(prefix, key, value); - tmp.encode(tmp_bl); + tmp->put(prefix, key, value); + tmp->encode(tmp_bl); bufferlist tx_bl; - tx.encode(tx_bl); + tx->encode(tx_bl); size_t len = tx_bl.length() + tmp_bl.length(); - if (!tx.empty() && (len > max)) { + if (!tx->empty() && (len > max)) { return false; } - tx.append(tmp); + tx->append(tmp); last_key.first = prefix; last_key.second = key; @@ -341,7 +345,7 @@ class MonitorDBStore virtual bool has_next_chunk() { return !done && _is_valid(); } - virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0; + virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0; virtual pair get_next_key() = 0; }; typedef ceph::shared_ptr Synchronizer; @@ -369,7 +373,7 @@ class MonitorDBStore * differ from the one passed on to the function) * @param last_key[out] Last key in the chunk */ - virtual void get_chunk_tx(Transaction &tx, uint64_t max) { + virtual void get_chunk_tx(TransactionRef tx, uint64_t max) { assert(done == false); assert(iter->valid() == true); diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc index 3890704e852dc..957a4c2f0e80d 100644 --- a/src/mon/MonmapMonitor.cc +++ b/src/mon/MonmapMonitor.cc @@ -72,8 +72,8 @@ void MonmapMonitor::update_from_paxos(bool *need_bootstrap) mon->monmap->decode(monmap_bl); if (mon->store->exists("mkfs", "monmap")) { - MonitorDBStore::Transaction t; - t.erase("mkfs", "monmap"); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->erase("mkfs", "monmap"); mon->store->apply_transaction(t); } } @@ -86,7 +86,7 @@ void MonmapMonitor::create_pending() dout(10) << "create_pending monmap epoch " << pending_map.epoch << dendl; } -void MonmapMonitor::encode_pending(MonitorDBStore::Transaction *t) +void MonmapMonitor::encode_pending(MonitorDBStore::TransactionRef t) { dout(10) << "encode_pending epoch " << pending_map.epoch << dendl; @@ -116,8 +116,8 @@ void MonmapMonitor::on_active() single-threaded process and, truth be told, no one else relies on this thing besides us. */ - MonitorDBStore::Transaction t; - t.put(Monitor::MONITOR_NAME, "joined", 1); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(Monitor::MONITOR_NAME, "joined", 1); mon->store->apply_transaction(t); mon->has_ever_joined = true; } diff --git a/src/mon/MonmapMonitor.h b/src/mon/MonmapMonitor.h index 198489d701772..22b51ad64558e 100644 --- a/src/mon/MonmapMonitor.h +++ b/src/mon/MonmapMonitor.h @@ -50,9 +50,9 @@ class MonmapMonitor : public PaxosService { void create_pending(); - void encode_pending(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); // we always encode the full map; we have no use for full versions - virtual void encode_full(MonitorDBStore::Transaction *t) { } + virtual void encode_full(MonitorDBStore::TransactionRef t) { } void on_active(); diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 268bee849bf29..1923bfc5645c2 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -174,8 +174,8 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap) // state, and we shouldn't want to work around it without knowing what // exactly happened. assert(latest_full > 0); - MonitorDBStore::Transaction t; - put_version_latest_full(&t, latest_full); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + put_version_latest_full(t, latest_full); mon->store->apply_transaction(t); dout(10) << __func__ << " updated the on-disk full map version to " << latest_full << dendl; @@ -190,7 +190,7 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap) } // walk through incrementals - MonitorDBStore::Transaction *t = NULL; + MonitorDBStore::TransactionRef t; size_t tx_size = 0; while (version > osdmap.epoch) { bufferlist inc_bl; @@ -203,8 +203,8 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap) err = osdmap.apply_incremental(inc); assert(err == 0); - if (t == NULL) - t = new MonitorDBStore::Transaction; + if (!t) + t.reset(new MonitorDBStore::Transaction); // Write out the full map for all past epochs. Encode the full // map with the same features as the incremental. If we don't @@ -230,16 +230,14 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap) } if (tx_size > g_conf->mon_sync_max_payload_size*2) { - mon->store->apply_transaction(*t); - delete t; - t = NULL; + mon->store->apply_transaction(t); + t = MonitorDBStore::TransactionRef(); tx_size = 0; } } - if (t != NULL) { - mon->store->apply_transaction(*t); - delete t; + if (t) { + mon->store->apply_transaction(t); } for (int o = 0; o < osdmap.get_max_osd(); o++) { @@ -602,7 +600,7 @@ void OSDMonitor::create_pending() * @note receiving a transaction in this function gives a fair amount of * freedom to the service implementation if it does need it. It shouldn't. */ -void OSDMonitor::encode_pending(MonitorDBStore::Transaction *t) +void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t) { dout(10) << "encode_pending e " << pending_inc.epoch << dendl; @@ -728,7 +726,8 @@ version_t OSDMonitor::get_trim_to() return 0; } -void OSDMonitor::encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first) +void OSDMonitor::encode_trim_extra(MonitorDBStore::TransactionRef tx, + version_t first) { dout(10) << __func__ << " including full map for e " << first << dendl; bufferlist bl; diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 650c55e2c0447..e28efba40229f 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -158,7 +158,7 @@ public: private: void update_from_paxos(bool *need_bootstrap); void create_pending(); // prepare a new pending - void encode_pending(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); void on_active(); void on_shutdown(); @@ -166,7 +166,7 @@ private: * we haven't delegated full version stashing to paxosservice for some time * now, making this function useless in current context. */ - virtual void encode_full(MonitorDBStore::Transaction *t) { } + virtual void encode_full(MonitorDBStore::TransactionRef t) { } /** * do not let paxosservice periodically stash full osdmaps, or we will break our * locally-managed full maps. (update_from_paxos loads the latest and writes them @@ -182,7 +182,7 @@ private: * This ensures that anyone post-sync will have enough to rebuild their * full osdmaps. */ - void encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first); + void encode_trim_extra(MonitorDBStore::TransactionRef tx, version_t first); void update_msgr_features(); int check_cluster_features(uint64_t features, stringstream &ss); diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index d6ff07225761e..0d5c684195eae 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -486,7 +486,7 @@ void PGMonitor::apply_pgmap_delta(bufferlist& bl) } -void PGMonitor::encode_pending(MonitorDBStore::Transaction *t) +void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t) { version_t version = pending_inc.version; dout(10) << __func__ << " v " << version << dendl; diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index f0073789bb5bd..97a9ac1194b95 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -69,7 +69,7 @@ private: version_t get_trim_to(); void update_logger(); - void encode_pending(MonitorDBStore::Transaction *t); + void encode_pending(MonitorDBStore::TransactionRef t); void read_pgmap_meta(); void read_pgmap_full(); void apply_pgmap_delta(bufferlist& bl); @@ -180,7 +180,7 @@ public: bool should_stash_full() { return false; // never } - virtual void encode_full(MonitorDBStore::Transaction *t) { + virtual void encode_full(MonitorDBStore::TransactionRef t) { assert(0 == "unimplemented encode_full"); } diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index 2dad7a07d0d69..4635c0ad0bfa3 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -43,7 +43,8 @@ MonitorDBStore *Paxos::get_store() return mon->store; } -void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t first, version_t last) +void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx, + version_t first, version_t last) { dout(10) << __func__ << " first " << first << " last " << last << dendl; for (version_t v = first; v <= last; ++v) { @@ -52,7 +53,7 @@ void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, versi int err = get_store()->get(get_name(), v, bl); assert(err == 0); assert(bl.length()); - decode_append_transaction(*tx, bl); + decode_append_transaction(tx, bl); } dout(15) << __func__ << " total versions " << (last-first) << dendl; } @@ -224,18 +225,18 @@ void Paxos::handle_collect(MMonPaxos *collect) dout(10) << "accepting pn " << accepted_pn << " from " << accepted_pn_from << dendl; - MonitorDBStore::Transaction t; - t.put(get_name(), "accepted_pn", accepted_pn); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(get_name(), "accepted_pn", accepted_pn); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_collect); - logger->inc(l_paxos_collect_keys, t.get_keys()); - logger->inc(l_paxos_collect_bytes, t.get_bytes()); + logger->inc(l_paxos_collect_keys, t->get_keys()); + logger->inc(l_paxos_collect_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); @@ -339,7 +340,7 @@ void Paxos::share_state(MMonPaxos *m, version_t peer_first_committed, */ bool Paxos::store_state(MMonPaxos *m) { - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); map::iterator start = m->values.begin(); bool changed = false; @@ -372,14 +373,14 @@ bool Paxos::store_state(MMonPaxos *m) } else { dout(10) << "store_state [" << start->first << ".." << last_committed << "]" << dendl; - t.put(get_name(), "last_committed", last_committed); + t->put(get_name(), "last_committed", last_committed); // we should apply the state here -- decode every single bufferlist in the // map and append the transactions to 't'. map::iterator it; for (it = start; it != end; ++it) { // write the bufferlist as the version's value - t.put(get_name(), it->first, it->second); + t->put(get_name(), it->first, it->second); // decode the bufferlist and append it to the transaction we will shortly // apply. decode_append_transaction(t, it->second); @@ -394,16 +395,16 @@ bool Paxos::store_state(MMonPaxos *m) uncommitted_value.clear(); } } - if (!t.empty()) { + if (!t->empty()) { dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_store_state); - logger->inc(l_paxos_store_state_bytes, t.get_bytes()); - logger->inc(l_paxos_store_state_keys, t.get_keys()); + logger->inc(l_paxos_store_state_bytes, t->get_bytes()); + logger->inc(l_paxos_store_state_keys, t->get_keys()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); @@ -426,14 +427,14 @@ bool Paxos::store_state(MMonPaxos *m) void Paxos::remove_legacy_versions() { if (get_store()->exists(get_name(), "conversion_first")) { - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); version_t v = get_store()->get(get_name(), "conversion_first"); dout(10) << __func__ << " removing pre-conversion paxos states from " << v << " until " << first_committed << dendl; for (; v < first_committed; ++v) { - t.erase(get_name(), v); + t->erase(get_name(), v); } - t.erase(get_name(), "conversion_first"); + t->erase(get_name(), "conversion_first"); get_store()->apply_transaction(t); } } @@ -618,41 +619,41 @@ void Paxos::begin(bufferlist& v) new_value = v; if (last_committed == 0) { - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); // initial base case; set first_committed too - t.put(get_name(), "first_committed", 1); + t->put(get_name(), "first_committed", 1); decode_append_transaction(t, new_value); bufferlist tx_bl; - t.encode(tx_bl); + t->encode(tx_bl); new_value = tx_bl; } // store the proposed value in the store. IF it is accepted, we will then // have to decode it into a transaction and apply it. - MonitorDBStore::Transaction t; - t.put(get_name(), last_committed+1, new_value); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(get_name(), last_committed+1, new_value); // note which pn this pending value is for. - t.put(get_name(), "pending_v", last_committed + 1); - t.put(get_name(), "pending_pn", accepted_pn); + t->put(get_name(), "pending_v", last_committed + 1); + t->put(get_name(), "pending_pn", accepted_pn); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); - MonitorDBStore::Transaction debug_tx; + MonitorDBStore::TransactionRef debug_tx(new MonitorDBStore::Transaction); bufferlist::iterator new_value_it = new_value.begin(); - debug_tx.decode(new_value_it); - debug_tx.dump(&f); + debug_tx->decode(new_value_it); + debug_tx->dump(&f); *_dout << "\nbl dump:\n"; f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_begin); - logger->inc(l_paxos_begin_keys, t.get_keys()); - logger->inc(l_paxos_begin_bytes, t.get_bytes()); + logger->inc(l_paxos_begin_keys, t->get_keys()); + logger->inc(l_paxos_begin_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); @@ -725,20 +726,20 @@ void Paxos::handle_begin(MMonPaxos *begin) dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl; // store the accepted value onto our store. We will have to decode it and // apply its transaction once we receive permission to commit. - MonitorDBStore::Transaction t; - t.put(get_name(), v, begin->values[v]); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(get_name(), v, begin->values[v]); // note which pn this pending value is for. - t.put(get_name(), "pending_v", v); - t.put(get_name(), "pending_pn", accepted_pn); + t->put(get_name(), "pending_v", v); + t->put(get_name(), "pending_pn", accepted_pn); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; - logger->inc(l_paxos_begin_bytes, t.get_bytes()); + logger->inc(l_paxos_begin_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); @@ -841,12 +842,12 @@ void Paxos::commit() assert(g_conf->paxos_kill_at != 7); - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); // commit locally last_committed++; last_commit_time = ceph_clock_now(g_ceph_context); - t.put(get_name(), "last_committed", last_committed); + t->put(get_name(), "last_committed", last_committed); // decode the value and apply its transaction to the store. // this value can now be read from last_committed. @@ -854,13 +855,13 @@ void Paxos::commit() dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; logger->inc(l_paxos_commit); - logger->inc(l_paxos_commit_keys, t.get_keys()); - logger->inc(l_paxos_commit_bytes, t.get_bytes()); + logger->inc(l_paxos_commit_keys, t->get_keys()); + logger->inc(l_paxos_commit_bytes, t->get_bytes()); utime_t start = ceph_clock_now(NULL); get_store()->apply_transaction(t); @@ -1173,26 +1174,26 @@ void Paxos::trim() dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); for (version_t v = first_committed; v < end; ++v) { dout(10) << "trim " << v << dendl; - t.erase(get_name(), v); + t->erase(get_name(), v); } - t.put(get_name(), "first_committed", end); + t->put(get_name(), "first_committed", end); if (g_conf->mon_compact_on_trim) { dout(10) << " compacting trimmed range" << dendl; - t.compact_range(get_name(), stringify(first_committed - 1), stringify(end)); + t->compact_range(get_name(), stringify(first_committed - 1), stringify(end)); } dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; bufferlist bl; - t.encode(bl); + t->encode(bl); trimming = true; queue_proposal(bl, new C_Trimmed(this)); @@ -1213,12 +1214,12 @@ version_t Paxos::get_new_proposal_number(version_t gt) last_pn += (version_t)mon->rank; // write - MonitorDBStore::Transaction t; - t.put(get_name(), "last_pn", last_pn); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + t->put(get_name(), "last_pn", last_pn); dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index b1ecedc16c837..c38debd9fdcd8 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -1071,7 +1071,8 @@ public: void dispatch(PaxosServiceMessage *m); - void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last); + void read_and_prepare_transactions(MonitorDBStore::TransactionRef tx, + version_t from, version_t last); void init(); @@ -1159,12 +1160,12 @@ public: * @param t The transaction to which we will append the operations * @param bl A bufferlist containing an encoded transaction */ - static void decode_append_transaction(MonitorDBStore::Transaction& t, - bufferlist& bl) { - MonitorDBStore::Transaction vt; + static void decode_append_transaction(MonitorDBStore::TransactionRef t, + bufferlist& bl) { + MonitorDBStore::TransactionRef vt(new MonitorDBStore::Transaction); bufferlist::iterator it = bl.begin(); - vt.decode(it); - t.append(vt); + vt->decode(it); + t->append(vt); } /** @@ -1353,11 +1354,11 @@ inline ostream& operator<<(ostream& out, Paxos::C_Proposal& p) out << " " << proposed << " queued " << (ceph_clock_now(NULL) - p.proposal_time) << " tx dump:\n"; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); bufferlist::iterator p_it = p.bl.begin(); - t.decode(p_it); + t->decode(p_it); JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(out); return out; } diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index 1b21689863bc6..aba8c959a94da 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -140,11 +140,11 @@ void PaxosService::remove_legacy_versions() dout(10) << __func__ << " conversion_first " << cf << " first committed " << fc << dendl; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); if (cf < fc) { - trim(&t, cf, fc); + trim(t, cf, fc); } - t.erase(get_service_name(), "conversion_first"); + t->erase(get_service_name(), "conversion_first"); mon->store->apply_transaction(t); } @@ -187,26 +187,26 @@ void PaxosService::propose_pending() * to encode whatever is pending on the implementation class into a * bufferlist, so we can then propose that as a value through Paxos. */ - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); bufferlist bl; if (should_stash_full()) - encode_full(&t); + encode_full(t); - encode_pending(&t); + encode_pending(t); have_pending = false; if (format_version > 0) { - t.put(get_service_name(), "format_version", format_version); + t->put(get_service_name(), "format_version", format_version); } dout(30) << __func__ << " transaction dump:\n"; JSONFormatter f(true); - t.dump(&f); + t->dump(&f); f.flush(*_dout); *_dout << dendl; - t.encode(bl); + t->encode(bl); // apply to paxos proposing = true; @@ -350,19 +350,19 @@ void PaxosService::maybe_trim() } dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl; - MonitorDBStore::Transaction t; - trim(&t, get_first_committed(), trim_to); - put_first_committed(&t, trim_to); + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); + trim(t, get_first_committed(), trim_to); + put_first_committed(t, trim_to); // let the service add any extra stuff - encode_trim_extra(&t, trim_to); + encode_trim_extra(t, trim_to); bufferlist bl; - t.encode(bl); + t->encode(bl); paxos->propose_new_value(bl, NULL); } -void PaxosService::trim(MonitorDBStore::Transaction *t, +void PaxosService::trim(MonitorDBStore::TransactionRef t, version_t from, version_t to) { dout(10) << __func__ << " from " << from << " to " << to << dendl; diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 5321bebcacefc..3acdf65ffda51 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -380,7 +380,7 @@ public: * * @param t The transaction to hold all changes. */ - virtual void encode_pending(MonitorDBStore::Transaction *t) = 0; + virtual void encode_pending(MonitorDBStore::TransactionRef t) = 0; /** * Discard the pending state @@ -680,7 +680,7 @@ public: * @param from the lower limit of the interval to be trimmed * @param to the upper limit of the interval to be trimmed (not including) */ - void trim(MonitorDBStore::Transaction *t, version_t from, version_t to); + void trim(MonitorDBStore::TransactionRef t, version_t from, version_t to); /** * encode service-specific extra bits into trim transaction @@ -688,7 +688,8 @@ public: * @param tx transaction * @param first new first_committed value */ - virtual void encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first) {} + virtual void encode_trim_extra(MonitorDBStore::TransactionRef tx, + version_t first) {} /** * Get the version we should trim to. @@ -722,7 +723,7 @@ public: * * @param t Transaction on which the full version shall be encoded. */ - virtual void encode_full(MonitorDBStore::Transaction *t) = 0; + virtual void encode_full(MonitorDBStore::TransactionRef t) = 0; /** * @} @@ -747,7 +748,7 @@ public: * purposes * @{ */ - void put_first_committed(MonitorDBStore::Transaction *t, version_t ver) { + void put_first_committed(MonitorDBStore::TransactionRef t, version_t ver) { t->put(get_service_name(), first_committed_name, ver); } /** @@ -756,7 +757,7 @@ public: * @param t A transaction to which we add this put operation * @param ver The last committed version number being put */ - void put_last_committed(MonitorDBStore::Transaction *t, version_t ver) { + void put_last_committed(MonitorDBStore::TransactionRef t, version_t ver) { t->put(get_service_name(), last_committed_name, ver); /* We only need to do this once, and that is when we are about to make our @@ -775,7 +776,7 @@ public: * @param ver The version to which we will add the value * @param bl A bufferlist containing the version's value */ - void put_version(MonitorDBStore::Transaction *t, version_t ver, + void put_version(MonitorDBStore::TransactionRef t, version_t ver, bufferlist& bl) { t->put(get_service_name(), ver, bl); } @@ -787,7 +788,7 @@ public: * @param ver A version number * @param bl A bufferlist containing the version's value */ - void put_version_full(MonitorDBStore::Transaction *t, + void put_version_full(MonitorDBStore::TransactionRef t, version_t ver, bufferlist& bl) { string key = mon->store->combine_strings(full_prefix_name, ver); t->put(get_service_name(), key, bl); @@ -799,7 +800,7 @@ public: * @param t The transaction to which we will add this put operation * @param ver A version number */ - void put_version_latest_full(MonitorDBStore::Transaction *t, version_t ver) { + void put_version_latest_full(MonitorDBStore::TransactionRef t, version_t ver) { string key = mon->store->combine_strings(full_prefix_name, full_latest_name); t->put(get_service_name(), key, ver); } @@ -810,7 +811,8 @@ public: * @param key The key to which we will add the value * @param bl A bufferlist containing the value */ - void put_value(MonitorDBStore::Transaction *t, const string& key, bufferlist& bl) { + void put_value(MonitorDBStore::TransactionRef t, + const string& key, bufferlist& bl) { t->put(get_service_name(), key, bl); } diff --git a/src/tools/ceph_monstore_tool.cc b/src/tools/ceph_monstore_tool.cc index a1cec7fe5b117..d26add613c107 100644 --- a/src/tools/ceph_monstore_tool.cc +++ b/src/tools/ceph_monstore_tool.cc @@ -30,15 +30,16 @@ using namespace std; class TraceIter { int fd; unsigned idx; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t; public: TraceIter(string fname) : fd(-1), idx(-1) { fd = ::open(fname.c_str(), O_RDONLY); + t.reset(new MonitorDBStore::Transaction); } bool valid() { return fd != -1; } - const MonitorDBStore::Transaction &cur() { + MonitorDBStore::TransactionRef cur() { assert(valid()); return t; } @@ -79,7 +80,8 @@ public: return; } bliter = bl.begin(); - t.decode(bliter); + t.reset(new MonitorDBStore::Transaction); + t->decode(bliter); } void init() { next(); @@ -252,10 +254,10 @@ int main(int argc, char **argv) { if (bl.length() == 0) break; cout << "\n--- " << v << " ---" << std::endl; - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); Paxos::decode_append_transaction(tx, bl); JSONFormatter f(true); - tx.dump(&f); + tx->dump(&f); f.flush(cout); } } else if (cmd == "dump-trace") { @@ -274,7 +276,7 @@ int main(int argc, char **argv) { } if (iter.num() >= dstart) { JSONFormatter f(true); - iter.cur().dump(&f, false); + iter.cur()->dump(&f, false); f.flush(std::cout); std::cout << std::endl; } @@ -315,7 +317,7 @@ int main(int argc, char **argv) { unsigned num = 0; for (unsigned i = 0; i < ntrans; ++i) { std::cerr << "Applying trans " << i << std::endl; - MonitorDBStore::Transaction t; + MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); string prefix; prefix.push_back((i%26)+'a'); for (unsigned j = 0; j < tsize; ++j) { @@ -323,10 +325,10 @@ int main(int argc, char **argv) { os << num; bufferlist bl; for (unsigned k = 0; k < tvalsize; ++k) bl.append(rand()); - t.put(prefix, os.str(), bl); + t->put(prefix, os.str(), bl); ++num; } - t.compact_prefix(prefix); + t->compact_prefix(prefix); st.apply_transaction(t); } } else if (cmd == "store-copy") { @@ -366,12 +368,12 @@ int main(int argc, char **argv) { do { uint64_t num_keys = 0; - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); while (it->valid() && num_keys < 128) { pair k = it->raw_key(); bufferlist v = it->value(); - tx.put(k.first, k.second, v); + tx->put(k.first, k.second, v); num_keys ++; total_tx ++; @@ -382,7 +384,7 @@ int main(int argc, char **argv) { total_keys += num_keys; - if (!tx.empty()) + if (!tx->empty()) out_store.apply_transaction(tx); std::cout << "copied " << total_keys << " keys so far (" diff --git a/src/tools/mon_store_converter.cc b/src/tools/mon_store_converter.cc index e9376248c8e39..7b8add8cd0627 100644 --- a/src/tools/mon_store_converter.cc +++ b/src/tools/mon_store_converter.cc @@ -94,14 +94,14 @@ class MonitorStoreConverter { } void _mark_convert_start() { - MonitorDBStore::Transaction tx; - tx.put("mon_convert", "on_going", 1); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put("mon_convert", "on_going", 1); db->apply_transaction(tx); } void _mark_convert_finish() { - MonitorDBStore::Transaction tx; - tx.erase("mon_convert", "on_going"); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->erase("mon_convert", "on_going"); db->apply_transaction(tx); } @@ -112,11 +112,11 @@ class MonitorStoreConverter { assert(store->exists_bl_ss("feature_set")); assert(store->exists_bl_ss("election_epoch")); - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); if (store->exists_bl_ss("joined")) { version_t joined = store->get_int("joined"); - tx.put(MONITOR_NAME, "joined", joined); + tx->put(MONITOR_NAME, "joined", joined); } vector keys; @@ -133,10 +133,10 @@ class MonitorStoreConverter { bufferlist bl; int r = store->get_bl_ss(bl, (*it).c_str(), 0); assert(r > 0); - tx.put(MONITOR_NAME, *it, bl); + tx->put(MONITOR_NAME, *it, bl); } - assert(!tx.empty()); + assert(!tx->empty()); db->apply_transaction(tx); } @@ -179,9 +179,9 @@ class MonitorStoreConverter { std::cout << __func__ << " " << machine << " ver " << ver << " bl " << bl.length() << std::endl; - MonitorDBStore::Transaction tx; - tx.put(machine, ver, bl); - tx.put(machine, "last_committed", ver); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put(machine, ver, bl); + tx->put(machine, "last_committed", ver); if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) { stringstream s; @@ -201,8 +201,8 @@ class MonitorStoreConverter { } bufferlist tx_bl; - tx.encode(tx_bl); - tx.put("paxos", gv, tx_bl); + tx->encode(tx_bl); + tx->put("paxos", gv, tx_bl); } db->apply_transaction(tx); } @@ -210,9 +210,9 @@ class MonitorStoreConverter { version_t lc = db->get(machine, "last_committed"); assert(lc == last_committed); - MonitorDBStore::Transaction tx; - tx.put(machine, "first_committed", first_committed); - tx.put(machine, "last_committed", last_committed); + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); + tx->put(machine, "first_committed", first_committed); + tx->put(machine, "last_committed", last_committed); if (store->exists_bl_ss(machine.c_str(), "latest")) { bufferlist latest_bl_raw; @@ -224,7 +224,7 @@ class MonitorStoreConverter { goto out; } - tx.put(machine, "latest", latest_bl_raw); + tx->put(machine, "latest", latest_bl_raw); bufferlist::iterator lbl_it = latest_bl_raw.begin(); bufferlist latest_bl; @@ -235,10 +235,10 @@ class MonitorStoreConverter { std::cout << __func__ << " machine " << machine << " latest ver " << latest_ver << std::endl; - tx.put(machine, "full_latest", latest_ver); + tx->put(machine, "full_latest", latest_ver); stringstream os; os << "full_" << latest_ver; - tx.put(machine, os.str(), latest_bl); + tx->put(machine, os.str(), latest_bl); } out: db->apply_transaction(tx); @@ -269,17 +269,17 @@ class MonitorStoreConverter { // erase all paxos versions between [first, last_gv[, with first being the // first gv in the map. - MonitorDBStore::Transaction tx; + MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction); set::iterator it = gvs.begin(); std::cout << __func__ << " first gv " << (*it) << " last gv " << last_gv << std::endl; for (; it != gvs.end() && (*it < last_gv); ++it) { - tx.erase("paxos", *it); + tx->erase("paxos", *it); } - tx.put("paxos", "first_committed", last_gv); - tx.put("paxos", "last_committed", highest_gv); - tx.put("paxos", "accepted_pn", highest_accepted_pn); - tx.put("paxos", "last_pn", highest_last_pn); + tx->put("paxos", "first_committed", last_gv); + tx->put("paxos", "last_committed", highest_gv); + tx->put("paxos", "accepted_pn", highest_accepted_pn); + tx->put("paxos", "last_pn", highest_last_pn); db->apply_transaction(tx); } -- 2.39.5