bufferlist::iterator bp = bl.begin();
try {
while (!bp.end()) {
- DECODE_START(1, bp);
+ DECODE_START(max_required_version, bp);
uint8_t code;
::decode(code, bp);
switch (code) {
visitor->try_rmobject(old_version);
break;
}
+ case ROLLBACK_EXTENTS: {
+ vector<pair<uint64_t, uint64_t> > extents;
+ version_t gen;
+ ::decode(gen, bp);
+ ::decode(extents, bp);
+ visitor->rollback_extents(gen,extents);
+ break;
+ }
default:
assert(0 == "Invalid rollback code");
}
struct DumpVisitor : public ObjectModDesc::Visitor {
Formatter *f;
explicit DumpVisitor(Formatter *f) : f(f) {}
- void append(uint64_t old_size) {
+ void append(uint64_t old_size) override {
f->open_object_section("op");
f->dump_string("code", "APPEND");
f->dump_unsigned("old_size", old_size);
f->close_section();
}
- void setattrs(map<string, boost::optional<bufferlist> > &attrs) {
+ void setattrs(map<string, boost::optional<bufferlist> > &attrs) override {
f->open_object_section("op");
f->dump_string("code", "SETATTRS");
f->open_array_section("attrs");
f->close_section();
f->close_section();
}
- void rmobject(version_t old_version) {
+ void rmobject(version_t old_version) override {
f->open_object_section("op");
f->dump_string("code", "RMOBJECT");
f->dump_unsigned("old_version", old_version);
f->close_section();
}
- void create() {
+ void try_rmobject(version_t old_version) override {
+ f->open_object_section("op");
+ f->dump_string("code", "TRY_RMOBJECT");
+ f->dump_unsigned("old_version", old_version);
+ f->close_section();
+ }
+ void create() override {
f->open_object_section("op");
f->dump_string("code", "CREATE");
f->close_section();
}
- void update_snaps(set<snapid_t> &snaps) {
+ void update_snaps(const set<snapid_t> &snaps) override {
f->open_object_section("op");
f->dump_string("code", "UPDATE_SNAPS");
f->dump_stream("snaps") << snaps;
f->close_section();
}
+ void rollback_extents(
+ version_t gen,
+ const vector<pair<uint64_t, uint64_t> > &extents) override {
+ f->open_object_section("op");
+ f->dump_string("code", "ROLLBACK_EXTENTS");
+ f->dump_unsigned("gen", gen);
+ f->dump_stream("snaps") << extents;
+ f->close_section();
+ }
};
void ObjectModDesc::dump(Formatter *f) const
void ObjectModDesc::encode(bufferlist &_bl) const
{
- ENCODE_START(1, 1, _bl);
+ ENCODE_START(max_required_version, max_required_version, _bl);
::encode(can_local_rollback, _bl);
::encode(rollback_info_completed, _bl);
::encode(bl, _bl);
}
void ObjectModDesc::decode(bufferlist::iterator &_bl)
{
- DECODE_START(1, _bl);
+ DECODE_START(2, _bl);
+ max_required_version = struct_v;
::decode(can_local_rollback, _bl);
::decode(rollback_info_completed, _bl);
::decode(bl, _bl);
class ObjectModDesc {
bool can_local_rollback;
bool rollback_info_completed;
+
+ // version required to decode, reflected in encode/decode version
+ __u8 max_required_version = 1;
public:
class Visitor {
public:
rmobject(old_version);
}
virtual void create() {}
- virtual void update_snaps(set<snapid_t> &old_snaps) {}
+ virtual void update_snaps(const set<snapid_t> &old_snaps) {}
+ virtual void rollback_extents(
+ version_t gen,
+ const vector<pair<uint64_t, uint64_t> > &extents) {}
virtual ~Visitor() {}
};
void visit(Visitor *visitor) const;
DELETE = 3,
CREATE = 4,
UPDATE_SNAPS = 5,
- TRY_DELETE = 6
+ TRY_DELETE = 6,
+ ROLLBACK_EXTENTS = 7
};
ObjectModDesc() : can_local_rollback(true), rollback_info_completed(false) {}
void claim(ObjectModDesc &other) {
can_local_rollback = other.can_local_rollback;
rollback_info_completed = other.rollback_info_completed;
}
+ void claim_append(ObjectModDesc &other) {
+ if (!can_local_rollback || rollback_info_completed)
+ return;
+ if (!other.can_local_rollback) {
+ mark_unrollbackable();
+ return;
+ }
+ bl.claim_append(other.bl);
+ rollback_info_completed = other.rollback_info_completed;
+ }
void swap(ObjectModDesc &other) {
bl.swap(other.bl);
- bool temp = other.can_local_rollback;
- other.can_local_rollback = can_local_rollback;
- can_local_rollback = temp;
-
- temp = other.rollback_info_completed;
- other.rollback_info_completed = rollback_info_completed;
- rollback_info_completed = temp;
+ ::swap(other.can_local_rollback, can_local_rollback);
+ ::swap(other.rollback_info_completed, rollback_info_completed);
+ ::swap(other.max_required_version, max_required_version);
}
void append_id(ModID id) {
uint8_t _id(id);
::encode(old_snaps, bl);
ENCODE_FINISH(bl);
}
+ void rollback_extents(
+ version_t gen, const vector<pair<uint64_t, uint64_t> > &extents) {
+ assert(can_local_rollback);
+ assert(!rollback_info_completed);
+ if (max_required_version < 2)
+ max_required_version = 2;
+ ENCODE_START(2, 2, bl);
+ append_id(ROLLBACK_EXTENTS);
+ ::encode(gen, bl);
+ ::encode(extents, bl);
+ ENCODE_FINISH(bl);
+ }
// cannot be rolled back
void mark_unrollbackable() {
return can_local_rollback && (bl.length() == 0);
}
+ bool requires_kraken() const {
+ return max_required_version >= 2;
+ }
+
/**
* Create fresh copy of bl bytes to avoid keeping large buffers around
* in the case that bl contains ptrs which point into a much larger
return op == DELETE || op == LOST_DELETE;
}
+ bool can_rollback() const {
+ return mod_desc.can_rollback();
+ }
+
+ void mark_unrollbackable() {
+ mod_desc.mark_unrollbackable();
+ }
+
+ bool requires_kraken() const {
+ return mod_desc.requires_kraken();
+ }
+
// Errors are only used for dup detection, whereas
// the index by objects is used by recovery, copy_get,
// and other facilities that don't expect or need to
(op == MODIFY || op == DELETE || op == ERROR);
}
- bool is_rollforward() const { /* TODO */ return false; }
-
string get_key_name() const;
void encode_with_checksum(bufferlist& bl) const;
void decode_with_checksum(bufferlist::iterator& p);
static pg_log_entry_t mk_ple_mod(
const hobject_t &hoid, eversion_t v, eversion_t pv) {
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.op = pg_log_entry_t::MODIFY;
e.soid = hoid;
e.version = v;
static pg_log_entry_t mk_ple_dt(
const hobject_t &hoid, eversion_t v, eversion_t pv) {
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.op = pg_log_entry_t::DELETE;
e.soid = hoid;
e.version = v;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid.set_hash(0x5);
eversion_t newhead;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
info.log_tail = log.tail = eversion_t(1, 1);
newhead = eversion_t(1, 3);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 5);
e.soid.set_hash(0x9);
add(e);
}
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 6);
e.soid.set_hash(0x10);
add(e);
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
list<hobject_t> remove_snap;
pg_log_entry_t ne;
- ne.mod_desc.mark_unrollbackable();
+ ne.mark_unrollbackable();
ne.version = eversion_t(2,1);
log.add(ne);
{
log.log.front().op = pg_log_entry_t::DELETE;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
oe.version = eversion_t(1,1);
TestHandler h(remove_snap);
ne.op = pg_log_entry_t::MODIFY;
missing.add_next_event(ne);
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
oe.version = eversion_t(1,1);
TestHandler h(remove_snap);
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
pg_log_entry_t ne;
- ne.mod_desc.mark_unrollbackable();
+ ne.mark_unrollbackable();
ne.version = eversion_t(1,1);
ne.op = pg_log_entry_t::DELETE;
log.add(ne);
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
pg_log_entry_t ne;
- ne.mod_desc.mark_unrollbackable();
+ ne.mark_unrollbackable();
ne.version = eversion_t(1,1);
ne.op = pg_log_entry_t::DELETE;
log.add(ne);
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
ObjectStore::Transaction t;
pg_log_entry_t oe;
- oe.mod_desc.mark_unrollbackable();
+ oe.mark_unrollbackable();
pg_info_t info;
list<hobject_t> remove_snap;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 4);
e.soid.set_hash(0x5);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid.set_hash(0x5);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid.set_hash(0x5);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
log.tail = eversion_t();
e.version = eversion_t(1, 1);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 2);
e.soid.set_hash(0x5);
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
{
e.soid = divergent_object;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid = divergent_object;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid = divergent_object;
{
pg_log_entry_t e;
- e.mod_desc.mark_unrollbackable();
+ e.mark_unrollbackable();
e.version = eversion_t(1, 1);
e.soid.set_hash(0x9);