#include <vector>
#include <map>
-#define OPS_PER_PTR 32
-
#if defined(DARWIN) || defined(__FreeBSD__)
#include <sys/statvfs.h>
#else
#include <sys/vfs.h> /* or <sys/statfs.h> */
#endif /* DARWIN */
+#define OPS_PER_PTR 32
+
class CephContext;
using std::vector;
std::swap(on_applied, other.on_applied);
std::swap(on_commit, other.on_commit);
std::swap(on_applied_sync, other.on_applied_sync);
+
+ std::swap(use_tbl, other.use_tbl);
tbl.swap(other.tbl);
- }
+ std::swap(coll_index, other.coll_index);
+ std::swap(object_index, other.object_index);
+ std::swap(coll_id, other.coll_id);
+ std::swap(object_id, other.object_id);
+ op_bl.swap(other.op_bl);
+ data_bl.swap(other.data_bl);
+ }
+
+ void _update_op(Op* op,
+ vector<__le32> &cm,
+ vector<__le32> &om) {
+
+ switch (op->op) {
+ case OP_NOP:
+ case OP_STARTSYNC:
+ break;
+
+ case OP_TOUCH:
+ case OP_REMOVE:
+ case OP_SETATTR:
+ case OP_SETATTRS:
+ case OP_RMATTR:
+ case OP_RMATTRS:
+ case OP_COLL_REMOVE:
+ case OP_OMAP_CLEAR:
+ case OP_OMAP_SETKEYS:
+ case OP_OMAP_RMKEYS:
+ case OP_OMAP_RMKEYRANGE:
+ case OP_OMAP_SETHEADER:
+ case OP_WRITE:
+ case OP_ZERO:
+ case OP_TRUNCATE:
+ case OP_CLONERANGE2:
+ case OP_SETALLOCHINT:
+ assert(op->cid < cm.size());
+ assert(op->oid < om.size());
+ op->cid = cm[op->cid];
+ op->oid = om[op->oid];
+ break;
+
+ case OP_CLONE:
+ assert(op->cid < cm.size());
+ assert(op->oid < om.size());
+ assert(op->dest_oid < om.size());
+ op->cid = cm[op->cid];
+ op->oid = om[op->oid];
+ op->dest_oid = om[op->dest_oid];
+ break;
+
+ case OP_MKCOLL:
+ case OP_RMCOLL:
+ case OP_COLL_SETATTR:
+ case OP_COLL_RMATTR:
+ case OP_COLL_SETATTRS:
+ case OP_COLL_HINT:
+ assert(op->cid < cm.size());
+ op->cid = cm[op->cid];
+ break;
+
+ case OP_COLL_ADD:
+ assert(op->cid < cm.size());
+ assert(op->oid < om.size());
+ assert(op->dest_cid < om.size());
+ op->cid = cm[op->cid];
+ op->dest_cid = cm[op->dest_cid];
+ op->oid = om[op->oid];
+ break;
+
+ case OP_COLL_MOVE_RENAME:
+ assert(op->cid < cm.size());
+ assert(op->oid < om.size());
+ assert(op->dest_cid < cm.size());
+ assert(op->dest_oid < om.size());
+ op->cid = cm[op->cid];
+ op->oid = om[op->oid];
+ op->dest_cid = cm[op->dest_cid];
+ op->dest_oid = om[op->dest_oid];
+ break;
+
+ case OP_SPLIT_COLLECTION2:
+ assert(op->cid < cm.size());
+ op->dest_cid = cm[op->dest_oid];
+ op->cid = cm[op->cid];
+ op->dest_cid = cm[op->dest_cid];
+ break;
+
+ default:
+ assert(0 == "Unkown OP");
+ }
+ }
+ void _update_op_bl(
+ bufferlist& bl,
+ vector<__le32> &cm,
+ vector<__le32> &om) {
+
+ list<bufferptr> list = bl.buffers();
+ std::list<bufferptr>::iterator p;
+
+ for(p = list.begin(); p != list.end(); p++) {
+ assert(p->length() % sizeof(Op) == 0);
+
+ char* raw_p = p->c_str();
+ char* raw_end = raw_p + p->length();
+ while (raw_p < raw_end) {
+ _update_op((Op*)raw_p, cm, om);
+ raw_p += sizeof(Op);
+ }
+ }
+ }
/// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
void append(Transaction& other) {
+ assert(use_tbl == other.use_tbl);
+
data.ops += other.data.ops;
if (other.data.largest_data_len > data.largest_data_len) {
data.largest_data_len = other.data.largest_data_len;
on_applied.splice(on_applied.end(), other.on_applied);
on_commit.splice(on_commit.end(), other.on_commit);
on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);
+
+ //append coll_index & object_index
+ vector<__le32> cm(other.coll_index.size());
+ map<coll_t, __le32>::iterator coll_index_p;
+ for (coll_index_p = other.coll_index.begin();
+ coll_index_p != other.coll_index.end();
+ coll_index_p++) {
+ cm[coll_index_p->second] = _get_coll_id(coll_index_p->first);
+ }
+
+ vector<__le32> om(other.object_index.size());
+ map<ghobject_t, __le32>::iterator object_index_p;
+ for (object_index_p = other.object_index.begin();
+ object_index_p != other.object_index.end();
+ object_index_p++) {
+ om[object_index_p->second] = _get_object_id(object_index_p->first);
+ }
+
+ //update other.op_bl with cm & om
+ //When the other is appended to current transaction, all coll_index and
+ //object_index in other.op_buffer should be updated by new index of the
+ //combined transaction
+ _update_op_bl(other.op_bl, cm, om);
+
+ //append op_bl
+ op_bl.append(other.op_bl);
+ //append data_bl
+ data_bl.append(other.data_bl);
+ }
+ void make_op_bl_continue() {
+ op_bl.get_contiguous(0, op_bl.length());
}
/** Inquires about the Transaction as a whole. */
}
ghobject_t get_oid(__le32 oid_id) {
+ assert(oid_id < objects.size());
return objects[oid_id];
}
coll_t get_cid(__le32 cid_id) {
+ assert(cid_id < colls.size());
return colls[cid_id];
}
uint32_t get_fadvise_flags() const {
iterator_impl* iterator = NULL;
if (t->use_tbl) {
- assert("tbl is not supported" == 0);
+ assert("tbl encode is not supported" == 0);
iterator = new map_iterator(t);
} else {
iterator = new map_iterator(t);
osr(NULL),
use_tbl(false),
coll_id(0),
- object_id(0) {}
+ object_id(0) { }
Transaction(bufferlist::iterator &dp) :
osr(NULL),
::decode(object_index, bl);
data.decode(bl);
use_tbl = false;
+ coll_id = coll_index.size();
+ object_id = object_index.size();
} else {
decode7_5(bl, struct_v);
use_tbl = true;