From: Yehuda Sadeh Date: Mon, 1 Dec 2014 23:34:37 +0000 (-0800) Subject: rgw: use new BucketShard structure for index manipulation calls X-Git-Tag: v0.92~12^2~24 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=87934706fceb3b7a5bae4679f689f45de140e807;p=ceph.git rgw: use new BucketShard structure for index manipulation calls Instead of recalculating the hash every call, do it once, and pass this structure around. Also, will be used for logging changes into the data log. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index b8c2778b0ec5..1242f546229c 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1102,9 +1102,10 @@ void rgw_data_change::dump(Formatter *f) const } -int RGWDataChangesLog::choose_oid(rgw_bucket& bucket) { +int RGWDataChangesLog::choose_oid(rgw_bucket& bucket, int shard_id) { string& name = bucket.name; - uint32_t r = ceph_str_hash_linux(name.c_str(), name.size()) % num_shards; + int shard_shift = (shard_id > 0 ? shard_id : 0); + uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards; return (int)r; } @@ -1197,7 +1198,7 @@ void RGWDataChangesLog::update_renewed(string& bucket_name, utime_t& expiration) status->cur_expiration = expiration; } -int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { +int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) { if (!store->need_to_log_data()) return 0; @@ -1243,7 +1244,7 @@ int RGWDataChangesLog::add_entry(rgw_bucket& bucket) { status->cond = new RefCountedCond; status->pending = true; - string& oid = oids[choose_oid(bucket)]; + string& oid = oids[choose_oid(bucket, shard_id)]; utime_t expiration; int ret; diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index d53da2f9ba63..03215f1a4cf5 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -365,7 +365,7 @@ public: ~RGWDataChangesLog(); int choose_oid(rgw_bucket& bucket); - int add_entry(rgw_bucket& bucket); + int add_entry(rgw_bucket& bucket, int shard_id); int renew_entries(); int list_entries(int shard, utime_t& start_time, utime_t& end_time, int max_entries, list& entries, diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 6cf4aaa9e85b..7db1e9d0be34 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2828,6 +2828,25 @@ int RGWRados::get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bu return 0; } +int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj) +{ + bucket = _bucket; + + if (store->bucket_is_system(bucket)) { + return 0; + } + + int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl; + return ret; + } + ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl; + + return 0; +} + + /** * Write/overwrite an object to the bucket storage. * bucket: the bucket to store the object in @@ -2949,7 +2968,14 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, index_tag = state->write_tag; } - r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag); + librados::IoCtx index_ctx; + BucketShard bs(this); + r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + + r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag); if (r < 0) return r; @@ -2971,7 +2997,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl; } - r = complete_update_index(bucket, obj, index_tag, poolid, epoch, size, + r = complete_update_index(bs, obj, index_tag, poolid, epoch, size, ut, etag, content_type, &acl_bl, category, remove_objs); if (r < 0) goto done_cancel; @@ -2988,7 +3014,7 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, return 0; done_cancel: - int ret = complete_update_index_cancel(bucket, obj, index_tag); + int ret = complete_update_index_cancel(bs, obj, index_tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl; } @@ -3841,7 +3867,7 @@ int RGWRados::open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, } int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, - const string& obj_key, string *bucket_obj) + const string& obj_key, string *bucket_obj, int *shard_id) { string bucket_oid_base; int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base); @@ -3855,7 +3881,7 @@ int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index return ret; get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards, - (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj); + (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id); return 0; } @@ -3969,8 +3995,14 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob bool ret_not_existed = (state && !state->exists); + BucketShard bs(this); + r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + string tag; - r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag); + r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag); if (r < 0) return r; @@ -3985,9 +4017,9 @@ int RGWRados::delete_obj_impl(void *ctx, const string& bucket_owner, rgw_obj& ob int64_t poolid = ref.ioctx.get_id(); if (r >= 0 || r == -ENOENT) { uint64_t epoch = ref.ioctx.get_last_version(); - r = complete_update_index_del(bucket, obj, tag, poolid, epoch); + r = complete_update_index_del(bs, obj, tag, poolid, epoch); } else { - int ret = complete_update_index_cancel(bucket, obj, tag); + int ret = complete_update_index_cancel(bs, obj, tag); if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl; } @@ -4045,8 +4077,14 @@ int RGWRados::delete_obj_index(rgw_obj& obj) std::string oid, key; get_obj_bucket_and_oid_key(obj, bucket, oid, key); + BucketShard bs(this); + int r = bs.init(bucket, obj); + if (r < 0) { + return r; + } + string tag; - int r = complete_update_index_del(bucket, obj, tag, -1 /* pool */, 0); + r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0); return r; } @@ -4599,13 +4637,13 @@ done_err: return r; } -int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, +int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs, RGWModifyOp op, rgw_obj& obj, string& tag) { - if (bucket_is_system(bucket)) + if (bucket_is_system(bs.bucket)) return 0; - int ret = data_log->add_entry(obj.bucket); + int ret = data_log->add_entry(bs.bucket, bs.shard_id); if (ret < 0) { lderr(cct) << "ERROR: failed writing data log" << dendl; return ret; @@ -4622,16 +4660,16 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, append_rand_alpha(cct, tag, tag, 32); } } - ret = cls_obj_prepare_op(bucket, op, tag, obj.object, obj.key, obj.get_hash_object()); + ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key); return ret; } -int RGWRados::complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, +int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs) { - if (bucket_is_system(bucket)) + if (bucket_is_system(bs.bucket)) return 0; RGWObjEnt ent; @@ -4650,7 +4688,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& ta ent.owner_display_name = owner.get_display_name(); ent.content_type = content_type; - int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs, oid.get_hash_object()); + int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs); return ret; } @@ -4674,6 +4712,7 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, string etag; string content_type; bufferlist acl_bl; + BucketShard bs(this); bool update_index = (category == RGW_OBJ_CATEGORY_MAIN || category == RGW_OBJ_CATEGORY_MULTIMETA); @@ -4754,8 +4793,13 @@ int RGWRados::clone_objs_impl(void *ctx, rgw_obj& dst_obj, int64_t poolid = io_ctx.get_id(); int ret; + ret = bs.init(bucket, dst_obj); + if (ret < 0) { + goto done; + } + if (update_index) { - ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag); + ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag); if (ret < 0) goto done; } @@ -4769,10 +4813,10 @@ done: if (update_index) { if (ret >= 0) { - ret = complete_update_index(bucket, dst_obj, tag, poolid, epoch, size, + ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size, ut, etag, content_type, &acl_bl, category, NULL); } else { - int r = complete_update_index_cancel(bucket, dst_obj, tag); + int r = complete_update_index_cancel(bs, dst_obj, tag); if (r < 0) { ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl; } @@ -6241,34 +6285,20 @@ int RGWRados::cls_rgw_init_index(librados::IoCtx& index_ctx, librados::ObjectWri return r; } -int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, - string& name, string& locator, const string& index_hash_object) +int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, + string& name, string& locator) { - librados::IoCtx index_ctx; - string bucket_obj; - int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj); - ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl; - if (ret < 0) - return ret; - ObjectWriteOperation o; cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data); - ret = index_ctx.operate(bucket_obj, &o); + int ret = bs.index_ctx.operate(bs.bucket_obj, &o); return ret; } -int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, +int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, - list *remove_objs, const string& index_hash_object) + list *remove_objs) { - librados::IoCtx index_ctx; - string bucket_obj; - int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj); - ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl; - if (ret < 0) - return ret; - ObjectWriteOperation o; rgw_bucket_dir_entry_meta dir_meta; dir_meta.size = ent.size; @@ -6285,33 +6315,33 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& ta cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data); AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); - ret = index_ctx.aio_operate(bucket_obj, c, &o); + int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o); c->release(); return ret; } -int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, +int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, - list *remove_obj, const string& index_hash_object) + list *remove_obj) { - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj, index_hash_object); + return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj); } -int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, +int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, - string& name, const string& index_hash_object) + string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object); + return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); } -int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object) +int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object); + return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); } int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout) @@ -6972,7 +7002,7 @@ void RGWRados::get_bucket_index_objects(const string& bucket_oid_base, } int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, - uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj) + uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id) { int r = 0; switch (hash_type) { @@ -6980,12 +7010,18 @@ int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const strin if (!num_shards) { // By default with no sharding, we use the bucket oid as itself (*bucket_obj) = bucket_oid_base; + if (shard_id) { + *shard_id = -1; + } } else { uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size()) % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards; char buf[bucket_oid_base.size() + 32]; snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid); (*bucket_obj) = buf; + if (shard_id) { + *shard_id = (int)sid; + } } break; default: diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index d8f6c168a0f8..66ef2dbfd9b1 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1255,7 +1255,7 @@ class RGWRados int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx, string& bucket_oid_base); int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx, - const string& obj_key, string *bucket_obj); + const string& obj_key, string *bucket_obj, int *shard_id); int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx, vector& bucket_objs, int shard_id = -1); template @@ -1843,36 +1843,47 @@ public: virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv, map *pattrs, bool create_entry_point); + struct BucketShard { + RGWRados *store; + rgw_bucket bucket; + int shard_id; + librados::IoCtx index_ctx; + string bucket_obj; + + BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {} + int init(rgw_bucket& _bucket, rgw_obj& obj); + }; + int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid); - int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, - string& name, string& locator, const string& index_hash_object); - int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, - RGWObjEnt& ent, RGWObjCategory category, list *remove_objs, const string& index_hash_object); - int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs, const string& index_hash_object); - int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name, const string& index_hash_object); - int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object); + int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator); + int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch, + RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); + int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); + int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name); + int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name); int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout); int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num, map& m, bool *is_truncated, string *last_entry, bool (*force_check_filter)(const string& name) = NULL); int cls_bucket_head(rgw_bucket& bucket, map& headers); int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio); - int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, + + int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard, RGWModifyOp op, rgw_obj& oid, string& tag); - int complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, + int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size, utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, list *remove_objs); - int complete_update_index_del(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) { - if (bucket_is_system(bucket)) + int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) { + if (bucket_is_system(bucket_shard.bucket)) return 0; - return cls_obj_complete_del(bucket, tag, pool, epoch, oid.object, oid.get_hash_object()); + return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object); } - int complete_update_index_cancel(rgw_bucket& bucket, rgw_obj& oid, string& tag) { - if (bucket_is_system(bucket)) + int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) { + if (bucket_is_system(bucket_shard.bucket)) return 0; - return cls_obj_complete_cancel(bucket, tag, oid.object, oid.get_hash_object()); + return cls_obj_complete_cancel(bucket_shard, tag, oid.object); } int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list& result, bool *truncated); int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker); @@ -1976,7 +1987,7 @@ public: * Return 0 on success, a failure code otherwise. */ int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key, - uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj); + uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard); int process_intent_log(rgw_bucket& bucket, string& oid, time_t epoch, int flags, bool purge);