assert(size == 0);
}
- // TODO: handle this error.
- if (r != 0)
- probe->err = r;
+ bool probe_complete;
+ {
+ Mutex::Locker l(probe->lock);
- filer->_probed(probe, oid, size, mtime);
+ // TODO: handle this error.
+ if (r != 0) {
+ probe->err = r;
+ }
+
+ probe_complete = filer->_probed(probe, oid, size, mtime);
+ }
+ if (probe_complete) {
+ probe->onfinish->complete(probe->err);
+ delete probe;
+ }
}
};
probe->probing_off -= probe->probing_len;
}
- _probe(probe);
+ // Take lock before starting any I/Os, to protect us from concurrent calls
+ // back into C_Probe from OSD op completions from different OSDs
+ {
+ Mutex::Locker l(probe->lock);
+ _probe(probe);
+ }
return 0;
}
void Filer::_probe(Probe *probe)
{
+ assert(probe->lock.is_locked_by_me());
+
ldout(cct, 10) << "_probe " << hex << probe->ino << dec
<< " " << probe->probing_off << "~" << probe->probing_len
<< dendl;
}
}
-void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
+/**
+ * @return true if probe is complete and Probe object may be freed.
+ */
+bool Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
{
+ assert(probe->lock.is_locked_by_me());
+
ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
<< " has size " << size << " mtime " << mtime << dendl;
probe->ops.erase(oid);
if (!probe->ops.empty())
- return; // waiting for more!
+ return false; // waiting for more!
if (probe->err) { // we hit an error, propagate back up
- probe->onfinish->complete(probe->err);
- delete probe;
- return;
+ return true;
}
// analyze!
probe->probing_off -= period;
}
_probe(probe);
- return;
+ return false;
}
if (probe->pmtime) {
*probe->pmtime = probe->max_mtime;
}
- // done! finish and clean up.
- probe->onfinish->complete(probe->err);
- delete probe;
+ // done!
+ return true;
}
// probes
struct Probe {
+ Mutex lock;
inodeno_t ino;
ceph_file_layout layout;
snapid_t snapid;
Probe(inodeno_t i, ceph_file_layout &l, snapid_t sn,
uint64_t f, uint64_t *e, utime_t *m, int fl, bool fw, Context *c) :
- ino(i), layout(l), snapid(sn),
+ lock("Filer::Probe"), ino(i), layout(l), snapid(sn),
psize(e), pmtime(m), flags(fl), fwd(fw), onfinish(c),
probing_off(f), probing_len(0),
err(0), found_size(false) {}
class C_Probe;
void _probe(Probe *p);
- void _probed(Probe *p, const object_t& oid, uint64_t size, utime_t mtime);
+ bool _probed(Probe *p, const object_t& oid, uint64_t size, utime_t mtime);
public:
Filer(const Filer& other);