#include <errno.h>
#include <sys/stat.h>
#include <vector>
+#include <map>
#if defined(DARWIN) || defined(__FreeBSD__)
#include <sys/statvfs.h>
using std::vector;
using std::string;
+using std::map;
namespace ceph {
class Formatter;
COLL_HINT_EXPECTED_NUM_OBJECTS = 1,
};
+ struct TransactionData {
+ __le64 ops;
+ __le32 largest_data_len;
+ __le32 largest_data_off;
+ __le32 largest_data_off_in_tbl;
+ __le32 fadvise_flags;
+
+ TransactionData() :
+ ops(0),
+ largest_data_len(0),
+ largest_data_off(0),
+ largest_data_off_in_tbl(0),
+ fadvise_flags(0) { }
+
+ void encode(bufferlist& bl) const {
+ bl.append((char*)this, sizeof(TransactionData));
+ }
+ void decode(bufferlist::iterator &bl) {
+ bl.copy(sizeof(TransactionData), (char*)this);
+ }
+ } __attribute__ ((packed)) ;
+
private:
- uint64_t ops;
- uint32_t largest_data_len, largest_data_off, largest_data_off_in_tbl;
- bufferlist tbl;
- uint32_t fadvise_flags; //record write flags
+ TransactionData data;
+
void *osr; // NULL on replay
+ bufferlist tbl;
+
list<Context *> on_applied;
list<Context *> on_commit;
list<Context *> on_applied_sync;
public:
+
/* Operations on callback contexts */
void register_on_applied(Context *c) {
if (!c) return;
}
void set_fadvise_flags(uint32_t flags) {
- fadvise_flags = flags;
+ data.fadvise_flags = flags;
}
void set_fadvise_flag(uint32_t flag) {
- fadvise_flags |= flag;
+ data.fadvise_flags = data.fadvise_flags | flag;
}
- uint32_t get_fadvise_flags() { return fadvise_flags; }
+ uint32_t get_fadvise_flags() { return data.fadvise_flags; }
void swap(Transaction& other) {
- std::swap(ops, other.ops);
- std::swap(largest_data_len, other.largest_data_len);
- std::swap(largest_data_off, other.largest_data_off);
- std::swap(largest_data_off_in_tbl, other.largest_data_off_in_tbl);
- std::swap(fadvise_flags, other.fadvise_flags);
+ std::swap(data, other.data);
std::swap(on_applied, other.on_applied);
std::swap(on_commit, other.on_commit);
std::swap(on_applied_sync, other.on_applied_sync);
/// Append the operations of the parameter to this Transaction. Those operations are removed from the parameter Transaction
void append(Transaction& other) {
- ops += other.ops;
- if (other.largest_data_len > largest_data_len) {
- largest_data_len = other.largest_data_len;
- largest_data_off = other.largest_data_off;
- largest_data_off_in_tbl = tbl.length() + other.largest_data_off_in_tbl;
+ data.ops += other.data.ops;
+ if (other.data.largest_data_len > data.largest_data_len) {
+ data.largest_data_len = other.data.largest_data_len;
+ data.largest_data_off = other.data.largest_data_off;
+ data.largest_data_off_in_tbl = tbl.length() + other.data.largest_data_off_in_tbl;
}
- fadvise_flags |= other.fadvise_flags;
+ data.fadvise_flags |= other.data.fadvise_flags;
tbl.append(other.tbl);
on_applied.splice(on_applied.end(), other.on_applied);
on_commit.splice(on_commit.end(), other.on_commit);
}
/// Size of largest data buffer to the "write" operation encountered so far
uint32_t get_data_length() {
- return largest_data_len;
+ return data.largest_data_len;
}
/// offset within the encoded buffer to the start of the largest data buffer that's encoded
uint32_t get_data_offset() {
- if (largest_data_off_in_tbl) {
- return largest_data_off_in_tbl +
- sizeof(__u8) + // encode struct_v
- sizeof(__u8) + // encode compat_v
- sizeof(__u32) + // encode len
- sizeof(ops) +
- sizeof(uint64_t) + //encode old pad_unused_bytes
- sizeof(largest_data_len) +
- sizeof(largest_data_off) +
- sizeof(largest_data_off_in_tbl) +
- sizeof(fadvise_flags) +
- sizeof(__u32); // tbl length
+ if (data.largest_data_off_in_tbl) {
+ return data.largest_data_off_in_tbl +
+ sizeof(__u8) + // encode struct_v
+ sizeof(__u8) + // encode compat_v
+ sizeof(__u32) + // encode len
+ sizeof(uint64_t) + //ops
+ sizeof(uint64_t) + //old pad_unused_bytes
+ sizeof(uint32_t) + //largest_data_len
+ sizeof(uint32_t) + //largest_data_off
+ sizeof(uint32_t) + //largest_data_off_in_tbl
+ sizeof(uint32_t) + //fadvise_flags
+ sizeof(__u32); // tbl length
}
return 0; // none
}
/// offset of buffer as aligned to destination within object.
int get_data_alignment() {
- if (!largest_data_len)
+ if (!data.largest_data_len)
return -1;
return (0 - get_data_offset()) & ~CEPH_PAGE_MASK;
}
/// Is the Transaction empty (no operations)
bool empty() {
- return !ops;
+ return !data.ops;
}
/// Number of operations in the transation
int get_num_ops() {
- return ops;
+ return data.ops;
}
void set_osr(void *s) {
uint32_t fadvise_flags;
iterator(Transaction *t)
- : p(t->tbl.begin()),
- fadvise_flags(t->fadvise_flags) {}
+ : p(t->tbl.begin()) {}
friend class Transaction;
void start_sync() {
__u32 op = OP_STARTSYNC;
::encode(op, tbl);
- ops++;
+ data.ops++;
}
/// noop. 'nuf said
void nop() {
__u32 op = OP_NOP;
::encode(op, tbl);
- ops++;
+ data.ops++;
}
/**
* touch
::encode(op, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
}
/**
* Write data to an offset within an object. If the object is too
* "hole" in the file.
*/
void write(coll_t cid, const ghobject_t& oid, uint64_t off, uint64_t len,
- const bufferlist& data, uint32_t flags = 0) {
+ const bufferlist& write_data, uint32_t flags = 0) {
__u32 op = OP_WRITE;
::encode(op, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
::encode(off, tbl);
::encode(len, tbl);
- assert(len == data.length());
- fadvise_flags |= flags;
- if (data.length() > largest_data_len) {
- largest_data_len = data.length();
- largest_data_off = off;
- largest_data_off_in_tbl = tbl.length() + sizeof(__u32); // we are about to
+ assert(len == write_data.length());
+ data.fadvise_flags = data.fadvise_flags | flags;
+ if (write_data.length() > data.largest_data_len) {
+ data.largest_data_len = write_data.length();
+ data.largest_data_off = off;
+ data.largest_data_off_in_tbl = tbl.length() + sizeof(__u32); // we are about to
}
- ::encode(data, tbl);
- ops++;
+ ::encode(write_data, tbl);
+ data.ops++;
}
/**
* zero out the indicated byte range within an object. Some
::encode(oid, tbl);
::encode(off, tbl);
::encode(len, tbl);
- ops++;
+ data.ops++;
}
/// Discard all data in the object beyond the specified size.
void truncate(coll_t cid, const ghobject_t& oid, uint64_t off) {
::encode(cid, tbl);
::encode(oid, tbl);
::encode(off, tbl);
- ops++;
+ data.ops++;
}
/// Remove an object. All four parts of the object are removed.
void remove(coll_t cid, const ghobject_t& oid) {
::encode(op, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
}
/// Set an xattr of an object
void setattr(coll_t cid, const ghobject_t& oid, const char* name, bufferlist& val) {
::encode(oid, tbl);
::encode(s, tbl);
::encode(val, tbl);
- ops++;
+ data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferptr>& attrset) {
::encode(cid, tbl);
::encode(oid, tbl);
::encode(attrset, tbl);
- ops++;
+ data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(coll_t cid, const ghobject_t& oid, map<string,bufferlist>& attrset) {
::encode(cid, tbl);
::encode(oid, tbl);
::encode(attrset, tbl);
- ops++;
+ data.ops++;
}
/// remove an xattr from an object
void rmattr(coll_t cid, const ghobject_t& oid, const char *name) {
::encode(cid, tbl);
::encode(oid, tbl);
::encode(s, tbl);
- ops++;
+ data.ops++;
}
/// remove all xattrs from an object
void rmattrs(coll_t cid, const ghobject_t& oid) {
::encode(op, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
}
/**
* Clone an object into another object.
::encode(cid, tbl);
::encode(oid, tbl);
::encode(noid, tbl);
- ops++;
+ data.ops++;
}
/**
* Clone a byte range from one object to another.
::encode(srcoff, tbl);
::encode(srclen, tbl);
::encode(dstoff, tbl);
- ops++;
+ data.ops++;
}
/// Create the collection
void create_collection(coll_t cid) {
__u32 op = OP_MKCOLL;
::encode(op, tbl);
::encode(cid, tbl);
- ops++;
+ data.ops++;
}
/**
::encode(cid, tbl);
::encode(type, tbl);
::encode(hint, tbl);
- ops++;
+ data.ops++;
}
/// remove the collection, the collection must be empty
__u32 op = OP_RMCOLL;
::encode(op, tbl);
::encode(cid, tbl);
- ops++;
+ data.ops++;
}
void collection_move(coll_t cid, coll_t oldcid, const ghobject_t& oid) {
// NOTE: we encode this as a fixed combo of ADD + REMOVE. they
::encode(cid, tbl);
::encode(oldcid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
op = OP_COLL_REMOVE;
::encode(op, tbl);
::encode(oldcid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
return;
}
void collection_move_rename(coll_t oldcid, const ghobject_t& oldoid,
::encode(oldoid, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
}
// NOTE: Collection attr operations are all DEPRECATED. new
::encode(cid, tbl);
::encode(name, tbl);
::encode(val, tbl);
- ops++;
+ data.ops++;
}
/// Remove an xattr from a collection
::encode(op, tbl);
::encode(cid, tbl);
::encode(name, tbl);
- ops++;
+ data.ops++;
}
/// Set multiple xattrs on a collection
void collection_setattrs(coll_t cid, map<string,bufferptr>& aset)
::encode(op, tbl);
::encode(cid, tbl);
::encode(aset, tbl);
- ops++;
+ data.ops++;
}
/// Set multiple xattrs on a collection
void collection_setattrs(coll_t cid, map<string,bufferlist>& aset)
::encode(op, tbl);
::encode(cid, tbl);
::encode(aset, tbl);
- ops++;
+ data.ops++;
}
-
/// Remove omap from oid
void omap_clear(
coll_t cid, ///< [in] Collection containing oid
::encode(op, tbl);
::encode(cid, tbl);
::encode(oid, tbl);
- ops++;
+ data.ops++;
}
/// Set keys on oid omap. Replaces duplicate keys.
void omap_setkeys(
::encode(cid, tbl);
::encode(oid, tbl);
::encode(attrset, tbl);
- ops++;
+ data.ops++;
}
/// Remove keys from oid omap
void omap_rmkeys(
::encode(cid, tbl);
::encode(oid, tbl);
::encode(keys, tbl);
- ops++;
+ data.ops++;
}
/// Remove key range from oid omap
::encode(oid, tbl);
::encode(first, tbl);
::encode(last, tbl);
- ops++;
+ data.ops++;
}
/// Set omap header
::encode(cid, tbl);
::encode(oid, tbl);
::encode(bl, tbl);
- ops++;
+ data.ops++;
}
/// Split collection based on given prefixes, objects matching the specified bits/rem are
::encode(bits, tbl);
::encode(rem, tbl);
::encode(destination, tbl);
- ++ops;
+ data.ops++;
}
void set_alloc_hint(
::encode(oid, tbl);
::encode(expected_object_size, tbl);
::encode(expected_write_size, tbl);
- ++ops;
+ data.ops++;
}
// etc.
Transaction() :
- ops(0),
- largest_data_len(0),
- largest_data_off(0),
- largest_data_off_in_tbl(0),
- fadvise_flags(0),
osr(NULL) {}
Transaction(bufferlist::iterator &dp) :
- ops(0),
- largest_data_len(0),
- largest_data_off(0),
- largest_data_off_in_tbl(0),
- fadvise_flags(0),
osr(NULL) {
decode(dp);
}
Transaction(bufferlist &nbl) :
- ops(0),
- largest_data_len(0),
- largest_data_off(0),
- largest_data_off_in_tbl(0),
- fadvise_flags(0),
osr(NULL) {
bufferlist::iterator dp = nbl.begin();
decode(dp);
void encode(bufferlist& bl) const {
ENCODE_START(8, 5, bl);
- uint64_t pad_unused_bytes = 0;
- bool tolerate_collection_add_enoent = false;
- ::encode(ops, bl);
- ::encode(pad_unused_bytes, bl);
- ::encode(largest_data_len, bl);
- ::encode(largest_data_off, bl);
- ::encode(largest_data_off_in_tbl, bl);
+ data.encode(bl);
::encode(tbl, bl);
- ::encode(tolerate_collection_add_enoent, bl);
- ::encode(fadvise_flags, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(8, 5, 5, bl);
DECODE_OLDEST(2);
- uint64_t pad_unused_bytes; //no used
- bool tolerate_collection_add_enoent; //no used
- ::decode(ops, bl);
- ::decode(pad_unused_bytes, bl);
+ if (struct_v == 8) {
+ data.decode(bl);
+ ::decode(tbl, bl);
+ } else {
+ decode7_5(bl, struct_v);
+ }
+ DECODE_FINISH(bl);
+ }
+ void decode7_5(bufferlist::iterator &bl, __u8 struct_v) {
+ uint64_t _ops = 0;
+ uint64_t _pad_unused_bytes = 0;
+ uint32_t _largest_data_len = 0;
+ uint32_t _largest_data_off = 0;
+ uint32_t _largest_data_off_in_tbl = 0;
+ uint32_t _fadvise_flags = 0;
+ bool tolerate_collection_add_enoent = false;
+
+ ::decode(_ops, bl);
+ ::decode(_pad_unused_bytes, bl);
if (struct_v >= 3) {
- ::decode(largest_data_len, bl);
- ::decode(largest_data_off, bl);
- ::decode(largest_data_off_in_tbl, bl);
+ ::decode(_largest_data_len, bl);
+ ::decode(_largest_data_off, bl);
+ ::decode(_largest_data_off_in_tbl, bl);
}
::decode(tbl, bl);
if (struct_v >= 7) {
::decode(tolerate_collection_add_enoent, bl);
}
if (struct_v >= 8) {
- ::decode(fadvise_flags, bl);
+ ::decode(_fadvise_flags, bl);
}
- DECODE_FINISH(bl);
+
+ //assign temp to TransactionData
+ data.ops = _ops;
+ data.largest_data_len = _largest_data_len;
+ data.largest_data_off = _largest_data_off;
+ data.largest_data_off_in_tbl = _largest_data_off_in_tbl;
+ data.fadvise_flags = _fadvise_flags;
}
void dump(ceph::Formatter *f);
virtual void inject_mdata_error(const ghobject_t &oid) {}
};
WRITE_CLASS_ENCODER(ObjectStore::Transaction)
+WRITE_CLASS_ENCODER(ObjectStore::Transaction::TransactionData)
ostream& operator<<(ostream& out, const ObjectStore::Sequencer& s);