op_wq.queue(osr);
}
-void FileStore::op_queue_reserve_throttle(Op *o)
+void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
{
// Do not call while holding the journal lock!
uint64_t max_ops = m_filestore_queue_max_ops;
&& (op_queue_bytes + o->bytes) > max_bytes)) {
dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
<< op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
+ if (handle)
+ handle->suspend_tp_timeout();
op_throttle_cond.Wait(op_throttle_lock);
+ if (handle)
+ handle->reset_tp_timeout();
}
op_queue_len++;
};
int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
- TrackedOpRef osd_op)
+ TrackedOpRef osd_op,
+ ThreadPool::TPHandle *handle)
{
Context *onreadable;
Context *ondisk;
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
- op_queue_reserve_throttle(o);
+ op_queue_reserve_throttle(o, handle);
journal->throttle();
uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num;
Context *onreadable, Context *onreadable_sync,
TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
- void op_queue_reserve_throttle(Op *o);
+ void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
void op_queue_release_throttle(Op *o);
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend struct C_JournaledAhead;
ThreadPool::TPHandle *handle);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
- TrackedOpRef op = TrackedOpRef());
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL);
/**
* set replay guard xattr on given file
#include "include/types.h"
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
+#include "common/WorkQueue.h"
#include "ObjectMap.h"
#include <errno.h>
}
unsigned apply_transactions(Sequencer *osr, list<Transaction*>& tls, Context *ondisk=0);
- int queue_transaction(Sequencer *osr, Transaction* t) {
+ int queue_transaction(Sequencer *osr, Transaction* t,
+ ThreadPool::TPHandle *handle = NULL) {
list<Transaction *> tls;
tls.push_back(t);
- return queue_transactions(osr, tls, new C_DeleteTransaction(t));
+ return queue_transactions(osr, tls, new C_DeleteTransaction(t),
+ NULL, NULL, TrackedOpRef(), handle);
}
int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0,
- TrackedOpRef op = TrackedOpRef()) {
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL) {
list<Transaction*> tls;
tls.push_back(t);
- return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op);
+ return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync,
+ op, handle);
}
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0,
- TrackedOpRef op = TrackedOpRef()) {
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL) {
assert(!tls.empty());
tls.back()->register_on_applied(onreadable);
tls.back()->register_on_commit(ondisk);
tls.back()->register_on_applied_sync(onreadable_sync);
- return queue_transactions(osr, tls, op);
+ return queue_transactions(osr, tls, op, handle);
}
virtual int queue_transactions(
Sequencer *osr, list<Transaction*>& tls,
- TrackedOpRef op = TrackedOpRef()) = 0;
+ TrackedOpRef op = TrackedOpRef(),
+ ThreadPool::TPHandle *handle = NULL) = 0;
int queue_transactions(
return rctx;
}
-void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg)
+void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+ ThreadPool::TPHandle *handle)
{
if (!ctx.transaction->empty()) {
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction(
pg->osr.get(),
- ctx.transaction, ctx.on_applied, ctx.on_safe);
+ ctx.transaction, ctx.on_applied, ctx.on_safe, NULL,
+ TrackedOpRef(), handle);
assert(tr == 0);
ctx.transaction = new ObjectStore::Transaction;
ctx.on_applied = new C_Contexts(cct);
return false;
}
-void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
+void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+ ThreadPool::TPHandle *handle)
{
if (service.get_osdmap()->is_up(whoami)) {
do_notifies(*ctx.notify_list, curmap);
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction(
pg->osr.get(),
- ctx.transaction, ctx.on_applied, ctx.on_safe);
+ ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
+ handle);
assert(tr == 0);
}
}
split_pgs.clear();
}
if (compat_must_dispatch_immediately(pg)) {
- dispatch_context(rctx, pg, curmap);
+ dispatch_context(rctx, pg, curmap, &handle);
rctx = create_context();
} else {
- dispatch_context_transaction(rctx, pg);
+ dispatch_context_transaction(rctx, pg, &handle);
}
pg->unlock();
handle.reset_tp_timeout();
}
if (need_up_thru)
queue_want_up_thru(same_interval_since);
- dispatch_context(rctx, 0, curmap);
+ dispatch_context(rctx, 0, curmap, &handle);
service.send_pg_temp();
}
// -- generic pg peering --
PG::RecoveryCtx create_context();
bool compat_must_dispatch_immediately(PG *pg);
- void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap);
- void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg);
+ void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
+ ThreadPool::TPHandle *handle = NULL);
+ void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
+ ThreadPool::TPHandle *handle = NULL);
void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list,
OSDMapRef map);
void do_queries(map< int, map<pg_t,pg_query_t> >& query_map,