]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: identify racing writes when using copy-if-newer
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 26 Mar 2015 00:35:40 +0000 (17:35 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:49 +0000 (16:12 -0800)
When copying an object from a different zone, and copy-if-newer is
specified, if the final meta write is canceled check whether the
destinatioin that was created is actually newer than our mtime,
otherwise retry.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 6ebc4699715385ad7e50a52a58811fdd40797186..c1024c608d20c5937f43a7f82144ae997292b16f 100644 (file)
@@ -1455,7 +1455,7 @@ int RGWPutObjProcessor::complete(string& etag, time_t *mtime, time_t set_mtime,
   if (r < 0)
     return r;
 
-  is_complete = true;
+  is_complete = canceled;
   return 0;
 }
 
@@ -1798,6 +1798,8 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
     return r;
   }
 
+  canceled = obj_op.meta.canceled;
+
   return 0;
 }
 
@@ -4696,6 +4698,7 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
       /* ignoring error, nothing we can do at this point */
     }
   }
+  meta.canceled = false;
 
   /* update quota cache */
   store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size);
@@ -4708,6 +4711,8 @@ done_cancel:
     ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;
   }
 
+  meta.canceled = true;
+
   /* we lost in a race. There are a few options:
    * - existing object was rewritten (ECANCELED)
    * - non existing object was created (EEXIST)
@@ -4968,6 +4973,10 @@ public:
   int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs, time_t delete_at) {
     return processor->complete(etag, mtime, set_mtime, attrs, delete_at);
   }
+
+  bool is_canceled() {
+    return processor->is_canceled();
+  }
 };
 
 /*
@@ -5066,6 +5075,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   RGWRESTStreamReadRequest *in_stream_req;
   string tag;
   map<string, bufferlist> src_attrs;
+  int i;
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
@@ -5187,8 +5197,34 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     attrs = src_attrs;
   }
 
-  ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at);
-  if (ret < 0) {
+#define MAX_COMPLETE_RETRY 100
+  for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
+    ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at);
+    if (ret < 0) {
+      goto set_err_state;
+    }
+    if (copy_if_newer && cb.is_canceled()) {
+      ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
+      obj_ctx.invalidate(dest_obj); /* object was overwritten */
+      ret = get_obj_state(&obj_ctx, dest_obj, &dest_state, NULL);
+      if (ret < 0) {
+        ldout(cct, 0) << "ERROR: " << __func__ << ": get_err_state() returned ret=" << ret << dendl;
+        goto set_err_state;
+      }
+      if (!dest_state->exists ||
+        dest_state->mtime < set_mtime) {
+        ldout(cct, 20) << "retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
+        continue;
+      } else {
+        ldout(cct, 20) << "not retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
+      }
+    }
+    break;
+  }
+
+  if (i == MAX_COMPLETE_RETRY) {
+    ldout(cct, 0) << "ERROR: retried object completion too many times, something is wrong!" << dendl;
+    ret = -EIO;
     goto set_err_state;
   }
 
@@ -5199,7 +5235,12 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
   return 0;
 set_err_state:
-  int r = opstate.set_state(RGWOpState::OPSTATE_ERROR);
+  RGWOpState::OpState state = RGWOpState::OPSTATE_ERROR;
+  if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
+    state = RGWOpState::OPSTATE_COMPLETE;
+    ret = 0;
+  }
+  int r = opstate.set_state(state);
   if (r < 0) {
     ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
   }
index dfd92c59606678deb2ca07f09927dd44063e62c2..9d1598cb2a78a99200f95ac364873506c45e9c45 100644 (file)
@@ -1958,10 +1958,11 @@ public:
         const char *if_nomatch;
         uint64_t olh_epoch;
        time_t delete_at;
+        bool canceled;
 
         MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
                  remove_objs(NULL), set_mtime(0), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
-                 if_match(NULL), if_nomatch(NULL), olh_epoch(0), delete_at(0) {}
+                 if_match(NULL), if_nomatch(NULL), olh_epoch(0), delete_at(0), canceled(false) {}
       } meta;
 
       explicit Write(RGWRados::Object *_target) : target(_target) {}
@@ -2757,13 +2758,14 @@ protected:
   RGWObjectCtx& obj_ctx;
   bool is_complete;
   RGWBucketInfo bucket_info;
+  bool canceled;
 
   virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
                           map<string, bufferlist>& attrs, time_t delete_at,
                           const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
 
 public:
-  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi) {}
+  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {}
   virtual ~RGWPutObjProcessor() {}
   virtual int prepare(RGWRados *_store, string *oid_rand) {
     store = _store;
@@ -2779,6 +2781,8 @@ public:
                        const char *if_match = NULL, const char *if_nomatch = NULL);
 
   CephContext *ctx();
+
+  bool is_canceled() { return canceled; }
 };
 
 struct put_obj_aio_info {