]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: propagate error from remote gateway when copying object
authorYehuda Sadeh <yehuda@inktank.com>
Tue, 11 Jun 2013 17:43:08 +0000 (10:43 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 11 Jun 2013 17:43:08 +0000 (10:43 -0700)
Also make sure that we don't continue iterating locally through
the object.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h

index a9a4815036a0b28197dce4691a89086d50f32c4b..d115fc731a47bad19b32e38b4b74c92f7bbe5844 100644 (file)
@@ -3566,7 +3566,11 @@ void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
 
   for (iter = bl_list.begin(); iter != bl_list.end(); ++iter) {
     bufferlist& bl = *iter;
-    d->client_cb->handle_data(bl, 0, bl.length());
+    int r = d->client_cb->handle_data(bl, 0, bl.length());
+    if (r < 0) {
+      d->set_cancelled(r);
+      break;
+    }
   }
 
 done_unlock:
@@ -3585,10 +3589,16 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
   RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
   ObjectReadOperation op;
   struct get_obj_data *d = (struct get_obj_data *)arg;
+  string oid, key;
+  rgw_bucket bucket;
+  bufferlist *pbl;
+  AioCompletion *c;
+
+  int r;
 
   if (is_head_obj) {
     /* only when reading from the head object do we need to do the atomic test */
-    int r = append_atomic_test(rctx, obj, op, &astate);
+    r = append_atomic_test(rctx, obj, op, &astate);
     if (r < 0)
       return r;
 
@@ -3597,8 +3607,10 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
       unsigned chunk_len = min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
 
       d->data_lock.Lock();
-      d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
+      r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
       d->data_lock.Unlock();
+      if (r < 0)
+        return r;
 
       d->lock.Lock();
       d->total_read += chunk_len;
@@ -3612,33 +3624,35 @@ int RGWRados::get_obj_iterate_cb(void *ctx, RGWObjState *astate,
     }
   }
 
-  string oid, key;
-  rgw_bucket bucket;
   get_obj_bucket_and_oid_key(obj, bucket, oid, key);
 
-  bufferlist *pbl;
-  AioCompletion *c;
-
-  d->add_io(obj_ofs, len, &pbl, &c);
-
   d->throttle.get(len);
   if (d->is_cancelled()) {
     return d->get_err_code();
   }
 
+  /* add io after we check that we're not cancelled, otherwise we're going to have trouble
+   * cleaning up
+   */
+  d->add_io(obj_ofs, len, &pbl, &c);
+
   ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
   op.read(read_ofs, len, pbl, NULL);
 
   librados::IoCtx io_ctx(d->io_ctx);
   io_ctx.locator_set_key(key);
 
-  int r = io_ctx.aio_operate(oid, c, &op, NULL);
+  r = io_ctx.aio_operate(oid, c, &op, NULL);
   ldout(cct, 20) << "rados->aio_operate r=" << r << " bl.length=" << pbl->length() << dendl;
+  if (r < 0)
+    goto done_err;
 
-  if (r < 0) {
-    d->set_cancelled(r);
-    d->cancel_io(obj_ofs);
-  }
+  return 0;
+
+done_err:
+  ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl;
+  d->set_cancelled(r);
+  d->cancel_io(obj_ofs);
 
   return r;
 }
@@ -3659,6 +3673,7 @@ int RGWRados::get_obj_iterate(void *ctx, void **handle, rgw_obj& obj,
 
   int r = iterate_obj(ctx, obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
   if (r < 0) {
+    data->cancel_all_io();
     goto done;
   }
 
index e3e1c0ba7f8c40fcf5b89c970778c35c2e68b025..7a78bb8f9cd7b78a72b8d4ce64b54a2d4a912f7a 100644 (file)
@@ -33,7 +33,10 @@ int RGWRESTSimpleRequest::receive_header(void *ptr, size_t len)
           l++;
  
         if (strcmp(tok, "HTTP") == 0 || strncmp(tok, "HTTP/", 5) == 0) {
-          status = atoi(l);
+          http_status = atoi(l);
+          if (http_status == 100) /* 100-continue response */
+            continue;
+          status = rgw_http_error_to_errno(http_status);
         } else {
           /* convert header field name to upper case  */
           char *src = tok;
@@ -100,7 +103,7 @@ int RGWRESTSimpleRequest::execute(RGWAccessKey& key, const char *method, const c
   if (r < 0)
     return r;
 
-  return rgw_http_error_to_errno(status);
+  return status;
 }
 
 int RGWRESTSimpleRequest::send_data(void *ptr, size_t len)
@@ -245,7 +248,7 @@ int RGWRESTSimpleRequest::forward_request(RGWAccessKey& key, req_info& info, siz
     outbl->claim(response);
   }
 
-  return rgw_http_error_to_errno(status);
+  return status;
 }
 
 class RGWRESTStreamOutCB : public RGWGetDataCB {
@@ -277,6 +280,11 @@ RGWRESTStreamRequest::~RGWRESTStreamRequest()
 int RGWRESTStreamRequest::add_output_data(bufferlist& bl)
 {
   lock.Lock();
+  if (status < 0) {
+    int ret = status;
+    lock.Unlock();
+    return ret;
+  }
   pending_send.push_back(bl);
   lock.Unlock();
 
@@ -442,9 +450,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
 
   dout(20) << "RGWRESTStreamRequest::send_data()" << dendl;
   lock.Lock();
-  if (pending_send.empty()) {
+  if (pending_send.empty() || status < 0) {
     lock.Unlock();
-    return 0;
+    return status;
   }
 
   list<bufferlist>::iterator iter = pending_send.begin();
@@ -483,5 +491,9 @@ int RGWRESTStreamRequest::send_data(void *ptr, size_t len)
 
 int RGWRESTStreamRequest::complete()
 {
-  return complete_request(handle);
+  int ret = complete_request(handle);
+  if (ret < 0)
+    return ret;
+
+  return status;
 }
index f3f9f7ff91c36d422140689894bd72580a808619..cb744f4f695cee51f2900345000d57a54ce0a5d8 100644 (file)
@@ -11,6 +11,7 @@ class RGWRESTSimpleRequest : public RGWHTTPClient {
 protected:
   CephContext *cct;
 
+  int http_status;
   int status;
 
   string url;
@@ -29,8 +30,9 @@ protected:
   int sign_request(RGWAccessKey& key, RGWEnv& env, req_info& info);
 public:
   RGWRESTSimpleRequest(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
-                list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
-                                                        max_response(0) {
+                list<pair<string, string> > *_params) : cct(_cct), http_status(0), status(0),
+                url(_url), send_iter(NULL),
+                max_response(0) {
     if (_headers)
       headers = *_headers;