#include <vector>
#include <map>
+#define OPS_PER_PTR 32
+
#if defined(DARWIN) || defined(__FreeBSD__)
#include <sys/statvfs.h>
#else
COLL_HINT_EXPECTED_NUM_OBJECTS = 1,
};
+ struct Op {
+ __le32 op;
+ __le32 cid;
+ __le32 oid;
+ __le64 off;
+ __le64 len;
+ __le32 dest_cid;
+ __le32 dest_oid; //OP_CLONE, OP_CLONERANGE
+ __le64 dest_off; //OP_CLONERANGE
+ __le32 hint_type; //OP_COLL_HINT
+ __le64 expected_object_size; //OP_SETALLOCHINT
+ __le64 expected_write_size; //OP_SETALLOCHINT
+ __le32 bits; //OP_SPLIT_COLLECTION2
+ __le32 rem; //OP_SPLIT_COLLECTION2
+ } __attribute__ ((packed)) ;
+
struct TransactionData {
__le64 ops;
__le32 largest_data_len;
void *osr; // NULL on replay
+ bool use_tbl; //use_tbl for encode/decode
bufferlist tbl;
+ map<coll_t, __le32> coll_index;
+ map<ghobject_t, __le32> object_index;
+
+ __le32 coll_id;
+ __le32 object_id;
+
+ bufferlist data_bl;
+ bufferlist op_bl;
+
+ bufferptr op_ptr;
+
list<Context *> on_applied;
list<Context *> on_commit;
list<Context *> on_applied_sync;
* right place. Sadly, there's no corresponding version nor any
* form of seat belts for the decoder.
*/
+ Op* _get_next_op() {
+ if (op_ptr.length() == 0 || op_ptr.offset() >= op_ptr.length()) {
+ op_ptr = bufferptr(sizeof(Op) * OPS_PER_PTR);
+ op_ptr.zero();
+ }
+ bufferptr ptr(op_ptr, 0, sizeof(Op));
+ op_bl.append(ptr);
+
+ op_ptr.set_offset(op_ptr.offset() + sizeof(Op));
+
+ char* p = ptr.c_str();
+ return (Op*)p;
+ }
+ __le32 _get_coll_id(const coll_t& coll) {
+ map<coll_t, __le32>::iterator c = coll_index.find(coll);
+ if (c != coll_index.end())
+ return c->second;
+
+ __le32 index_id = coll_id++;
+ coll_index[coll] = index_id;
+ return index_id;
+ }
+ __le32 _get_object_id(const ghobject_t& oid) {
+ map<ghobject_t, __le32>::iterator o = object_index.find(oid);
+ if (o != object_index.end())
+ return o->second;
+
+ __le32 index_id = object_id++;
+ object_index[oid] = index_id;
+ return index_id;
+ }
/// Commence a global file system sync operation.
void start_sync() {
- __u32 op = OP_STARTSYNC;
- ::encode(op, tbl);
+ if (use_tbl) {
+ __u32 op = OP_STARTSYNC;
+ ::encode(op, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_STARTSYNC;
+ }
data.ops++;
}
/// noop. 'nuf said
void nop() {
- __u32 op = OP_NOP;
- ::encode(op, tbl);
+ if (use_tbl) {
+ __u32 op = OP_NOP;
+ ::encode(op, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_NOP;
+ }
data.ops++;
}
/**
* empty object if necessary
*/
void touch(coll_t cid, const ghobject_t& oid) {
- __u32 op = OP_TOUCH;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_TOUCH;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_TOUCH;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ }
data.ops++;
}
/**
*/
void write(coll_t cid, const ghobject_t& oid, uint64_t off, uint64_t len,
const bufferlist& write_data, uint32_t flags = 0) {
- __u32 op = OP_WRITE;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(off, tbl);
- ::encode(len, tbl);
+ if (use_tbl) {
+ __u32 op = OP_WRITE;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(off, tbl);
+ ::encode(len, tbl);
+ ::encode(write_data, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_WRITE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->off = off;
+ _op->len = len;
+ ::encode(write_data, data_bl);
+ }
assert(len == write_data.length());
data.fadvise_flags = data.fadvise_flags | flags;
if (write_data.length() > data.largest_data_len) {
data.largest_data_off = off;
data.largest_data_off_in_tbl = tbl.length() + sizeof(__u32); // we are about to
}
- ::encode(write_data, tbl);
data.ops++;
}
/**
* underlying storage space.
*/
void zero(coll_t cid, const ghobject_t& oid, uint64_t off, uint64_t len) {
- __u32 op = OP_ZERO;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(off, tbl);
- ::encode(len, tbl);
+ if (use_tbl) {
+ __u32 op = OP_ZERO;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(off, tbl);
+ ::encode(len, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_ZERO;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->off = off;
+ _op->len = len;
+ }
data.ops++;
}
/// Discard all data in the object beyond the specified size.
void truncate(coll_t cid, const ghobject_t& oid, uint64_t off) {
- __u32 op = OP_TRUNCATE;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(off, tbl);
+ if (use_tbl) {
+ __u32 op = OP_TRUNCATE;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(off, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_TRUNCATE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->off = off;
+ }
data.ops++;
}
/// Remove an object. All four parts of the object are removed.
void remove(coll_t cid, const ghobject_t& oid) {
- __u32 op = OP_REMOVE;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_REMOVE;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_REMOVE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ }
data.ops++;
}
/// Set an xattr of an object
}
/// Set an xattr of an object
void setattr(coll_t cid, const ghobject_t& oid, const string& s, bufferlist& val) {
- __u32 op = OP_SETATTR;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(s, tbl);
- ::encode(val, tbl);
+ if (use_tbl) {
+ __u32 op = OP_SETATTR;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(s, tbl);
+ ::encode(val, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_SETATTR;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(s, data_bl);
+ ::encode(val, data_bl);
+ }
data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& attrset) {
- __u32 op = OP_SETATTRS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(attrset, tbl);
+ if (use_tbl) {
+ __u32 op = OP_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(attrset, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_SETATTRS;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(attrset, data_bl);
+ }
data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferlist>& attrset) {
- __u32 op = OP_SETATTRS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(attrset, tbl);
+ if (use_tbl) {
+ __u32 op = OP_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(attrset, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_SETATTRS;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(attrset, data_bl);
+ }
data.ops++;
}
/// remove an xattr from an object
}
/// remove an xattr from an object
void rmattr(coll_t cid, const ghobject_t& oid, const string& s) {
- __u32 op = OP_RMATTR;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(s, tbl);
+ if (use_tbl) {
+ __u32 op = OP_RMATTR;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(s, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_RMATTR;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(s, data_bl);
+ }
data.ops++;
}
/// remove all xattrs from an object
void rmattrs(coll_t cid, const ghobject_t& oid) {
- __u32 op = OP_RMATTRS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_RMATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_RMATTRS;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ }
data.ops++;
}
/**
* which case its previous contents are discarded.
*/
void clone(coll_t cid, const ghobject_t& oid, ghobject_t noid) {
- __u32 op = OP_CLONE;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(noid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_CLONE;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(noid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_CLONE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->dest_oid = _get_object_id(noid);
+ }
data.ops++;
}
/**
*/
void clone_range(coll_t cid, const ghobject_t& oid, ghobject_t noid,
uint64_t srcoff, uint64_t srclen, uint64_t dstoff) {
- __u32 op = OP_CLONERANGE2;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(noid, tbl);
- ::encode(srcoff, tbl);
- ::encode(srclen, tbl);
- ::encode(dstoff, tbl);
+ if (use_tbl) {
+ __u32 op = OP_CLONERANGE2;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(noid, tbl);
+ ::encode(srcoff, tbl);
+ ::encode(srclen, tbl);
+ ::encode(dstoff, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_CLONERANGE2;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->dest_oid = _get_object_id(noid);
+ _op->off = srcoff;
+ _op->len = srclen;
+ _op->dest_off = dstoff;
+ }
data.ops++;
}
/// Create the collection
void create_collection(coll_t cid) {
- __u32 op = OP_MKCOLL;
- ::encode(op, tbl);
- ::encode(cid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_MKCOLL;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_MKCOLL;
+ _op->cid = _get_coll_id(cid);
+ }
data.ops++;
}
* @param hint - the hint payload, which contains the customized
* data along with the hint type.
*/
- void collection_hint(coll_t cid, uint32_t type, const bufferlist& hint) {
- __u32 op = OP_COLL_HINT;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(type, tbl);
- ::encode(hint, tbl);
- data.ops++;
- }
+ void collection_hint(coll_t cid, uint32_t type, const bufferlist& hint) {
+ if (use_tbl) {
+ __u32 op = OP_COLL_HINT;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(type, tbl);
+ ::encode(hint, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_HINT;
+ _op->cid = _get_coll_id(cid);
+ _op->hint_type = type;
+ ::encode(hint, data_bl);
+ }
+ data.ops++;
+ }
/// remove the collection, the collection must be empty
void remove_collection(coll_t cid) {
- __u32 op = OP_RMCOLL;
- ::encode(op, tbl);
- ::encode(cid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_RMCOLL;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_RMCOLL;
+ _op->cid = _get_coll_id(cid);
+ }
data.ops++;
}
void collection_move(coll_t cid, coll_t oldcid, const ghobject_t& oid) {
// NOTE: we encode this as a fixed combo of ADD + REMOVE. they
// always appear together, so this is effectively a single MOVE.
- __u32 op = OP_COLL_ADD;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oldcid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_ADD;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oldcid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_ADD;
+ _op->cid = _get_coll_id(oldcid);
+ _op->oid = _get_object_id(oid);
+ _op->dest_cid = _get_coll_id(cid);
+ }
data.ops++;
- op = OP_COLL_REMOVE;
- ::encode(op, tbl);
- ::encode(oldcid, tbl);
- ::encode(oid, tbl);
+
+ if (use_tbl) {
+ __u32 op = OP_COLL_REMOVE;
+ ::encode(op, tbl);
+ ::encode(oldcid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_REMOVE;
+ _op->cid = _get_coll_id(oldcid);
+ _op->oid = _get_object_id(oid);
+ }
data.ops++;
- return;
}
void collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
coll_t cid, const ghobject_t& oid) {
- __u32 op = OP_COLL_MOVE_RENAME;
- ::encode(op, tbl);
- ::encode(oldcid, tbl);
- ::encode(oldoid, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_MOVE_RENAME;
+ ::encode(op, tbl);
+ ::encode(oldcid, tbl);
+ ::encode(oldoid, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_MOVE_RENAME;
+ _op->cid = _get_coll_id(oldcid);
+ _op->oid = _get_object_id(oldoid);
+ _op->dest_cid = _get_coll_id(cid);
+ _op->dest_oid = _get_object_id(oid);
+ }
data.ops++;
}
/// Set an xattr on a collection
void collection_setattr(coll_t cid, const string& name, bufferlist& val)
__attribute__ ((deprecated)) {
- __u32 op = OP_COLL_SETATTR;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(name, tbl);
- ::encode(val, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_SETATTR;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(name, tbl);
+ ::encode(val, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_SETATTR;
+ _op->cid = _get_coll_id(cid);
+ ::encode(name, data_bl);
+ ::encode(val, data_bl);
+ }
data.ops++;
}
/// Remove an xattr from a collection
void collection_rmattr(coll_t cid, const string& name)
__attribute__ ((deprecated)) {
- __u32 op = OP_COLL_RMATTR;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(name, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_RMATTR;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(name, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_RMATTR;
+ _op->cid = _get_coll_id(cid);
+ ::encode(name, data_bl);
+ }
data.ops++;
}
/// Set multiple xattrs on a collection
void collection_setattrs(coll_t cid, map<string,bufferptr>& aset)
__attribute__ ((deprecated)) {
- __u32 op = OP_COLL_SETATTRS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(aset, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(aset, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_SETATTRS;
+ _op->cid = _get_coll_id(cid);
+ ::encode(aset, data_bl);
+ }
data.ops++;
}
/// Set multiple xattrs on a collection
void collection_setattrs(coll_t cid, map<string,bufferlist>& aset)
__attribute__ ((deprecated)) {
- __u32 op = OP_COLL_SETATTRS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(aset, tbl);
+ if (use_tbl) {
+ __u32 op = OP_COLL_SETATTRS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(aset, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_COLL_SETATTRS;
+ _op->cid = _get_coll_id(cid);
+ ::encode(aset, data_bl);
+ }
data.ops++;
}
/// Remove omap from oid
coll_t cid, ///< [in] Collection containing oid
const ghobject_t &oid ///< [in] Object from which to remove omap
) {
- __u32 op = OP_OMAP_CLEAR;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
+ if (use_tbl) {
+ __u32 op = OP_OMAP_CLEAR;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_OMAP_CLEAR;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ }
data.ops++;
}
/// Set keys on oid omap. Replaces duplicate keys.
const ghobject_t &oid, ///< [in] Object to update
const map<string, bufferlist> &attrset ///< [in] Replacement keys and values
) {
- __u32 op = OP_OMAP_SETKEYS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(attrset, tbl);
+ if (use_tbl) {
+ __u32 op = OP_OMAP_SETKEYS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(attrset, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_OMAP_SETKEYS;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(attrset, data_bl);
+ }
data.ops++;
}
/// Remove keys from oid omap
const ghobject_t &oid, ///< [in] Object from which to remove the omap
const set<string> &keys ///< [in] Keys to clear
) {
- __u32 op = OP_OMAP_RMKEYS;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(keys, tbl);
+ if (use_tbl) {
+ __u32 op = OP_OMAP_RMKEYS;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(keys, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_OMAP_RMKEYS;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(keys, data_bl);
+ }
data.ops++;
}
const string& first, ///< [in] first key in range
const string& last ///< [in] first key past range, range is [first,last)
) {
- __u32 op = OP_OMAP_RMKEYRANGE;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(first, tbl);
- ::encode(last, tbl);
+ if (use_tbl) {
+ __u32 op = OP_OMAP_RMKEYRANGE;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(first, tbl);
+ ::encode(last, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_OMAP_RMKEYRANGE;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(first, data_bl);
+ ::encode(last, data_bl);
+ }
data.ops++;
}
const ghobject_t &oid, ///< [in] Object
const bufferlist &bl ///< [in] Header value
) {
- __u32 op = OP_OMAP_SETHEADER;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(bl, tbl);
+ if (use_tbl) {
+ __u32 op = OP_OMAP_SETHEADER;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(bl, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_OMAP_SETHEADER;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ ::encode(bl, data_bl);
+ }
data.ops++;
}
uint32_t bits,
uint32_t rem,
coll_t destination) {
- __u32 op = OP_SPLIT_COLLECTION2;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(bits, tbl);
- ::encode(rem, tbl);
- ::encode(destination, tbl);
+ if (use_tbl) {
+ __u32 op = OP_SPLIT_COLLECTION2;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(bits, tbl);
+ ::encode(rem, tbl);
+ ::encode(destination, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_SPLIT_COLLECTION2;
+ _op->cid = _get_coll_id(cid);
+ _op->dest_cid = _get_coll_id(destination);
+ _op->bits = bits;
+ _op->rem = rem;
+ }
data.ops++;
}
uint64_t expected_object_size,
uint64_t expected_write_size
) {
- __u32 op = OP_SETALLOCHINT;
- ::encode(op, tbl);
- ::encode(cid, tbl);
- ::encode(oid, tbl);
- ::encode(expected_object_size, tbl);
- ::encode(expected_write_size, tbl);
+ if (use_tbl) {
+ __u32 op = OP_SETALLOCHINT;
+ ::encode(op, tbl);
+ ::encode(cid, tbl);
+ ::encode(oid, tbl);
+ ::encode(expected_object_size, tbl);
+ ::encode(expected_write_size, tbl);
+ } else {
+ Op* _op = _get_next_op();
+ _op->op = OP_SETALLOCHINT;
+ _op->cid = _get_coll_id(cid);
+ _op->oid = _get_object_id(oid);
+ _op->expected_object_size = expected_object_size;
+ _op->expected_write_size = expected_write_size;
+ }
data.ops++;
}
// etc.
Transaction() :
- osr(NULL) {}
+ osr(NULL),
+ use_tbl(true),
+ coll_id(0),
+ object_id(0) {}
Transaction(bufferlist::iterator &dp) :
- osr(NULL) {
+ osr(NULL),
+ use_tbl(true),
+ coll_id(0),
+ object_id(0) {
decode(dp);
}
Transaction(bufferlist &nbl) :
- osr(NULL) {
+ osr(NULL),
+ use_tbl(true),
+ coll_id(0),
+ object_id(0) {
bufferlist::iterator dp = nbl.begin();
decode(dp);
}
::decode(tbl, bl);
} else {
decode7_5(bl, struct_v);
+ use_tbl = true;
}
DECODE_FINISH(bl);
}