timer(g_ceph_context, sync_entry_timeo_lock),
stop(false), sync_thread(this),
default_osr("default"),
- op_queue_len(0), op_queue_bytes(0), op_finisher(g_ceph_context),
+ op_queue_len(0), op_queue_bytes(0),
+ op_throttle_lock("FileStore::op_throttle_lock"),
+ op_finisher(g_ceph_context),
op_tp(g_ceph_context, "FileStore::op_tp", g_conf->filestore_op_threads, "filestore_op_threads"),
op_wq(this, g_conf->filestore_op_thread_timeout,
g_conf->filestore_op_thread_suicide_timeout, &op_tp),
}
void FileStore::op_queue_reserve_throttle(Op *o)
-{
- op_tp.lock();
- _op_queue_reserve_throttle(o, "op_queue_reserve_throttle");
- op_tp.unlock();
-}
-
-void FileStore::_op_queue_reserve_throttle(Op *o, const char *caller)
{
// Do not call while holding the journal lock!
uint64_t max_ops = m_filestore_queue_max_ops;
logger->set(l_os_oq_max_ops, max_ops);
logger->set(l_os_oq_max_bytes, max_bytes);
- while ((max_ops && (op_queue_len + 1) > max_ops) ||
- (max_bytes && op_queue_bytes // let single large ops through!
- && (op_queue_bytes + o->bytes) > max_bytes)) {
- dout(2) << caller << " waiting: "
- << op_queue_len + 1 << " > " << max_ops << " ops || "
- << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
- op_tp.wait(op_throttle_cond);
- }
+ {
+ Mutex::Locker l(op_throttle_lock);
+ while ((max_ops && (op_queue_len + 1) > max_ops) ||
+ (max_bytes && op_queue_bytes // let single large ops through!
+ && (op_queue_bytes + o->bytes) > max_bytes)) {
+ dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
+ << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
+ op_throttle_cond.Wait(op_throttle_lock);
+ }
- op_queue_len++;
- op_queue_bytes += o->bytes;
+ op_queue_len++;
+ op_queue_bytes += o->bytes;
+ }
logger->set(l_os_oq_ops, op_queue_len);
logger->set(l_os_oq_bytes, op_queue_bytes);
}
-void FileStore::_op_queue_release_throttle(Op *o)
+void FileStore::op_queue_release_throttle(Op *o)
{
- // Called with op_tp lock!
- op_queue_len--;
- op_queue_bytes -= o->bytes;
- op_throttle_cond.Signal();
+ {
+ Mutex::Locker l(op_throttle_lock);
+ op_queue_len--;
+ op_queue_bytes -= o->bytes;
+ op_throttle_cond.Signal();
+ }
logger->set(l_os_oq_ops, op_queue_len);
logger->set(l_os_oq_bytes, op_queue_bytes);
osr->apply_lock.Unlock(); // locked in _do_op
// called with tp lock held
- _op_queue_release_throttle(o);
+ op_queue_release_throttle(o);
utime_t lat = ceph_clock_now(g_ceph_context);
lat -= o->start;
deque<OpSequencer*> op_queue;
uint64_t op_queue_len, op_queue_bytes;
Cond op_throttle_cond;
+ Mutex op_throttle_lock;
Finisher op_finisher;
ThreadPool op_tp;
TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
void op_queue_reserve_throttle(Op *o);
- void _op_queue_reserve_throttle(Op *o, const char *caller = 0);
- void _op_queue_release_throttle(Op *o);
+ void op_queue_release_throttle(Op *o);
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend class C_JournaledAhead;