apply_manager.op_apply_finish(o->op);
dout(10) << __FUNC__ << ": " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
-
- o->tls.clear();
-
}
void FileStore::_finish_op(OpSequencer *osr)
list<Context*> to_queue;
Op *o = osr->dequeue(&to_queue);
+ o->tls.clear();
+
utime_t lat = ceph_clock_now();
lat -= o->start;
o = nullptr;
}
-
struct C_JournaledAhead : public Context {
FileStore *fs;
FileStore::OpSequencer *osr;
bool FileStore::exists(CollectionHandle& ch, const ghobject_t& oid)
{
tracepoint(objectstore, exists_enter, ch->cid.c_str());
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
struct stat st;
bool retval = stat(ch, oid, &st) == 0;
tracepoint(objectstore, exists_exit, retval);
CollectionHandle& ch, const ghobject_t& oid, struct stat *st, bool allow_eio)
{
tracepoint(objectstore, stat_enter, ch->cid.c_str());
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
int r = lfn_stat(cid, oid, st);
assert(allow_eio || !m_filestore_fail_eio || r != -EIO);
dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
+
FDRef fd;
int r = lfn_open(cid, oid, false, &fd);
if (r < 0) {
dout(15) << __FUNC__ << ": " << cid << "/" << oid << " " << offset << "~" << len << dendl;
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
+
FDRef fd;
int r = lfn_open(cid, oid, false, &fd);
tracepoint(objectstore, getattr_enter, ch->cid.c_str());
const coll_t& cid = !_need_temp_object_collection(ch->cid, oid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << cid << "/" << oid << " '" << name << "'" << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
+
FDRef fd;
int r = lfn_open(cid, oid, false, &fd);
if (r < 0) {
map<string, bufferlist> omap_aset;
Index index;
dout(15) << __FUNC__ << ": " << cid << "/" << oid << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(oid);
+
FDRef fd;
bool spill_out = true;
char buf[2];
tracepoint(objectstore, omap_get_enter, ch->cid.c_str());
const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(hoid);
+
Index index;
int r = get_index(c, &index);
if (r < 0)
tracepoint(objectstore, omap_get_header_enter, ch->cid.c_str());
const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(hoid);
+
Index index;
int r = get_index(c, &index);
if (r < 0)
tracepoint(objectstore, omap_get_keys_enter, ch->cid.c_str());
const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(hoid);
+
Index index;
int r = get_index(c, &index);
if (r < 0)
tracepoint(objectstore, omap_get_values_enter, ch->cid.c_str());
const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(hoid);
+
Index index;
const char *where = "()";
int r = get_index(c, &index);
const coll_t& c = !_need_temp_object_collection(ch->cid, hoid) ? ch->cid : ch->cid.get_temp();
dout(15) << __FUNC__ << ": " << c << "/" << hoid << dendl;
+ auto osr = static_cast<OpSequencer*>(ch.get());
+ osr->wait_for_apply(hoid);
+
Index index;
int r = get_index(c, &index);
if (r < 0)
z.omap_backend = "rocksdb";
o.push_back(new FSSuperblock(z));
}
+
+#undef dout_prefix
+#define dout_prefix *_dout << "filestore.osr(" << this << ") "
+
+void FileStore::OpSequencer::_register_apply(Op *o)
+{
+ if (o->registered_apply) {
+ dout(20) << __func__ << " " << o << " already registered" << dendl;
+ return;
+ }
+ o->registered_apply = true;
+ for (auto& t : o->tls) {
+ for (auto& i : t.get_object_index()) {
+ ++applying[i.first];
+ dout(20) << __func__ << " " << o << " " << i.first << " now " << applying[i.first]
+ << dendl;
+ }
+ }
+}
+
+void FileStore::OpSequencer::_unregister_apply(Op *o)
+{
+ assert(o->registered_apply);
+ for (auto& t : o->tls) {
+ for (auto& i : t.get_object_index()) {
+ auto p = applying.find(i.first);
+ assert(p != applying.end());
+ if (--p->second == 0) {
+ dout(20) << __func__ << " " << o << " " << i.first << " now 0, removing" << dendl;
+ applying.erase(p);
+ } else {
+ dout(20) << __func__ << " " << o << " " << i.first << " now " << p->second << dendl;
+ }
+ }
+ }
+}
+
+void FileStore::OpSequencer::wait_for_apply(const ghobject_t& oid)
+{
+ Mutex::Locker l(qlock);
+ while (applying.count(oid)) {
+ dout(20) << __func__ << " " << oid << " waiting" << dendl;
+ cond.Wait(qlock);
+ }
+ dout(20) << __func__ << " " << oid << " done" << dendl;
+}