]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: set olh if object has been versioned
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 3 Oct 2014 00:05:06 +0000 (17:05 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 16 Jan 2015 22:41:43 +0000 (14:41 -0800)
Bucket could have versioning suspended, but object has been versioned,
so once object has olh pointer in it, we need to continue maintainig it.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 2ff8074ad181c68c5089f171e23b7ec92afd21bb..2dc5919f336aff566dbffbfe6ed8a6ed4e7c411d 100644 (file)
@@ -1219,6 +1219,18 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
   extra_params.set_mtime = set_mtime;
   extra_params.owner = bucket_owner;
 
+  RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(obj_ctx);
+
+  bool is_olh = false;
+  if (head_obj.get_instance().empty()) {
+    RGWObjState *astate = NULL;
+    r = store->get_obj_state(rctx, head_obj, &astate, NULL, false); /* don't follow olh */
+    if (r < 0) {
+      return r;
+    }
+    is_olh = astate->is_olh;
+  }
+
   r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
                           RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
                           extra_params);
@@ -1226,7 +1238,7 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
     return r;
   }
 
-  if (versioned_object) {
+  if (versioned_object || is_olh) {
     r = store->set_olh(obj_ctx, bucket_owner, head_obj, false);
     if (r < 0) {
       return r;
@@ -1254,6 +1266,11 @@ RGWObjState *RGWRadosCtx::get_state(rgw_obj& obj) {
   }
 }
 
+void RGWRadosCtx::invalidate(rgw_obj& obj)
+{
+  objs_state.erase(obj);
+}
+
 void RGWRadosCtx::set_atomic(rgw_obj& obj) {
   if (!obj.get_object().empty()) {
     objs_state[obj].is_atomic = true;
@@ -4216,7 +4233,7 @@ int RGWRados::prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
                            bool reset_obj, const string *ptag,
                             const char *if_match, const char *if_nomatch)
 {
-  int r = get_obj_state(rctx, obj, pstate, NULL);
+  int r = get_obj_state(rctx, obj, pstate, NULL, false);
   if (r < 0)
     return r;
 
@@ -5369,9 +5386,9 @@ int RGWRados::olh_init_modification(RGWObjState *state, rgw_obj& obj, string *ob
 
   do {
     ret = olh_init_modification_impl(state, obj, obj_tag, op_tag);
-  } while (ret == -ECANCELED || ret == -EEXIST);
+  } while (ret == -EEXIST);
 
-  return 0;
+  return ret;
 }
 
 int RGWRados::bucket_index_link_olh(rgw_obj& obj_instance, bool delete_marker, const string& op_tag)
@@ -5585,15 +5602,24 @@ int RGWRados::set_olh(void *ctx, const string& bucket_owner, rgw_obj& target_obj
 
   RGWObjState *state = NULL;
   RGWRadosCtx *rctx = static_cast<RGWRadosCtx *>(ctx);
-  int r = get_obj_state(rctx, olh_obj, &state, NULL, false); /* don't follow olh */
-  if (r < 0)
-    return r;
 
-  int ret = olh_init_modification(state, olh_obj, &obj_tag, &op_tag);
-  if (ret < 0) {
-    ldout(cct, 20) << "olh_init_modification() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
-    return ret;
-  }
+  int ret;
+
+  do {
+    ret = get_obj_state(rctx, olh_obj, &state, NULL, false); /* don't follow olh */
+    if (ret < 0)
+      return ret;
+
+    ret = olh_init_modification(state, olh_obj, &obj_tag, &op_tag);
+    if (ret == -ECANCELED) {
+      rctx->invalidate(olh_obj);
+      continue;
+    }
+    if (ret < 0) {
+      ldout(cct, 20) << "olh_init_modification() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
+      return ret;
+    }
+  } while (ret == -ECANCELED);
 
   ret = bucket_index_link_olh(target_obj, delete_marker, op_tag);
   if (ret < 0) {
index ef7326dc5da215ec10799836790ab1b779c0f09e..bc52a438995673320455274595178b047149167a 100644 (file)
@@ -823,6 +823,8 @@ struct RGWRadosCtx {
     }
     return 0;
   }
+
+  void invalidate(rgw_obj& obj);
 };
 
 struct RGWPoolIterCtx {
@@ -1338,10 +1340,6 @@ class RGWRados
   int get_olh_target_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState *olh_state,
                            RGWObjState **target_state, RGWObjVersionTracker *objv_tracker);
   int get_obj_state_impl(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker, bool follow_olh);
-  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker, bool follow_olh);
-  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker) {
-    return get_obj_state(rctx, obj, state, objv_tracker, true);
-  }
   int append_atomic_test(RGWRadosCtx *rctx, rgw_obj& obj,
                          librados::ObjectOperation& op, RGWObjState **state);
   int prepare_atomic_for_write_impl(RGWRadosCtx *rctx, rgw_obj& obj,
@@ -1688,6 +1686,10 @@ public:
                         map<string, bufferlist>* rmattrs,
                         RGWObjVersionTracker *objv_tracker);
 
+  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker, bool follow_olh);
+  int get_obj_state(RGWRadosCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker) {
+    return get_obj_state(rctx, obj, state, objv_tracker, true);
+  }
 /**
  * Get data about an object out of RADOS and into memory.
  * bucket: name of the bucket the object is in.