struct {
uint64_t ino; // "file" identifier
uint32_t bno; // "block" in that "file"
- objectrev_t rev; // revision. normally ctime (as epoch).
+ uint64_t snap; // snap revision.
} __attribute__ ((packed));
- };
+ } __attribute__ ((packed));
- object_t() : ino(0), bno(0), rev(0) {}
- object_t(uint64_t i, uint32_t b) : ino(i), bno(b), rev(0) {}
- object_t(uint64_t i, uint32_t b, uint64_t r) : ino(i), bno(b), rev(r) {}
+ object_t() : ino(0), bno(0), snap(0) {}
+ object_t(uint64_t i, uint32_t b) : ino(i), bno(b), snap(0) {}
+ object_t(uint64_t i, uint32_t b, uint64_t r) : ino(i), bno(b), snap(r) {}
// IMPORTANT: make this match struct ceph_object ****
object_t(const ceph_object& co) {
if (op == CEPH_OSD_OP_WRNOOP)
return;
-
- // raise last_complete?
- if (info.last_complete == info.last_update)
- info.last_complete = version;
-
- // raise last_update.
- assert(version > info.last_update);
- info.last_update = version;
+ // clone?
+ if (poid.oid.snap) {
+ assert(poid.oid.snap == NOSNAP);
+ dout(20) << "snaps=" << snaps << " follows_snap=" << follows_snap << dendl;;
+
+ if (follows_snap &&
+ !snaps.empty() && snaps[0] > follows_snap) {
+ // clone
+ pobject_t coid = poid;
+ coid.oid.snap = snaps[0];
+
+ unsigned l;
+ for (l=1; l<snaps.size() && snaps[l] > follows_snap; l++) ;
+ vector<snapid_t> csnaps(l);
+ for (unsigned i=0; i<l; i++)
+ csnaps[i] = snaps[i];
+
+ // log clone
+ dout(10) << "cloning to " << coid << " v " << at_version << " csnaps=" << csnaps << dendl;
+ Log::Entry cloneentry(PG::Log::Entry::CLONE, coid.oid, at_version, reqid);
+ dout(10) << "prepare_transaction " << cloneentry << dendl;
+ log.add(cloneentry);
+ assert(log.top == at_version);
+
+ // prepare clone
+ t.clone(info.pgid, poid, coid);
+ t.setattr(info.pgid, coid, "snaps", &csnaps[0], csnaps.size()*sizeof(snapid_t));
+ t.setattr(info.pgid, coid, "version", &at_version, sizeof(at_version));
+
+ at_version.version++;
+
+ /*
+ // munge delete into a truncate?
+ if (op == CEPH_OSD_OP_DELETE) {
+ op = CEPH_OSD_OP_TRUNCATE;
+ length = 0;
+ }
+ */
+ }
+ }
- // write pg info
- bufferlist infobl;
- ::encode(info, infobl);
- t.collection_setattr(pgid, "info", infobl);
+ // log op
+ int opcode = Log::Entry::MODIFY;
+ if (op == CEPH_OSD_OP_DELETE) opcode = Log::Entry::DELETE;
+ Log::Entry logentry(opcode, poid.oid, at_version, reqid);
+ dout(10) << "prepare_transaction " << op << " " << logentry << dendl;
- // clone?
- if (crev && rev && rev > crev) {
- assert(0);
- pobject_t noid = poid; // FIXME ****
- noid.oid.rev = rev;
- dout(10) << "prepare_op_transaction cloning " << poid << " crev " << crev << " to " << noid << dendl;
- t.clone(info.pgid, poid, noid);
- did_clone = true;
- }
-
- // apply the op
+ assert(at_version > log.top);
+ log.add(logentry);
+ assert(log.top == at_version);
+
+ // prepare op
switch (op) {
// -- locking --
}
// object collection, version
- if (op == CEPH_OSD_OP_DELETE) {
- // remove object from c
- //t.collection_remove(pgid, poid);
- } else {
- // add object to c
- //t.collection_add(pgid, poid);
-
+ if (op != CEPH_OSD_OP_DELETE) {
// object version
- t.setattr(info.pgid, poid, "version", &version, sizeof(version));
+ t.setattr(info.pgid, poid, "version", &at_version, sizeof(at_version));
- // set object crev
- if (crev == 0 || // new object
- did_clone) // we cloned
- t.setattr(info.pgid, poid, "crev", &rev, sizeof(rev));
+ if (snaps.size())
+ t.setattr(info.pgid, poid, "follows_snap", &snaps[0], snaps.size() * sizeof(snapid_t));
}
- t.collection_setattr(info.pgid, "info", &info, sizeof(info));
+
+ // update pg info:
+ // raise last_complete only if we were previously up to date
+ if (info.last_complete == info.last_update)
+ info.last_complete = at_version;
+
+ // raise last_update.
+ assert(at_version > info.last_update);
+ info.last_update = at_version;
+
+ // write pg info
++ bufferlist infobl;
++ ::encode(info, infobl);
++ t.collection_setattr(info.pgid, "info", infobl);
+
+ // prepare log append
+ append_log(t, logentry, trim_to);
}