f->close_section();
}
-PGBackend::PGTransaction *ECBackend::get_transaction()
-{
- return new ECTransaction;
-}
-
-struct MustPrependHashInfo : public ObjectModDesc::Visitor {
- enum { EMPTY, FOUND_APPEND, FOUND_CREATE_STASH } state;
- MustPrependHashInfo() : state(EMPTY) {}
- void append(uint64_t) {
- if (state == EMPTY) {
- state = FOUND_APPEND;
- }
- }
- void rmobject(version_t) {
- if (state == EMPTY) {
- state = FOUND_CREATE_STASH;
- }
- }
- void create() {
- if (state == EMPTY) {
- state = FOUND_CREATE_STASH;
- }
- }
- bool must_prepend_hash_info() const { return state == FOUND_APPEND; }
-};
-
void ECBackend::submit_transaction(
const hobject_t &hoid,
const eversion_t &at_version,
op->reqid = reqid;
op->client_op = client_op;
- op->t.reset(static_cast<ECTransaction*>(_t.release()));
+ op->t = std::move(_t);
set<hobject_t, hobject_t::BitwiseComparator> need_hinfos;
- op->t->get_append_objects(&need_hinfos);
+ ECTransaction::get_append_objects(*(op->t), &need_hinfos);
for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = need_hinfos.begin();
i != need_hinfos.end();
++i) {
ref));
}
- for (vector<pg_log_entry_t>::iterator i = op->log_entries.begin();
- i != op->log_entries.end();
- ++i) {
- MustPrependHashInfo vis;
- i->mod_desc.visit(&vis);
- if (vis.must_prepend_hash_info()) {
- dout(10) << __func__ << ": stashing HashInfo for "
- << i->soid << " for entry " << *i << dendl;
- assert(op->unstable_hash_infos.count(i->soid));
- ObjectModDesc desc;
- map<string, boost::optional<bufferlist> > old_attrs;
- bufferlist old_hinfo;
- ::encode(*(op->unstable_hash_infos[i->soid]), old_hinfo);
- old_attrs[ECUtil::get_hinfo_key()] = old_hinfo;
- desc.setattrs(old_attrs);
- i->mod_desc.swap(desc);
- i->mod_desc.claim_append(desc);
- assert(i->mod_desc.can_rollback());
- }
- }
-
dout(10) << __func__ << ": op " << *op << " starting" << dendl;
start_write(op);
writing.push_back(op);
}
ObjectStore::Transaction empty;
- op->t->generate_transactions(
+ ECTransaction::generate_transactions(
+ *(op->t),
op->unstable_hash_infos,
ec_impl,
get_parent()->get_info().pgid.pgid,
sinfo,
+ op->log_entries,
&trans,
&(op->temp_added),
&(op->temp_cleared));
struct ECSubWriteReply;
struct ECSubRead;
struct ECSubReadReply;
-class ECTransaction;
struct RecoveryMessages;
class ECBackend : public PGBackend {
void dump_recovery_info(Formatter *f) const;
- /// @see osd/ECTransaction.cc/h
- PGTransaction *get_transaction();
-
void submit_transaction(
const hobject_t &hoid,
const eversion_t &at_version,
eversion_t trim_to;
eversion_t trim_rollback_to;
vector<pg_log_entry_t> log_entries;
+ map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
Context *on_local_applied_sync;
Context *on_all_applied;
osd_reqid_t reqid;
OpRequestRef client_op;
- std::unique_ptr<ECTransaction> t;
+ std::unique_ptr<PGTransaction> t;
set<hobject_t, hobject_t::BitwiseComparator> temp_added;
set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
#include <iostream>
#include <vector>
+#include <vector>
#include <sstream>
#include "ECTransaction.h"
#include "ECUtil.h"
#include "os/ObjectStore.h"
+#include "common/inline_variant.h"
-struct AppendObjectsGenerator: public boost::static_visitor<void> {
- set<hobject_t, hobject_t::BitwiseComparator> *out;
- explicit AppendObjectsGenerator(set<hobject_t, hobject_t::BitwiseComparator> *out) : out(out) {}
- void operator()(const ECTransaction::AppendOp &op) {
- out->insert(op.oid);
- }
- void operator()(const ECTransaction::TouchOp &op) {
- out->insert(op.oid);
- }
- void operator()(const ECTransaction::CloneOp &op) {
- out->insert(op.source);
- out->insert(op.target);
- }
- void operator()(const ECTransaction::RenameOp &op) {
- out->insert(op.source);
- out->insert(op.destination);
- }
- void operator()(const ECTransaction::StashOp &op) {
- out->insert(op.oid);
- }
- void operator()(const ECTransaction::RemoveOp &op) {
- out->insert(op.oid);
- }
- void operator()(const ECTransaction::SetAttrsOp &op) {}
- void operator()(const ECTransaction::RmAttrOp &op) {}
- void operator()(const ECTransaction::AllocHintOp &op) {}
- void operator()(const ECTransaction::NoOp &op) {}
-};
void ECTransaction::get_append_objects(
- set<hobject_t, hobject_t::BitwiseComparator> *out) const
+ const PGTransaction &t,
+ set<hobject_t, hobject_t::BitwiseComparator> *out)
{
- AppendObjectsGenerator gen(out);
- reverse_visit(gen);
+ for (auto &&i: t.op_map) {
+ out->insert(i.first);
+ hobject_t source;
+ if (i.second.has_source(&source))
+ out->insert(source);
+ }
}
-struct TransGenerator : public boost::static_visitor<void> {
- map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos;
+void append(
+ pg_t pgid,
+ const hobject_t &oid,
+ const ECUtil::stripe_info_t &sinfo,
+ ErasureCodeInterfaceRef &ecimpl,
+ const set<int> &want,
+ uint64_t offset,
+ bufferlist &bl,
+ uint32_t flags,
+ ECUtil::HashInfoRef hinfo,
+ map<shard_id_t, ObjectStore::Transaction> *transactions) {
- ErasureCodeInterfaceRef &ecimpl;
- const pg_t pgid;
- const ECUtil::stripe_info_t sinfo;
- map<shard_id_t, ObjectStore::Transaction> *trans;
- set<int> want;
- set<hobject_t, hobject_t::BitwiseComparator> *temp_added;
- set<hobject_t, hobject_t::BitwiseComparator> *temp_removed;
- stringstream *out;
- TransGenerator(
- map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
- ErasureCodeInterfaceRef &ecimpl,
- pg_t pgid,
- const ECUtil::stripe_info_t &sinfo,
- map<shard_id_t, ObjectStore::Transaction> *trans,
- set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
- set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
- stringstream *out)
- : hash_infos(hash_infos),
- ecimpl(ecimpl), pgid(pgid),
- sinfo(sinfo),
- trans(trans),
- temp_added(temp_added), temp_removed(temp_removed),
- out(out) {
- for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
- want.insert(i);
- }
- }
+ assert(bl.length());
+ assert(offset % sinfo.get_stripe_width() == 0);
+ assert(
+ sinfo.aligned_logical_offset_to_chunk_offset(offset) ==
+ hinfo->get_total_chunk_size());
+ map<int, bufferlist> buffers;
- coll_t get_coll_ct(shard_id_t shard, const hobject_t &hoid) {
- if (hoid.is_temp()) {
- temp_removed->erase(hoid);
- temp_added->insert(hoid);
- }
- return get_coll(shard);
- }
- coll_t get_coll_rm(shard_id_t shard, const hobject_t &hoid) {
- if (hoid.is_temp()) {
- temp_added->erase(hoid);
- temp_removed->insert(hoid);
- }
- return get_coll(shard);
- }
- coll_t get_coll(shard_id_t shard) {
- return coll_t(spg_t(pgid, shard));
+ // align
+ if (bl.length() % sinfo.get_stripe_width())
+ bl.append_zero(
+ sinfo.get_stripe_width() -
+ ((offset + bl.length()) % sinfo.get_stripe_width()));
+ int r = ECUtil::encode(
+ sinfo, ecimpl, bl, want, &buffers);
+
+ hinfo->append(
+ sinfo.aligned_logical_offset_to_chunk_offset(offset),
+ buffers);
+ bufferlist hbuf;
+ ::encode(*hinfo, hbuf);
+
+ assert(r == 0);
+ for (auto &&i : *transactions) {
+ assert(buffers.count(i.first));
+ bufferlist &enc_bl = buffers[i.first];
+ i.second.set_alloc_hint(
+ coll_t(spg_t(pgid, i.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+ 0, 0,
+ CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
+ CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+ i.second.write(
+ coll_t(spg_t(pgid, i.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+ sinfo.logical_to_prev_chunk_offset(
+ offset),
+ enc_bl.length(),
+ enc_bl,
+ flags);
+ i.second.setattr(
+ coll_t(spg_t(pgid, i.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+ ECUtil::get_hinfo_key(),
+ hbuf);
}
+}
- void operator()(const ECTransaction::TouchOp &op) {
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.touch(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first));
+void ECTransaction::generate_transactions(
+ PGTransaction &t,
+ map<
+ hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
+ > &hash_infos,
+ ErasureCodeInterfaceRef &ecimpl,
+ pg_t pgid,
+ const ECUtil::stripe_info_t &sinfo,
+ vector<pg_log_entry_t> &entries,
+ map<shard_id_t, ObjectStore::Transaction> *transactions,
+ set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
+ set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
+ stringstream *out)
+{
+ assert(transactions);
+ assert(temp_added);
+ assert(temp_removed);
- /* No change, but write it out anyway in case the object did not
- * previously exist. */
- assert(hash_infos.count(op.oid));
- ECUtil::HashInfoRef hinfo = hash_infos[op.oid];
- bufferlist hbuf;
- ::encode(
- *hinfo,
- hbuf);
- i->second.setattr(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- ECUtil::get_hinfo_key(),
- hbuf);
- }
+ map<hobject_t, pg_log_entry_t*, hobject_t::BitwiseComparator> obj_to_log;
+ for (auto &&i: entries) {
+ obj_to_log.insert(make_pair(i.soid, &i));
}
- void operator()(const ECTransaction::AppendOp &op) {
- uint64_t offset = op.off;
- bufferlist bl(op.bl);
- assert(bl.length());
- assert(offset % sinfo.get_stripe_width() == 0);
- map<int, bufferlist> buffers;
- assert(hash_infos.count(op.oid));
- ECUtil::HashInfoRef hinfo = hash_infos[op.oid];
+ t.safe_create_traverse(
+ [&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
+ const hobject_t &oid = opair.first;
- // align
- if (bl.length() % sinfo.get_stripe_width())
- bl.append_zero(
- sinfo.get_stripe_width() -
- ((offset + bl.length()) % sinfo.get_stripe_width()));
- assert(bl.length() - op.bl.length() < sinfo.get_stripe_width());
- int r = ECUtil::encode(
- sinfo, ecimpl, bl, want, &buffers);
+ auto iter = obj_to_log.find(oid);
+ pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
- hinfo->append(
- sinfo.aligned_logical_offset_to_chunk_offset(op.off),
- buffers);
- bufferlist hbuf;
- ::encode(
- *hinfo,
- hbuf);
+ if (entry && opair.second.updated_snaps) {
+ entry->mod_desc.update_snaps(opair.second.updated_snaps->first);
+ vector<snapid_t> snaps(
+ opair.second.updated_snaps->second.begin(),
+ opair.second.updated_snaps->second.end());
+ ::encode(snaps, entry->snaps);
+ }
- assert(r == 0);
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- assert(buffers.count(i->first));
- bufferlist &enc_bl = buffers[i->first];
- i->second.set_alloc_hint(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- 0, 0,
- CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
- CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
- i->second.write(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- sinfo.logical_to_prev_chunk_offset(
- offset),
- enc_bl.length(),
- enc_bl,
- op.fadvise_flags);
- i->second.setattr(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- ECUtil::get_hinfo_key(),
- hbuf);
+ ObjectContextRef obc;
+ auto obiter = t.obc_map.find(oid);
+ if (obiter != t.obc_map.end()) {
+ obc = obiter->second;
}
- }
- void operator()(const ECTransaction::CloneOp &op) {
- assert(hash_infos.count(op.source));
- assert(hash_infos.count(op.target));
- *(hash_infos[op.target]) = *(hash_infos[op.source]);
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.clone(
- get_coll_ct(i->first, op.source),
- ghobject_t(op.source, ghobject_t::NO_GEN, i->first),
- ghobject_t(op.target, ghobject_t::NO_GEN, i->first));
+ if (entry) {
+ assert(obc);
+ } else {
+ assert(oid.is_temp());
}
- }
- void operator()(const ECTransaction::RenameOp &op) {
- assert(hash_infos.count(op.source));
- assert(hash_infos.count(op.destination));
- *(hash_infos[op.destination]) = *(hash_infos[op.source]);
- hash_infos[op.source]->clear();
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.collection_move_rename(
- get_coll_rm(i->first, op.source),
- ghobject_t(op.source, ghobject_t::NO_GEN, i->first),
- get_coll_ct(i->first, op.destination),
- ghobject_t(op.destination, ghobject_t::NO_GEN, i->first));
+
+ map<string, boost::optional<bufferlist> > xattr_rollback;
+ ECUtil::HashInfoRef hinfo;
+ {
+ auto iter = hash_infos.find(oid);
+ assert(iter != hash_infos.end());
+ hinfo = iter->second;
+ bufferlist old_hinfo;
+ ::encode(*hinfo, old_hinfo);
+ xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
}
- }
- void operator()(const ECTransaction::StashOp &op) {
- assert(hash_infos.count(op.oid));
- hash_infos[op.oid]->clear();
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- coll_t cid(get_coll_rm(i->first, op.oid));
- i->second.collection_move_rename(
- cid,
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- cid,
- ghobject_t(op.oid, op.version, i->first));
+
+ if (opair.second.is_none() && opair.second.truncate) {
+ assert(opair.second.truncate->first == 0);
+ assert(opair.second.truncate->first ==
+ opair.second.truncate->second);
+ assert(entry);
+ assert(obc);
+
+ opair.second.truncate = boost::none;
+ opair.second.delete_first = true;
+ opair.second.init_type = PGTransaction::ObjectOperation::Init::Create();
+
+ if (obc) {
+ /* We need to reapply all of the cached xattrs.
+ * std::map insert fortunately only writes keys
+ * which don't already exist, so this should do
+ * the right thing. */
+ opair.second.attr_updates.insert(
+ obc->attr_cache.begin(),
+ obc->attr_cache.end());
+ }
}
- }
- void operator()(const ECTransaction::RemoveOp &op) {
- assert(hash_infos.count(op.oid));
- hash_infos[op.oid]->clear();
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.remove(
- get_coll_rm(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first));
+
+ if (oid.is_temp()) {
+ if (opair.second.is_fresh_object()) {
+ temp_added->insert(oid);
+ } else if (opair.second.is_delete()) {
+ temp_removed->insert(oid);
+ }
}
- }
- void operator()(const ECTransaction::SetAttrsOp &op) {
- map<string, bufferlist> attrs(op.attrs);
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.setattrs(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- attrs);
+
+ if (opair.second.delete_first) {
+ /* We also want to remove the boost::none entries since
+ * the keys already won't exist */
+ for (auto j = opair.second.attr_updates.begin();
+ j != opair.second.attr_updates.end();
+ ) {
+ if (j->second) {
+ ++j;
+ } else {
+ opair.second.attr_updates.erase(j++);
+ }
+ }
+ /* Fill in all current entries for xattr rollback */
+ if (obc) {
+ xattr_rollback.insert(
+ obc->attr_cache.begin(),
+ obc->attr_cache.end());
+ obc->attr_cache.clear();
+ }
+ if (entry) {
+ entry->mod_desc.rmobject(entry->version.version);
+ for (auto &&st: *transactions) {
+ st.second.collection_move_rename(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, entry->version.version, st.first));
+ }
+ } else {
+ for (auto &&st: *transactions) {
+ st.second.remove(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+ }
+ hinfo->clear();
}
- }
- void operator()(const ECTransaction::RmAttrOp &op) {
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.rmattr(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- op.key);
+
+ if (opair.second.is_fresh_object() && entry) {
+ entry->mod_desc.create();
}
- }
- void operator()(const ECTransaction::AllocHintOp &op) {
- // logical_to_next_chunk_offset() scales down both aligned and
- // unaligned offsets
- uint64_t object_size = sinfo.logical_to_next_chunk_offset(
- op.expected_object_size);
- uint64_t write_size = sinfo.logical_to_next_chunk_offset(
- op.expected_write_size);
- for (map<shard_id_t, ObjectStore::Transaction>::iterator i = trans->begin();
- i != trans->end();
- ++i) {
- i->second.set_alloc_hint(
- get_coll_ct(i->first, op.oid),
- ghobject_t(op.oid, ghobject_t::NO_GEN, i->first),
- object_size, write_size, op.flags);
+ match(
+ opair.second.init_type,
+ [&](const PGTransaction::ObjectOperation::Init::None &) {},
+ [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+ for (auto &&st: *transactions) {
+ st.second.touch(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+ for (auto &&st: *transactions) {
+ st.second.clone(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+
+ auto siter = hash_infos.find(op.source);
+ assert(siter != hash_infos.end());
+ *hinfo = *(siter->second);
+
+ if (obc) {
+ auto cobciter = t.obc_map.find(op.source);
+ assert(cobciter != t.obc_map.end());
+ obc->attr_cache = cobciter->second->attr_cache;
+ }
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+ assert(op.source.is_temp());
+ for (auto &&st: *transactions) {
+ st.second.collection_move_rename(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+ auto siter = hash_infos.find(op.source);
+ assert(siter != hash_infos.end());
+ *hinfo = *(siter->second);
+ assert(obc->attr_cache.empty());
+ });
+
+ // omap, truncate not supported (except 0, handled above)
+ assert(!(opair.second.clear_omap));
+ assert(!(opair.second.truncate));
+ assert(!(opair.second.omap_header));
+ assert(opair.second.omap_updates.empty());
+
+ if (!opair.second.attr_updates.empty()) {
+ map<string, bufferlist> to_set;
+ for (auto &&j: opair.second.attr_updates) {
+ if (j.second) {
+ to_set[j.first] = *(j.second);
+ } else {
+ for (auto &&st : *transactions) {
+ st.second.rmattr(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ j.first);
+ }
+ }
+ if (obc) {
+ auto citer = obc->attr_cache.find(j.first);
+ if (entry) {
+ if (citer != obc->attr_cache.end()) {
+ // won't overwrite anything we put in earlier
+ xattr_rollback.insert(
+ make_pair(
+ j.first,
+ boost::optional<bufferlist>(citer->second)));
+ } else {
+ // won't overwrite anything we put in earlier
+ xattr_rollback.insert(
+ make_pair(
+ j.first,
+ boost::none));
+ }
+ }
+ if (j.second) {
+ obc->attr_cache[j.first] = *(j.second);
+ } else if (citer != obc->attr_cache.end()) {
+ obc->attr_cache.erase(citer);
+ }
+ } else {
+ assert(!entry);
+ }
+ }
+ for (auto &&st : *transactions) {
+ st.second.setattrs(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ to_set);
+ }
+ assert(!xattr_rollback.empty());
+ }
+ if (entry && !xattr_rollback.empty()) {
+ entry->mod_desc.setattrs(xattr_rollback);
}
- }
- void operator()(const ECTransaction::NoOp &op) {}
-};
+ if (opair.second.alloc_hint) {
+ /* logical_to_next_chunk_offset() scales down both aligned and
+ * unaligned offsets
-void ECTransaction::generate_transactions(
- map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
- ErasureCodeInterfaceRef &ecimpl,
- pg_t pgid,
- const ECUtil::stripe_info_t &sinfo,
- map<shard_id_t, ObjectStore::Transaction> *transactions,
- set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
- set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
- stringstream *out) const
-{
- TransGenerator gen(
- hash_infos,
- ecimpl,
- pgid,
- sinfo,
- transactions,
- temp_added,
- temp_removed,
- out);
- visit(gen);
+ * we don't bother to roll this back at this time for two reasons:
+ * 1) it's advisory
+ * 2) we don't track the old value */
+ uint64_t object_size = sinfo.logical_to_next_chunk_offset(
+ opair.second.alloc_hint->expected_object_size);
+ uint64_t write_size = sinfo.logical_to_next_chunk_offset(
+ opair.second.alloc_hint->expected_write_size);
+
+ for (auto &&st : *transactions) {
+ st.second.set_alloc_hint(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ object_size,
+ write_size,
+ opair.second.alloc_hint->flags);
+ }
+ }
+
+
+ if (!opair.second.buffer_updates.empty()) {
+ set<int> want;
+ for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
+ want.insert(i);
+ }
+ if (entry) {
+ entry->mod_desc.append(
+ sinfo.aligned_chunk_offset_to_logical_offset(
+ hinfo->get_total_chunk_size()
+ ));
+ }
+ for (auto &&extent: opair.second.buffer_updates) {
+ using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+ match(
+ extent.get_val(),
+ [&](const BufferUpdate::Write &op) {
+ if (extent.get_len()) {
+ assert(op.buffer.length() == extent.get_len());
+ bufferlist bl = op.buffer;
+ append(
+ pgid,
+ oid,
+ sinfo,
+ ecimpl,
+ want,
+ extent.get_off(),
+ bl,
+ op.fadvise_flags,
+ hinfo,
+ transactions);
+ }
+ },
+ [&](const BufferUpdate::Zero &) {
+ assert(
+ 0 ==
+ "Zero is not allowed, do_op should have returned ENOTSUPP");
+ },
+ [&](const BufferUpdate::CloneRange &) {
+ assert(
+ 0 ==
+ "CloneRange is not allowed, do_op should have returned ENOTSUPP");
+ });
+ }
+ }
+ });
}
#include "PGBackend.h"
#include "ECUtil.h"
#include "erasure-code/ErasureCodeInterface.h"
+#include "PGTransaction.h"
-class ECTransaction : public PGBackend::PGTransaction {
-public:
- struct AppendOp {
- hobject_t oid;
- uint64_t off;
- bufferlist bl;
- uint32_t fadvise_flags;
- AppendOp(const hobject_t &oid, uint64_t off, bufferlist &bl, uint32_t flags)
- : oid(oid), off(off), bl(bl), fadvise_flags(flags) {}
- };
- struct CloneOp {
- hobject_t source;
- hobject_t target;
- CloneOp(const hobject_t &source, const hobject_t &target)
- : source(source), target(target) {}
- };
- struct RenameOp {
- hobject_t source;
- hobject_t destination;
- RenameOp(const hobject_t &source, const hobject_t &destination)
- : source(source), destination(destination) {}
- };
- struct StashOp {
- hobject_t oid;
- version_t version;
- StashOp(const hobject_t &oid, version_t version)
- : oid(oid), version(version) {}
- };
- struct TouchOp {
- hobject_t oid;
- explicit TouchOp(const hobject_t &oid) : oid(oid) {}
- };
- struct RemoveOp {
- hobject_t oid;
- explicit RemoveOp(const hobject_t &oid) : oid(oid) {}
- };
- struct SetAttrsOp {
- hobject_t oid;
- map<string, bufferlist> attrs;
- SetAttrsOp(const hobject_t &oid, map<string, bufferlist> &_attrs)
- : oid(oid) {
- attrs.swap(_attrs);
- }
- SetAttrsOp(const hobject_t &oid, const string &key, bufferlist &val)
- : oid(oid) {
- attrs.insert(make_pair(key, val));
- }
- };
- struct RmAttrOp {
- hobject_t oid;
- string key;
- RmAttrOp(const hobject_t &oid, const string &key) : oid(oid), key(key) {}
- };
- struct AllocHintOp {
- hobject_t oid;
- uint64_t expected_object_size;
- uint64_t expected_write_size;
- uint32_t flags;
- AllocHintOp(const hobject_t &oid,
- uint64_t expected_object_size,
- uint64_t expected_write_size,
- uint32_t flags)
- : oid(oid),
- expected_object_size(expected_object_size),
- expected_write_size(expected_write_size),
- flags(flags) {}
- };
- struct NoOp {};
- typedef boost::variant<
- AppendOp,
- CloneOp,
- RenameOp,
- StashOp,
- TouchOp,
- RemoveOp,
- SetAttrsOp,
- RmAttrOp,
- AllocHintOp,
- NoOp> Op;
- list<Op> ops;
- uint64_t written;
-
- ECTransaction() : written(0) {}
- /// Write
- void touch(
- const hobject_t &hoid) {
- ops.push_back(TouchOp(hoid));
- }
- void append(
- const hobject_t &hoid,
- uint64_t off,
- uint64_t len,
- bufferlist &bl,
- uint32_t fadvise_flags) {
- if (len == 0) {
- touch(hoid);
- return;
- }
- written += len;
- assert(len == bl.length());
- ops.push_back(AppendOp(hoid, off, bl, fadvise_flags));
- }
- void stash(
- const hobject_t &hoid,
- version_t former_version) {
- ops.push_back(StashOp(hoid, former_version));
- }
- void remove(
- const hobject_t &hoid) {
- ops.push_back(RemoveOp(hoid));
- }
- void setattrs(
- const hobject_t &hoid,
- map<string, bufferlist> &attrs) {
- ops.push_back(SetAttrsOp(hoid, attrs));
- }
- void setattr(
- const hobject_t &hoid,
- const string &attrname,
- bufferlist &bl) {
- ops.push_back(SetAttrsOp(hoid, attrname, bl));
- }
- void rmattr(
- const hobject_t &hoid,
- const string &attrname) {
- ops.push_back(RmAttrOp(hoid, attrname));
- }
- void clone(
- const hobject_t &from,
- const hobject_t &to) {
- ops.push_back(CloneOp(from, to));
- }
- void rename(
- const hobject_t &from,
- const hobject_t &to) {
- ops.push_back(RenameOp(from, to));
- }
- void set_alloc_hint(
- const hobject_t &hoid,
- uint64_t expected_object_size,
- uint64_t expected_write_size,
- uint32_t flags) {
- ops.push_back(AllocHintOp(hoid, expected_object_size, expected_write_size,
- flags));
- }
-
- void append(PGTransaction *_to_append) {
- ECTransaction *to_append = static_cast<ECTransaction*>(_to_append);
- written += to_append->written;
- to_append->written = 0;
- ops.splice(ops.end(), to_append->ops,
- to_append->ops.begin(), to_append->ops.end());
- }
- void nop() {
- ops.push_back(NoOp());
- }
- bool empty() const {
- return ops.empty();
- }
- uint64_t get_bytes_written() const {
- return written;
- }
- template <typename T>
- void visit(T &vis) const {
- for (list<Op>::const_iterator i = ops.begin(); i != ops.end(); ++i) {
- boost::apply_visitor(vis, *i);
- }
- }
- template <typename T>
- void reverse_visit(T &vis) const {
- for (list<Op>::const_reverse_iterator i = ops.rbegin();
- i != ops.rend();
- ++i) {
- boost::apply_visitor(vis, *i);
- }
- }
+namespace ECTransaction {
void get_append_objects(
- set<hobject_t, hobject_t::BitwiseComparator> *out) const;
+ const PGTransaction &t,
+ set<hobject_t, hobject_t::BitwiseComparator> *out);
void generate_transactions(
- map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> &hash_infos,
+ PGTransaction &t,
+ map<
+ hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
+ > &hash_infos,
ErasureCodeInterfaceRef &ecimpl,
pg_t pgid,
const ECUtil::stripe_info_t &sinfo,
+ vector<pg_log_entry_t> &entries,
map<shard_id_t, ObjectStore::Transaction> *transactions,
set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
- stringstream *out = 0) const;
+ stringstream *out = 0);
};
#endif
#include "os/ObjectStore.h"
#include "common/LogClient.h"
#include <string>
+#include "PGTransaction.h"
namespace Scrub {
class Store;
virtual ~PGBackend() {}
- /**
- * Client IO Interface
- */
- class PGTransaction {
- public:
- /// Write
- virtual void touch(
- const hobject_t &hoid ///< [in] obj to touch
- ) = 0;
- virtual void stash(
- const hobject_t &hoid, ///< [in] obj to remove
- version_t former_version ///< [in] former object version
- ) = 0;
- virtual void remove(
- const hobject_t &hoid ///< [in] obj to remove
- ) = 0;
- virtual void setattrs(
- const hobject_t &hoid, ///< [in] object to write
- map<string, bufferlist> &attrs ///< [in] attrs, may be cleared
- ) = 0;
- virtual void setattr(
- const hobject_t &hoid, ///< [in] object to write
- const string &attrname, ///< [in] attr to write
- bufferlist &bl ///< [in] val to write, may be claimed
- ) = 0;
- virtual void rmattr(
- const hobject_t &hoid, ///< [in] object to write
- const string &attrname ///< [in] attr to remove
- ) = 0;
- virtual void clone(
- const hobject_t &from,
- const hobject_t &to
- ) = 0;
- virtual void rename(
- const hobject_t &from,
- const hobject_t &to
- ) = 0;
- virtual void set_alloc_hint(
- const hobject_t &hoid,
- uint64_t expected_object_size,
- uint64_t expected_write_size,
- uint32_t flags
- ) = 0;
-
- /// Optional, not supported on ec-pool
- virtual void write(
- const hobject_t &hoid, ///< [in] object to write
- uint64_t off, ///< [in] off at which to write
- uint64_t len, ///< [in] len to write from bl
- bufferlist &bl, ///< [in] bl to write will be claimed to len
- uint32_t fadvise_flags = 0 ///< [in] fadvise hint
- ) { assert(0); }
- virtual void omap_setkeys(
- const hobject_t &hoid, ///< [in] object to write
- map<string, bufferlist> &keys ///< [in] omap keys, may be cleared
- ) { assert(0); }
- virtual void omap_setkeys(
- const hobject_t &hoid, ///< [in] object to write
- bufferlist &keys_bl ///< [in] omap keys, may be cleared
- ) { assert(0); }
- virtual void omap_rmkeys(
- const hobject_t &hoid, ///< [in] object to write
- set<string> &keys ///< [in] omap keys, may be cleared
- ) { assert(0); }
- virtual void omap_rmkeys(
- const hobject_t &hoid, ///< [in] object to write
- bufferlist &keys_bl ///< [in] omap keys, may be cleared
- ) { assert(0); }
- virtual void omap_clear(
- const hobject_t &hoid ///< [in] object to clear omap
- ) { assert(0); }
- virtual void omap_setheader(
- const hobject_t &hoid, ///< [in] object to write
- bufferlist &header ///< [in] header
- ) { assert(0); }
- virtual void clone_range(
- const hobject_t &from, ///< [in] from
- const hobject_t &to, ///< [in] to
- uint64_t fromoff, ///< [in] offset
- uint64_t len, ///< [in] len
- uint64_t tooff ///< [in] offset
- ) { assert(0); }
- virtual void truncate(
- const hobject_t &hoid,
- uint64_t off
- ) { assert(0); }
- virtual void zero(
- const hobject_t &hoid,
- uint64_t off,
- uint64_t len
- ) { assert(0); }
-
- /// Supported on all backends
-
- /// off must be the current object size
- virtual void append(
- const hobject_t &hoid, ///< [in] object to write
- uint64_t off, ///< [in] off at which to write
- uint64_t len, ///< [in] len to write from bl
- bufferlist &bl, ///< [in] bl to write will be claimed to len
- uint32_t fadvise_flags ///< [in] fadvise hint
- ) { write(hoid, off, len, bl, fadvise_flags); }
-
- /// to_append *must* have come from the same PGBackend (same concrete type)
- virtual void append(
- PGTransaction *to_append ///< [in] trans to append, to_append is cleared
- ) = 0;
- virtual void nop() = 0;
- virtual bool empty() const = 0;
- virtual uint64_t get_bytes_written() const = 0;
- virtual ~PGTransaction() {}
- };
- using PGTransactionUPtr = std::unique_ptr<PGTransaction>;
-
- /// Get implementation specific empty transaction
- virtual PGTransaction *get_transaction() = 0;
-
/// execute implementation specific transaction
virtual void submit_transaction(
const hobject_t &hoid, ///< [in] object
new AsyncReadCallback(r, on_complete)));
}
-
-class RPGTransaction : public PGBackend::PGTransaction {
- coll_t coll;
- set<hobject_t, hobject_t::BitwiseComparator> temp_added;
- set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
- mutable ObjectStore::Transaction t;
- uint64_t written;
- const coll_t &get_coll_ct(const hobject_t &hoid) {
- if (hoid.is_temp()) {
- temp_cleared.erase(hoid);
- temp_added.insert(hoid);
- }
- return get_coll(hoid);
- }
- const coll_t &get_coll_rm(const hobject_t &hoid) {
- if (hoid.is_temp()) {
- temp_added.erase(hoid);
- temp_cleared.insert(hoid);
- }
- return get_coll(hoid);
- }
- const coll_t &get_coll(const hobject_t &hoid) {
- return coll;
- }
-public:
- RPGTransaction(coll_t coll)
- : coll(coll), written(0) {}
-
- /// Yields ownership of contained transaction
- ObjectStore::Transaction&& get_transaction() {
- return std::move(t);
- }
- const set<hobject_t, hobject_t::BitwiseComparator> &get_temp_added() {
- return temp_added;
- }
- const set<hobject_t, hobject_t::BitwiseComparator> &get_temp_cleared() {
- return temp_cleared;
- }
-
- void write(
- const hobject_t &hoid,
- uint64_t off,
- uint64_t len,
- bufferlist &bl,
- uint32_t fadvise_flags
- ) {
- written += len;
- t.write(get_coll_ct(hoid), ghobject_t(hoid), off, len, bl, fadvise_flags);
- }
- void remove(
- const hobject_t &hoid
- ) {
- t.remove(get_coll_rm(hoid), ghobject_t(hoid));
- }
- void stash(
- const hobject_t &hoid,
- version_t former_version) {
- t.collection_move_rename(
- coll, ghobject_t(hoid), coll,
- ghobject_t(hoid, former_version, shard_id_t::NO_SHARD));
- }
- void setattrs(
- const hobject_t &hoid,
- map<string, bufferlist> &attrs
- ) {
- t.setattrs(get_coll(hoid), ghobject_t(hoid), attrs);
- }
- void setattr(
- const hobject_t &hoid,
- const string &attrname,
- bufferlist &bl
- ) {
- t.setattr(get_coll(hoid), ghobject_t(hoid), attrname, bl);
- }
- void rmattr(
- const hobject_t &hoid,
- const string &attrname
- ) {
- t.rmattr(get_coll(hoid), ghobject_t(hoid), attrname);
- }
- void omap_setkeys(
- const hobject_t &hoid,
- map<string, bufferlist> &keys
- ) {
- for (map<string, bufferlist>::iterator p = keys.begin(); p != keys.end(); ++p)
- written += p->first.length() + p->second.length();
- return t.omap_setkeys(get_coll(hoid), ghobject_t(hoid), keys);
- }
- void omap_setkeys(
- const hobject_t &hoid,
- bufferlist &keys_bl
- ) {
- written += keys_bl.length();
- return t.omap_setkeys(get_coll(hoid), ghobject_t(hoid), keys_bl);
- }
- void omap_rmkeys(
- const hobject_t &hoid,
- set<string> &keys
- ) {
- t.omap_rmkeys(get_coll(hoid), ghobject_t(hoid), keys);
- }
- void omap_rmkeys(
- const hobject_t &hoid,
- bufferlist &keys_bl
- ) {
- t.omap_rmkeys(get_coll(hoid), ghobject_t(hoid), keys_bl);
- }
- void omap_clear(
- const hobject_t &hoid
- ) {
- t.omap_clear(get_coll(hoid), ghobject_t(hoid));
- }
- void omap_setheader(
- const hobject_t &hoid,
- bufferlist &header
- ) {
- written += header.length();
- t.omap_setheader(get_coll(hoid), ghobject_t(hoid), header);
- }
- void clone_range(
- const hobject_t &from,
- const hobject_t &to,
- uint64_t fromoff,
- uint64_t len,
- uint64_t tooff
- ) {
- assert(get_coll(from) == get_coll_ct(to) && get_coll(from) == coll);
- t.clone_range(coll, ghobject_t(from), ghobject_t(to), fromoff, len, tooff);
- }
- void clone(
- const hobject_t &from,
- const hobject_t &to
- ) {
- assert(get_coll(from) == get_coll_ct(to) && get_coll(from) == coll);
- t.clone(coll, ghobject_t(from), ghobject_t(to));
- }
- void rename(
- const hobject_t &from,
- const hobject_t &to
- ) {
- t.collection_move_rename(
- get_coll_rm(from),
- ghobject_t(from),
- get_coll_ct(to),
- ghobject_t(to));
- }
-
- void touch(
- const hobject_t &hoid
- ) {
- t.touch(get_coll_ct(hoid), ghobject_t(hoid));
- }
-
- void truncate(
- const hobject_t &hoid,
- uint64_t off
- ) {
- t.truncate(get_coll(hoid), ghobject_t(hoid), off);
- }
- void zero(
- const hobject_t &hoid,
- uint64_t off,
- uint64_t len
- ) {
- t.zero(get_coll(hoid), ghobject_t(hoid), off, len);
- }
-
- void set_alloc_hint(
- const hobject_t &hoid,
- uint64_t expected_object_size,
- uint64_t expected_write_size,
- uint32_t flags
- ) {
- t.set_alloc_hint(get_coll(hoid), ghobject_t(hoid), expected_object_size,
- expected_write_size, flags);
- }
-
- using PGBackend::PGTransaction::append;
- void append(
- PGTransaction *_to_append
- ) {
- RPGTransaction *to_append = dynamic_cast<RPGTransaction*>(_to_append);
- assert(to_append);
- written += to_append->written;
- to_append->written = 0;
- t.append((to_append->t));
- for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_append->temp_added.begin();
- i != to_append->temp_added.end();
- ++i) {
- temp_cleared.erase(*i);
- temp_added.insert(*i);
- }
- for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = to_append->temp_cleared.begin();
- i != to_append->temp_cleared.end();
- ++i) {
- temp_added.erase(*i);
- temp_cleared.insert(*i);
- }
- }
- void nop() {
- t.nop();
- }
- bool empty() const {
- return t.empty();
- }
- uint64_t get_bytes_written() const {
- return written;
- }
- ~RPGTransaction() { }
-};
-
-PGBackend::PGTransaction *ReplicatedBackend::get_transaction()
-{
- return new RPGTransaction(coll);
-}
-
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
ReplicatedBackend::InProgressOp *op;
}
};
+void generate_transaction(
+ PGTransactionUPtr &pgt,
+ const coll_t &coll,
+ vector<pg_log_entry_t> &log_entries,
+ ObjectStore::Transaction *t,
+ set<hobject_t, hobject_t::BitwiseComparator> *added,
+ set<hobject_t, hobject_t::BitwiseComparator> *removed)
+{
+ assert(t);
+ assert(added);
+ assert(removed);
+
+ for (auto &&le: log_entries) {
+ le.mod_desc.mark_unrollbackable();
+ auto oiter = pgt->op_map.find(le.soid);
+ if (oiter != pgt->op_map.end() && oiter->second.updated_snaps) {
+ vector<snapid_t> snaps(
+ oiter->second.updated_snaps->second.begin(),
+ oiter->second.updated_snaps->second.end());
+ ::encode(snaps, le.snaps);
+ }
+ }
+
+ pgt->safe_create_traverse(
+ [&](pair<const hobject_t, PGTransaction::ObjectOperation> &obj_op) {
+ const hobject_t &oid = obj_op.first;
+ const ghobject_t goid =
+ ghobject_t(oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD);
+ const PGTransaction::ObjectOperation &op = obj_op.second;
+
+ if (oid.is_temp()) {
+ if (op.is_fresh_object()) {
+ added->insert(oid);
+ } else if (op.is_delete()) {
+ removed->insert(oid);
+ }
+ }
+
+ if (op.delete_first) {
+ t->remove(coll, goid);
+ }
+
+ match(
+ op.init_type,
+ [&](const PGTransaction::ObjectOperation::Init::None &) {
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+ t->touch(coll, goid);
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+ t->clone(
+ coll,
+ ghobject_t(
+ op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+ goid);
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+ assert(op.source.is_temp());
+ t->collection_move_rename(
+ coll,
+ ghobject_t(
+ op.source, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+ coll,
+ goid);
+ });
+
+ if (op.truncate) {
+ t->truncate(coll, goid, op.truncate->first);
+ if (op.truncate->first != op.truncate->second)
+ t->truncate(coll, goid, op.truncate->second);
+ }
+
+ if (!op.attr_updates.empty()) {
+ map<string, bufferlist> attrs;
+ for (auto &&p: op.attr_updates) {
+ if (p.second)
+ attrs[p.first] = *(p.second);
+ else
+ t->rmattr(coll, goid, p.first);
+ }
+ t->setattrs(coll, goid, attrs);
+ }
+
+ if (op.clear_omap)
+ t->omap_clear(coll, goid);
+ if (op.omap_header)
+ t->omap_setheader(coll, goid, *(op.omap_header));
+
+ for (auto &&up: op.omap_updates) {
+ using UpdateType = PGTransaction::ObjectOperation::OmapUpdateType;
+ switch (up.first) {
+ case UpdateType::Remove:
+ t->omap_rmkeys(coll, goid, up.second);
+ break;
+ case UpdateType::Insert:
+ t->omap_setkeys(coll, goid, up.second);
+ break;
+ }
+ }
+
+ // updated_snaps doesn't matter since we marked unrollbackable
+
+ if (op.alloc_hint) {
+ auto &hint = *(op.alloc_hint);
+ t->set_alloc_hint(
+ coll,
+ goid,
+ hint.expected_object_size,
+ hint.expected_write_size,
+ hint.flags);
+ }
+
+ for (auto &&extent: op.buffer_updates) {
+ using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+ match(
+ extent.get_val(),
+ [&](const BufferUpdate::Write &op) {
+ t->write(
+ coll,
+ goid,
+ extent.get_off(),
+ extent.get_len(),
+ op.buffer);
+ },
+ [&](const BufferUpdate::Zero &op) {
+ t->zero(
+ coll,
+ goid,
+ extent.get_off(),
+ extent.get_len());
+ },
+ [&](const BufferUpdate::CloneRange &op) {
+ assert(op.len == extent.get_len());
+ t->clone_range(
+ coll,
+ ghobject_t(op.from, ghobject_t::NO_GEN, shard_id_t::NO_SHARD),
+ goid,
+ op.offset,
+ extent.get_len(),
+ extent.get_off());
+ });
+ }
+ });
+}
+
void ReplicatedBackend::submit_transaction(
const hobject_t &soid,
const eversion_t &at_version,
PGTransactionUPtr &&_t,
const eversion_t &trim_to,
const eversion_t &trim_rollback_to,
- const vector<pg_log_entry_t> &log_entries,
+ const vector<pg_log_entry_t> &_log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_acked,
osd_reqid_t reqid,
OpRequestRef orig_op)
{
- std::unique_ptr<RPGTransaction> t(
- static_cast<RPGTransaction*>(_t.release()));
- assert(t);
- ObjectStore::Transaction op_t = t->get_transaction();
-
- assert(t->get_temp_added().size() <= 1);
- assert(t->get_temp_cleared().size() <= 1);
+ vector<pg_log_entry_t> log_entries(_log_entries);
+ ObjectStore::Transaction op_t;
+ PGTransactionUPtr t(std::move(_t));
+ set<hobject_t, hobject_t::BitwiseComparator> added, removed;
+ generate_transaction(
+ t,
+ coll,
+ log_entries,
+ &op_t,
+ &added,
+ &removed);
+ assert(added.size() <= 1);
+ assert(removed.size() <= 1);
assert(!in_progress_ops.count(tid));
InProgressOp &op = in_progress_ops.insert(
parent->get_actingbackfill_shards().begin(),
parent->get_actingbackfill_shards().end());
-
issue_op(
soid,
at_version,
reqid,
trim_to,
trim_rollback_to,
- t->get_temp_added().empty() ? hobject_t() : *(t->get_temp_added().begin()),
- t->get_temp_cleared().empty() ?
- hobject_t() : *(t->get_temp_cleared().begin()),
+ added.size() ? *(added.begin()) : hobject_t(),
+ removed.size() ? *(removed.begin()) : hobject_t(),
log_entries,
hset_history,
&op,
op_t);
- if (!(t->get_temp_added().empty())) {
- add_temp_objs(t->get_temp_added());
- }
- clear_temp_objs(t->get_temp_cleared());
+ add_temp_objs(added);
+ clear_temp_objs(removed);
parent->log_operation(
log_entries,
};
map<ceph_tid_t, InProgressOp> in_progress_ops;
public:
- PGTransaction *get_transaction();
friend class C_OSD_OnOpCommit;
friend class C_OSD_OnOpApplied;
void submit_transaction(
close_op_ctx(ctx);
return;
}
+ dout(20) << __func__ << " obc " << *obc << dendl;
if (r) {
dout(20) << __func__ << " returned an error: " << r << dendl;
// this method must be idempotent since we may call it several times
// before we finally apply the resulting transaction.
- ctx->op_t.reset(pgbackend->get_transaction());
+ ctx->op_t.reset(new PGTransaction);
if (op->may_write() || op->may_cache()) {
// snap
ctx->at_version = get_next_version();
- PGBackend::PGTransaction *t = ctx->op_t.get();
+ PGTransaction *t = ctx->op_t.get();
if (new_snaps.empty()) {
// remove clone
ctx->mtime,
0)
);
- if (pool.info.require_rollback()) {
- set<snapid_t> snaps(
- ctx->obc->obs.oi.snaps.begin(),
- ctx->obc->obs.oi.snaps.end());
- ctx->log.back().mod_desc.update_snaps(snaps);
- if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
- t->stash(coid, ctx->at_version.version);
- } else {
- t->remove(coid);
- }
- } else {
- t->remove(coid);
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
+ t->remove(coid);
+ t->update_snaps(
+ coid,
+ old_snaps,
+ new_snaps);
ctx->at_version.version++;
} else {
// save adjusted snaps for this object
coi.version = ctx->at_version;
bl.clear();
::encode(coi, bl, get_osdmap()->get_up_osd_features());
- setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl);
+ t->setattr(coid, OI_ATTR, bl);
ctx->log.push_back(
pg_log_entry_t(
ctx->mtime,
0)
);
- if (pool.info.require_rollback()) {
- set<string> changing;
- changing.insert(OI_ATTR);
- ctx->obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
- set<snapid_t> snaps(
- ctx->obc->obs.oi.snaps.begin(),
- ctx->obc->obs.oi.snaps.end());
- ctx->log.back().mod_desc.update_snaps(old_snaps);
- } else {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
-
- ::encode(coi.snaps, ctx->log.back().snaps);
+
+ t->update_snaps(
+ coid,
+ old_snaps,
+ new_snaps);
ctx->at_version.version++;
}
ctx->snapset_obc->obs.exists = false;
- if (pool.info.require_rollback()) {
- if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
- t->stash(snapoid, ctx->at_version.version);
- } else {
- t->remove(snapoid);
- }
- } else {
- t->remove(snapoid);
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
+ t->remove(snapoid);
} else {
dout(10) << coid << " filtering snapset on " << snapoid << dendl;
snapset.filter(pool.info);
bl.clear();
::encode(ctx->snapset_obc->obs.oi, bl, get_osdmap()->get_up_osd_features());
attrs[OI_ATTR].claim(bl);
- setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs);
-
- if (pool.info.require_rollback()) {
- set<string> changing;
- changing.insert(OI_ATTR);
- changing.insert(SS_ATTR);
- ctx->snapset_obc->fill_in_setattrs(changing, &(ctx->log.back().mod_desc));
- } else {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
+ t->setattrs(snapoid, attrs);
}
return ctx;
return s;
}
-bool ReplicatedPG::maybe_create_new_object(OpContext *ctx)
+void ReplicatedPG::maybe_create_new_object(
+ OpContext *ctx,
+ bool ignore_transaction)
{
ObjectState& obs = ctx->new_obs;
if (!obs.exists) {
ctx->delta_stats.num_objects++;
obs.exists = true;
obs.oi.new_object();
- return true;
+ if (!ignore_transaction)
+ ctx->op_t->create(obs.oi.soid);
} else if (obs.oi.is_whiteout()) {
dout(10) << __func__ << " clearing whiteout on " << obs.oi.soid << dendl;
ctx->new_obs.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
--ctx->delta_stats.num_whiteouts;
- return true;
}
- return false;
}
int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
bool first_read = true;
- PGBackend::PGTransaction* t = ctx->op_t.get();
+ PGTransaction* t = ctx->op_t.get();
dout(10) << "do_osd_op " << soid << " " << ops << dendl;
++ctx->num_write;
{
tracepoint(osd, do_osd_op_pre_setallochint, soid.oid.name.c_str(), soid.snap.val, op.alloc_hint.expected_object_size, op.alloc_hint.expected_write_size);
- if (maybe_create_new_object(ctx)) {
- ctx->mod_desc.create();
- t->touch(soid);
- }
+ maybe_create_new_object(ctx);
oi.expected_object_size = op.alloc_hint.expected_object_size;
oi.expected_write_size = op.alloc_hint.expected_write_size;
oi.alloc_hint_flags = op.alloc_hint.flags;
result = -EOPNOTSUPP;
break;
}
- ctx->mod_desc.create();
- } else if (op.extent.offset == oi.size) {
- ctx->mod_desc.append(oi.size);
- } else {
- ctx->mod_desc.mark_unrollbackable();
- if (pool.info.require_rollback()) {
- result = -EOPNOTSUPP;
- break;
- }
+ } else if (op.extent.offset != oi.size && pool.info.require_rollback()) {
+ result = -EOPNOTSUPP;
+ break;
}
if (seq && (seq > op.extent.truncate_seq) &&
result = check_offset_and_length(op.extent.offset, op.extent.length, cct->_conf->osd_max_object_size);
if (result < 0)
break;
- if (pool.info.require_rollback()) {
- t->append(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
+
+ maybe_create_new_object(ctx);
+
+ if (op.extent.length == 0) {
+ if (op.extent.offset > oi.size) {
+ t->truncate(
+ soid, op.extent.offset);
+ } else {
+ t->nop(soid);
+ }
} else {
- t->write(soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
+ t->write(
+ soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
}
- maybe_create_new_object(ctx);
if (op.extent.offset == 0 && op.extent.length >= oi.size)
obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
else if (op.extent.offset == oi.size && obs.oi.is_data_digest())
if (pool.info.has_flag(pg_pool_t::FLAG_WRITE_FADVISE_DONTNEED))
op.flags = op.flags | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
- if (pool.info.require_rollback()) {
- if (obs.exists) {
- if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
- t->stash(soid, ctx->at_version.version);
- } else {
- t->remove(soid);
- }
- }
- ctx->mod_desc.create();
- t->append(soid, 0, op.extent.length, osd_op.indata, op.flags);
- if (obs.exists) {
- map<string, bufferlist> to_set = ctx->obc->attr_cache;
- map<string, boost::optional<bufferlist> > &overlay =
- ctx->pending_attrs[ctx->obc];
- for (map<string, boost::optional<bufferlist> >::iterator i =
- overlay.begin();
- i != overlay.end();
- ++i) {
- if (i->second) {
- to_set[i->first] = *(i->second);
- } else {
- to_set.erase(i->first);
- }
- }
- t->setattrs(soid, to_set);
- }
- } else {
- ctx->mod_desc.mark_unrollbackable();
+ maybe_create_new_object(ctx);
+ t->truncate(soid, 0);
+ if (op.extent.length) {
t->write(soid, 0, op.extent.length, osd_op.indata, op.flags);
- if (obs.exists && op.extent.length < oi.size) {
- t->truncate(soid, op.extent.length);
- }
}
- maybe_create_new_object(ctx);
obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
break;
assert(op.extent.length);
if (obs.exists && !oi.is_whiteout()) {
- ctx->mod_desc.mark_unrollbackable();
t->zero(soid, op.extent.offset, op.extent.length);
interval_set<uint64_t> ch;
ch.insert(op.extent.offset, op.extent.length);
// category is no longer implemented.
}
if (result >= 0) {
- bool is_whiteout = obs.exists && oi.is_whiteout();
- if (maybe_create_new_object(ctx)) {
- ctx->mod_desc.create();
- t->touch(soid);
- } else if (is_whiteout) {
- // to change whiteout to non-whiteout, it need an op to update xattr
- t->nop();
- }
+ maybe_create_new_object(ctx);
+ t->nop(soid);
}
}
}
break;
}
++ctx->num_write;
- ctx->mod_desc.mark_unrollbackable();
{
// truncate
if (!obs.exists || oi.is_whiteout()) {
oi.truncate_size = op.extent.truncate_size;
}
+ maybe_create_new_object(ctx);
t->truncate(soid, op.extent.offset);
if (oi.size > op.extent.offset) {
interval_set<uint64_t> trim;
case CEPH_OSD_OP_CLONERANGE:
tracepoint(osd, do_osd_op_pre_clonerange, soid.oid.name.c_str(), soid.snap.val, op.clonerange.offset, op.clonerange.length, op.clonerange.src_offset);
- ctx->mod_desc.mark_unrollbackable();
if (pool.info.require_rollback()) {
result = -EOPNOTSUPP;
break;
++ctx->num_read;
++ctx->num_write;
{
- if (maybe_create_new_object(ctx)) {
- t->touch(obs.oi.soid);
- }
+ maybe_create_new_object(ctx);
if (op.clonerange.src_offset + op.clonerange.length > src_obc->obs.oi.size) {
dout(10) << " clonerange source " << osd_op.soid << " "
<< op.clonerange.src_offset << "~" << op.clonerange.length
} else {
dout(10) << " registered new watch " << w << " by " << entity << dendl;
oi.watchers[make_pair(cookie, entity)] = w;
- t->nop(); // make sure update the object_info on disk!
+ t->nop(soid); // make sure update the object_info on disk!
}
bool will_ping = (op.watch.op == CEPH_OSD_WATCH_OP_WATCH);
ctx->watch_connects.push_back(make_pair(w, will_ping));
dout(10) << " removed watch " << oi_iter->second << " by "
<< entity << dendl;
oi.watchers.erase(oi_iter);
- t->nop(); // update oi on disk
+ t->nop(soid); // update oi on disk
ctx->watch_disconnects.push_back(
watch_disconnect_t(cookie, entity, false));
} else {
result = -ENAMETOOLONG;
break;
}
- if (maybe_create_new_object(ctx)) {
- ctx->mod_desc.create();
- t->touch(soid);
- }
+ maybe_create_new_object(ctx);
string aname;
bp.copy(op.xattr.name_len, aname);
tracepoint(osd, do_osd_op_pre_setxattr, soid.oid.name.c_str(), soid.snap.val, aname.c_str());
string name = "_" + aname;
- if (pool.info.require_rollback()) {
- map<string, boost::optional<bufferlist> > to_set;
- bufferlist old;
- int r = getattr_maybe_cache(ctx->obc, name, &old);
- if (r == 0) {
- to_set[name] = old;
- } else {
- to_set[name];
- }
- ctx->mod_desc.setattrs(to_set);
- } else {
- ctx->mod_desc.mark_unrollbackable();
- }
bufferlist bl;
bp.copy(op.xattr.value_len, bl);
- setattr_maybe_cache(ctx->obc, ctx, t, name, bl);
+ t->setattr(soid, name, bl);
ctx->delta_stats.num_wr++;
}
break;
break;
}
string name = "_" + aname;
- if (pool.info.require_rollback()) {
- map<string, boost::optional<bufferlist> > to_set;
- bufferlist old;
- int r = getattr_maybe_cache(ctx->obc, name, &old);
- if (r == 0) {
- to_set[name] = old;
- } else {
- to_set[name];
- }
- ctx->mod_desc.setattrs(to_set);
- } else {
- ctx->mod_desc.mark_unrollbackable();
- }
- rmattr_maybe_cache(ctx->obc, ctx, t, name);
+ t->rmattr(soid, name);
ctx->delta_stats.num_wr++;
}
break;
case CEPH_OSD_OP_STARTSYNC:
tracepoint(osd, do_osd_op_pre_startsync, soid.oid.name.c_str(), soid.snap.val);
- t->nop();
+ t->nop(soid);
break;
tracepoint(osd, do_osd_op_pre_omapsetvals, soid.oid.name.c_str(), soid.snap.val);
break;
}
- ctx->mod_desc.mark_unrollbackable();
++ctx->num_write;
{
- if (maybe_create_new_object(ctx)) {
- t->touch(soid);
- }
+ maybe_create_new_object(ctx);
bufferlist to_set_bl;
try {
decode_str_str_map_to_bl(bp, &to_set_bl);
result = -EOPNOTSUPP;
break;
}
- ctx->mod_desc.mark_unrollbackable();
++ctx->num_write;
{
- if (maybe_create_new_object(ctx)) {
- t->touch(soid);
- }
+ maybe_create_new_object(ctx);
t->omap_setheader(soid, osd_op.indata);
ctx->delta_stats.num_wr++;
}
result = -EOPNOTSUPP;
break;
}
- ctx->mod_desc.mark_unrollbackable();
++ctx->num_write;
{
if (!obs.exists || oi.is_whiteout()) {
tracepoint(osd, do_osd_op_pre_omaprmkeys, soid.oid.name.c_str(), soid.snap.val);
break;
}
- ctx->mod_desc.mark_unrollbackable();
++ctx->num_write;
{
if (!obs.exists || oi.is_whiteout()) {
ObjectState& obs = ctx->new_obs;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- PGBackend::PGTransaction* t = ctx->op_t.get();
+ PGTransaction* t = ctx->op_t.get();
if (!obs.exists || (obs.oi.is_whiteout() && !no_whiteout))
return -ENOENT;
- if (pool.info.require_rollback()) {
- if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
- t->stash(soid, ctx->at_version.version);
- } else {
- t->remove(soid);
- }
- map<string, bufferlist> new_attrs;
- replace_cached_attrs(ctx, ctx->obc, new_attrs);
- } else {
- ctx->mod_desc.mark_unrollbackable();
- t->remove(soid);
- }
+ t->remove(soid);
if (oi.size > 0) {
interval_set<uint64_t> ch;
dout(20) << __func__ << " setting whiteout on " << soid << dendl;
oi.set_flag(object_info_t::FLAG_WHITEOUT);
ctx->delta_stats.num_whiteouts++;
- t->touch(soid);
+ t->create(soid);
osd->logger->inc(l_osd_tier_whiteout);
return 0;
}
ObjectState& obs = ctx->new_obs;
object_info_t& oi = obs.oi;
const hobject_t& soid = oi.soid;
- PGBackend::PGTransaction* t = ctx->op_t.get();
+ PGTransaction* t = ctx->op_t.get();
snapid_t snapid = (uint64_t)op.snap.snapid;
hobject_t missing_oid;
dout(10) << "_rollback_to deleting " << soid.oid
<< " and rolling back to old snap" << dendl;
- if (pool.info.require_rollback()) {
- if (obs.exists) {
- if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
- t->stash(soid, ctx->at_version.version);
- } else {
- t->remove(soid);
- }
- }
- replace_cached_attrs(ctx, ctx->obc, rollback_to->attr_cache);
- } else {
- if (obs.exists) {
- ctx->mod_desc.mark_unrollbackable();
- t->remove(soid);
- }
+ if (obs.exists) {
+ t->remove(soid);
}
- ctx->mod_desc.create();
- t->clone(rollback_to_sobject, soid);
+ t->clone(soid, rollback_to_sobject);
snapset.head_exists = true;
+ t->add_obc(rollback_to);
map<snapid_t, interval_set<uint64_t> >::iterator iter =
snapset.clone_overlap.lower_bound(snapid);
}
// Adjust the cached objectcontext
- maybe_create_new_object(ctx);
+ maybe_create_new_object(ctx, true);
ctx->delta_stats.num_bytes -= obs.oi.size;
ctx->delta_stats.num_bytes += rollback_to->obs.oi.size;
obs.oi.size = rollback_to->obs.oi.size;
void ReplicatedPG::_make_clone(
OpContext *ctx,
- PGBackend::PGTransaction* t,
+ PGTransaction* t,
ObjectContextRef obc,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi)
bufferlist bv;
::encode(*poi, bv, get_osdmap()->get_up_osd_features());
- t->clone(head, coid);
+ t->clone(coid, head);
setattr_maybe_cache(obc, ctx, t, OI_ATTR, bv);
rmattr_maybe_cache(obc, ctx, t, SS_ATTR);
}
snap_oi->copy_user_bits(ctx->obs->oi);
snap_oi->snaps = snaps;
- // prepend transaction to op_t
- PGBackend::PGTransaction *t = pgbackend->get_transaction();
- _make_clone(ctx, t, ctx->clone_obc, soid, coid, snap_oi);
- t->append(ctx->op_t.get());
- ctx->op_t.reset(t);
+ _make_clone(ctx, ctx->op_t.get(), ctx->clone_obc, soid, coid, snap_oi);
ctx->delta_stats.num_objects++;
if (snap_oi->is_dirty()) {
ctx->obs->oi.user_version,
osd_reqid_t(), ctx->new_obs.oi.mtime, 0));
::encode(snaps, ctx->log.back().snaps);
- ctx->log.back().mod_desc.create();
ctx->at_version.version++;
}
ctx->at_version,
ctx->snapset_obc->obs.oi.version,
0, osd_reqid_t(), ctx->mtime, 0));
- if (pool.info.require_rollback()) {
- if (ctx->log.back().mod_desc.rmobject(ctx->at_version.version)) {
- ctx->op_t->stash(snapoid, ctx->at_version.version);
- } else {
- ctx->op_t->remove(snapoid);
- }
- } else {
- ctx->op_t->remove(snapoid);
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
+ ctx->op_t->remove(snapoid);
dout(10) << " removing old " << snapoid << dendl;
ctx->at_version.version++;
}
assert(got);
dout(20) << " got greedy write on snapset_obc " << *ctx->snapset_obc << dendl;
- if (pool.info.require_rollback() && !ctx->snapset_obc->obs.exists) {
- ctx->log.back().mod_desc.create();
- } else if (!pool.info.require_rollback()) {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
ctx->snapset_obc->obs.exists = true;
ctx->snapset_obc->obs.oi.version = ctx->at_version;
ctx->snapset_obc->obs.oi.last_reqid = ctx->reqid;
map<string, bufferlist> attrs;
bufferlist bv(sizeof(ctx->new_obs.oi));
::encode(ctx->snapset_obc->obs.oi, bv, get_osdmap()->get_up_osd_features());
- ctx->op_t->touch(snapoid);
+ ctx->op_t->create(snapoid);
attrs[OI_ATTR].claim(bv);
attrs[SS_ATTR].claim(bss);
setattrs_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t.get(), attrs);
- if (pool.info.require_rollback()) {
- map<string, boost::optional<bufferlist> > to_set;
- to_set[SS_ATTR];
- to_set[OI_ATTR];
- ctx->log.back().mod_desc.setattrs(to_set);
- } else {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
ctx->at_version.version++;
}
}
} else {
dout(10) << " no snapset (this is a clone)" << dendl;
}
- setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t.get(), attrs);
-
- if (pool.info.require_rollback()) {
- set<string> changing;
- changing.insert(OI_ATTR);
- if (!soid.is_snap())
- changing.insert(SS_ATTR);
- ctx->obc->fill_in_setattrs(changing, &(ctx->mod_desc));
- } else {
- // replicated pools are never rollbackable in this case
- ctx->mod_desc.mark_unrollbackable();
- }
+ ctx->op_t->setattrs(soid, attrs);
} else {
ctx->new_obs.oi = object_info_t(ctx->obc->obs.oi.soid);
}
}
}
- ctx->log.back().mod_desc.claim(ctx->mod_desc);
if (!ctx->extra_reqids.empty()) {
dout(20) << __func__ << " extra_reqids " << ctx->extra_reqids << dendl;
ctx->log.back().extra_reqids.swap(ctx->extra_reqids);
assert(cop->rval >= 0);
+ if (!cop->temp_cursor.data_complete) {
+ cop->results.data_digest = cop->data.crc32c(cop->results.data_digest);
+ }
+ if (pool.info.supports_omap() && !cop->temp_cursor.omap_complete) {
+ if (cop->omap_header.length()) {
+ cop->results.omap_digest =
+ cop->omap_header.crc32c(cop->results.omap_digest);
+ }
+ if (cop->omap_data.length()) {
+ bufferlist keys;
+ keys.substr_of(cop->omap_data, 4, cop->omap_data.length() - 4);
+ cop->results.omap_digest = keys.crc32c(cop->results.omap_digest);
+ }
+ }
+
+ if (!cop->temp_cursor.attr_complete) {
+ for (map<string,bufferlist>::iterator p = cop->attrs.begin();
+ p != cop->attrs.end();
+ ++p) {
+ cop->results.attrs[string("_") + p->first] = p->second;
+ }
+ cop->attrs.clear();
+ }
+
if (!cop->cursor.is_complete()) {
// write out what we have so far
if (cop->temp_cursor.is_initial()) {
return;
}
- cop->results.final_tx = pgbackend->get_transaction();
- _build_finish_copy_transaction(cop, cop->results.final_tx);
-
// verify digests?
if (cop->results.is_data_digest() || cop->results.is_omap_digest()) {
dout(20) << __func__ << std::hex
goto out;
}
+ cop->results.fill_in_final_tx = std::function<void(PGTransaction*)>(
+ [this, &cop /* avoid ref cycle */](PGTransaction *t) {
+ ObjectState& obs = cop->obc->obs;
+ if (cop->temp_cursor.is_initial()) {
+ // write directly to final object
+ cop->results.temp_oid = obs.oi.soid;
+ _write_copy_chunk(cop, t);
+ } else {
+ // finish writing to temp object, then move into place
+ _write_copy_chunk(cop, t);
+ t->rename(obs.oi.soid, cop->results.temp_oid);
+ }
+ t->setattrs(obs.oi.soid, cop->results.attrs);
+ });
+
dout(20) << __func__ << " success; committing" << dendl;
out:
kick_object_context_blocked(cobc);
}
-void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t)
+void ReplicatedPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
{
dout(20) << __func__ << " " << cop
<< " " << cop->attrs.size() << " attrs"
<< " " << cop->omap_data.length() << " omap data bytes"
<< dendl;
if (!cop->temp_cursor.attr_complete) {
- t->touch(cop->results.temp_oid);
- for (map<string,bufferlist>::iterator p = cop->attrs.begin();
- p != cop->attrs.end();
- ++p) {
- cop->results.attrs[string("_") + p->first] = p->second;
- t->setattr(
- cop->results.temp_oid,
- string("_") + p->first, p->second);
- }
- cop->attrs.clear();
+ t->create(cop->results.temp_oid);
}
if (!cop->temp_cursor.data_complete) {
assert(cop->data.length() + cop->temp_cursor.data_offset ==
cop->cursor.data_offset);
}
}
- cop->results.data_digest = cop->data.crc32c(cop->results.data_digest);
- t->append(
- cop->results.temp_oid,
- cop->temp_cursor.data_offset,
- cop->data.length(),
- cop->data,
- cop->dest_obj_fadvise_flags);
+ if (cop->data.length()) {
+ t->write(
+ cop->results.temp_oid,
+ cop->temp_cursor.data_offset,
+ cop->data.length(),
+ cop->data,
+ cop->dest_obj_fadvise_flags);
+ }
cop->data.clear();
}
if (pool.info.supports_omap()) {
if (!cop->temp_cursor.omap_complete) {
if (cop->omap_header.length()) {
- cop->results.omap_digest =
- cop->omap_header.crc32c(cop->results.omap_digest);
t->omap_setheader(
cop->results.temp_oid,
cop->omap_header);
cop->omap_header.clear();
}
if (cop->omap_data.length()) {
- // don't checksum the key count prefix
- bufferlist keys;
- keys.substr_of(cop->omap_data, 4, cop->omap_data.length() - 4);
- cop->results.omap_digest = keys.crc32c(cop->results.omap_digest);
-
map<string,bufferlist> omap;
bufferlist::iterator p = cop->omap_data.begin();
::decode(omap, p);
cop->temp_cursor = cop->cursor;
}
-void ReplicatedPG::_build_finish_copy_transaction(CopyOpRef cop,
- PGBackend::PGTransaction* t)
-{
- ObjectState& obs = cop->obc->obs;
- if (cop->temp_cursor.is_initial()) {
- // write directly to final object
- cop->results.temp_oid = obs.oi.soid;
- _write_copy_chunk(cop, t);
- } else {
- // finish writing to temp object, then move into place
- _write_copy_chunk(cop, t);
- t->rename(cop->results.temp_oid, obs.oi.soid);
- }
-}
-
void ReplicatedPG::finish_copyfrom(OpContext *ctx)
{
dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
ObjectState& obs = ctx->new_obs;
CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
- if (pool.info.require_rollback()) {
- if (obs.exists) {
- if (ctx->mod_desc.rmobject(ctx->at_version.version)) {
- ctx->op_t->stash(obs.oi.soid, ctx->at_version.version);
- } else {
- ctx->op_t->remove(obs.oi.soid);
- }
- }
- ctx->mod_desc.create();
- replace_cached_attrs(ctx, ctx->obc, cb->results->attrs);
+ if (obs.exists) {
+ ctx->op_t->remove(obs.oi.soid);
} else {
- if (obs.exists) {
- ctx->op_t->remove(obs.oi.soid);
- }
- ctx->mod_desc.mark_unrollbackable();
- }
-
- if (!obs.exists) {
ctx->delta_stats.num_objects++;
obs.exists = true;
}
if (cb->is_temp_obj_used()) {
ctx->discard_temp_oid = cb->results->temp_oid;
}
- ctx->op_t->append(cb->results->final_tx);
- delete cb->results->final_tx;
- cb->results->final_tx = NULL;
+ cb->results->fill_in_final_tx(ctx->op_t.get());
// CopyFromCallback fills this in for us
obs.oi.user_version = ctx->user_at_version;
if (whiteout) {
// create a whiteout
- tctx->op_t->touch(soid);
+ tctx->op_t->create(soid);
tctx->new_obs.oi.set_flag(object_info_t::FLAG_WHITEOUT);
++tctx->delta_stats.num_whiteouts;
dout(20) << __func__ << " creating whiteout on " << soid << dendl;
++tctx->delta_stats.num_objects_omap;
}
- tctx->op_t->append(results->final_tx);
- delete results->final_tx;
- results->final_tx = NULL;
+ results->fill_in_final_tx(tctx->op_t.get());
if (results->started_temp_obj) {
tctx->discard_temp_oid = results->temp_oid;
}
}
ctx->obc->ondisk_write_lock();
- if (ctx->clone_obc)
- ctx->clone_obc->ondisk_write_lock();
bool unlock_snapset_obc = false;
+ ctx->op_t->add_obc(ctx->obc);
+ if (ctx->clone_obc) {
+ ctx->clone_obc->ondisk_write_lock();
+ ctx->op_t->add_obc(ctx->clone_obc);
+ }
if (ctx->snapset_obc && ctx->snapset_obc->obs.oi.soid !=
ctx->obc->obs.oi.soid) {
ctx->snapset_obc->ondisk_write_lock();
unlock_snapset_obc = true;
- }
-
- ctx->apply_pending_attrs();
-
- if (pool.info.require_rollback()) {
- for (vector<pg_log_entry_t>::iterator i = ctx->log.begin();
- i != ctx->log.end();
- ++i) {
- assert(i->mod_desc.can_rollback());
- assert(!i->mod_desc.empty());
- }
+ ctx->op_t->add_obc(ctx->snapset_obc);
}
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
- ctx->op_t.reset(pgbackend->get_transaction());
+ ctx->op_t.reset(new PGTransaction());
ctx->mtime = ceph_clock_now(g_ceph_context);
return ctx;
}
});
- PGBackend::PGTransaction *t = ctx->op_t.get();
+ PGTransaction *t = ctx->op_t.get();
ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, obc->obs.oi.soid,
ctx->at_version,
oi.version,
oi.version = ctx->at_version;
bufferlist bl;
::encode(oi, bl, get_osdmap()->get_up_osd_features());
- setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl);
-
- if (pool.info.require_rollback()) {
- map<string, boost::optional<bufferlist> > to_set;
- to_set[OI_ATTR] = bl;
- ctx->log.back().mod_desc.setattrs(to_set);
- } else {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
-
+ t->setattr(obc->obs.oi.soid, OI_ATTR, bl);
// apply new object state.
ctx->obc->obs = ctx->new_obs;
bufferlist boi(sizeof(ctx->new_obs.oi));
::encode(ctx->new_obs.oi, boi, get_osdmap()->get_up_osd_features());
- ctx->op_t->append(oid, 0, bl.length(), bl, 0);
+ ctx->op_t->create(oid);
+ if (bl.length()) {
+ ctx->op_t->write(oid, 0, bl.length(), bl, 0);
+ }
map <string, bufferlist> attrs;
attrs[OI_ATTR].claim(boi);
attrs[SS_ATTR].claim(bss);
ctx->mtime,
0)
);
- if (pool.info.require_rollback()) {
- ctx->log.back().mod_desc.create();
- } else {
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
hit_set_trim(ctx, max);
osd_reqid_t(),
ctx->mtime,
0));
- if (pool.info.require_rollback()) {
- if (ctx->log.back().mod_desc.rmobject(
- ctx->at_version.version)) {
- ctx->op_t->stash(oid, ctx->at_version.version);
- } else {
- ctx->op_t->remove(oid);
- }
- } else {
- ctx->op_t->remove(oid);
- ctx->log.back().mod_desc.mark_unrollbackable();
- }
+
+ ctx->op_t->remove(oid);
updated_hit_set_hist.history.pop_front();
ObjectContextRef obc = get_object_context(oid, false);
return transit< NotTrimming >();
}
-void ReplicatedPG::replace_cached_attrs(
- OpContext *ctx,
- ObjectContextRef obc,
- const map<string, bufferlist> &new_attrs)
-{
- ctx->pending_attrs[obc].clear();
- for (map<string, bufferlist>::iterator i = obc->attr_cache.begin();
- i != obc->attr_cache.end();
- ++i) {
- ctx->pending_attrs[obc][i->first] = boost::optional<bufferlist>();
- }
- for (map<string, bufferlist>::const_iterator i = new_attrs.begin();
- i != new_attrs.end();
- ++i) {
- ctx->pending_attrs[obc][i->first] = i->second;
- }
-}
-
void ReplicatedPG::setattr_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
const string &key,
bufferlist &val)
{
- if (pool.info.require_rollback()) {
- op->pending_attrs[obc][key] = val;
- }
t->setattr(obc->obs.oi.soid, key, val);
}
void ReplicatedPG::setattrs_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
map<string, bufferlist> &attrs)
{
- if (pool.info.require_rollback()) {
- for (map<string, bufferlist>::iterator it = attrs.begin();
- it != attrs.end(); ++it) {
- op->pending_attrs[obc][it->first] = it->second;
- }
- }
t->setattrs(obc->obs.oi.soid, attrs);
}
void ReplicatedPG::rmattr_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
const string &key)
{
- if (pool.info.require_rollback()) {
- op->pending_attrs[obc][key] = boost::optional<bufferlist>();
- }
t->rmattr(obc->obs.oi.soid, key);
}
#include "messages/MOSDOpReply.h"
#include "common/sharedptr_registry.hpp"
#include "ReplicatedBackend.h"
+#include "PGTransaction.h"
class MOSDSubOpReply;
uint64_t object_size; ///< the copied object's size
bool started_temp_obj; ///< true if the callback needs to delete temp object
hobject_t temp_oid; ///< temp object (if any)
+
/**
- * Final transaction; if non-empty the callback must execute it before any
- * other accesses to the object (in order to complete the copy).
+ * Function to fill in transaction; if non-empty the callback
+ * must execute it before any other accesses to the object
+ * (in order to complete the copy).
*/
- PGBackend::PGTransaction *final_tx;
+ std::function<void(PGTransaction *)> fill_in_final_tx;
+
version_t user_version; ///< The copy source's user version
bool should_requeue; ///< op should be requeued on cancel
vector<snapid_t> snaps; ///< src's snaps (if clone)
snapid_t snap_seq; ///< src's snap_seq (if head)
librados::snap_set_t snapset; ///< src snapset (if head)
bool mirror_snapset;
- map<string, bufferlist> attrs; ///< src user attrs
bool has_omap;
uint32_t flags; // object_copy_data_t::FLAG_*
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
+ map<string, bufferlist> attrs; // xattrs
uint64_t truncate_seq;
uint64_t truncate_size;
bool is_data_digest() {
}
CopyResults()
: object_size(0), started_temp_obj(false),
- final_tx(NULL), user_version(0),
+ user_version(0),
should_requeue(false), mirror_snapset(false),
has_omap(false),
flags(0),
int current_osd_subop_num;
- PGBackend::PGTransactionUPtr op_t;
+ PGTransactionUPtr op_t;
vector<pg_log_entry_t> log;
boost::optional<pg_hit_set_history_t> updated_hset_history;
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
- // pending xattr updates
- map<ObjectContextRef,
- map<string, boost::optional<bufferlist> > > pending_attrs;
-
list<std::function<void()>> on_applied;
list<std::function<void()>> on_committed;
list<std::function<void()>> on_finish;
bool sent_ack;
bool sent_disk;
- void apply_pending_attrs() {
- for (map<ObjectContextRef,
- map<string, boost::optional<bufferlist> > >::iterator i =
- pending_attrs.begin();
- i != pending_attrs.end();
- ++i) {
- if (i->first->obs.exists) {
- for (map<string, boost::optional<bufferlist> >::iterator j =
- i->second.begin();
- j != i->second.end();
- ++j) {
- if (j->second)
- i->first->attr_cache[j->first] = j->second.get();
- else
- i->first->attr_cache.erase(j->first);
- }
- } else {
- i->first->attr_cache.clear();
- }
- }
- pending_attrs.clear();
- }
-
// pending async reads <off, len, op_flags> -> <outbl, outr>
list<pair<boost::tuple<uint64_t, uint64_t, unsigned>,
pair<bufferlist*, Context*> > > pending_async_reads;
return inflightreads == 0;
}
- ObjectModDesc mod_desc;
-
ObjectContext::RWState::State lock_type;
ObcLockManager lock_manager;
void _make_clone(
OpContext *ctx,
- PGBackend::PGTransaction* t,
+ PGTransaction* t,
ObjectContextRef obc,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi);
bool mirror_snapset, unsigned src_obj_fadvise_flags,
unsigned dest_obj_fadvise_flags);
void process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r);
- void _write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t);
+ void _write_copy_chunk(CopyOpRef cop, PGTransaction *t);
uint64_t get_copy_chunk_size() const {
uint64_t size = cct->_conf->osd_copyfrom_max_chunk;
if (pool.info.requires_aligned_append()) {
return size;
}
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
- void _build_finish_copy_transaction(CopyOpRef cop,
- PGBackend::PGTransaction *t);
void finish_copyfrom(OpContext *ctx);
void finish_promote(int r, CopyResults *results, ObjectContextRef obc);
void cancel_copy(CopyOpRef cop, bool requeue);
const SnapSet& ss);
// return true if we're creating a local object, false for a
// whiteout or no change.
- bool maybe_create_new_object(OpContext *ctx);
+ void maybe_create_new_object(OpContext *ctx, bool ignore_transaction=false);
int _delete_oid(OpContext *ctx, bool no_whiteout);
int _rollback_to(OpContext *ctx, ceph_osd_op& op);
public:
void on_shutdown() override;
// attr cache handling
- void replace_cached_attrs(
- OpContext *ctx,
- ObjectContextRef obc,
- const map<string, bufferlist> &new_attrs);
void setattr_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
const string &key,
bufferlist &val);
void setattrs_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
map<string, bufferlist> &attrs);
void rmattr_maybe_cache(
ObjectContextRef obc,
OpContext *op,
- PGBackend::PGTransaction *t,
+ PGTransaction *t,
const string &key);
int getattr_maybe_cache(
ObjectContextRef obc,