logger->set(l_os_oq_bytes, op_queue_bytes);
}
-void FileStore::_do_op(OpSequencer *osr)
+void FileStore::_do_op(OpSequencer *osr, ThreadPool::TPHandle &handle)
{
// inject a stall?
if (g_conf->filestore_inject_stall) {
Op *o = osr->peek_queue();
apply_manager.op_apply_start(o->op);
dout(5) << "_do_op " << o << " seq " << o->op << " " << *osr << "/" << osr->parent << " start" << dendl;
- int r = do_transactions(o->tls, o->op);
+ int r = _do_transactions(o->tls, o->op, &handle);
apply_manager.op_apply_finish(o->op);
dout(10) << "_do_op " << o << " seq " << o->op << " r = " << r
<< ", finisher " << o->onreadable << " " << o->onreadable_sync << dendl;
}
}
-int FileStore::do_transactions(list<Transaction*> &tls, uint64_t op_seq)
+int FileStore::_do_transactions(
+ list<Transaction*> &tls,
+ uint64_t op_seq,
+ ThreadPool::TPHandle *handle)
{
int r = 0;
r = _do_transaction(**p, op_seq, trans_num);
if (r < 0)
break;
+ if (handle)
+ handle->reset_tp_timeout();
}
return r;
store->op_queue.pop_front();
return osr;
}
- void _process(OpSequencer *osr) {
- store->_do_op(osr);
+ void _process(OpSequencer *osr, ThreadPool::TPHandle &handle) {
+ store->_do_op(osr, handle);
}
void _process_finish(OpSequencer *osr) {
store->_finish_op(osr);
}
} op_wq;
- void _do_op(OpSequencer *o);
+ void _do_op(OpSequencer *o, ThreadPool::TPHandle &handle);
void _finish_op(OpSequencer *o);
Op *build_op(list<Transaction*>& tls,
Context *onreadable, Context *onreadable_sync,
int statfs(struct statfs *buf);
- int do_transactions(list<Transaction*> &tls, uint64_t op_seq);
-
+ int _do_transactions(
+ list<Transaction*> &tls, uint64_t op_seq,
+ ThreadPool::TPHandle *handle);
+ int do_transactions(list<Transaction*> &tls, uint64_t op_seq) {
+ return _do_transactions(tls, op_seq, 0);
+ }
unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num);
int queue_transaction(Sequencer *osr, Transaction* t);