bool use_pool_override;
bool replica;
+ list<Context *> on_applied;
+ list<Context *> on_commit;
+ list<Context *> on_applied_sync;
+
public:
+ void register_on_applied(Context *c) {
+ on_applied.push_back(c);
+ }
+ void register_on_commit(Context *c) {
+ on_commit.push_back(c);
+ }
+ void register_on_applied_sync(Context *c) {
+ on_applied_sync.push_back(c);
+ }
+
+ static void collect_contexts(
+ list<Transaction *> &t,
+ Context **out_on_applied,
+ Context **out_on_commit,
+ Context **out_on_applied_sync) {
+ assert(out_on_applied);
+ assert(out_on_commit);
+ assert(out_on_applied_sync);
+ list<Context *> on_applied, on_commit, on_applied_sync;
+ for (list<Transaction *>::iterator i = t.begin();
+ i != t.end();
+ ++i) {
+ on_applied.splice(on_applied.end(), (*i)->on_applied);
+ on_commit.splice(on_commit.end(), (*i)->on_commit);
+ on_applied_sync.splice(on_applied_sync.end(), (*i)->on_applied_sync);
+ }
+ *out_on_applied = C_Contexts::list_to_context(on_applied);
+ *out_on_commit = C_Contexts::list_to_context(on_commit);
+ *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync);
+ }
+
+ Context *get_on_applied() {
+ return C_Contexts::list_to_context(on_applied);
+ }
+ Context *get_on_commit() {
+ return C_Contexts::list_to_context(on_commit);
+ }
+ Context *get_on_applied_sync() {
+ return C_Contexts::list_to_context(on_applied_sync);
+ }
+
void set_pool_override(int64_t pool) {
pool_override = pool;
}
std::swap(largest_data_len, other.largest_data_len);
std::swap(largest_data_off, other.largest_data_off);
std::swap(largest_data_off_in_tbl, other.largest_data_off_in_tbl);
+ std::swap(on_applied, other.on_applied);
+ std::swap(on_commit, other.on_commit);
+ std::swap(on_applied_sync, other.on_applied_sync);
tbl.swap(other.tbl);
}
largest_data_off_in_tbl = tbl.length() + other.largest_data_off_in_tbl;
}
tbl.append(other.tbl);
+ on_applied.splice(on_applied.end(), other.on_applied);
+ on_commit.splice(on_commit.end(), other.on_commit);
+ on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync);
}
uint64_t get_encoded_bytes() {
tls.push_back(t);
return queue_transactions(osr, tls, new C_DeleteTransaction(t));
}
+
int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef()) {
tls.push_back(t);
return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op);
}
- virtual int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
- Context *onreadable_sync=0,
- TrackedOpRef op = TrackedOpRef()) = 0;
+
+ int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
+ Context *onreadable, Context *ondisk=0,
+ Context *onreadable_sync=0,
+ TrackedOpRef op = TrackedOpRef()) {
+ assert(!tls.empty());
+ tls.back()->register_on_applied(onreadable);
+ tls.back()->register_on_commit(ondisk);
+ tls.back()->register_on_applied_sync(onreadable_sync);
+ return queue_transactions(osr, tls, op);
+ }
+
+ virtual int queue_transactions(
+ Sequencer *osr, list<Transaction*>& tls,
+ TrackedOpRef op = TrackedOpRef()) = 0;
+
int queue_transactions(
Sequencer *osr,