return 0;
}
+static int guard_bucket_resharding(cls_method_context_t hctx,
+ const rgw_bucket_dir_header& header,
+ int error_code = -CLS_RGW_ERR_BUSY_RESHARDING)
+{
+ const ConfigProxy& conf = cls_get_config(hctx);
+ const uint32_t reshardlog_threshold = conf->rgw_reshardlog_threshold;
+
+ if (header.resharding_in_progress() ||
+ (header.resharding_in_logrecord() && header.reshardlog_entries >= reshardlog_threshold)) {
+ CLS_LOG(4, "ERROR: writes are blocked while bucket is "
+ "resharding, returning %d", error_code);
+ return error_code;
+ }
+
+ return 0;
+}
+
int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
const ConfigProxy& conf = cls_get_config(hctx);
return rc;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
// get on-disk state
std::string idx;
return -EINVAL;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
rgw_bucket_dir_entry entry;
bool ondisk = true;
return rc;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
/* read instance entry */
BIVerObjEntry obj(hctx, op.key);
int ret = obj.init(op.delete_marker);
return ret;
}
+ ret = guard_bucket_resharding(hctx, header);
+ if (ret < 0) {
+ return ret;
+ }
+
BIVerObjEntry obj(hctx, dest_key);
BIOLHEntry olh(hctx, dest_key);
return rc;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
/* write the olh data entry */
ret = write_entry(hctx, olh_data_entry, olh_data_key, header);
if (ret < 0) {
return rc;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
/* read olh entry */
rgw_bucket_olh_entry olh_data_entry;
string olh_data_key, olh_sub_ver;
return rc;
}
+ rc = guard_bucket_resharding(hctx, header);
+ if (rc < 0) {
+ return rc;
+ }
+
const uint64_t config_op_expiration =
conf->rgw_pending_bucket_index_op_expiration;
return r;
}
+ r = guard_bucket_resharding(hctx, header);
+ if (r < 0) {
+ return r;
+ }
+
if (op.check_existing) {
// fetch any existing keys and decrement their stats before overwriting
std::set<std::string> keys;
}
static int check_index(cls_method_context_t hctx,
- rgw_bucket_dir_header *existing_header,
+ const rgw_bucket_dir_header& existing_header,
rgw_bucket_dir_header *calc_header)
{
- int rc = read_bucket_header(hctx, existing_header);
- if (rc < 0) {
- CLS_LOG(1, "ERROR: check_index(): failed to read header\n");
- return rc;
- }
-
- calc_header->tag_timeout = existing_header->tag_timeout;
- calc_header->ver = existing_header->ver;
- calc_header->syncstopped = existing_header->syncstopped;
+ calc_header->tag_timeout = existing_header.tag_timeout;
+ calc_header->ver = existing_header.ver;
+ calc_header->syncstopped = existing_header.syncstopped;
std::list<rgw_cls_bi_entry> entries;
string start_obj;
bool more;
do {
- rc = list_plain_entries(hctx, filter_prefix, start_obj, CHECK_CHUNK_SIZE, &entries, &more);
+ int rc = list_plain_entries(hctx, filter_prefix, start_obj, CHECK_CHUNK_SIZE, &entries, &more);
if (rc < 0) {
return rc;
}
start_obj = "";
do {
- rc = list_instance_entries(hctx, filter_prefix, start_obj, CHECK_CHUNK_SIZE, &entries, &more);
+ int rc = list_instance_entries(hctx, filter_prefix, start_obj, CHECK_CHUNK_SIZE, &entries, &more);
if (rc < 0) {
return rc;
}
int rgw_bucket_rebuild_index(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
+
rgw_bucket_dir_header existing_header;
+ int rc = read_bucket_header(hctx, &existing_header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: check_index(): failed to read header\n");
+ return rc;
+ }
+
+ rc = guard_bucket_resharding(hctx, existing_header);
+ if (rc < 0) {
+ return rc;
+ }
+
rgw_bucket_dir_header calc_header;
- int rc = check_index(hctx, &existing_header, &calc_header);
+ rc = check_index(hctx, existing_header, &calc_header);
if (rc < 0)
return rc;
{
CLS_LOG(10, "entered %s", __func__);
rgw_cls_check_index_ret ret;
+ int rc = read_bucket_header(hctx, &ret.existing_header);
+ if (rc < 0) {
+ CLS_LOG(1, "ERROR: check_index(): failed to read header\n");
+ return rc;
+ }
- int rc = check_index(hctx, &ret.existing_header, &ret.calculated_header);
+ rc = check_index(hctx, ret.existing_header, &ret.calculated_header);
if (rc < 0)
return rc;
{
CLS_LOG(10, "entered %s", __func__);
- const ConfigProxy& conf = cls_get_config(hctx);
- const uint32_t reshardlog_threshold = conf->rgw_reshardlog_threshold;
-
cls_rgw_guard_bucket_resharding_op op;
-
- auto in_iter = in->cbegin();
try {
+ auto in_iter = in->cbegin();
decode(op, in_iter);
} catch (ceph::buffer::error& err) {
CLS_LOG(1, "ERROR: %s: failed to decode entry", __func__);
return rc;
}
- if (header.resharding_in_progress() ||
- (header.resharding_in_logrecord() && header.reshardlog_entries >= reshardlog_threshold)) {
- return op.ret_err;
- }
-
- return 0;
+ return guard_bucket_resharding(hctx, header, op.ret_err);
}
static int rgw_get_bucket_resharding(cls_method_context_t hctx,