perfcounter->inc(l_rgw_get_b, cur_end - cur_ofs);
filter->fixup_range(cur_ofs, cur_end);
- op_ret = read_op.iterate(cur_ofs, cur_end, filter);
+ op_ret = read_op.iterate(cur_ofs, cur_end, filter, s->yield);
if (op_ret >= 0)
op_ret = filter->flush();
return op_ret;
ofs_x = ofs;
end_x = end;
filter->fixup_range(ofs_x, end_x);
- op_ret = read_op.iterate(ofs_x, end_x, filter);
+ op_ret = read_op.iterate(ofs_x, end_x, filter, s->yield);
if (op_ret >= 0)
op_ret = filter->flush();
return ret;
filter->fixup_range(new_ofs, new_end);
- ret = read_op.iterate(new_ofs, new_end, filter);
+ ret = read_op.iterate(new_ofs, new_end, filter, s->yield);
if (ret >= 0)
ret = filter->flush();
return ret;
}
- ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb());
+ ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb(), null_yield);
if (ret < 0) {
delete out_stream_req;
return ret;
return d->flush(std::move(completed));
}
-int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb)
+int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb,
+ optional_yield y)
{
RGWRados *store = source->get_store();
CephContext *cct = store->ctx();
const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
- rgw::BlockingAioThrottle aio(window_size);
- get_obj_data data(store, cb, &aio, ofs);
+ auto aio = rgw::make_throttle(window_size, y);
+ get_obj_data data(store, cb, &*aio, ofs);
int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,
ofs, end, chunk_size, _get_obj_iterate_cb, &data);
int prepare();
static int range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end);
int read(int64_t ofs, int64_t end, bufferlist& bl);
- int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb);
+ int iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y);
int get_attr(const char *name, bufferlist& dest);
};