]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: move versioning handling to Object::Write::write_meta()
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 11 Dec 2014 18:58:23 +0000 (10:58 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 19 Jan 2015 23:57:55 +0000 (15:57 -0800)
Instead of having to call set_olh() after calling write_meta(), do it
inside that method. This way we (mostly) don't need to care about
versioning outside of it. This required moving bucket info around
instead of just passing the bucket owner.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 689dd659a67e26b05fc5af2e7ab3627de21e7cd9..8a5f2acd7dfa0cc061a8b0dc75d8b311c5d2f4fa 100644 (file)
@@ -820,13 +820,13 @@ static bool bucket_object_check_filter(const string& name)
   return rgw_obj::translate_raw_obj_to_obj_in_ns(obj, instance, ns);
 }
 
-int check_min_obj_stripe_size(RGWRados *store, rgw_obj& obj, uint64_t min_stripe_size, bool *need_rewrite)
+int check_min_obj_stripe_size(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, uint64_t min_stripe_size, bool *need_rewrite)
 {
   map<string, bufferlist> attrs;
   uint64_t obj_size;
 
   RGWObjectCtx obj_ctx(store);
-  RGWRados::Object op_target(store, obj_ctx, obj);
+  RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
@@ -2087,7 +2087,7 @@ next:
       return -ret;
     }
     rgw_obj_key key(object, object_version);
-    ret = rgw_remove_object(store, bucket_info.owner, bucket, key, bucket_info.versioning_enabled());
+    ret = rgw_remove_object(store, bucket_info, bucket, key, bucket_info.versioning_enabled());
 
     if (ret < 0) {
       cerr << "ERROR: object remove returned: " << cpp_strerror(-ret) << std::endl;
@@ -2116,7 +2116,7 @@ next:
     obj.set_instance(object_version);
     bool need_rewrite = true;
     if (min_rewrite_stripe_size > 0) {
-      ret = check_min_obj_stripe_size(store, obj, min_rewrite_stripe_size, &need_rewrite);
+      ret = check_min_obj_stripe_size(store, bucket_info, obj, min_rewrite_stripe_size, &need_rewrite);
       if (ret < 0) {
         ldout(store->ctx(), 0) << "WARNING: check_min_obj_stripe_size failed, r=" << ret << dendl;
       }
@@ -2207,7 +2207,7 @@ next:
 
           bool need_rewrite = true;
           if (min_rewrite_stripe_size > 0) {
-            r = check_min_obj_stripe_size(store, obj, min_rewrite_stripe_size, &need_rewrite);
+            r = check_min_obj_stripe_size(store, bucket_info, obj, min_rewrite_stripe_size, &need_rewrite);
             if (r < 0) {
               ldout(store->ctx(), 0) << "WARNING: check_min_obj_stripe_size failed, r=" << r << dendl;
             }
@@ -2264,7 +2264,7 @@ next:
     uint64_t obj_size;
     map<string, bufferlist> attrs;
     RGWObjectCtx obj_ctx(store);
-    RGWRados::Object op_target(store, obj_ctx, obj);
+    RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
     RGWRados::Object::Read read_op(&op_target);
 
     read_op.params.attrs = &attrs;
index 2d7358032bcc948a27b4be50fced708ac7154cb1..748dd68ca9474f1baf30091fa05d045bdc3ec070 100644 (file)
@@ -342,13 +342,13 @@ static bool bucket_object_check_filter(const string& name)
   return rgw_obj::translate_raw_obj_to_obj_in_ns(obj, ns, ver);
 }
 
-int rgw_remove_object(RGWRados *store, const string& bucket_owner, rgw_bucket& bucket, rgw_obj_key& key, bool use_versioning)
+int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key, bool use_versioning)
 {
   RGWObjectCtx rctx(store);
 
   rgw_obj obj(bucket, key);
 
-  int ret = store->delete_obj(rctx, bucket_owner, obj, use_versioning);
+  int ret = store->delete_obj(rctx, bucket_info, obj, use_versioning);
 
   return ret;
 }
@@ -391,7 +391,7 @@ int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& b
     while (!objs.empty()) {
       std::vector<RGWObjEnt>::iterator it = objs.begin();
       for (it = objs.begin(); it != objs.end(); ++it) {
-        ret = rgw_remove_object(store, bucket_owner, bucket, (*it).key, false);
+        ret = rgw_remove_object(store, info, bucket, (*it).key, false);
         if (ret < 0)
           return ret;
       }
@@ -585,7 +585,7 @@ int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_m
 
   rgw_obj_key key(object_name);
 
-  int ret = rgw_remove_object(store, bucket_info.owner, bucket, key, bucket_info.versioning_enabled());
+  int ret = rgw_remove_object(store, bucket_info, bucket, key, bucket_info.versioning_enabled());
   if (ret < 0) {
     set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
     return ret;
@@ -804,11 +804,18 @@ int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, ostream& o)
   rgw_bucket bucket = op_state.get_bucket();
   RGWObjectCtx obj_ctx(store);
 
+  RGWBucketInfo bucket_info;
+  map<string, bufferlist> attrs;
+  int ret = store->get_bucket_info(obj_ctx, bucket.name, bucket_info, NULL, &attrs);
+  if (ret < 0) {
+    return ret;
+  }
+
   if (!object_name.empty()) {
     bufferlist bl;
     rgw_obj obj(bucket, object_name);
 
-    RGWRados::Object op_target(store, obj_ctx, obj);
+    RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
     RGWRados::Object::Read rop(&op_target);
 
     int ret = rop.get_attr(RGW_ATTR_ACL, bl);
@@ -818,14 +825,6 @@ int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, ostream& o)
     return policy_bl_to_stream(bl, o);
   }
 
-
-  RGWBucketInfo bucket_info;
-  map<string, bufferlist> attrs;
-  int ret = store->get_bucket_info(obj_ctx, bucket.name, bucket_info, NULL, &attrs);
-  if (ret < 0) {
-    return ret;
-  }
-
   map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
   if (aiter == attrs.end()) {
     return -ENOENT;
index 86a07481a5f22e6ba16cc4610eb5213aa5a72b63..e42698ab6114ead570b6baa2e5dc30ff2dbe56a6 100644 (file)
@@ -108,7 +108,7 @@ extern int rgw_read_user_buckets(RGWRados *store, string user_id, RGWUserBuckets
 extern int rgw_link_bucket(RGWRados *store, string user_id, rgw_bucket& bucket, time_t creation_time, bool update_entrypoint = true);
 extern int rgw_unlink_bucket(RGWRados *store, string user_id, const string& bucket_name, bool update_entrypoint = true);
 
-extern int rgw_remove_object(RGWRados *store, const string& bucket_owner, rgw_bucket& bucket, rgw_obj_key& key, bool use_versioning);
+extern int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key, bool use_versioning);
 extern int rgw_remove_bucket(RGWRados *store, const string& bucket_owner, rgw_bucket& bucket, bool delete_children);
 
 extern int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
index 30b8b0debd86e048e7732bc9b2b8dda06a97cd10..da6a87663724558b796dddc1f745f610939d4442 100644 (file)
@@ -210,7 +210,7 @@ static int get_obj_policy_from_attr(CephContext *cct, RGWRados *store, RGWObject
   bufferlist bl;
   int ret = 0;
 
-  RGWRados::Object op_target(store, obj_ctx, obj);
+  RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
   RGWRados::Object::Read rop(&op_target);
 
   ret = rop.get_attr(RGW_ATTR_ACL, bl);
@@ -259,7 +259,7 @@ static int get_policy_from_attr(CephContext *cct, RGWRados *store, void *ctx,
 
 static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map<string, bufferlist>& attrs)
 {
-  RGWRados::Object op_target(store, *(RGWObjectCtx *)s->obj_ctx, obj);
+  RGWRados::Object op_target(store, s->bucket_info, *(RGWObjectCtx *)s->obj_ctx, obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
@@ -656,7 +656,7 @@ int RGWGetObj::read_user_manifest_part(rgw_bucket& bucket, RGWObjEnt& ent, RGWAc
   obj_ctx.set_atomic(part);
   store->set_prefetch_data(&obj_ctx, part);
 
-  RGWRados::Object op_target(store, obj_ctx, part);
+  RGWRados::Object op_target(store, s->bucket_info, obj_ctx, part);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrs;
@@ -876,7 +876,7 @@ void RGWGetObj::execute()
   perfcounter->inc(l_rgw_get);
   int64_t new_ofs, new_end;
 
-  RGWRados::Object op_target(store, *(RGWObjectCtx *)s->obj_ctx, obj);
+  RGWRados::Object op_target(store, s->bucket_info, *(RGWObjectCtx *)s->obj_ctx, obj);
   RGWRados::Object::Read read_op(&op_target);
 
   ret = get_params();
@@ -1487,8 +1487,8 @@ protected:
 
 public:
   bool immutable_head() { return true; }
-  RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, const string& bucket_owner, uint64_t _p, req_state *_s) :
-                   RGWPutObjProcessor_Atomic(obj_ctx, bucket_owner, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
+  RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) :
+                   RGWPutObjProcessor_Atomic(obj_ctx, bucket_info, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
 };
 
 int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand)
@@ -1559,7 +1559,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_
 {
   complete_writing_data();
 
-  RGWRados::Object op_target(store, obj_ctx, head_obj);
+  RGWRados::Object op_target(store, s->bucket_info, obj_ctx, head_obj);
   RGWRados::Object::Write head_obj_op(&op_target);
 
   head_obj_op.meta.set_mtime = set_mtime;
@@ -1615,14 +1615,12 @@ RGWPutObjProcessor *RGWPutObj::select_processor(RGWObjectCtx& obj_ctx, bool *is_
 
   uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
 
-  const string& bucket_owner = s->bucket_owner.get_id();
-
   if (!multipart) {
-    processor = new RGWPutObjProcessor_Atomic(obj_ctx, bucket_owner, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+    processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
     ((RGWPutObjProcessor_Atomic *)processor)->set_olh_epoch(olh_epoch);
     ((RGWPutObjProcessor_Atomic *)processor)->set_version_id(version_id);
   } else {
-    processor = new RGWPutObjProcessor_Multipart(obj_ctx, bucket_owner, part_size, s);
+    processor = new RGWPutObjProcessor_Multipart(obj_ctx, s->bucket_info, part_size, s);
   }
 
   if (is_multipart) {
@@ -1896,7 +1894,7 @@ RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx)
 
   uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
 
-  processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_owner.get_id(), s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+  processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
 
   return processor;
 }
@@ -2139,7 +2137,7 @@ void RGWDeleteObj::execute()
 
     obj_ctx->set_atomic(obj);
 
-    RGWRados::Object del_target(store, *obj_ctx, obj);
+    RGWRados::Object del_target(store, s->bucket_info, *obj_ctx, obj);
     RGWRados::Object::Delete del_op(&del_target);
 
     ret = get_system_versioning_params(s, &del_op.params.olh_epoch, &del_op.params.marker_version_id);
@@ -2713,7 +2711,7 @@ void RGWInitMultipart::execute()
     // the meta object will be indexed with 0 size, we c
     obj.set_in_extra_data(true);
 
-    RGWRados::Object op_target(store, *(RGWObjectCtx *)s->obj_ctx, obj);
+    RGWRados::Object op_target(store, s->bucket_info, *(RGWObjectCtx *)s->obj_ctx, obj);
     RGWRados::Object::Write obj_op(&op_target);
 
     obj_op.meta.owner = s->owner.get_id();
@@ -3026,7 +3024,7 @@ void RGWCompleteMultipart::execute()
 
   obj_ctx.set_atomic(target_obj);
 
-  RGWRados::Object op_target(store, *(RGWObjectCtx *)s->obj_ctx, target_obj);
+  RGWRados::Object op_target(store, s->bucket_info, *(RGWObjectCtx *)s->obj_ctx, target_obj);
   RGWRados::Object::Write obj_op(&op_target);
 
   obj_op.meta.manifest = &manifest;
@@ -3040,15 +3038,8 @@ void RGWCompleteMultipart::execute()
   if (ret < 0)
     return;
 
-  if (versioned_object) {
-    ret = store->set_olh(obj_ctx, s->bucket_owner.get_id(), target_obj, false, NULL, olh_epoch);
-    if (ret < 0) {
-      return;
-    }
-  }
-
   // remove the upload obj
-  int r = store->delete_obj(*(RGWObjectCtx *)s->obj_ctx, s->bucket_owner.get_id(), meta_obj, 0);
+  int r = store->delete_obj(*(RGWObjectCtx *)s->obj_ctx, s->bucket_info, meta_obj, 0);
   if (r < 0) {
     ldout(store->ctx(), 0) << "WARNING: failed to remove object " << meta_obj << dendl;
   }
@@ -3078,7 +3069,6 @@ void RGWAbortMultipart::execute()
   map<string, bufferlist> attrs;
   rgw_obj meta_obj;
   RGWMPObj mp;
-  const string& owner = s->bucket_owner.get_id();
 
   if (upload_id.empty() || s->object.empty())
     return;
@@ -3108,7 +3098,7 @@ void RGWAbortMultipart::execute()
         string oid = mp.get_part(obj_iter->second.num);
         rgw_obj obj;
         obj.init_ns(s->bucket, oid, mp_ns);
-        ret = store->delete_obj(*obj_ctx, owner, obj, 0);
+        ret = store->delete_obj(*obj_ctx, s->bucket_info, obj, 0);
         if (ret < 0 && ret != -ENOENT)
           return;
       } else {
@@ -3116,7 +3106,7 @@ void RGWAbortMultipart::execute()
         RGWObjManifest::obj_iterator oiter;
         for (oiter = manifest.obj_begin(); oiter != manifest.obj_end(); ++oiter) {
           rgw_obj loc = oiter.get_location();
-          ret = store->delete_obj(*obj_ctx, owner, loc, 0);
+          ret = store->delete_obj(*obj_ctx, s->bucket_info, loc, 0);
           if (ret < 0 && ret != -ENOENT)
             return;
         }
@@ -3127,7 +3117,7 @@ void RGWAbortMultipart::execute()
   // and also remove the metadata obj
   meta_obj.init_ns(s->bucket, meta_oid, mp_ns);
   meta_obj.set_in_extra_data(true);
-  ret = store->delete_obj(*obj_ctx, owner, meta_obj, 0);
+  ret = store->delete_obj(*obj_ctx, s->bucket_info, meta_obj, 0);
   if (ret == -ENOENT) {
     ret = -ERR_NO_SUCH_BUCKET;
   }
@@ -3289,7 +3279,7 @@ void RGWDeleteMultiObj::execute()
     rgw_obj obj(bucket,(*iter));
 
     obj_ctx->set_atomic(obj);
-    ret = store->delete_obj(*obj_ctx, s->bucket_owner.get_id(), obj, s->bucket_info.versioning_status());
+    ret = store->delete_obj(*obj_ctx, s->bucket_info, obj, s->bucket_info.versioning_status());
     if (ret == -ENOENT) {
       ret = 0;
     }
index f9e27ffa89ab3e1a92f87c30792e0af95a6e1e49..85d3546fc1ea97ee8b50ef10194cc868fceb072a 100644 (file)
@@ -897,7 +897,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
   list<rgw_obj>::iterator iter;
   for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
     rgw_obj& obj = *iter;
-    int r = store->delete_obj(obj_ctx, bucket_owner, obj, false);
+    int r = store->delete_obj(obj_ctx, bucket_info, obj, false);
     if (r < 0 && r != -ENOENT) {
       ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
     }
@@ -1175,7 +1175,11 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
 
   obj_ctx.set_atomic(head_obj);
 
-  RGWRados::Object op_target(store, obj_ctx, head_obj);
+  RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+
+  /* some object types shouldn't be versioned, e.g., multipart parts */
+  op_target.set_versioning_disabled(!versioned_object);
+
   RGWRados::Object::Write obj_op(&op_target);
 
   obj_op.meta.data = &first_chunk;
@@ -1185,31 +1189,15 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, time_t *mtime, time_t s
   obj_op.meta.if_nomatch = if_nomatch;
   obj_op.meta.mtime = mtime;
   obj_op.meta.set_mtime = set_mtime;
-  obj_op.meta.owner = bucket_owner;
+  obj_op.meta.owner = bucket_info.owner;
   obj_op.meta.flags = PUT_OBJ_CREATE;
   obj_op.meta.olh_epoch = olh_epoch;
 
-  bool is_olh = false;
-  if (head_obj.get_instance().empty()) {
-    RGWObjState *astate = NULL;
-    r = store->get_obj_state(&obj_ctx, head_obj, &astate, NULL, false); /* don't follow olh */
-    if (r < 0) {
-      return r;
-    }
-    is_olh = astate->is_olh;
-  }
-
   r = obj_op.write_meta(obj_len, attrs);
   if (r < 0) {
     return r;
   }
 
-  if (versioned_object || is_olh) {
-    r = store->set_olh(obj_ctx, bucket_owner, head_obj, false, NULL, olh_epoch);
-    if (r < 0) {
-      return r;
-    }
-  }
   return 0;
 }
 
@@ -2846,6 +2834,8 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
   if (r < 0)
     return r;
 
+  bool is_olh = state->is_olh;
+
   bool reset_obj = (meta.flags & PUT_OBJ_CREATE) != 0;
   r = target->prepare_atomic_modification(op, reset_obj, meta.ptag, meta.if_match, meta.if_nomatch);
   if (r < 0)
@@ -2957,6 +2947,13 @@ int RGWRados::Object::Write::write_meta(uint64_t size,
     *meta.mtime = meta.set_mtime;
   }
 
+  if (target->versioning_enabled() || is_olh) {
+    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch);
+    if (r < 0) {
+      return r;
+    }
+  }
+
   /* update quota cache */
   store->quota_handler->update_stats(meta.owner, bucket, (state->exists ? 0 : 1), size, state->size);
 
@@ -3229,7 +3226,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
   uint64_t obj_size;
   RGWObjectCtx rctx(this);
 
-  RGWRados::Object op_target(this, rctx, obj);
+  RGWRados::Object op_target(this, dest_bucket_info, rctx, obj);
   RGWRados::Object::Read read_op(&op_target);
 
   read_op.params.attrs = &attrset;
@@ -3288,7 +3285,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
+                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_object(),
                                       cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
   int ret = processor.prepare(this, NULL);
   if (ret < 0)
@@ -3474,7 +3471,7 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   map<string, bufferlist> src_attrs;
   int64_t ofs = 0;
   int64_t end = -1;
-  RGWRados::Object src_op_target(this, obj_ctx, src_obj);
+  RGWRados::Object src_op_target(this, src_bucket_info, obj_ctx, src_obj);
   RGWRados::Object::Read read_op(&src_op_target);
 
   read_op.conds.mod_ptr = mod_ptr;
@@ -3573,14 +3570,9 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   RGWObjManifest *pmanifest; 
   ldout(cct, 0) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
 
-  RGWRados::Object dest_op_target(this, obj_ctx, dest_obj);
+  RGWRados::Object dest_op_target(this, dest_bucket_info, obj_ctx, dest_obj);
   RGWRados::Object::Write write_op(&dest_op_target);
 
-  RGWObjState *dest_state = NULL;
-  ret = get_obj_state(&obj_ctx, dest_obj, &dest_state, NULL);
-  if (ret < 0)
-    return ret;
-
   string tag;
 
   if (ptag)
@@ -3640,13 +3632,6 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
   if (ret < 0)
     goto done_ret;
 
-  if (versioned_dest || dest_state->is_olh) {
-    ret = set_olh(obj_ctx, dest_bucket_info.owner, dest_obj, false, NULL, olh_epoch);
-    if (ret < 0) {
-      goto done_ret;
-    }
-  }
-
   return 0;
 
 done_ret:
@@ -3696,7 +3681,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
+                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_object(),
                                       cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
   if (version_id) {
     processor.set_version_id(*version_id);
@@ -4031,7 +4016,7 @@ int RGWRados::Object::Delete::delete_obj()
       meta.owner_display_name = params.obj_owner.get_display_name();
       meta.mtime = ceph_clock_now(store->ctx());
 
-      int r = store->set_olh(target->get_ctx(), params.bucket_owner, marker, true, &meta, params.olh_epoch);
+      int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch);
       if (r < 0) {
         return r;
       }
@@ -4045,7 +4030,7 @@ int RGWRados::Object::Delete::delete_obj()
       if (exists) {
         result.delete_marker = dirent.is_delete_marker();
       }
-      r = store->unlink_obj_instance(target->get_ctx(), params.bucket_owner, obj, params.olh_epoch);
+      r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch);
       if (r < 0) {
         return r;
       }
@@ -4127,12 +4112,12 @@ int RGWRados::Object::Delete::delete_obj()
   return 0;
 }
 
-int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& obj, int versioning_status)
+int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj, int versioning_status)
 {
-  RGWRados::Object del_target(this, obj_ctx, obj);
+  RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
   RGWRados::Object::Delete del_op(&del_target);
 
-  del_op.params.bucket_owner = bucket_owner;
+  del_op.params.bucket_owner = bucket_info.owner;
   del_op.params.versioning_status = versioning_status;
 
   return del_op.delete_obj();
@@ -5747,7 +5732,7 @@ int RGWRados::bucket_index_trim_olh_log(RGWObjState& state, rgw_obj& obj_instanc
   return 0;
 }
 
-int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const string& bucket_owner, rgw_obj& obj,
+int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, RGWBucketInfo& bucket_info, rgw_obj& obj,
                             bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                             uint64_t *plast_ver)
 {
@@ -5826,7 +5811,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str
     cls_rgw_obj_key& key = *liter;
     rgw_obj obj_instance(bucket, key.name);
     obj_instance.set_instance(key.instance);
-    int ret = delete_obj(obj_ctx, bucket_owner, obj_instance, 0);
+    int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0);
     if (ret < 0 && ret != -ENOENT) {
       ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
       return ret;
@@ -5850,7 +5835,7 @@ int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const str
 /*
  * read olh log and apply it
  */
-int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj)
+int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, rgw_obj& obj)
 {
   map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
   bool is_truncated;
@@ -5861,7 +5846,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string
     if (ret < 0) {
       return ret;
     }
-    ret = apply_olh_log(obj_ctx, *state, bucket_owner, obj, state->olh_tag, log, &ver_marker);
+    ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker);
     if (ret < 0) {
       return ret;
     }
@@ -5870,7 +5855,7 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string
   return 0;
 }
 
-int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
                       uint64_t olh_epoch)
 {
   string op_tag;
@@ -5904,7 +5889,7 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj
       continue;
     }
 
-    ret = update_olh(obj_ctx, state, bucket_owner, olh_obj);
+    ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
     if (ret < 0) {
       ldout(cct, 20) << "update_olh() target_obj=" << target_obj << " returned " << ret << dendl;
       continue;
@@ -5918,7 +5903,7 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj
   return 0;
 }
 
-int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj,
+int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& target_obj,
                                   uint64_t olh_epoch)
 {
   string op_tag;
@@ -5951,7 +5936,7 @@ int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, const string& bucket_ow
       continue;
     }
 
-    ret = update_olh(obj_ctx, state, bucket_owner, olh_obj);
+    ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
     if (ret < 0) {
       ldout(cct, 20) << "update_olh() target_obj=" << target_obj << " returned " << ret << dendl;
       continue;
@@ -6107,7 +6092,7 @@ int RGWRados::follow_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, rgw_obj& olh
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: get_bucket_instance_info() returned " << ret << " olh_obj.bucket=" << olh_obj.bucket << dendl;
     }
-    ret = update_olh(obj_ctx, state, bucket_info.owner, olh_obj);
+    ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
     if (ret < 0) {
       return ret;
     }
index 0a44d37bf925ba7583c7c3f08c2a8a7998c635e3..67662cf9c14a5e628cebf9f0a94913bbbf01d22f 100644 (file)
@@ -1387,11 +1387,14 @@ public:
 
   class Object {
     RGWRados *store;
+    RGWBucketInfo bucket_info;
     RGWObjectCtx& ctx;
     rgw_obj obj;
 
     RGWObjState *state;
 
+    bool versioning_disabled;
+
   protected:
     int get_state(RGWObjState **pstate, bool follow_olh);
 
@@ -1400,11 +1403,21 @@ public:
     int complete_atomic_modification();
 
   public:
-    Object(RGWRados *_store, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), ctx(_ctx), obj(_obj), state(NULL) {}
+    Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
+                                                                                               ctx(_ctx), obj(_obj), state(NULL), versioning_disabled(false) {}
 
     RGWRados *get_store() { return store; }
     rgw_obj& get_obj() { return obj; }
     RGWObjectCtx& get_ctx() { return ctx; }
+    RGWBucketInfo& get_bucket_info() { return bucket_info; }
+
+    void set_versioning_disabled(bool status) {
+      versioning_disabled = status;
+    }
+
+    bool versioning_enabled() {
+      return (!versioning_disabled && bucket_info.versioning_enabled());
+    }
 
     struct Read {
       RGWRados::Object *source;
@@ -1671,7 +1684,7 @@ public:
   int bucket_suspended(rgw_bucket& bucket, bool *suspended);
 
   /** Delete an object.*/
-  virtual int delete_obj(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& src_obj, int versioning_status);
+  virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj, int versioning_status);
 
   /* Delete a system object */
   virtual int delete_system_obj(rgw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL);
@@ -1761,13 +1774,13 @@ public:
   int bucket_index_read_olh_log(RGWObjState& state, rgw_obj& obj_instance, uint64_t ver_marker,
                                 map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
   int bucket_index_trim_olh_log(RGWObjState& obj_state, rgw_obj& obj_instance, uint64_t ver);
-  int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, const string& bucket_owner, rgw_obj& obj,
+  int apply_olh_log(RGWObjectCtx& ctx, RGWObjState& obj_state, RGWBucketInfo& bucket_info, rgw_obj& obj,
                     bufferlist& obj_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
                     uint64_t *plast_ver);
-  int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj);
-  int set_olh(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
+  int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, rgw_obj& obj);
+  int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
               uint64_t olh_epoch);
-  int unlink_obj_instance(RGWObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj,
+  int unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& target_obj,
                           uint64_t olh_epoch);
 
   void check_pending_olh_entries(map<string, bufferlist>& pending_entries, map<string, bufferlist> *rm_pending_entries);
@@ -2031,14 +2044,14 @@ protected:
   RGWRados *store;
   RGWObjectCtx& obj_ctx;
   bool is_complete;
-  string bucket_owner;
+  RGWBucketInfo bucket_info;
 
   virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
                           map<string, bufferlist>& attrs,
                           const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
 
 public:
-  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, const string& _bo) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_owner(_bo) {}
+  RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi) {}
   virtual ~RGWPutObjProcessor() {}
   virtual int prepare(RGWRados *_store, string *oid_rand) {
     store = _store;
@@ -2086,7 +2099,7 @@ protected:
 public:
   int throttle_data(void *handle, bool need_to_wait);
 
-  RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, const string& bucket_owner) : RGWPutObjProcessor(obj_ctx, bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+  RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
   virtual ~RGWPutObjProcessor_Aio();
 };
 
@@ -2132,9 +2145,9 @@ protected:
 
 public:
   ~RGWPutObjProcessor_Atomic() {}
-  RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, const string& bucket_owner,
+  RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info,
                             rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
-                                RGWPutObjProcessor_Aio(obj_ctx, bucket_owner),
+                                RGWPutObjProcessor_Aio(obj_ctx, bucket_info),
                                 part_size(_p),
                                 cur_part_ofs(0),
                                 next_part_ofs(_p),