// do not resend this; we will send a new op to reregister
o->should_resend = false;
+ o->ctx_budgeted = true;
if (info->register_tid) {
- // repeat send. cancel old registeration op, if any.
+ // repeat send. cancel old registration op, if any.
OSDSession::unique_lock sl(info->session->lock);
if (info->session->ops.count(info->register_tid)) {
Op *o = info->session->ops[info->register_tid];
_cancel_linger_op(o);
}
sl.unlock();
-
- _op_submit(o, sul, &info->register_tid);
- } else {
- // first send
- _op_submit_with_budget(o, sul, &info->register_tid);
}
+ _op_submit_with_budget(o, sul, &info->register_tid, &info->ctx_budget);
+
logger->inc(l_osdc_linger_send);
}
const object_locator_t& oloc,
int flags)
{
- LingerOp *info = new LingerOp;
+ LingerOp *info = new LingerOp(this);
info->target.base_oid = oid;
info->target.base_oloc = oloc;
if (info->target.base_oloc.key == oid)
info->pobjver = objver;
info->on_reg_commit = oncommit;
+ info->ctx_budget = take_linger_budget(info);
+
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
info->pobjver = objver;
info->on_reg_commit = onfinish;
+ info->ctx_budget = take_linger_budget(info);
+
shunique_lock sul(rwlock, ceph::acquire_unique);
_linger_submit(info, sul);
logger->inc(l_osdc_linger_active);
{
assert(sul.owns_lock() && sul.mutex() == &rwlock);
assert(info->linger_id);
+ assert(info->ctx_budget != -1); // caller needs to have taken budget already!
// Populate Op::target
OSDSession *s = NULL;
}
}
+int Objecter::take_linger_budget(LingerOp *info)
+{
+ return 1;
+}
+
void Objecter::unregister_op(Op *op)
{
OSDSession::unique_lock sl(op->session->lock);
OSDSession *session;
+ Objecter *objecter;
+ int ctx_budget;
ceph_tid_t register_tid;
ceph_tid_t ping_tid;
epoch_t map_dne_bound;
watch_pending_async.pop_front();
}
- LingerOp() : linger_id(0),
- target(object_t(), object_locator_t(), 0),
- snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
- is_watch(false), last_error(0),
- register_gen(0),
- registered(false),
- canceled(false),
- on_reg_commit(NULL),
- on_notify_finish(NULL),
- notify_result_bl(NULL),
- notify_id(0),
- watch_context(NULL),
- session(NULL),
- register_tid(0),
- ping_tid(0),
- map_dne_bound(0) {}
+ LingerOp(Objecter *o) : linger_id(0),
+ target(object_t(), object_locator_t(), 0),
+ snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL),
+ is_watch(false), last_error(0),
+ register_gen(0),
+ registered(false),
+ canceled(false),
+ on_reg_commit(NULL),
+ on_notify_finish(NULL),
+ notify_result_bl(NULL),
+ notify_id(0),
+ watch_context(NULL),
+ session(NULL),
+ objecter(o),
+ ctx_budget(-1),
+ register_tid(0),
+ ping_tid(0),
+ map_dne_bound(0) {}
const LingerOp &operator=(const LingerOp& r) = delete;
LingerOp(const LingerOp& o) = delete;
int op_budget = calc_op_budget(op->ops);
if (keep_balanced_budget) {
_throttle_op(op, sul, op_budget);
- } else {
+ } else { // update take_linger_budget to match this!
op_throttle_bytes.take(op_budget);
op_throttle_ops.take(1);
}
op->budgeted = true;
return op_budget;
}
+ int take_linger_budget(LingerOp *info);
+ friend class WatchContext; // to invoke put_up_budget_bytes
void put_op_budget_bytes(int op_budget) {
assert(op_budget >= 0);
op_throttle_bytes.put(op_budget);