for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
bufferlist& bl = *iter;
- d->client_cb->handle_data(bl, 0, bl.length());
+ int r = d->client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ d->set_cancelled(r);
+ break;
+ }
}
done_unlock:
RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
ObjectReadOperation op;
struct get_obj_data *d = (struct get_obj_data *)arg;
+ string oid, key;
+ rgw_bucket bucket;
+ bufferlist *pbl;
+ AioCompletion *c;
+
+ int r;
if (is_head_obj) {
/* only when reading from the head object do we need to do the atomic test */
- int r = append_atomic_test(rctx, obj, op, &astate);
+ r = append_atomic_test(rctx, obj, op, &astate);
if (r < 0)
return r;
unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
d->data_lock.Lock();
- d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+ r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
d->data_lock.Unlock();
+ if (r < 0)
+ return r;
d->lock.Lock();
d->total_read += chunk_len;
}
}
- string oid, key;
- rgw_bucket bucket;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
- bufferlist *pbl;
- AioCompletion *c;
-
- d->add_io(obj_ofs, len, &pbl, &c);
-
d->throttle.get(len);
if (d->is_cancelled()) {
return d->get_err_code();
}
+ /* add io after we check that we're not cancelled, otherwise we're going to have trouble
+ * cleaning up
+ */
+ d->add_io(obj_ofs, len, &pbl, &c);
+
ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
op.read(read_ofs, len, pbl, NULL);
librados::IoCtx io_ctx(d->io_ctx);
io_ctx.locator_set_key(key);
- int r = io_ctx.aio_operate(oid, c, &op, NULL);
+ r = io_ctx.aio_operate(oid, c, &op, NULL);
ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
+ if (r < 0)
+ goto done_err;
- if (r < 0) {
- d->set_cancelled(r);
- d->cancel_io(obj_ofs);
- }
+ return 0;
+
+done_err:
+ ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl;
+ d->set_cancelled(r);
+ d->cancel_io(obj_ofs);
return r;
}
int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
if (r < 0) {
+ data->cancel_all_io();
goto done;
}
l++;
if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
- status = atoi(l);
+ http_status = atoi(l);
+ if (http_status == 100) /* 100-continue response */
+ continue;
+ status = rgw_http_error_to_errno(http_status);
} else {
/* convert header field name to upper case */
char *src = tok;
if (r < 0)
return r;
- return rgw_http_error_to_errno(status);
+ return status;
}
int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
outbl->claim(response);
}
- return rgw_http_error_to_errno(status);
+ return status;
}
class RGWRESTStreamOutCB : public RGWGetDataCB {
int RGWRESTStreamRequest::add_output_data(bufferlist& bl)
{
lock.Lock();
+ if (status < 0) {
+ int ret = status;
+ lock.Unlock();
+ return ret;
+ }
pending_send.push_back(bl);
lock.Unlock();
dout(20) << "RGWRESTStreamRequest::send_data()" << dendl;
lock.Lock();
- if (pending_send.empty()) {
+ if (pending_send.empty() || status < 0) {
lock.Unlock();
- return 0;
+ return status;
}
list<bufferlist>::iterator iter = pending_send.begin();
int RGWRESTStreamRequest::complete()
{
- return complete_request(handle);
+ int ret = complete_request(handle);
+ if (ret < 0)
+ return ret;
+
+ return status;
}
protected:
CephContext *cct;
+ int http_status;
int status;
string url;
int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info);
public:
RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
- list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
- max_response(0) {
+ list<pair<string, string> > *_params) : cct(_cct), http_status(0), status(0),
+ url(_url), send_iter(NULL),
+ max_response(0) {
if (_headers)
headers = *_headers;