}
}
-void Objecter::send_linger(LingerOp *info)
+void Objecter::send_linger(LingerOp *info, bool first_send)
{
if (!info->registering) {
ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
linger_check_for_latest_map(info);
}
}
- op_submit(o, info->session);
+
+ if (first_send) {
+ op_submit(o, info->session);
+ } else {
+ _op_submit(o, info->session);
+ }
+
OSDSession *s = o->session;
if (info->session != s) {
info->session_item.remove_myself();
logger->set(l_osdc_linger_active, linger_ops.size());
- send_linger(info);
+ send_linger(info, true);
return info->linger_id;
}
LingerOp *op = *p;
if (op->session) {
logger->inc(l_osdc_linger_resend);
- send_linger(op);
+ send_linger(op, false);
}
}
// resend lingers
for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j) {
logger->inc(l_osdc_linger_resend);
- send_linger(*j);
+ send_linger(*j, false);
}
}
// take_op_budget() may drop our lock while it blocks.
take_op_budget(op);
+ return _op_submit(op, s);
+}
+
+tid_t Objecter::_op_submit(Op *op, OSDSession *s)
+{
// pick tid
tid_t mytid = ++last_tid;
op->tid = mytid;
if (!op->onack && !op->oncommit) {
op->session_item.remove_myself();
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
- put_op_budget(op);
+ if (op->budgeted)
+ put_op_budget(op);
ops.erase(tid);
logger->set(l_osdc_op_active, ops.size());
if (op->con)
bool precalc_pgid;
+ bool budgeted;
+
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, eversion_t *ov) :
session(NULL), session_item(this), incarnation(0),
outbl(NULL),
flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
- paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false) {
+ paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false),
+ budgeted(false) {
ops.swap(op);
/* initialize out_* to match op vector */
int recalc_op_target(Op *op);
bool recalc_linger_op_target(LingerOp *op);
- void send_linger(LingerOp *info);
+ void send_linger(LingerOp *info, bool first_send);
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
op_throttle_bytes.take(op_budget);
op_throttle_ops.take(1);
}
+ op->budgeted = true;
}
void put_op_budget(Op *op) {
+ assert(op->budgeted);
int op_budget = calc_op_budget(op);
op_throttle_bytes.put(op_budget);
op_throttle_ops.put(1);
private:
// low-level
tid_t op_submit(Op *op, OSDSession *s = NULL);
+ tid_t _op_submit(Op *op, OSDSession *s);
// public interface
public: