Transaction(const Transaction& other) = default;
Transaction& operator=(const Transaction& other) = default;
+ // expose object_index for FileStore::Op's benefit
+ const map<ghobject_t, __le32>& get_object_index() const {
+ return object_index;
+ }
+
/* Operations on callback contexts */
void register_on_applied(Context *c) {
if (!c) return;
} else if (m_filestore_journal_writeahead) {
dout(5) << __FUNC__ << ": (writeahead) " << o->op << " " << o->tls << dendl;
- osr->queue_journal(o->op);
+ osr->queue_journal(o);
trace.keyval("journal mode", "writeahead");
trace.event("journal started");
uint64_t ops, bytes;
TrackedOpRef osd_op;
ZTracer::Trace trace;
+ bool registered_apply = false;
};
class OpSequencer : public CollectionImpl {
CephContext *cct;
list<pair<uint64_t, Context*> > flush_commit_waiters;
Cond cond;
string osr_name_str;
+ map<ghobject_t,int> applying;
public:
Mutex apply_lock; // for apply mutual exclusion
int id;
}
}
- void queue_journal(uint64_t s) {
+ void queue_journal(Op *o) {
Mutex::Locker l(qlock);
- jq.push_back(s);
+ jq.push_back(o->op);
+ _register_apply(o);
}
void dequeue_journal(list<Context*> *to_queue) {
Mutex::Locker l(qlock);
void queue(Op *o) {
Mutex::Locker l(qlock);
q.push_back(o);
+ _register_apply(o);
o->trace.keyval("queue depth", q.size());
}
+ void _register_apply(Op *o) {
+ if (o->registered_apply)
+ return;
+ o->registered_apply = true;
+ for (auto& t : o->tls) {
+ for (auto& i : t.get_object_index()) {
+ ++applying[i.first];
+ }
+ }
+ }
+ void _unregister_apply(Op *o) {
+ assert(o->registered_apply);
+ for (auto& t : o->tls) {
+ for (auto& i : t.get_object_index()) {
+ auto p = applying.find(i.first);
+ assert(p != applying.end());
+ if (--p->second == 0) {
+ applying.erase(p);
+ }
+ }
+ }
+ }
+ void wait_for_apply(const ghobject_t& oid) {
+ Mutex::Locker l(qlock);
+ while (applying.count(oid)) {
+ cond.Wait(qlock);
+ }
+ }
Op *peek_queue() {
Mutex::Locker l(qlock);
assert(apply_lock.is_locked());
Op *o = q.front();
q.pop_front();
cond.Signal();
-
+ _unregister_apply(o);
_wake_flush_waiters(to_queue);
return o;
}