]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use new BucketShard structure for index manipulation calls
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 1 Dec 2014 23:34:37 +0000 (15:34 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Wed, 14 Jan 2015 03:21:25 +0000 (19:21 -0800)
Instead of recalculating the hash every call, do it once, and pass this
structure around. Also, will be used for logging changes into the data
log.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index b8c2778b0ec5c0d6ddd0b86c680d4f184b3dfa16..1242f546229c69d9afda772bafe25e99ffedae9f 100644 (file)
@@ -1102,9 +1102,10 @@ void rgw_data_change::dump(Formatter *f) const
 }
 
 
-int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) {
+int RGWDataChangesLog::choose_oid(rgw_bucket& bucket, int shard_id) {
     string& name = bucket.name;
-    uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards;
+    int shard_shift = (shard_id > 0 ? shard_id : 0);
+    uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
 
     return (int)r;
 }
@@ -1197,7 +1198,7 @@ void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration)
   status->cur_expiration = expiration;
 }
 
-int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
+int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
   if (!store->need_to_log_data())
     return 0;
 
@@ -1243,7 +1244,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) {
   status->cond = new RefCountedCond;
   status->pending = true;
 
-  string& oid = oids[choose_oid(bucket)];
+  string& oid = oids[choose_oid(bucket, shard_id)];
   utime_t expiration;
 
   int ret;
index d53da2f9ba63e485e1b14980726e3f997bfe253c..03215f1a4cf501121d72fbfc02207f60eb6a719f 100644 (file)
@@ -365,7 +365,7 @@ public:
   ~RGWDataChangesLog();
 
   int choose_oid(rgw_bucket& bucket);
-  int add_entry(rgw_bucket& bucket);
+  int add_entry(rgw_bucket& bucket, int shard_id);
   int renew_entries();
   int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries,
                   list<rgw_data_change>& entries,
index 6cf4aaa9e85b252a6bc3085e7d002d6a3f497587..7db1e9d0be34ff22e1a1976422871e1333ed8b83 100644 (file)
@@ -2828,6 +2828,25 @@ int RGWRados::get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bu
   return 0;
 }
 
+int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
+{
+  bucket = _bucket;
+
+  if (store->bucket_is_system(bucket)) {
+    return 0;
+  }
+
+  int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+    return ret;
+  }
+  ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+  return 0;
+}
+
+
 /**
  * Write/overwrite an object to the bucket storage.
  * bucket: the bucket to store the object in
@@ -2949,7 +2968,14 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
     index_tag = state->write_tag;
   }
 
-  r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag);
+  librados::IoCtx index_ctx;
+  BucketShard bs(this);
+  r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
+  r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag);
   if (r < 0)
     return r;
 
@@ -2971,7 +2997,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
     ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl;
   }
 
-  r = complete_update_index(bucket, obj, index_tag, poolid, epoch, size,
+  r = complete_update_index(bs, obj, index_tag, poolid, epoch, size,
       ut, etag, content_type, &acl_bl, category, remove_objs);
   if (r < 0)
     goto done_cancel;
@@ -2988,7 +3014,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj,  uint64_t size,
   return 0;
 
 done_cancel:
-  int ret = complete_update_index_cancel(bucket, obj, index_tag);
+  int ret = complete_update_index_cancel(bs, obj, index_tag);
   if (ret < 0) {
     ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
   }
@@ -3841,7 +3867,7 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
 }
 
 int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
-    const string& obj_key, string *bucket_obj)
+    const string& obj_key, string *bucket_obj, int *shard_id)
 {
   string bucket_oid_base;
   int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
@@ -3855,7 +3881,7 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index
     return ret;
 
   get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards,
-      (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj);
+      (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id);
   return 0;
 }
 
@@ -3969,8 +3995,14 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob
 
   bool ret_not_existed = (state && !state->exists);
 
+  BucketShard bs(this);
+  r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
   string tag;
-  r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag);
+  r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag);
   if (r < 0)
     return r;
 
@@ -3985,9 +4017,9 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob
   int64_t poolid = ref.ioctx.get_id();
   if (r >= 0 || r == -ENOENT) {
     uint64_t epoch = ref.ioctx.get_last_version();
-    r = complete_update_index_del(bucket, obj, tag, poolid, epoch);
+    r = complete_update_index_del(bs, obj, tag, poolid, epoch);
   } else {
-    int ret = complete_update_index_cancel(bucket, obj, tag);
+    int ret = complete_update_index_cancel(bs, obj, tag);
     if (ret < 0) {
       ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl;
     }
@@ -4045,8 +4077,14 @@ int RGWRados::delete_obj_index(rgw_obj& obj)
   std::string oid, key;
   get_obj_bucket_and_oid_key(obj, bucket, oid, key);
 
+  BucketShard bs(this);
+  int r = bs.init(bucket, obj);
+  if (r < 0) {
+    return r;
+  }
+
   string tag;
-  int r = complete_update_index_del(bucket, obj, tag, -1 /* pool */, 0);
+  r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0);
 
   return r;
 }
@@ -4599,13 +4637,13 @@ done_err:
   return r;
 }
 
-int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
                                    RGWModifyOp op, rgw_obj& obj, string& tag)
 {
-  if (bucket_is_system(bucket))
+  if (bucket_is_system(bs.bucket))
     return 0;
 
-  int ret = data_log->add_entry(obj.bucket);
+  int ret = data_log->add_entry(bs.bucket, bs.shard_id);
   if (ret < 0) {
     lderr(cct) << "ERROR: failed writing data log" << dendl;
     return ret;
@@ -4622,16 +4660,16 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
       append_rand_alpha(cct, tag, tag, 32);
     }
   }
-  ret = cls_obj_prepare_op(bucket, op, tag, obj.object, obj.key, obj.get_hash_object());
+  ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
 
   return ret;
 }
 
-int RGWRados::complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
                                     utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
                                     list<string> *remove_objs)
 {
-  if (bucket_is_system(bucket))
+  if (bucket_is_system(bs.bucket))
     return 0;
 
   RGWObjEnt ent;
@@ -4650,7 +4688,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& ta
   ent.owner_display_name = owner.get_display_name();
   ent.content_type = content_type;
 
-  int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs, oid.get_hash_object());
+  int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
 
   return ret;
 }
@@ -4674,6 +4712,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
   string etag;
   string content_type;
   bufferlist acl_bl;
+  BucketShard bs(this);
   bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
                        category == RGW_OBJ_CATEGORY_MULTIMETA);
 
@@ -4754,8 +4793,13 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj,
   int64_t poolid = io_ctx.get_id();
   int ret;
 
+  ret = bs.init(bucket, dst_obj);
+  if (ret < 0) {
+    goto done;
+  }
+
   if (update_index) {
-    ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag);
+    ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
     if (ret < 0)
       goto done;
   }
@@ -4769,10 +4813,10 @@ done:
 
   if (update_index) {
     if (ret >= 0) {
-      ret = complete_update_index(bucket, dst_obj, tag, poolid, epoch, size,
+      ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
                                   ut, etag, content_type, &acl_bl, category, NULL);
     } else {
-      int r = complete_update_index_cancel(bucket, dst_obj, tag);
+      int r = complete_update_index_cancel(bs, dst_obj, tag);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
       }
@@ -6241,34 +6285,20 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri
   return r;
 }
 
-int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
-                                 string& name, string& locator, const string& index_hash_object)
+int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
+                                 string& name, string& locator)
 {
-  librados::IoCtx index_ctx;
-  string bucket_obj;
-  int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj);
-  ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl;
-  if (ret < 0)
-    return ret;
-
   ObjectWriteOperation o;
   cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
-  ret = index_ctx.operate(bucket_obj, &o);
+  int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
   return ret;
 }
 
-int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
                                   int64_t pool, uint64_t epoch,
                                   RGWObjEnt& ent, RGWObjCategory category,
-                                 list<string> *remove_objs, const string& index_hash_object)
+                                 list<string> *remove_objs)
 {
-  librados::IoCtx index_ctx;
-  string bucket_obj;
-  int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj);
-  ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl;
-  if (ret < 0)
-    return ret;
-
   ObjectWriteOperation o;
   rgw_bucket_dir_entry_meta dir_meta;
   dir_meta.size = ent.size;
@@ -6285,33 +6315,33 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& ta
   cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
 
   AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
-  ret = index_ctx.aio_operate(bucket_obj, c, &o);
+  int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
   c->release();
   return ret;
 }
 
-int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
                                    int64_t pool, uint64_t epoch,
                                    RGWObjEnt& ent, RGWObjCategory category,
-                                   list<string> *remove_obj, const string& index_hash_object)
+                                   list<string> *remove_obj)
 {
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj, index_hash_object);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
 }
 
-int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
                                    int64_t pool, uint64_t epoch,
-                                   string& name, const string& index_hash_object)
+                                   string& name)
 {
   RGWObjEnt ent;
   ent.name = name;
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
 }
 
-int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
 {
   RGWObjEnt ent;
   ent.name = name;
-  return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object);
+  return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
 }
 
 int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
@@ -6972,7 +7002,7 @@ void RGWRados::get_bucket_index_objects(const string& bucket_oid_base,
 }
 
 int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
-    uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj)
+    uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
 {
   int r = 0;
   switch (hash_type) {
@@ -6980,12 +7010,18 @@ int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const strin
       if (!num_shards) {
         // By default with no sharding, we use the bucket oid as itself
         (*bucket_obj) = bucket_oid_base;
+        if (shard_id) {
+          *shard_id = -1;
+        }
       } else {
         uint32_t sid = ceph_str_hash_linux(obj_key.c_str(),
           obj_key.size()) % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
         char buf[bucket_oid_base.size() + 32];
         snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
         (*bucket_obj) = buf;
+        if (shard_id) {
+          *shard_id = (int)sid;
+        }
       }
       break;
     default:
index d8f6c168a0f8bcb95f0cfe6bd0d25c51adc2fdf4..66ef2dbfd9b1c144ed0831f2887b6f924fa20203 100644 (file)
@@ -1255,7 +1255,7 @@ class RGWRados
   int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx&  index_ctx,
       string& bucket_oid_base);
   int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
-      const string& obj_key, string *bucket_obj);
+      const string& obj_key, string *bucket_obj, int *shard_id);
   int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
       vector<string>& bucket_objs, int shard_id = -1);
   template<typename T>
@@ -1843,36 +1843,47 @@ public:
   virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
                                      map<string, bufferlist> *pattrs, bool create_entry_point);
 
+  struct BucketShard {
+    RGWRados *store;
+    rgw_bucket bucket;
+    int shard_id;
+    librados::IoCtx index_ctx;
+    string bucket_obj;
+
+    BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
+    int init(rgw_bucket& _bucket, rgw_obj& obj);
+  };
+
   int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
-  int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
-                         string& name, string& locator, const string& index_hash_object);
-  int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
-                          RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs, const string& index_hash_object);
-  int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs, const string& index_hash_object);
-  int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name, const string& index_hash_object);
-  int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object);
+  int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
+  int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+                          RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+  int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+  int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
+  int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
   int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
   int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
                       map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
                       bool (*force_check_filter)(const string&  name) = NULL);
   int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers);
   int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
-  int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+
+  int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
                            RGWModifyOp op, rgw_obj& oid, string& tag);
-  int complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+  int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
                             utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
                            list<string> *remove_objs);
-  int complete_update_index_del(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
-    if (bucket_is_system(bucket))
+  int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
+    if (bucket_is_system(bucket_shard.bucket))
       return 0;
 
-    return cls_obj_complete_del(bucket, tag, pool, epoch, oid.object, oid.get_hash_object());
+    return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
   }
-  int complete_update_index_cancel(rgw_bucket& bucket, rgw_obj& oid, string& tag) {
-    if (bucket_is_system(bucket))
+  int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
+    if (bucket_is_system(bucket_shard.bucket))
       return 0;
 
-    return cls_obj_complete_cancel(bucket, tag, oid.object, oid.get_hash_object());
+    return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
   }
   int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
   int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
@@ -1976,7 +1987,7 @@ public:
    * Return 0 on success, a failure code otherwise.
    */
   int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
-      uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj);
+      uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
 
   int process_intent_log(rgw_bucket& bucket, string& oid,
                         time_t epoch, int flags, bool purge);