if (ret < 0)
goto done_err;
- len = bl.length();
+ off_t len = bl.length();
cur_ofs += len;
ofs += len;
ret = 0;
perfcounter->tinc(l_rgw_get_lat,
(ceph_clock_now(s->cct) - start_time));
- send_response_data(bl);
+ send_response_data(bl, 0, len);
start_time = ceph_clock_now(s->cct);
}
return 0;
}
+class RGWGetObj_CB : public RGWGetDataCB
+{
+ RGWGetObj *op;
+public:
+ RGWGetObj_CB(RGWGetObj *_op) : op(_op) {}
+ virtual ~RGWGetObj_CB() {}
+
+ int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) {
+ return op->get_data_cb(bl, bl_ofs, bl_len);
+ }
+};
+
+int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
+{
+ /* garbage collection related handling */
+ utime_t start_time = ceph_clock_now(s->cct);
+ if (start_time > gc_invalidate_time) {
+ int r = store->defer_gc(s->obj_ctx, obj);
+ if (r < 0) {
+ dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
+ }
+ gc_invalidate_time = start_time;
+ gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+ }
+ return send_response_data(bl, bl_ofs, bl_len);
+}
+
void RGWGetObj::execute()
{
void *handle = NULL;
utime_t start_time = s->time;
bufferlist bl;
- utime_t gc_invalidate_time = ceph_clock_now(s->cct);
+ gc_invalidate_time = ceph_clock_now(s->cct);
gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
+ RGWGetObj_CB cb(this);
+
map<string, bufferlist>::iterator attr_iter;
perfcounter->inc(l_rgw_get);
ret = get_params();
if (ret < 0)
- goto done;
+ goto done_err;
ret = init_common();
if (ret < 0)
- goto done;
+ goto done_err;
new_ofs = ofs;
new_end = end;
ret = store->prepare_get_obj(s->obj_ctx, obj, &new_ofs, &new_end, &attrs, mod_ptr,
unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &s->obj_size, &handle, &s->err);
if (ret < 0)
- goto done;
+ goto done_err;
attr_iter = attrs.find(RGW_ATTR_USER_MANIFEST);
if (attr_iter != attrs.end()) {
start = ofs;
if (!get_data || ofs > end)
- goto done;
+ goto done_err;
perfcounter->inc(l_rgw_get_b, end - ofs);
- while (ofs <= end) {
- ret = store->get_obj(s->obj_ctx, &handle, obj, bl, ofs, end);
- if (ret < 0) {
- goto done;
- }
- len = ret;
-
- if (!len) {
- dout(0) << "WARNING: failed to read object, returned zero length" << dendl;
- ret = -EIO;
- goto done;
- }
-
- ofs += len;
- ret = 0;
+ ret = store->get_obj_iterate(s->obj_ctx, &handle, obj, ofs, end, &cb);
- perfcounter->tinc(l_rgw_get_lat,
- (ceph_clock_now(s->cct) - start_time));
- ret = send_response_data(bl);
- bl.clear();
- if (ret < 0) {
- dout(0) << "NOTICE: failed to send response to client" << dendl;
- goto done;
- }
-
- start_time = ceph_clock_now(s->cct);
-
- if (ofs <= end) {
- if (start_time > gc_invalidate_time) {
- int r = store->defer_gc(s->obj_ctx, obj);
- if (r < 0) {
- dout(0) << "WARNING: could not defer gc entry for obj" << dendl;
- }
- gc_invalidate_time = start_time;
- gc_invalidate_time += (s->cct->_conf->rgw_gc_obj_min_wait / 2);
- }
- }
+ perfcounter->tinc(l_rgw_get_lat,
+ (ceph_clock_now(s->cct) - start_time));
+ if (ret < 0) {
+ goto done_err;
}
- return;
+ store->finish_get_obj(&handle);
-done:
- send_response_data(bl);
+done_err:
+ send_response_data(bl, 0, 0);
store->finish_get_obj(&handle);
}
#include "common/errno.h"
#include "common/Formatter.h"
+#include "common/Throttle.h"
#include "rgw_rados.h"
#include "rgw_cache.h"
done_err:
delete new_ctx;
- delete state;
- *handle = NULL;
+ finish_get_obj(handle);
return r;
}
r = bl.length();
}
if (r < 0 || !len || ((off_t)(ofs + len - 1) == end)) {
- delete state;
- *handle = NULL;
+ finish_get_obj(handle);
}
done_ret:
return r;
}
+struct get_obj_data;
+
+struct get_obj_aio_data {
+ struct get_obj_data *op_data;
+ off_t ofs;
+ off_t len;
+};
+
+struct get_obj_io {
+ off_t len;
+ bufferlist bl;
+};
+
+static void _get_obj_aio_completion_cb(completion_t cb, void *arg);
+
+struct get_obj_data : public RefCountedObject {
+ CephContext *cct;
+ RGWRados *rados;
+ void *ctx;
+ IoCtx io_ctx;
+ map<off_t, get_obj_io> io_map;
+ map<off_t, librados::AioCompletion *> completion_map;
+ uint64_t total_read;
+ Mutex lock;
+ Mutex data_lock;
+ list<get_obj_aio_data> aio_data;
+ RGWGetDataCB *client_cb;
+ atomic_t cancelled;
+ atomic_t err_code;
+ Throttle throttle;
+
+ get_obj_data(CephContext *_cct) : cct(_cct),
+ total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"),
+ throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
+ virtual ~get_obj_data() { }
+ void set_cancelled(int r) {
+ cancelled.set(1);
+ err_code.set(r);
+ }
+
+ bool is_cancelled() {
+ return cancelled.read() == 1;
+ }
+
+ int get_err_code() {
+ return err_code.read();
+ }
+
+ int wait_next_io(bool *done) {
+ lock.Lock();
+ map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
+ if (iter == completion_map.end()) {
+ *done = true;
+ lock.Unlock();
+ return 0;
+ }
+ off_t cur_ofs = iter->first;
+ librados::AioCompletion *c = iter->second;
+ lock.Unlock();
+
+ c->wait_for_complete_and_cb();
+ int r = c->get_return_value();
+ c->release();
+
+ lock.Lock();
+ completion_map.erase(cur_ofs);
+
+ if (completion_map.empty()) {
+ *done = true;
+ }
+ lock.Unlock();
+
+ return r;
+ }
+
+ void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) {
+ Mutex::Locker l(lock);
+
+ get_obj_io& io = io_map[ofs];
+ *pbl = &io.bl;
+
+ struct get_obj_aio_data aio;
+ aio.ofs = ofs;
+ aio.len = len;
+ aio.op_data = this;
+
+ aio_data.push_back(aio);
+
+ struct get_obj_aio_data *paio_data = &aio_data.back(); /* last element */
+
+ librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, _get_obj_aio_completion_cb, NULL);
+ completion_map[ofs] = c;
+
+ *pc = c;
+
+ /* we have a reference per IO, plus one reference for the calling function.
+ * reference is dropped for each callback, plus when we're done iterating
+ * over the parts */
+ get();
+ }
+
+ void cancel_io(off_t ofs) {
+ ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl;
+ lock.Lock();
+ map<off_t, AioCompletion *>::iterator iter = completion_map.find(ofs);
+ if (iter != completion_map.end()) {
+ AioCompletion *c = iter->second;
+ c->release();
+ completion_map.erase(ofs);
+ io_map.erase(ofs);
+ }
+ lock.Unlock();
+
+ /* we don't drop a reference here -- e.g., not calling d->put(), because we still
+ * need IoCtx to live, as io callback may still be called
+ */
+ }
+
+ void cancel_all_io() {
+ ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl;
+ Mutex::Locker l(lock);
+ for (map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
+ iter != completion_map.end(); ++iter) {
+ librados::AioCompletion *c = iter->second;
+ c->release();
+ }
+ }
+
+ int get_complete_ios(off_t ofs, list<bufferlist>& bl_list) {
+ Mutex::Locker l(lock);
+
+ map<off_t, get_obj_io>::iterator liter = io_map.begin();
+
+ if (liter == io_map.end() ||
+ liter->first != ofs) {
+ return 0;
+ }
+
+ map<off_t, librados::AioCompletion *>::iterator aiter;
+ aiter = completion_map.find(ofs);
+ if (aiter == completion_map.end()) {
+ /* completion map does not hold this io, it was cancelled */
+ return 0;
+ }
+
+ AioCompletion *completion = aiter->second;
+ int r = completion->get_return_value();
+ if (r < 0)
+ return r;
+
+ for (; aiter != completion_map.end(); aiter++) {
+ completion = aiter->second;
+ if (!completion->is_complete()) {
+ /* reached a request that is not yet complete, stop */
+ break;
+ }
+
+ r = completion->get_return_value();
+ if (r < 0) {
+ set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */
+ return r;
+ }
+
+ total_read += r;
+
+ map<off_t, get_obj_io>::iterator old_liter = liter++;
+ bl_list.push_back(old_liter->second.bl);
+ io_map.erase(old_liter);
+ }
+
+ return 0;
+ }
+};
+
+static int _get_obj_iterate_cb(rgw_obj& obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg)
+{
+ struct get_obj_data *d = (struct get_obj_data *)arg;
+
+ return d->rados->get_obj_iterate_cb(d->ctx, astate, obj, obj_ofs, read_ofs, len, is_head_obj, arg);
+}
+
+static void _get_obj_aio_completion_cb(completion_t cb, void *arg)
+{
+ struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
+ struct get_obj_data *d = aio_data->op_data;
+
+ d->rados->get_obj_aio_completion_cb(cb, arg);
+}
+
+
+void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
+{
+ struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
+ struct get_obj_data *d = aio_data->op_data;
+ off_t ofs = aio_data->ofs;
+ off_t len = aio_data->len;
+
+ list<bufferlist> bl_list;
+ list<bufferlist>::iterator iter;
+ int r;
+
+ ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl;
+ d->throttle.put(len);
+
+ if (d->is_cancelled())
+ goto done;
+
+ d->data_lock.Lock();
+
+ r = d->get_complete_ios(ofs, bl_list);
+ if (r < 0) {
+ goto done_unlock;
+ }
+
+ for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
+ bufferlist& bl = *iter;
+ d->client_cb->handle_data(bl, 0, bl.length());
+ }
+
+done_unlock:
+ d->data_lock.Unlock();
+done:
+ d->put();
+ return;
+}
+
+int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
+ rgw_obj& obj,
+ off_t obj_ofs,
+ off_t read_ofs, off_t len,
+ bool is_head_obj, void *arg)
+{
+ RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+ ObjectReadOperation op;
+ struct get_obj_data *d = (struct get_obj_data *)arg;
+
+ if (is_head_obj) {
+ /* only when reading from the head object do we need to do the atomic test */
+ int r = append_atomic_test(rctx, obj, op, &astate);
+ if (r < 0)
+ return r;
+
+ if (astate &&
+ obj_ofs < astate->data.length()) {
+ unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
+
+ d->data_lock.Lock();
+ d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+ d->data_lock.Unlock();
+
+ d->lock.Lock();
+ d->total_read += chunk_len;
+ d->lock.Unlock();
+
+ len -= chunk_len;
+ read_ofs += chunk_len;
+ obj_ofs += chunk_len;
+ if (!len)
+ return 0;
+ }
+ }
+
+ string oid, key;
+ rgw_bucket bucket;
+ get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+
+ bufferlist *pbl;
+ AioCompletion *c;
+
+ d->add_io(obj_ofs, len, &pbl, &c);
+
+ d->throttle.get(len);
+ if (d->is_cancelled()) {
+ return d->get_err_code();
+ }
+
+ ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
+ op.read(read_ofs, len, pbl, NULL);
+
+ librados::IoCtx io_ctx(d->io_ctx);
+ io_ctx.locator_set_key(key);
+
+ int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
+
+ if (r < 0) {
+ d->set_cancelled(r);
+ d->cancel_io(obj_ofs);
+ }
+
+ return r;
+}
+
+int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
+ off_t ofs, off_t end,
+ RGWGetDataCB *cb)
+{
+ struct get_obj_data *data = new get_obj_data(cct);
+ bool done = false;
+
+ GetObjState *state = *(GetObjState **)handle;
+
+ data->rados = this;
+ data->ctx = ctx;
+ data->io_ctx.dup(state->io_ctx);
+ data->client_cb = cb;
+
+ int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
+ if (r < 0) {
+ goto done;
+ }
+
+ while (!done) {
+ r = data->wait_next_io(&done);
+ if (r < 0) {
+ dout(10) << "get_obj_iterate() r=" << r << ", canceling all io" << dendl;
+ data->cancel_all_io();
+ break;
+ }
+ }
+
+done:
+ data->put();
+ return r;
+}
+
void RGWRados::finish_get_obj(void **handle)
{
if (*handle) {
}
}
+int RGWRados::iterate_obj(void *ctx, rgw_obj& obj,
+ off_t ofs, off_t end,
+ uint64_t max_chunk_size,
+ int (*iterate_obj_cb)(rgw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
+ void *arg)
+{
+ rgw_bucket bucket;
+ rgw_obj read_obj = obj;
+ uint64_t read_ofs = ofs;
+ uint64_t len;
+ RGWRadosCtx *rctx = (RGWRadosCtx *)ctx;
+ RGWRadosCtx *new_ctx = NULL;
+ bool reading_from_head = true;
+ RGWObjState *astate = NULL;
+
+ if (!rctx) {
+ new_ctx = new RGWRadosCtx(this);
+ rctx = new_ctx;
+ }
+
+ int r = get_obj_state(rctx, obj, &astate);
+ if (r < 0)
+ goto done_err;
+
+ if (end < 0)
+ len = 0;
+ else
+ len = end - ofs + 1;
+
+ if (astate->has_manifest) {
+ /* now get the relevant object part */
+ map<uint64_t, RGWObjManifestPart>::iterator iter = astate->manifest.objs.upper_bound(ofs);
+ /* we're now pointing at the next part (unless the first part starts at a higher ofs),
+ so retract to previous part */
+ if (iter != astate->manifest.objs.begin()) {
+ --iter;
+ }
+
+ for (; iter != astate->manifest.objs.end() && ofs <= end; ++iter) {
+ RGWObjManifestPart& part = iter->second;
+ off_t part_ofs = iter->first;
+ off_t next_part_ofs = part_ofs + part.size;
+
+ while (ofs < next_part_ofs && ofs <= end) {
+ read_obj = part.loc;
+ uint64_t read_len = min(len, part.size - (ofs - part_ofs));
+ read_ofs = part.loc_ofs + (ofs - part_ofs);
+
+ if (read_len > max_chunk_size) {
+ read_len = max_chunk_size;
+ }
+
+ reading_from_head = (read_obj == obj);
+ r = iterate_obj_cb(read_obj, ofs, read_ofs, read_len, reading_from_head, astate, arg);
+ if (r < 0)
+ goto done_err;
+
+ len -= read_len;
+ ofs += read_len;
+ }
+ }
+ } else {
+ while (ofs <= end) {
+ uint64_t read_len = min(len, max_chunk_size);
+
+ r = iterate_obj_cb(obj, ofs, ofs, read_len, reading_from_head, astate, arg);
+ if (r < 0)
+ goto done_err;
+
+ len -= read_len;
+ ofs += read_len;
+ }
+ }
+
+ return 0;
+
+done_err:
+ delete new_ctx;
+ return r;
+}
+
/* a simple object read */
int RGWRados::read(void *ctx, rgw_obj& obj, off_t ofs, size_t size, bufferlist& bl)
{