return 0;
}
+int RGWRados::BucketShard::init(rgw_bucket& _bucket, rgw_obj& obj)
+{
+ bucket = _bucket;
+
+ if (store->bucket_is_system(bucket)) {
+ return 0;
+ }
+
+ int ret = store->open_bucket_index_shard(bucket, index_ctx, obj.get_hash_object(), &bucket_obj, &shard_id);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: open_bucket_index_shard() returned ret=" << ret << dendl;
+ return ret;
+ }
+ ldout(store->ctx(), 20) << " bucket index object: " << bucket_obj << dendl;
+
+ return 0;
+}
+
+
/**
* Write/overwrite an object to the bucket storage.
* bucket: the bucket to store the object in
index_tag = state->write_tag;
}
- r = prepare_update_index(NULL, bucket, CLS_RGW_OP_ADD, obj, index_tag);
+ librados::IoCtx index_ctx;
+ BucketShard bs(this);
+ r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
+ r = prepare_update_index(NULL, bs, CLS_RGW_OP_ADD, obj, index_tag);
if (r < 0)
return r;
ldout(cct, 0) << "ERROR: complete_atomic_overwrite returned r=" << r << dendl;
}
- r = complete_update_index(bucket, obj, index_tag, poolid, epoch, size,
+ r = complete_update_index(bs, obj, index_tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, remove_objs);
if (r < 0)
goto done_cancel;
return 0;
done_cancel:
- int ret = complete_update_index_cancel(bucket, obj, index_tag);
+ int ret = complete_update_index_cancel(bs, obj, index_tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
}
}
int RGWRados::open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- const string& obj_key, string *bucket_obj)
+ const string& obj_key, string *bucket_obj, int *shard_id)
{
string bucket_oid_base;
int ret = open_bucket_index_base(bucket, index_ctx, bucket_oid_base);
return ret;
get_bucket_index_object(bucket_oid_base, obj_key, binfo.num_shards,
- (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj);
+ (RGWBucketInfo::BIShardsHashType)binfo.bucket_index_shard_hash_type, bucket_obj, shard_id);
return 0;
}
bool ret_not_existed = (state && !state->exists);
+ BucketShard bs(this);
+ r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
string tag;
- r = prepare_update_index(state, bucket, CLS_RGW_OP_DEL, obj, tag);
+ r = prepare_update_index(state, bs, CLS_RGW_OP_DEL, obj, tag);
if (r < 0)
return r;
int64_t poolid = ref.ioctx.get_id();
if (r >= 0 || r == -ENOENT) {
uint64_t epoch = ref.ioctx.get_last_version();
- r = complete_update_index_del(bucket, obj, tag, poolid, epoch);
+ r = complete_update_index_del(bs, obj, tag, poolid, epoch);
} else {
- int ret = complete_update_index_cancel(bucket, obj, tag);
+ int ret = complete_update_index_cancel(bs, obj, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: complete_update_index_cancel returned ret=" << ret << dendl;
}
std::string oid, key;
get_obj_bucket_and_oid_key(obj, bucket, oid, key);
+ BucketShard bs(this);
+ int r = bs.init(bucket, obj);
+ if (r < 0) {
+ return r;
+ }
+
string tag;
- int r = complete_update_index_del(bucket, obj, tag, -1 /* pool */, 0);
+ r = complete_update_index_del(bs, obj, tag, -1 /* pool */, 0);
return r;
}
return r;
}
-int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+int RGWRados::prepare_update_index(RGWObjState *state, BucketShard& bs,
RGWModifyOp op, rgw_obj& obj, string& tag)
{
- if (bucket_is_system(bucket))
+ if (bucket_is_system(bs.bucket))
return 0;
- int ret = data_log->add_entry(obj.bucket);
+ int ret = data_log->add_entry(bs.bucket, bs.shard_id);
if (ret < 0) {
lderr(cct) << "ERROR: failed writing data log" << dendl;
return ret;
append_rand_alpha(cct, tag, tag, 32);
}
}
- ret = cls_obj_prepare_op(bucket, op, tag, obj.object, obj.key, obj.get_hash_object());
+ ret = cls_obj_prepare_op(bs, op, tag, obj.object, obj.key);
return ret;
}
-int RGWRados::complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+int RGWRados::complete_update_index(BucketShard& bs, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs)
{
- if (bucket_is_system(bucket))
+ if (bucket_is_system(bs.bucket))
return 0;
RGWObjEnt ent;
ent.owner_display_name = owner.get_display_name();
ent.content_type = content_type;
- int ret = cls_obj_complete_add(bucket, tag, poolid, epoch, ent, category, remove_objs, oid.get_hash_object());
+ int ret = cls_obj_complete_add(bs, tag, poolid, epoch, ent, category, remove_objs);
return ret;
}
string etag;
string content_type;
bufferlist acl_bl;
+ BucketShard bs(this);
bool update_index = (category == RGW_OBJ_CATEGORY_MAIN ||
category == RGW_OBJ_CATEGORY_MULTIMETA);
int64_t poolid = io_ctx.get_id();
int ret;
+ ret = bs.init(bucket, dst_obj);
+ if (ret < 0) {
+ goto done;
+ }
+
if (update_index) {
- ret = prepare_update_index(state, bucket, CLS_RGW_OP_ADD, dst_obj, tag);
+ ret = prepare_update_index(state, bs, CLS_RGW_OP_ADD, dst_obj, tag);
if (ret < 0)
goto done;
}
if (update_index) {
if (ret >= 0) {
- ret = complete_update_index(bucket, dst_obj, tag, poolid, epoch, size,
+ ret = complete_update_index(bs, dst_obj, tag, poolid, epoch, size,
ut, etag, content_type, &acl_bl, category, NULL);
} else {
- int r = complete_update_index_cancel(bucket, dst_obj, tag);
+ int r = complete_update_index_cancel(bs, dst_obj, tag);
if (r < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
return r;
}
-int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
- string& name, string& locator, const string& index_hash_object)
+int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
+ string& name, string& locator)
{
- librados::IoCtx index_ctx;
- string bucket_obj;
- int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj);
- ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl;
- if (ret < 0)
- return ret;
-
ObjectWriteOperation o;
cls_rgw_bucket_prepare_op(o, op, tag, name, locator, zone_public_config.log_data);
- ret = index_ctx.operate(bucket_obj, &o);
+ int ret = bs.index_ctx.operate(bs.bucket_obj, &o);
return ret;
}
-int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_objs, const string& index_hash_object)
+ list<string> *remove_objs)
{
- librados::IoCtx index_ctx;
- string bucket_obj;
- int ret = open_bucket_index_shard(bucket, index_ctx, index_hash_object, &bucket_obj);
- ldout(cct, 20) << " bucket index object: " << bucket_obj << dendl;
- if (ret < 0)
- return ret;
-
ObjectWriteOperation o;
rgw_bucket_dir_entry_meta dir_meta;
dir_meta.size = ent.size;
cls_rgw_bucket_complete_op(o, op, tag, ver, ent.name, dir_meta, remove_objs, zone_public_config.log_data);
AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- ret = index_ctx.aio_operate(bucket_obj, c, &o);
+ int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
c->release();
return ret;
}
-int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
RGWObjEnt& ent, RGWObjCategory category,
- list<string> *remove_obj, const string& index_hash_object)
+ list<string> *remove_obj)
{
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj, index_hash_object);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_obj);
}
-int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag,
+int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
int64_t pool, uint64_t epoch,
- string& name, const string& index_hash_object)
+ string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
-int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name)
{
RGWObjEnt ent;
ent.name = name;
- return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, index_hash_object);
+ return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout)
}
int RGWRados::get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
- uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj)
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard_id)
{
int r = 0;
switch (hash_type) {
if (!num_shards) {
// By default with no sharding, we use the bucket oid as itself
(*bucket_obj) = bucket_oid_base;
+ if (shard_id) {
+ *shard_id = -1;
+ }
} else {
uint32_t sid = ceph_str_hash_linux(obj_key.c_str(),
obj_key.size()) % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
char buf[bucket_oid_base.size() + 32];
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
(*bucket_obj) = buf;
+ if (shard_id) {
+ *shard_id = (int)sid;
+ }
}
break;
default:
int open_bucket_index_base(rgw_bucket& bucket, librados::IoCtx& index_ctx,
string& bucket_oid_base);
int open_bucket_index_shard(rgw_bucket& bucket, librados::IoCtx& index_ctx,
- const string& obj_key, string *bucket_obj);
+ const string& obj_key, string *bucket_obj, int *shard_id);
int open_bucket_index(rgw_bucket& bucket, librados::IoCtx& index_ctx,
vector<string>& bucket_objs, int shard_id = -1);
template<typename T>
virtual int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, time_t mtime, obj_version *pep_objv,
map<string, bufferlist> *pattrs, bool create_entry_point);
+ struct BucketShard {
+ RGWRados *store;
+ rgw_bucket bucket;
+ int shard_id;
+ librados::IoCtx index_ctx;
+ string bucket_obj;
+
+ BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
+ int init(rgw_bucket& _bucket, rgw_obj& obj);
+ };
+
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
- int cls_obj_prepare_op(rgw_bucket& bucket, RGWModifyOp op, string& tag,
- string& name, string& locator, const string& index_hash_object);
- int cls_obj_complete_op(rgw_bucket& bucket, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
- RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs, const string& index_hash_object);
- int cls_obj_complete_add(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs, const string& index_hash_object);
- int cls_obj_complete_del(rgw_bucket& bucket, string& tag, int64_t pool, uint64_t epoch, string& name, const string& index_hash_object);
- int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name, const string& index_hash_object);
+ int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, string& name, string& locator);
+ int cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
+ RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+ int cls_obj_complete_add(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list<string> *remove_objs);
+ int cls_obj_complete_del(BucketShard& bs, string& tag, int64_t pool, uint64_t epoch, string& name);
+ int cls_obj_complete_cancel(BucketShard& bs, string& tag, string& name);
int cls_obj_set_bucket_tag_timeout(rgw_bucket& bucket, uint64_t timeout);
int cls_bucket_list(rgw_bucket& bucket, const string& start, const string& prefix, uint32_t hint_num,
map<string, RGWObjEnt>& m, bool *is_truncated, string *last_entry,
bool (*force_check_filter)(const string& name) = NULL);
int cls_bucket_head(rgw_bucket& bucket, map<string, struct rgw_bucket_dir_header>& headers);
int cls_bucket_head_async(rgw_bucket& bucket, RGWGetDirHeader_CB *ctx, int *num_aio);
- int prepare_update_index(RGWObjState *state, rgw_bucket& bucket,
+
+ int prepare_update_index(RGWObjState *state, BucketShard& bucket_shard,
RGWModifyOp op, rgw_obj& oid, string& tag);
- int complete_update_index(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
+ int complete_update_index(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t poolid, uint64_t epoch, uint64_t size,
utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category,
list<string> *remove_objs);
- int complete_update_index_del(rgw_bucket& bucket, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
- if (bucket_is_system(bucket))
+ int complete_update_index_del(BucketShard& bucket_shard, rgw_obj& oid, string& tag, int64_t pool, uint64_t epoch) {
+ if (bucket_is_system(bucket_shard.bucket))
return 0;
- return cls_obj_complete_del(bucket, tag, pool, epoch, oid.object, oid.get_hash_object());
+ return cls_obj_complete_del(bucket_shard, tag, pool, epoch, oid.object);
}
- int complete_update_index_cancel(rgw_bucket& bucket, rgw_obj& oid, string& tag) {
- if (bucket_is_system(bucket))
+ int complete_update_index_cancel(BucketShard& bucket_shard, rgw_obj& oid, string& tag) {
+ if (bucket_is_system(bucket_shard.bucket))
return 0;
- return cls_obj_complete_cancel(bucket, tag, oid.object, oid.get_hash_object());
+ return cls_obj_complete_cancel(bucket_shard, tag, oid.object);
}
int list_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
int trim_bi_log_entries(rgw_bucket& bucket, int shard_id, string& marker, string& end_marker);
* Return 0 on success, a failure code otherwise.
*/
int get_bucket_index_object(const string& bucket_oid_base, const string& obj_key,
- uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj);
+ uint32_t num_shards, RGWBucketInfo::BIShardsHashType hash_type, string *bucket_obj, int *shard);
int process_intent_log(rgw_bucket& bucket, string& oid,
time_t epoch, int flags, bool purge);