::ObjectOperation rd;
rd.hit_set_ls(pls, NULL);
object_locator_t oloc(poolid);
- objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL, NULL);
return 0;
}
::ObjectOperation rd;
rd.hit_set_get(utime_t(stamp, 0), pbl, 0);
object_locator_t oloc(poolid);
- objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL);
+ objecter->pg_read(hash, oloc, rd, NULL, 0, onack, NULL, NULL);
return 0;
}
}
};
-ceph_tid_t Objecter::op_submit(Op *op)
+ceph_tid_t Objecter::op_submit(Op *op, int *ctx_budget)
{
assert(client_lock.is_locked());
assert(initialized);
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
- take_op_budget(op);
+ if (!op->ctx_budgeted || (ctx_budget && (*ctx_budget == -1))) {
+ int op_budget = take_op_budget(op);
+ // take and pass out the budget for the first OP
+ // in the context session
+ if (ctx_budget && (*ctx_budget == -1)) {
+ *ctx_budget = op_budget;
+ }
+ }
return _op_submit(op);
}
ldout(cct, 15) << "finish_op " << op->tid << dendl;
op->session_item.remove_myself();
- if (op->budgeted)
+ if (!op->ctx_budgeted && op->budgeted)
put_op_budget(op);
ops.erase(op->tid);
}
}
if (list_context->at_end_of_pool) {
+ // release the listing context's budget once all
+ // OPs (in the session) are finished
+ put_list_context_budget(list_context);
+
onfinish->complete(0);
return;
}
C_List *onack = new C_List(list_context, onfinish, this);
object_locator_t oloc(list_context->pool_id, list_context->nspace);
pg_read(list_context->current_pg, oloc, op,
- &list_context->bl, 0, onack, &onack->epoch);
+ &list_context->bl, 0, onack, &onack->epoch, &list_context->ctx_budget);
}
void Objecter::_list_reply(ListContext *list_context, int r,
}
if (!list_context->list.empty()) {
ldout(cct, 20) << " returning results so far" << dendl;
+ // release the listing context's budget once all
+ // OPs (in the session) are finished
+ put_list_context_budget(list_context);
final_finish->complete(0);
return;
}
list_objects(list_context, final_finish);
}
+void Objecter::put_list_context_budget(ListContext *list_context) {
+ if (list_context->ctx_budget >= 0) {
+ ldout(cct, 10) << " release listing context's budget " << list_context->ctx_budget << dendl;
+ put_op_budget_bytes(list_context->ctx_budget);
+ list_context->ctx_budget = -1;
+ }
+ }
//snapshots
epoch_t last_force_resend;
+ /// true if the throttle budget is get/put on a series of OPs, instead of
+ /// per OP basis, when this flag is set, the budget is acquired before sending
+ /// the very first OP of the series and released upon receiving the last OP reply.
+ bool ctx_budgeted;
+
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, version_t *ov) :
session(NULL), session_item(this), incarnation(0),
map_dne_bound(0),
budgeted(false),
should_resend(true),
- last_force_resend(0) {
+ last_force_resend(0),
+ ctx_budgeted(false) {
ops.swap(op);
/* initialize out_* to match op vector */
bufferlist extra_info;
+ // The budget associated with this context, once it is set (>= 0),
+ // the budget is not get/released on OP basis, instead the budget
+ // is acquired before sending the first OP and released upon receiving
+ // the last op reply.
+ int ctx_budget;
+
ListContext() : current_pg(0), current_pg_epoch(0), starting_pg_num(0),
at_end_of_pool(false),
at_end_of_pg(false),
pool_id(0),
- pool_snap_seq(0), max_entries(0) {}
+ pool_snap_seq(0),
+ max_entries(0),
+ nspace(),
+ bl(),
+ list(),
+ filter(),
+ extra_info(),
+ ctx_budget(-1) {}
bool at_end() const {
return at_end_of_pool;
*/
int calc_op_budget(Op *op);
void throttle_op(Op *op, int op_size=0);
- void take_op_budget(Op *op) {
+ int take_op_budget(Op *op) {
int op_budget = calc_op_budget(op);
if (keep_balanced_budget) {
throttle_op(op, op_budget);
op_throttle_ops.take(1);
}
op->budgeted = true;
+ return op_budget;
+ }
+ void put_op_budget_bytes(int op_budget) {
+ assert(op_budget >= 0);
+ op_throttle_bytes.put(op_budget);
+ op_throttle_ops.put(1);
}
void put_op_budget(Op *op) {
assert(op->budgeted);
int op_budget = calc_op_budget(op);
- op_throttle_bytes.put(op_budget);
- op_throttle_ops.put(1);
+ put_op_budget_bytes(op_budget);
}
+ void put_list_context_budget(ListContext *list_context);
Throttle op_throttle_bytes, op_throttle_ops;
public:
// public interface
public:
- ceph_tid_t op_submit(Op *op);
+ ceph_tid_t op_submit(Op *op, int *ctx_budget = NULL);
bool is_active() {
return !(ops.empty() && linger_ops.empty() && poolstat_ops.empty() && statfs_ops.empty());
}
ObjectOperation& op,
bufferlist *pbl, int flags,
Context *onack,
- epoch_t *reply_epoch) {
+ epoch_t *reply_epoch,
+ int *ctx_budget) {
Op *o = new Op(object_t(), oloc,
op.ops, flags | global_op_flags | CEPH_OSD_FLAG_READ,
onack, NULL, NULL);
o->out_handler.swap(op.out_handler);
o->out_rval.swap(op.out_rval);
o->reply_epoch = reply_epoch;
- return op_submit(o);
+ if (ctx_budget) {
+ // budget is tracked by listing context
+ o->ctx_budgeted = true;
+ }
+ return op_submit(o, ctx_budget);
}
ceph_tid_t linger_mutate(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,