::encode(v, final);
::encode(mapbl, final);
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
// save it
- t.put("monmap", v, mapbl);
- t.put("monmap", "latest", final);
- t.put("monmap", "last_committed", v);
+ t->put("monmap", v, mapbl);
+ t->put("monmap", "latest", final);
+ t->put("monmap", "last_committed", v);
store->apply_transaction(t);
dout(0) << "done." << dendl;
mon->key_server.set_ver(keys_ver);
if (keys_ver == 1 && mon->is_keyring_required()) {
- MonitorDBStore::Transaction t;
- t.erase("mkfs", "keyring");
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->erase("mkfs", "keyring");
mon->store->apply_transaction(t);
}
}
dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
}
-void AuthMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void AuthMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
dout(10) << __func__ << " v " << (get_last_committed() + 1) << dendl;
put_last_committed(t, version);
}
-void AuthMonitor::encode_full(MonitorDBStore::Transaction *t)
+void AuthMonitor::encode_full(MonitorDBStore::TransactionRef t)
{
version_t version = mon->key_server.get_ver();
// do not stash full version 0 as it will never be removed nor read
void increase_max_global_id();
uint64_t assign_global_id(MAuth *m, bool should_increase_max);
// propose pending update to peers
- void encode_pending(MonitorDBStore::Transaction *t);
- virtual void encode_full(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
+ virtual void encode_full(MonitorDBStore::TransactionRef t);
version_t get_trim_to();
bool preprocess_query(PaxosServiceMessage *m); // true if processed.
void ConfigKeyService::store_put(string key, bufferlist &bl, Context *cb)
{
bufferlist proposal_bl;
- MonitorDBStore::Transaction t;
- t.put(STORE_PREFIX, key, bl);
- t.encode(proposal_bl);
-
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(STORE_PREFIX, key, bl);
+ t->encode(proposal_bl);
paxos->propose_new_value(proposal_bl, cb);
}
dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
assert(epoch <= e);
epoch = e;
- MonitorDBStore::Transaction t;
- t.put(Monitor::MONITOR_NAME, "election_epoch", epoch);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
mon->store->apply_transaction(t);
mon->join_election();
<< ", taking it"
<< dendl;
mon->monmap->decode(em->monmap_bl);
- MonitorDBStore::Transaction t;
- t.put("monmap", mon->monmap->epoch, em->monmap_bl);
- t.put("monmap", "last_committed", mon->monmap->epoch);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put("monmap", mon->monmap->epoch, em->monmap_bl);
+ t->put("monmap", "last_committed", mon->monmap->epoch);
mon->store->apply_transaction(t);
//mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
cancel_timer();
check_subs();
}
-void LogMonitor::store_do_append(MonitorDBStore::Transaction *t,
+void LogMonitor::store_do_append(MonitorDBStore::TransactionRef t,
const string& key, bufferlist& bl)
{
bufferlist existing_bl;
dout(10) << "create_pending v " << (get_last_committed() + 1) << dendl;
}
-void LogMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void LogMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
version_t version = get_last_committed() + 1;
bufferlist bl;
put_last_committed(t, version);
}
-void LogMonitor::encode_full(MonitorDBStore::Transaction *t)
+void LogMonitor::encode_full(MonitorDBStore::TransactionRef t)
{
dout(10) << __func__ << " log v " << summary.version << dendl;
assert(get_last_committed() == summary.version);
void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
// propose pending update to peers
- void encode_pending(MonitorDBStore::Transaction *t);
- virtual void encode_full(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
+ virtual void encode_full(MonitorDBStore::TransactionRef t);
version_t get_trim_to();
bool preprocess_query(PaxosServiceMessage *m); // true if processed.
bool prepare_update(PaxosServiceMessage *m);
bool _create_sub_summary(MLog *mlog, int level);
void _create_sub_incremental(MLog *mlog, int level, version_t sv);
- void store_do_append(MonitorDBStore::Transaction *t,
+ void store_do_append(MonitorDBStore::TransactionRef t,
const string& key, bufferlist& bl);
public:
dout(10) << "create_pending e" << pending_mdsmap.epoch << dendl;
}
-void MDSMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void MDSMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
dout(10) << "encode_pending e" << pending_mdsmap.epoch << dendl;
void create_initial();
void update_from_paxos(bool *need_bootstrap);
void create_pending();
- void encode_pending(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
// we don't require full versions; don't encode any.
- virtual void encode_full(MonitorDBStore::Transaction *t) { }
+ virtual void encode_full(MonitorDBStore::TransactionRef t) { }
void update_logger();
bufferlist bl;
features->encode(bl);
- MonitorDBStore::Transaction t;
- t.put(MONITOR_NAME, COMPAT_SET_LOC, bl);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
store->apply_transaction(t);
} else {
bufferlist::iterator it = featuresbl.begin();
dout(10) << "required_features " << required_features << dendl;
}
-void Monitor::write_features(MonitorDBStore::Transaction &t)
+void Monitor::write_features(MonitorDBStore::TransactionRef t)
{
bufferlist bl;
features.encode(bl);
- t.put(MONITOR_NAME, COMPAT_SET_LOC, bl);
+ t->put(MONITOR_NAME, COMPAT_SET_LOC, bl);
}
const char** Monitor::get_tracked_conf_keys() const
if (sync_full) {
// stash key state, and mark that we are syncing
- MonitorDBStore::Transaction t;
- sync_stash_critical_state(&t);
- t.put("mon_sync", "in_sync", 1);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ sync_stash_critical_state(t);
+ t->put("mon_sync", "in_sync", 1);
sync_last_committed_floor = MAX(sync_last_committed_floor, paxos->get_version());
dout(10) << __func__ << " marking sync in progress, storing sync_last_committed_floor "
<< sync_last_committed_floor << dendl;
- t.put("mon_sync", "last_committed_floor", sync_last_committed_floor);
+ t->put("mon_sync", "last_committed_floor", sync_last_committed_floor);
store->apply_transaction(t);
messenger->send_message(m, sync_provider);
}
-void Monitor::sync_stash_critical_state(MonitorDBStore::Transaction *t)
+void Monitor::sync_stash_critical_state(MonitorDBStore::TransactionRef t)
{
dout(10) << __func__ << dendl;
bufferlist backup_monmap;
if (sync_full) {
// finalize the paxos commits
- MonitorDBStore::Transaction tx;
- paxos->read_and_prepare_transactions(&tx, sync_start_version, last_committed);
- tx.put(paxos->get_name(), "last_committed", last_committed);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ paxos->read_and_prepare_transactions(tx, sync_start_version,
+ last_committed);
+ tx->put(paxos->get_name(), "last_committed", last_committed);
dout(30) << __func__ << " final tx dump:\n";
JSONFormatter f(true);
- tx.dump(&f);
+ tx->dump(&f);
f.flush(*_dout);
*_dout << dendl;
assert(g_conf->mon_sync_requester_kill_at != 8);
- MonitorDBStore::Transaction t;
- t.erase("mon_sync", "in_sync");
- t.erase("mon_sync", "force_sync");
- t.erase("mon_sync", "last_committed_floor");
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->erase("mon_sync", "in_sync");
+ t->erase("mon_sync", "force_sync");
+ t->erase("mon_sync", "last_committed_floor");
store->apply_transaction(t);
assert(g_conf->mon_sync_requester_kill_at != 9);
}
MMonSync *reply = new MMonSync(MMonSync::OP_CHUNK, sp.cookie);
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
int left = g_conf->mon_sync_max_payload_size;
while (sp.last_committed < paxos->get_version() && left > 0) {
bufferlist bl;
sp.last_committed++;
store->get(paxos->get_name(), sp.last_committed, bl);
- tx.put(paxos->get_name(), sp.last_committed, bl);
+ tx->put(paxos->get_name(), sp.last_committed, bl);
left -= bl.length();
- dout(20) << __func__ << " including paxos state " << sp.last_committed << dendl;
+ dout(20) << __func__ << " including paxos state " << sp.last_committed
+ << dendl;
}
reply->last_committed = sp.last_committed;
if ((sp.full && sp.synchronizer->has_next_chunk()) ||
sp.last_committed < paxos->get_version()) {
- dout(10) << __func__ << " chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+ dout(10) << __func__ << " chunk, through version " << sp.last_committed
+ << " key " << sp.last_key << dendl;
} else {
- dout(10) << __func__ << " last chunk, through version " << sp.last_committed << " key " << sp.last_key << dendl;
+ dout(10) << __func__ << " last chunk, through version " << sp.last_committed
+ << " key " << sp.last_key << dendl;
reply->op = MMonSync::OP_LAST_CHUNK;
assert(g_conf->mon_sync_provider_kill_at != 3);
sync_providers.erase(sp.cookie);
}
- ::encode(tx, reply->chunk_bl);
+ ::encode(*tx, reply->chunk_bl);
m->get_connection()->send_message(reply);
}
assert(state == STATE_SYNCHRONIZING);
assert(g_conf->mon_sync_requester_kill_at != 5);
- MonitorDBStore::Transaction tx;
- tx.append_from_encoded(m->chunk_bl);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->append_from_encoded(m->chunk_bl);
dout(30) << __func__ << " tx dump:\n";
JSONFormatter f(true);
- tx.dump(&f);
+ tx->dump(&f);
f.flush(*_dout);
*_dout << dendl;
if (!sync_full) {
dout(10) << __func__ << " applying recent paxos transactions as we go" << dendl;
- MonitorDBStore::Transaction tx;
- paxos->read_and_prepare_transactions(&tx, paxos->get_version() + 1, m->last_committed);
- tx.put(paxos->get_name(), "last_committed", m->last_committed);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ paxos->read_and_prepare_transactions(tx, paxos->get_version() + 1,
+ m->last_committed);
+ tx->put(paxos->get_name(), "last_committed", m->last_committed);
dout(30) << __func__ << " tx dump:\n";
JSONFormatter f(true);
- tx.dump(&f);
+ tx->dump(&f);
f.flush(*_dout);
*_dout << dendl;
dout(1) << __func__ << " enabling new quorum features: " << diff << dendl;
features = new_features;
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
write_features(t);
store->apply_transaction(t);
free_formatter = true;
}
- MonitorDBStore::Transaction tx;
- sync_stash_critical_state(&tx);
- tx.put("mon_sync", "force_sync", 1);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ sync_stash_critical_state(tx);
+ tx->put("mon_sync", "force_sync", 1);
store->apply_transaction(tx);
f->open_object_section("sync_force");
if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
// this is only necessary on upgraded clusters.
- MonitorDBStore::Transaction t;
- prepare_new_fingerprint(&t);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ prepare_new_fingerprint(t);
bufferlist tbl;
- t.encode(tbl);
+ t->encode(tbl);
paxos->propose_new_value(tbl, new C_NoopContext);
}
new_tick();
}
-void Monitor::prepare_new_fingerprint(MonitorDBStore::Transaction *t)
+void Monitor::prepare_new_fingerprint(MonitorDBStore::TransactionRef t)
{
uuid_d nf;
nf.generate_random();
int Monitor::write_fsid()
{
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
int r = write_fsid(t);
store->apply_transaction(t);
return r;
}
-int Monitor::write_fsid(MonitorDBStore::Transaction &t)
+int Monitor::write_fsid(MonitorDBStore::TransactionRef t)
{
ostringstream ss;
ss << monmap->get_fsid() << "\n";
bufferlist b;
b.append(us);
- t.put(MONITOR_NAME, "cluster_uuid", b);
+ t->put(MONITOR_NAME, "cluster_uuid", b);
return 0;
}
*/
int Monitor::mkfs(bufferlist& osdmapbl)
{
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
// verify cluster fsid
int r = check_fsid();
bufferlist magicbl;
magicbl.append(CEPH_MON_ONDISK_MAGIC);
magicbl.append("\n");
- t.put(MONITOR_NAME, "magic", magicbl);
+ t->put(MONITOR_NAME, "magic", magicbl);
features = get_supported_features();
bufferlist monmapbl;
monmap->encode(monmapbl, CEPH_FEATURES_ALL);
monmap->set_epoch(0); // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()
- t.put("mkfs", "monmap", monmapbl);
+ t->put("mkfs", "monmap", monmapbl);
if (osdmapbl.length()) {
// make sure it's a valid osdmap
derr << "error decoding provided osdmap: " << e.what() << dendl;
return -EINVAL;
}
- t.put("mkfs", "osdmap", osdmapbl);
+ t->put("mkfs", "osdmap", osdmapbl);
}
if (is_keyring_required()) {
bufferlist keyringbl;
keyring.encode_plaintext(keyringbl);
- t.put("mkfs", "keyring", keyringbl);
+ t->put("mkfs", "keyring", keyringbl);
}
write_fsid(t);
store->apply_transaction(t);
#define dout_prefix *_dout
void Monitor::StoreConverter::_convert_finish_features(
- MonitorDBStore::Transaction &t)
+ MonitorDBStore::TransactionRef t)
{
dout(20) << __func__ << dendl;
features.encode(features_bl);
dout(20) << __func__ << " new features " << features << dendl;
- t.put(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
+ t->put(MONITOR_NAME, COMPAT_SET_LOC, features_bl);
}
assert(store->exists_bl_ss("feature_set"));
assert(store->exists_bl_ss("election_epoch"));
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
if (store->exists_bl_ss("joined")) {
version_t joined = store->get_int("joined");
- tx.put(MONITOR_NAME, "joined", joined);
+ tx->put(MONITOR_NAME, "joined", joined);
}
vector<string> keys;
bufferlist bl;
int r = store->get_bl_ss(bl, (*it).c_str(), 0);
assert(r > 0);
- tx.put(MONITOR_NAME, *it, bl);
+ tx->put(MONITOR_NAME, *it, bl);
}
version_t election_epoch = store->get_int("election_epoch");
- tx.put(MONITOR_NAME, "election_epoch", election_epoch);
+ tx->put(MONITOR_NAME, "election_epoch", election_epoch);
- assert(!tx.empty());
+ assert(!tx->empty());
db->apply_transaction(tx);
dout(10) << __func__ << " finished" << dendl;
}
dout(20) << __func__ << " " << machine
<< " ver " << ver << " bl " << bl.length() << dendl;
- MonitorDBStore::Transaction tx;
- tx.put(machine, ver, bl);
- tx.put(machine, "last_committed", ver);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put(machine, ver, bl);
+ tx->put(machine, "last_committed", ver);
if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) {
stringstream s;
dout(20) << __func__ << " " << machine
<< " ver " << ver << " -> " << gv << dendl;
- MonitorDBStore::Transaction paxos_tx;
+ MonitorDBStore::TransactionRef paxos_tx(new MonitorDBStore::Transaction);
if (gvs.count(gv) == 0) {
gvs.insert(gv);
bufferlist paxos_bl;
int r = db->get("paxos", gv, paxos_bl);
assert(r >= 0);
- paxos_tx.append_from_encoded(paxos_bl);
+ paxos_tx->append_from_encoded(paxos_bl);
}
gv_map[gv].insert(make_pair(machine,ver));
bufferlist tx_bl;
- tx.encode(tx_bl);
- paxos_tx.append_from_encoded(tx_bl);
+ tx->encode(tx_bl);
+ paxos_tx->append_from_encoded(tx_bl);
bufferlist paxos_bl;
- paxos_tx.encode(paxos_bl);
- tx.put("paxos", gv, paxos_bl);
+ paxos_tx->encode(paxos_bl);
+ tx->put("paxos", gv, paxos_bl);
}
db->apply_transaction(tx);
}
dout(20) << __func__ << " lc " << lc << " last_committed " << last_committed << dendl;
assert(lc == last_committed);
- MonitorDBStore::Transaction tx;
- tx.put(machine, "first_committed", first_committed);
- tx.put(machine, "last_committed", last_committed);
- tx.put(machine, "conversion_first", first_committed);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put(machine, "first_committed", first_committed);
+ tx->put(machine, "last_committed", last_committed);
+ tx->put(machine, "conversion_first", first_committed);
if (store->exists_bl_ss(machine.c_str(), "latest")) {
bufferlist latest_bl_raw;
goto out;
}
- tx.put(machine, "latest", latest_bl_raw);
+ tx->put(machine, "latest", latest_bl_raw);
bufferlist::iterator lbl_it = latest_bl_raw.begin();
bufferlist latest_bl;
dout(20) << __func__ << " machine " << machine
<< " latest ver " << latest_ver << dendl;
- tx.put(machine, "full_latest", latest_ver);
+ tx->put(machine, "full_latest", latest_ver);
stringstream os;
os << "full_" << latest_ver;
- tx.put(machine, os.str(), latest_bl);
+ tx->put(machine, os.str(), latest_bl);
}
out:
db->apply_transaction(tx);
<< " bl " << bl.length() << " bytes" << dendl;
string full_key = "full_" + stringify(ver);
- MonitorDBStore::Transaction tx;
- tx.put("osdmap", full_key, bl);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put("osdmap", full_key, bl);
db->apply_transaction(tx);
}
dout(10) << __func__ << " found " << err << " conversion errors!" << dendl;
// erase all paxos versions between [first, last_gv[, with first being the
// first gv in the map.
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
set<version_t>::iterator it = gvs.begin();
dout(1) << __func__ << " first gv " << (*it)
<< " last gv " << last_gv << dendl;
for (; it != gvs.end() && (*it < last_gv); ++it) {
- tx.erase("paxos", *it);
+ tx->erase("paxos", *it);
}
- tx.put("paxos", "first_committed", last_gv);
- tx.put("paxos", "last_committed", highest_gv);
- tx.put("paxos", "accepted_pn", highest_accepted_pn);
- tx.put("paxos", "last_pn", highest_last_pn);
- tx.put("paxos", "conversion_first", last_gv);
+ tx->put("paxos", "first_committed", last_gv);
+ tx->put("paxos", "last_committed", highest_gv);
+ tx->put("paxos", "accepted_pn", highest_accepted_pn);
+ tx->put("paxos", "last_pn", highest_last_pn);
+ tx->put("paxos", "conversion_first", last_gv);
db->apply_transaction(tx);
dout(10) << __func__ << " finished" << dendl;
const utime_t &get_leader_since() const;
- void prepare_new_fingerprint(MonitorDBStore::Transaction *t);
+ void prepare_new_fingerprint(MonitorDBStore::TransactionRef t);
// -- elector --
private:
* We store a few things on the side that we don't want to get clobbered by sync. This
* includes the latest monmap and a lower bound on last_committed.
*/
- void sync_stash_critical_state(MonitorDBStore::Transaction *tx);
+ void sync_stash_critical_state(MonitorDBStore::TransactionRef tx);
/**
* reset the sync timeout
/// read the ondisk features into the CompatSet pointed to by read_features
static void read_features_off_disk(MonitorDBStore *store, CompatSet *read_features);
void read_features();
- void write_features(MonitorDBStore::Transaction &t);
+ void write_features(MonitorDBStore::TransactionRef t);
public:
Monitor(CephContext *cct_, string nm, MonitorDBStore *s,
* @return 0 on success, or negative error code
*/
int write_fsid();
- int write_fsid(MonitorDBStore::Transaction &t);
+ int write_fsid(MonitorDBStore::TransactionRef t);
void do_admin_command(std::string command, cmdmap_t& cmdmap,
std::string format, ostream& ss);
}
void _mark_convert_start() {
- MonitorDBStore::Transaction tx;
- tx.put("mon_convert", "on_going", 1);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put("mon_convert", "on_going", 1);
db->apply_transaction(tx);
}
- void _convert_finish_features(MonitorDBStore::Transaction &t);
+ void _convert_finish_features(MonitorDBStore::TransactionRef t);
void _mark_convert_finish() {
- MonitorDBStore::Transaction tx;
- tx.erase("mon_convert", "on_going");
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->erase("mon_convert", "on_going");
_convert_finish_features(tx);
db->apply_transaction(tx);
}
}
};
+ struct Transaction;
+ typedef ceph::shared_ptr<Transaction> TransactionRef;
struct Transaction {
list<Op> ops;
uint64_t bytes, keys;
ls.back()->compact_range("prefix4", "from", "to");
}
- void append(Transaction& other) {
- ops.splice(ops.end(), other.ops);
- keys += other.keys;
- bytes += other.bytes;
+ void append(TransactionRef other) {
+ ops.splice(ops.end(), other->ops);
+ keys += other->keys;
+ bytes += other->bytes;
}
void append_from_encoded(bufferlist& bl) {
- Transaction other;
+ TransactionRef other(new Transaction);
bufferlist::iterator it = bl.begin();
- other.decode(it);
+ other->decode(it);
append(other);
}
}
};
- int apply_transaction(const MonitorDBStore::Transaction& t) {
+ int apply_transaction(MonitorDBStore::TransactionRef t) {
KeyValueDB::Transaction dbt = db->get_transaction();
if (do_dump) {
bufferlist bl;
- t.encode(bl);
+ t->encode(bl);
bl.write_fd(dump_fd);
}
list<pair<string, pair<string,string> > > compact;
- for (list<Op>::const_iterator it = t.ops.begin(); it != t.ops.end(); ++it) {
+ for (list<Op>::const_iterator it = t->ops.begin();
+ it != t->ops.end();
+ ++it) {
const Op& op = *it;
switch (op.type) {
case Transaction::OP_PUT:
StoreIteratorImpl() : done(false) { }
virtual ~StoreIteratorImpl() { }
- bool add_chunk_entry(Transaction &tx,
+ bool add_chunk_entry(TransactionRef tx,
string &prefix,
string &key,
bufferlist &value,
uint64_t max) {
- Transaction tmp;
+ TransactionRef tmp(new Transaction);
bufferlist tmp_bl;
- tmp.put(prefix, key, value);
- tmp.encode(tmp_bl);
+ tmp->put(prefix, key, value);
+ tmp->encode(tmp_bl);
bufferlist tx_bl;
- tx.encode(tx_bl);
+ tx->encode(tx_bl);
size_t len = tx_bl.length() + tmp_bl.length();
- if (!tx.empty() && (len > max)) {
+ if (!tx->empty() && (len > max)) {
return false;
}
- tx.append(tmp);
+ tx->append(tmp);
last_key.first = prefix;
last_key.second = key;
virtual bool has_next_chunk() {
return !done && _is_valid();
}
- virtual void get_chunk_tx(Transaction &tx, uint64_t max) = 0;
+ virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
virtual pair<string,string> get_next_key() = 0;
};
typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
* differ from the one passed on to the function)
* @param last_key[out] Last key in the chunk
*/
- virtual void get_chunk_tx(Transaction &tx, uint64_t max) {
+ virtual void get_chunk_tx(TransactionRef tx, uint64_t max) {
assert(done == false);
assert(iter->valid() == true);
mon->monmap->decode(monmap_bl);
if (mon->store->exists("mkfs", "monmap")) {
- MonitorDBStore::Transaction t;
- t.erase("mkfs", "monmap");
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->erase("mkfs", "monmap");
mon->store->apply_transaction(t);
}
}
dout(10) << "create_pending monmap epoch " << pending_map.epoch << dendl;
}
-void MonmapMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void MonmapMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
dout(10) << "encode_pending epoch " << pending_map.epoch << dendl;
single-threaded process and, truth be told, no one else relies on this
thing besides us.
*/
- MonitorDBStore::Transaction t;
- t.put(Monitor::MONITOR_NAME, "joined", 1);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(Monitor::MONITOR_NAME, "joined", 1);
mon->store->apply_transaction(t);
mon->has_ever_joined = true;
}
void create_pending();
- void encode_pending(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
// we always encode the full map; we have no use for full versions
- virtual void encode_full(MonitorDBStore::Transaction *t) { }
+ virtual void encode_full(MonitorDBStore::TransactionRef t) { }
void on_active();
// state, and we shouldn't want to work around it without knowing what
// exactly happened.
assert(latest_full > 0);
- MonitorDBStore::Transaction t;
- put_version_latest_full(&t, latest_full);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ put_version_latest_full(t, latest_full);
mon->store->apply_transaction(t);
dout(10) << __func__ << " updated the on-disk full map version to "
<< latest_full << dendl;
}
// walk through incrementals
- MonitorDBStore::Transaction *t = NULL;
+ MonitorDBStore::TransactionRef t;
size_t tx_size = 0;
while (version > osdmap.epoch) {
bufferlist inc_bl;
err = osdmap.apply_incremental(inc);
assert(err == 0);
- if (t == NULL)
- t = new MonitorDBStore::Transaction;
+ if (!t)
+ t.reset(new MonitorDBStore::Transaction);
// Write out the full map for all past epochs. Encode the full
// map with the same features as the incremental. If we don't
}
if (tx_size > g_conf->mon_sync_max_payload_size*2) {
- mon->store->apply_transaction(*t);
- delete t;
- t = NULL;
+ mon->store->apply_transaction(t);
+ t = MonitorDBStore::TransactionRef();
tx_size = 0;
}
}
- if (t != NULL) {
- mon->store->apply_transaction(*t);
- delete t;
+ if (t) {
+ mon->store->apply_transaction(t);
}
for (int o = 0; o < osdmap.get_max_osd(); o++) {
* @note receiving a transaction in this function gives a fair amount of
* freedom to the service implementation if it does need it. It shouldn't.
*/
-void OSDMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void OSDMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
dout(10) << "encode_pending e " << pending_inc.epoch
<< dendl;
return 0;
}
-void OSDMonitor::encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first)
+void OSDMonitor::encode_trim_extra(MonitorDBStore::TransactionRef tx,
+ version_t first)
{
dout(10) << __func__ << " including full map for e " << first << dendl;
bufferlist bl;
private:
void update_from_paxos(bool *need_bootstrap);
void create_pending(); // prepare a new pending
- void encode_pending(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
void on_active();
void on_shutdown();
* we haven't delegated full version stashing to paxosservice for some time
* now, making this function useless in current context.
*/
- virtual void encode_full(MonitorDBStore::Transaction *t) { }
+ virtual void encode_full(MonitorDBStore::TransactionRef t) { }
/**
* do not let paxosservice periodically stash full osdmaps, or we will break our
* locally-managed full maps. (update_from_paxos loads the latest and writes them
* This ensures that anyone post-sync will have enough to rebuild their
* full osdmaps.
*/
- void encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first);
+ void encode_trim_extra(MonitorDBStore::TransactionRef tx, version_t first);
void update_msgr_features();
int check_cluster_features(uint64_t features, stringstream &ss);
}
-void PGMonitor::encode_pending(MonitorDBStore::Transaction *t)
+void PGMonitor::encode_pending(MonitorDBStore::TransactionRef t)
{
version_t version = pending_inc.version;
dout(10) << __func__ << " v " << version << dendl;
version_t get_trim_to();
void update_logger();
- void encode_pending(MonitorDBStore::Transaction *t);
+ void encode_pending(MonitorDBStore::TransactionRef t);
void read_pgmap_meta();
void read_pgmap_full();
void apply_pgmap_delta(bufferlist& bl);
bool should_stash_full() {
return false; // never
}
- virtual void encode_full(MonitorDBStore::Transaction *t) {
+ virtual void encode_full(MonitorDBStore::TransactionRef t) {
assert(0 == "unimplemented encode_full");
}
return mon->store;
}
-void Paxos::read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t first, version_t last)
+void Paxos::read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
+ version_t first, version_t last)
{
dout(10) << __func__ << " first " << first << " last " << last << dendl;
for (version_t v = first; v <= last; ++v) {
int err = get_store()->get(get_name(), v, bl);
assert(err == 0);
assert(bl.length());
- decode_append_transaction(*tx, bl);
+ decode_append_transaction(tx, bl);
}
dout(15) << __func__ << " total versions " << (last-first) << dendl;
}
dout(10) << "accepting pn " << accepted_pn << " from "
<< accepted_pn_from << dendl;
- MonitorDBStore::Transaction t;
- t.put(get_name(), "accepted_pn", accepted_pn);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(get_name(), "accepted_pn", accepted_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_collect);
- logger->inc(l_paxos_collect_keys, t.get_keys());
- logger->inc(l_paxos_collect_bytes, t.get_bytes());
+ logger->inc(l_paxos_collect_keys, t->get_keys());
+ logger->inc(l_paxos_collect_bytes, t->get_bytes());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
*/
bool Paxos::store_state(MMonPaxos *m)
{
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
map<version_t,bufferlist>::iterator start = m->values.begin();
bool changed = false;
} else {
dout(10) << "store_state [" << start->first << ".."
<< last_committed << "]" << dendl;
- t.put(get_name(), "last_committed", last_committed);
+ t->put(get_name(), "last_committed", last_committed);
// we should apply the state here -- decode every single bufferlist in the
// map and append the transactions to 't'.
map<version_t,bufferlist>::iterator it;
for (it = start; it != end; ++it) {
// write the bufferlist as the version's value
- t.put(get_name(), it->first, it->second);
+ t->put(get_name(), it->first, it->second);
// decode the bufferlist and append it to the transaction we will shortly
// apply.
decode_append_transaction(t, it->second);
uncommitted_value.clear();
}
}
- if (!t.empty()) {
+ if (!t->empty()) {
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_store_state);
- logger->inc(l_paxos_store_state_bytes, t.get_bytes());
- logger->inc(l_paxos_store_state_keys, t.get_keys());
+ logger->inc(l_paxos_store_state_bytes, t->get_bytes());
+ logger->inc(l_paxos_store_state_keys, t->get_keys());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
void Paxos::remove_legacy_versions()
{
if (get_store()->exists(get_name(), "conversion_first")) {
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
version_t v = get_store()->get(get_name(), "conversion_first");
dout(10) << __func__ << " removing pre-conversion paxos states from " << v
<< " until " << first_committed << dendl;
for (; v < first_committed; ++v) {
- t.erase(get_name(), v);
+ t->erase(get_name(), v);
}
- t.erase(get_name(), "conversion_first");
+ t->erase(get_name(), "conversion_first");
get_store()->apply_transaction(t);
}
}
new_value = v;
if (last_committed == 0) {
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
// initial base case; set first_committed too
- t.put(get_name(), "first_committed", 1);
+ t->put(get_name(), "first_committed", 1);
decode_append_transaction(t, new_value);
bufferlist tx_bl;
- t.encode(tx_bl);
+ t->encode(tx_bl);
new_value = tx_bl;
}
// store the proposed value in the store. IF it is accepted, we will then
// have to decode it into a transaction and apply it.
- MonitorDBStore::Transaction t;
- t.put(get_name(), last_committed+1, new_value);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(get_name(), last_committed+1, new_value);
// note which pn this pending value is for.
- t.put(get_name(), "pending_v", last_committed + 1);
- t.put(get_name(), "pending_pn", accepted_pn);
+ t->put(get_name(), "pending_v", last_committed + 1);
+ t->put(get_name(), "pending_pn", accepted_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
- MonitorDBStore::Transaction debug_tx;
+ MonitorDBStore::TransactionRef debug_tx(new MonitorDBStore::Transaction);
bufferlist::iterator new_value_it = new_value.begin();
- debug_tx.decode(new_value_it);
- debug_tx.dump(&f);
+ debug_tx->decode(new_value_it);
+ debug_tx->dump(&f);
*_dout << "\nbl dump:\n";
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_begin);
- logger->inc(l_paxos_begin_keys, t.get_keys());
- logger->inc(l_paxos_begin_bytes, t.get_bytes());
+ logger->inc(l_paxos_begin_keys, t->get_keys());
+ logger->inc(l_paxos_begin_bytes, t->get_bytes());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
dout(10) << "accepting value for " << v << " pn " << accepted_pn << dendl;
// store the accepted value onto our store. We will have to decode it and
// apply its transaction once we receive permission to commit.
- MonitorDBStore::Transaction t;
- t.put(get_name(), v, begin->values[v]);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(get_name(), v, begin->values[v]);
// note which pn this pending value is for.
- t.put(get_name(), "pending_v", v);
- t.put(get_name(), "pending_pn", accepted_pn);
+ t->put(get_name(), "pending_v", v);
+ t->put(get_name(), "pending_pn", accepted_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
- logger->inc(l_paxos_begin_bytes, t.get_bytes());
+ logger->inc(l_paxos_begin_bytes, t->get_bytes());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
assert(g_conf->paxos_kill_at != 7);
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
// commit locally
last_committed++;
last_commit_time = ceph_clock_now(g_ceph_context);
- t.put(get_name(), "last_committed", last_committed);
+ t->put(get_name(), "last_committed", last_committed);
// decode the value and apply its transaction to the store.
// this value can now be read from last_committed.
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
logger->inc(l_paxos_commit);
- logger->inc(l_paxos_commit_keys, t.get_keys());
- logger->inc(l_paxos_commit_bytes, t.get_bytes());
+ logger->inc(l_paxos_commit_keys, t->get_keys());
+ logger->inc(l_paxos_commit_bytes, t->get_bytes());
utime_t start = ceph_clock_now(NULL);
get_store()->apply_transaction(t);
dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
for (version_t v = first_committed; v < end; ++v) {
dout(10) << "trim " << v << dendl;
- t.erase(get_name(), v);
+ t->erase(get_name(), v);
}
- t.put(get_name(), "first_committed", end);
+ t->put(get_name(), "first_committed", end);
if (g_conf->mon_compact_on_trim) {
dout(10) << " compacting trimmed range" << dendl;
- t.compact_range(get_name(), stringify(first_committed - 1), stringify(end));
+ t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
}
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
bufferlist bl;
- t.encode(bl);
+ t->encode(bl);
trimming = true;
queue_proposal(bl, new C_Trimmed(this));
last_pn += (version_t)mon->rank;
// write
- MonitorDBStore::Transaction t;
- t.put(get_name(), "last_pn", last_pn);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ t->put(get_name(), "last_pn", last_pn);
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
void dispatch(PaxosServiceMessage *m);
- void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last);
+ void read_and_prepare_transactions(MonitorDBStore::TransactionRef tx,
+ version_t from, version_t last);
void init();
* @param t The transaction to which we will append the operations
* @param bl A bufferlist containing an encoded transaction
*/
- static void decode_append_transaction(MonitorDBStore::Transaction& t,
- bufferlist& bl) {
- MonitorDBStore::Transaction vt;
+ static void decode_append_transaction(MonitorDBStore::TransactionRef t,
+ bufferlist& bl) {
+ MonitorDBStore::TransactionRef vt(new MonitorDBStore::Transaction);
bufferlist::iterator it = bl.begin();
- vt.decode(it);
- t.append(vt);
+ vt->decode(it);
+ t->append(vt);
}
/**
out << " " << proposed
<< " queued " << (ceph_clock_now(NULL) - p.proposal_time)
<< " tx dump:\n";
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
bufferlist::iterator p_it = p.bl.begin();
- t.decode(p_it);
+ t->decode(p_it);
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(out);
return out;
}
dout(10) << __func__ << " conversion_first " << cf
<< " first committed " << fc << dendl;
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
if (cf < fc) {
- trim(&t, cf, fc);
+ trim(t, cf, fc);
}
- t.erase(get_service_name(), "conversion_first");
+ t->erase(get_service_name(), "conversion_first");
mon->store->apply_transaction(t);
}
* to encode whatever is pending on the implementation class into a
* bufferlist, so we can then propose that as a value through Paxos.
*/
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
bufferlist bl;
if (should_stash_full())
- encode_full(&t);
+ encode_full(t);
- encode_pending(&t);
+ encode_pending(t);
have_pending = false;
if (format_version > 0) {
- t.put(get_service_name(), "format_version", format_version);
+ t->put(get_service_name(), "format_version", format_version);
}
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
- t.dump(&f);
+ t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
- t.encode(bl);
+ t->encode(bl);
// apply to paxos
proposing = true;
}
dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
- MonitorDBStore::Transaction t;
- trim(&t, get_first_committed(), trim_to);
- put_first_committed(&t, trim_to);
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
+ trim(t, get_first_committed(), trim_to);
+ put_first_committed(t, trim_to);
// let the service add any extra stuff
- encode_trim_extra(&t, trim_to);
+ encode_trim_extra(t, trim_to);
bufferlist bl;
- t.encode(bl);
+ t->encode(bl);
paxos->propose_new_value(bl, NULL);
}
-void PaxosService::trim(MonitorDBStore::Transaction *t,
+void PaxosService::trim(MonitorDBStore::TransactionRef t,
version_t from, version_t to)
{
dout(10) << __func__ << " from " << from << " to " << to << dendl;
*
* @param t The transaction to hold all changes.
*/
- virtual void encode_pending(MonitorDBStore::Transaction *t) = 0;
+ virtual void encode_pending(MonitorDBStore::TransactionRef t) = 0;
/**
* Discard the pending state
* @param from the lower limit of the interval to be trimmed
* @param to the upper limit of the interval to be trimmed (not including)
*/
- void trim(MonitorDBStore::Transaction *t, version_t from, version_t to);
+ void trim(MonitorDBStore::TransactionRef t, version_t from, version_t to);
/**
* encode service-specific extra bits into trim transaction
* @param tx transaction
* @param first new first_committed value
*/
- virtual void encode_trim_extra(MonitorDBStore::Transaction *tx, version_t first) {}
+ virtual void encode_trim_extra(MonitorDBStore::TransactionRef tx,
+ version_t first) {}
/**
* Get the version we should trim to.
*
* @param t Transaction on which the full version shall be encoded.
*/
- virtual void encode_full(MonitorDBStore::Transaction *t) = 0;
+ virtual void encode_full(MonitorDBStore::TransactionRef t) = 0;
/**
* @}
* purposes
* @{
*/
- void put_first_committed(MonitorDBStore::Transaction *t, version_t ver) {
+ void put_first_committed(MonitorDBStore::TransactionRef t, version_t ver) {
t->put(get_service_name(), first_committed_name, ver);
}
/**
* @param t A transaction to which we add this put operation
* @param ver The last committed version number being put
*/
- void put_last_committed(MonitorDBStore::Transaction *t, version_t ver) {
+ void put_last_committed(MonitorDBStore::TransactionRef t, version_t ver) {
t->put(get_service_name(), last_committed_name, ver);
/* We only need to do this once, and that is when we are about to make our
* @param ver The version to which we will add the value
* @param bl A bufferlist containing the version's value
*/
- void put_version(MonitorDBStore::Transaction *t, version_t ver,
+ void put_version(MonitorDBStore::TransactionRef t, version_t ver,
bufferlist& bl) {
t->put(get_service_name(), ver, bl);
}
* @param ver A version number
* @param bl A bufferlist containing the version's value
*/
- void put_version_full(MonitorDBStore::Transaction *t,
+ void put_version_full(MonitorDBStore::TransactionRef t,
version_t ver, bufferlist& bl) {
string key = mon->store->combine_strings(full_prefix_name, ver);
t->put(get_service_name(), key, bl);
* @param t The transaction to which we will add this put operation
* @param ver A version number
*/
- void put_version_latest_full(MonitorDBStore::Transaction *t, version_t ver) {
+ void put_version_latest_full(MonitorDBStore::TransactionRef t, version_t ver) {
string key = mon->store->combine_strings(full_prefix_name, full_latest_name);
t->put(get_service_name(), key, ver);
}
* @param key The key to which we will add the value
* @param bl A bufferlist containing the value
*/
- void put_value(MonitorDBStore::Transaction *t, const string& key, bufferlist& bl) {
+ void put_value(MonitorDBStore::TransactionRef t,
+ const string& key, bufferlist& bl) {
t->put(get_service_name(), key, bl);
}
class TraceIter {
int fd;
unsigned idx;
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t;
public:
TraceIter(string fname) : fd(-1), idx(-1) {
fd = ::open(fname.c_str(), O_RDONLY);
+ t.reset(new MonitorDBStore::Transaction);
}
bool valid() {
return fd != -1;
}
- const MonitorDBStore::Transaction &cur() {
+ MonitorDBStore::TransactionRef cur() {
assert(valid());
return t;
}
return;
}
bliter = bl.begin();
- t.decode(bliter);
+ t.reset(new MonitorDBStore::Transaction);
+ t->decode(bliter);
}
void init() {
next();
if (bl.length() == 0)
break;
cout << "\n--- " << v << " ---" << std::endl;
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
Paxos::decode_append_transaction(tx, bl);
JSONFormatter f(true);
- tx.dump(&f);
+ tx->dump(&f);
f.flush(cout);
}
} else if (cmd == "dump-trace") {
}
if (iter.num() >= dstart) {
JSONFormatter f(true);
- iter.cur().dump(&f, false);
+ iter.cur()->dump(&f, false);
f.flush(std::cout);
std::cout << std::endl;
}
unsigned num = 0;
for (unsigned i = 0; i < ntrans; ++i) {
std::cerr << "Applying trans " << i << std::endl;
- MonitorDBStore::Transaction t;
+ MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
string prefix;
prefix.push_back((i%26)+'a');
for (unsigned j = 0; j < tsize; ++j) {
os << num;
bufferlist bl;
for (unsigned k = 0; k < tvalsize; ++k) bl.append(rand());
- t.put(prefix, os.str(), bl);
+ t->put(prefix, os.str(), bl);
++num;
}
- t.compact_prefix(prefix);
+ t->compact_prefix(prefix);
st.apply_transaction(t);
}
} else if (cmd == "store-copy") {
do {
uint64_t num_keys = 0;
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
while (it->valid() && num_keys < 128) {
pair<string,string> k = it->raw_key();
bufferlist v = it->value();
- tx.put(k.first, k.second, v);
+ tx->put(k.first, k.second, v);
num_keys ++;
total_tx ++;
total_keys += num_keys;
- if (!tx.empty())
+ if (!tx->empty())
out_store.apply_transaction(tx);
std::cout << "copied " << total_keys << " keys so far ("
}
void _mark_convert_start() {
- MonitorDBStore::Transaction tx;
- tx.put("mon_convert", "on_going", 1);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put("mon_convert", "on_going", 1);
db->apply_transaction(tx);
}
void _mark_convert_finish() {
- MonitorDBStore::Transaction tx;
- tx.erase("mon_convert", "on_going");
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->erase("mon_convert", "on_going");
db->apply_transaction(tx);
}
assert(store->exists_bl_ss("feature_set"));
assert(store->exists_bl_ss("election_epoch"));
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
if (store->exists_bl_ss("joined")) {
version_t joined = store->get_int("joined");
- tx.put(MONITOR_NAME, "joined", joined);
+ tx->put(MONITOR_NAME, "joined", joined);
}
vector<string> keys;
bufferlist bl;
int r = store->get_bl_ss(bl, (*it).c_str(), 0);
assert(r > 0);
- tx.put(MONITOR_NAME, *it, bl);
+ tx->put(MONITOR_NAME, *it, bl);
}
- assert(!tx.empty());
+ assert(!tx->empty());
db->apply_transaction(tx);
}
std::cout << __func__ << " " << machine
<< " ver " << ver << " bl " << bl.length() << std::endl;
- MonitorDBStore::Transaction tx;
- tx.put(machine, ver, bl);
- tx.put(machine, "last_committed", ver);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put(machine, ver, bl);
+ tx->put(machine, "last_committed", ver);
if (has_gv && store->exists_bl_sn(machine_gv.c_str(), ver)) {
stringstream s;
}
bufferlist tx_bl;
- tx.encode(tx_bl);
- tx.put("paxos", gv, tx_bl);
+ tx->encode(tx_bl);
+ tx->put("paxos", gv, tx_bl);
}
db->apply_transaction(tx);
}
version_t lc = db->get(machine, "last_committed");
assert(lc == last_committed);
- MonitorDBStore::Transaction tx;
- tx.put(machine, "first_committed", first_committed);
- tx.put(machine, "last_committed", last_committed);
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
+ tx->put(machine, "first_committed", first_committed);
+ tx->put(machine, "last_committed", last_committed);
if (store->exists_bl_ss(machine.c_str(), "latest")) {
bufferlist latest_bl_raw;
goto out;
}
- tx.put(machine, "latest", latest_bl_raw);
+ tx->put(machine, "latest", latest_bl_raw);
bufferlist::iterator lbl_it = latest_bl_raw.begin();
bufferlist latest_bl;
std::cout << __func__ << " machine " << machine
<< " latest ver " << latest_ver << std::endl;
- tx.put(machine, "full_latest", latest_ver);
+ tx->put(machine, "full_latest", latest_ver);
stringstream os;
os << "full_" << latest_ver;
- tx.put(machine, os.str(), latest_bl);
+ tx->put(machine, os.str(), latest_bl);
}
out:
db->apply_transaction(tx);
// erase all paxos versions between [first, last_gv[, with first being the
// first gv in the map.
- MonitorDBStore::Transaction tx;
+ MonitorDBStore::TransactionRef tx(new MonitorDBStore::Transaction);
set<version_t>::iterator it = gvs.begin();
std::cout << __func__ << " first gv " << (*it)
<< " last gv " << last_gv << std::endl;
for (; it != gvs.end() && (*it < last_gv); ++it) {
- tx.erase("paxos", *it);
+ tx->erase("paxos", *it);
}
- tx.put("paxos", "first_committed", last_gv);
- tx.put("paxos", "last_committed", highest_gv);
- tx.put("paxos", "accepted_pn", highest_accepted_pn);
- tx.put("paxos", "last_pn", highest_last_pn);
+ tx->put("paxos", "first_committed", last_gv);
+ tx->put("paxos", "last_committed", highest_gv);
+ tx->put("paxos", "accepted_pn", highest_accepted_pn);
+ tx->put("paxos", "last_pn", highest_last_pn);
db->apply_transaction(tx);
}