return -EINVAL;
}
-
- string key;
+ std::string key;
op.entry.get_key(&key);
+ int ret;
bufferlist bl;
+
+ if (op.create_only) {
+ ret = cls_cxx_map_get_val(hctx, key, &bl);
+ if (ret == 0) {
+ // entry already exists; make no changes
+ return -EEXIST;
+ } else if (ret != -ENOENT) {
+ CLS_ERR("error accessing reshard queue for %s with key %s",
+ op.entry.bucket_name.c_str(), key.c_str());
+ return ret;
+ }
+
+ // we got a -ENOENT and can just fall through...
+ }
+
encode(op.entry, bl);
- int ret = cls_cxx_map_set_val(hctx, key, &bl);
+ ret = cls_cxx_map_set_val(hctx, key, &bl);
if (ret < 0) {
- CLS_ERR("error adding reshard job for bucket %s with key %s",op.entry.bucket_name.c_str(), key.c_str());
+ CLS_ERR("error adding reshard job for bucket %s with key %s",
+ op.entry.bucket_name.c_str(), key.c_str());
return ret;
}
op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in);
}
-void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
+void cls_rgw_reshard_add(librados::ObjectWriteOperation& op,
+ const cls_rgw_reshard_entry& entry,
+ const bool create_only)
{
bufferlist in;
cls_rgw_reshard_add_op call;
call.entry = entry;
+ call.create_only = create_only;
encode(call, in);
op.exec(RGW_CLASS, RGW_RESHARD_ADD, in);
}
void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op, const std::string& part_key, const RGWUploadPartInfo& info);
/* resharding */
-void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
+void cls_rgw_reshard_add(librados::ObjectWriteOperation& op,
+ const cls_rgw_reshard_entry& entry,
+ const bool create_only);
void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
// these overloads which call io_ctx.operate() should not be called in the rgw.
// rgw_rados_operate() should be called after the overloads w/o calls to io_ctx.operate()
WRITE_CLASS_ENCODER(cls_rgw_mp_upload_part_info_update_op)
struct cls_rgw_reshard_add_op {
- cls_rgw_reshard_entry entry;
+ cls_rgw_reshard_entry entry;
+
+ // true -> will not overwrite existing entry
+ bool create_only {false};
cls_rgw_reshard_add_op() {}
void encode(ceph::buffer::list& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(entry, bl);
+ encode(create_only, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(entry, bl);
+ create_only = false;
+ if (struct_v >= 2) {
+ decode(create_only, bl);
+ }
DECODE_FINISH(bl);
}
static void generate_test_instances(std::list<cls_rgw_reshard_add_op*>& o);
get_bucket_logshard_oid(entry.tenant, entry.bucket_name, &logshard_oid);
librados::ObjectWriteOperation op;
- cls_rgw_reshard_add(op, entry);
+
+ // if we're reducing, we don't want to overwrite an existing entry
+ // in order to not interfere with the reshard reduction wait period
+ const bool create_only = entry.new_num_shards < entry.old_num_shards;
+
+ cls_rgw_reshard_add(op, entry, create_only);
int ret = rgw_rados_operate(dpp, store->getRados()->reshard_pool_ctx, logshard_oid, &op, y);
- if (ret < 0) {
- ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" << logshard_oid << " tenant=" << entry.tenant << " bucket=" << entry.bucket_name << dendl;
+ if (create_only && ret == -EEXIST) {
+ ldpp_dout(dpp, 20) << "INFO: did not write reshard queue entry for oid=" <<
+ logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
+ entry.bucket_name <<
+ ", because it's a reshard reduction and an entry for that bucket already exists" <<
+ dendl;
+ // this is not an error so just fall through
+ } else if (ret < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: failed to add entry to reshard log, oid=" <<
+ logshard_oid << " tenant=" << entry.tenant << " bucket=" <<
+ entry.bucket_name << dendl;
return ret;
}
return 0;