From 59b37e0ec857ce1b018e352afcbd1507d399ea69 Mon Sep 17 00:00:00 2001 From: liangmingyuan Date: Wed, 21 Feb 2024 10:49:49 +0800 Subject: [PATCH] rgw/reshard: record a duplicated index entry copy together with version bucket writting operations. Signed-off-by: Mingyuan Liang --- src/cls/rgw/cls_rgw.cc | 172 +++++++++++++++++++++++++++-------------- 1 file changed, 113 insertions(+), 59 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index f3c69098f62d2..aa5ba9ed7dbfd 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -142,13 +142,13 @@ static void bi_reshard_log_prefix(string& key) } // 0x802001_idx -static void bi_reshard_log_key(cls_method_context_t hctx, string& key, string& idx) +static void bi_reshard_log_key(cls_method_context_t hctx, string& key, const string& idx) { bi_reshard_log_prefix(key); key.append(idx); } -static int reshard_log_index_operation(cls_method_context_t hctx, string& idx, +static int reshard_log_index_operation(cls_method_context_t hctx, const string& idx, const cls_rgw_obj_key& key, bufferlist* log_bl) { string reshard_log_idx; @@ -1308,11 +1308,19 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist } // rgw_bucket_complete_op template -static int write_entry(cls_method_context_t hctx, T& entry, const string& key) +static int write_entry(cls_method_context_t hctx, T& entry, const string& key, + const bool is_resharding = false) { bufferlist bl; encode(entry, bl); - return cls_cxx_map_set_val(hctx, key, &bl); + int ret = cls_cxx_map_set_val(hctx, key, &bl); + if (ret < 0) { + return ret; + } + if (is_resharding) { + ret = reshard_log_index_operation(hctx, key, entry.key, &bl); + } + return ret; } static int read_olh(cls_method_context_t hctx,cls_rgw_obj_key& obj_key, rgw_bucket_olh_entry *olh_data_entry, string *index_key, bool *found) @@ -1345,11 +1353,13 @@ static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, co log.push_back(log_entry); } -static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx) +static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, + const string& instance_idx, bool is_resharding) { - CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), instance_idx.c_str(), instance_entry.flags); + CLS_LOG(20, "write_entry() instance=%s idx=%s flags=%d", escape_str(instance_entry.key.instance).c_str(), + instance_idx.c_str(), instance_entry.flags); /* write the instance entry */ - int ret = write_entry(hctx, instance_entry, instance_idx); + int ret = write_entry(hctx, instance_entry, instance_idx, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance_key=%s ret=%d", escape_str(instance_idx).c_str(), ret); return ret; @@ -1360,9 +1370,10 @@ static int write_obj_instance_entry(cls_method_context_t hctx, rgw_bucket_dir_en /* * write object instance entry, and if needed also the list entry */ -static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, const string& instance_idx) +static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& instance_entry, + const string& instance_idx, bool is_resharding) { - int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx); + int ret = write_obj_instance_entry(hctx, instance_entry, instance_idx, is_resharding); if (ret < 0) { return ret; } @@ -1372,7 +1383,7 @@ static int write_obj_entries(cls_method_context_t hctx, rgw_bucket_dir_entry& in if (instance_idx != instance_list_idx) { CLS_LOG(20, "write_entry() idx=%s flags=%d", escape_str(instance_list_idx).c_str(), instance_entry.flags); /* write a new list entry for the object instance */ - ret = write_entry(hctx, instance_entry, instance_list_idx); + ret = write_entry(hctx, instance_entry, instance_list_idx, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() instance=%s instance_list_idx=%s ret=%d", instance_entry.key.instance.c_str(), instance_list_idx.c_str(), ret); return ret; @@ -1428,8 +1439,8 @@ public: instance_entry.versioned_epoch = epoch; } - int unlink_list_entry() { - string list_idx; + int unlink_list_entry(bool is_resharding) { + string list_idx, list_sub_ver; /* this instance has a previous list entry, remove that entry */ get_list_index_key(instance_entry, &list_idx); CLS_LOG(20, "unlink_list_entry() list_idx=%s", escape_str(list_idx).c_str()); @@ -1438,10 +1449,14 @@ public: CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() list_idx=%s ret=%d", list_idx.c_str(), ret); return ret; } + if (is_resharding) { + bufferlist empty; + return reshard_log_index_operation(hctx, list_idx, instance_entry.key, &empty); + } return 0; } - int unlink() { + int unlink(bool is_resharding, const cls_rgw_obj_key& key) { /* remove the instance entry */ CLS_LOG(20, "unlink() idx=%s", escape_str(instance_idx).c_str()); int ret = cls_cxx_map_remove_key(hctx, instance_idx); @@ -1449,10 +1464,14 @@ public: CLS_LOG(0, "ERROR: cls_cxx_map_remove_key() instance_idx=%s ret=%d", instance_idx.c_str(), ret); return ret; } + if (is_resharding) { + bufferlist empty; + return reshard_log_index_operation(hctx, instance_idx, key, &empty); + } return 0; } - int write_entries(uint64_t flags_set, uint64_t flags_reset) { + int write_entries(uint64_t flags_set, uint64_t flags_reset, bool is_resharding) { if (!initialized) { int ret = init(); if (ret < 0) { @@ -1465,7 +1484,7 @@ public: /* write the instance and list entries */ bool special_delete_marker_key = (instance_entry.is_delete_marker() && instance_entry.key.instance.empty()); encode_obj_versioned_data_key(key, &instance_idx, special_delete_marker_key); - int ret = write_obj_entries(hctx, instance_entry, instance_idx); + int ret = write_obj_entries(hctx, instance_entry, instance_idx, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries() instance_idx=%s ret=%d", instance_idx.c_str(), ret); return ret; @@ -1474,11 +1493,11 @@ public: return 0; } - int write(uint64_t epoch, bool current) { + int write(uint64_t epoch, bool current, bool is_resharding) { if (instance_entry.versioned_epoch > 0) { CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch); /* this instance has a previous list entry, remove that entry */ - int ret = unlink_list_entry(); + int ret = unlink_list_entry(is_resharding); if (ret < 0) { return ret; } @@ -1490,11 +1509,11 @@ public: } instance_entry.versioned_epoch = epoch; - return write_entries(flags, 0); + return write_entries(flags, 0, is_resharding); } - int demote_current() { - return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT); + int demote_current(bool is_resharding) { + return write_entries(0, rgw_bucket_dir_entry::FLAG_CURRENT, is_resharding); } bool is_delete_marker() { @@ -1596,9 +1615,9 @@ public: olh_data_entry.key = key; } - int write() { + int write(bool is_resharding) { /* write the olh data entry */ - int ret = write_entry(hctx, olh_data_entry, olh_data_idx); + int ret = write_entry(hctx, olh_data_entry, olh_data_idx, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_idx.c_str(), ret); return ret; @@ -1632,12 +1651,13 @@ public: } }; -static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key) +static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key, + bool is_resharding) { rgw_bucket_dir_entry entry; entry.key = key; entry.flags = rgw_bucket_dir_entry::FLAG_VER_MARKER; - int ret = write_entry(hctx, entry, key.name); + int ret = write_entry(hctx, entry, key.name, is_resharding); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry returned ret=%d", ret); return ret; @@ -1652,9 +1672,10 @@ static int write_version_marker(cls_method_context_t hctx, cls_rgw_obj_key& key) * key. Their version is going to be empty though */ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, - cls_rgw_obj_key& key, - bool demote_current, - bool instance_only) + cls_rgw_obj_key& key, + bool demote_current, + bool instance_only, + bool is_resharding) { if (!key.instance.empty()) { return -EINVAL; @@ -1681,9 +1702,9 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, encode_obj_versioned_data_key(key, &new_idx); if (instance_only) { - ret = write_obj_instance_entry(hctx, entry, new_idx); + ret = write_obj_instance_entry(hctx, entry, new_idx, is_resharding); } else { - ret = write_obj_entries(hctx, entry, new_idx); + ret = write_obj_entries(hctx, entry, new_idx, is_resharding); } if (ret < 0) { CLS_LOG(0, "ERROR: write_obj_entries new_idx=%s returned %d", @@ -1692,7 +1713,7 @@ static int convert_plain_entry_to_versioned(cls_method_context_t hctx, } } - ret = write_version_marker(hctx, key); + ret = write_version_marker(hctx, key, is_resharding); if (ret < 0) { return ret; } @@ -1732,6 +1753,13 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer return -EINVAL; } + struct rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__); + return rc; + } + /* read instance entry */ BIVerObjEntry obj(hctx, op.key); int ret = obj.init(op.delete_marker); @@ -1805,7 +1833,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer * entry */ existed = (ret >= 0 && !other_obj.is_delete_marker()); if (ret >= 0 && other_obj.is_delete_marker() != op.delete_marker) { - ret = other_obj.unlink_list_entry(); + ret = other_obj.unlink_list_entry(header.resharding_in_logrecord()); if (ret < 0) { return ret; } @@ -1813,7 +1841,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer removing = existed && op.delete_marker; if (!removing) { - ret = other_obj.unlink(); + ret = other_obj.unlink(header.resharding_in_logrecord(), op.key); if (ret < 0) { return ret; } @@ -1839,7 +1867,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer const uint64_t prev_epoch = olh.get_epoch(); if (!olh.start_modify(op.olh_epoch)) { - ret = obj.write(op.olh_epoch, false); + ret = obj.write(op.olh_epoch, false, header.resharding_in_logrecord()); if (ret < 0) { return ret; } @@ -1871,7 +1899,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer if (!(olh_entry.key == op.key)) { BIVerObjEntry old_obj(hctx, olh_entry.key); - ret = old_obj.demote_current(); + ret = old_obj.demote_current(header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret); return ret; @@ -1882,7 +1910,8 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } else { bool instance_only = (op.key.instance.empty() && op.delete_marker); cls_rgw_obj_key key(op.key.name); - ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only); + ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, + header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; @@ -1904,14 +1933,14 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer } olh.set_exists(true); - ret = olh.write(); + ret = olh.write(header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret); return ret; } /* write the instance and list entries */ - ret = obj.write(olh.get_epoch(), promote); + ret = obj.write(olh.get_epoch(), promote, header.resharding_in_logrecord()); if (ret < 0) { return ret; } @@ -1920,12 +1949,6 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer return 0; } - rgw_bucket_dir_header header; - ret = read_bucket_header(hctx, &header); - if (ret < 0) { - CLS_LOG(1, "ERROR: rgw_bucket_link_olh(): failed to read header\n"); - return ret; - } if (header.syncstopped) { return 0; } @@ -1972,10 +1995,17 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, cls_rgw_obj_key dest_key = op.key; + struct rgw_bucket_dir_header header; + int ret = read_bucket_header(hctx, &header); + if (ret < 0) { + CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n"); + return ret; + } + BIVerObjEntry obj(hctx, dest_key); BIOLHEntry olh(hctx, dest_key); - int ret = obj.init(); + ret = obj.init(); if (ret < 0) { if (ret != -ENOENT) { CLS_LOG(0, "ERROR: obj.init() returned ret=%d", ret); @@ -1993,7 +2023,8 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (!olh_found) { bool instance_only = false; cls_rgw_obj_key key(dest_key.name); - ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only); + ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, + header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret); return ret; @@ -2005,7 +2036,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } if (!olh.start_modify(op.olh_epoch)) { - ret = obj.unlink_list_entry(); + ret = obj.unlink_list_entry(header.resharding_in_logrecord()); if (ret < 0) { return ret; } @@ -2015,7 +2046,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch); - return olh.write(); + return olh.write(header.resharding_in_logrecord()); } rgw_bucket_olh_entry& olh_entry = olh.get_entry(); @@ -2035,7 +2066,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, if (found) { BIVerObjEntry next(hctx, next_key); - ret = next.write(olh.get_epoch(), true); + ret = next.write(olh.get_epoch(), true, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret); return ret; @@ -2062,18 +2093,18 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, } else { /* this is a delete marker, it's our responsibility to remove its * instance entry */ - ret = obj.unlink(); + ret = obj.unlink(header.resharding_in_logrecord(), op.key); if (ret < 0) { return ret; } } - ret = obj.unlink_list_entry(); + ret = obj.unlink_list_entry(header.resharding_in_logrecord()); if (ret < 0) { return ret; } - ret = olh.write(); + ret = olh.write(header.resharding_in_logrecord()); if (ret < 0) { return ret; } @@ -2082,12 +2113,6 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in, return 0; } - rgw_bucket_dir_header header; - ret = read_bucket_header(hctx, &header); - if (ret < 0) { - CLS_LOG(1, "ERROR: rgw_bucket_unlink_instance(): failed to read header\n"); - return ret; - } if (header.syncstopped) { return 0; } @@ -2203,8 +2228,15 @@ static int rgw_bucket_trim_olh_log(cls_method_context_t hctx, bufferlist *in, bu log.erase(rm_iter); } + struct rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__); + return rc; + } + /* write the olh data entry */ - ret = write_entry(hctx, olh_data_entry, olh_data_key); + ret = write_entry(hctx, olh_data_entry, olh_data_key, header.resharding_in_logrecord()); if (ret < 0) { CLS_LOG(0, "ERROR: write_entry() olh_key=%s ret=%d", olh_data_key.c_str(), ret); return ret; @@ -2231,9 +2263,16 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe return -EINVAL; } + struct rgw_bucket_dir_header header; + int rc = read_bucket_header(hctx, &header); + if (rc < 0) { + CLS_LOG(1, "ERROR: %s(): failed to read header\n", __func__); + return rc; + } + /* read olh entry */ rgw_bucket_olh_entry olh_data_entry; - string olh_data_key; + string olh_data_key, olh_sub_ver; encode_olh_data_key(op.key, &olh_data_key); int ret = read_index_entry(hctx, olh_data_key, &olh_data_entry); if (ret < 0 && ret != -ENOENT) { @@ -2251,6 +2290,10 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe CLS_LOG(1, "NOTICE: %s: can't remove key %s ret=%d", __func__, olh_data_key.c_str(), ret); return ret; } + bufferlist empty; + ret = record_duplicate_entry(hctx, olh_data_key, olh_data_entry.key, &empty, header.resharding_in_logrecord()); + if (ret < 0) + return ret; rgw_bucket_dir_entry plain_entry; @@ -2276,6 +2319,10 @@ static int rgw_bucket_clear_olh(cls_method_context_t hctx, bufferlist *in, buffe return ret; } + ret = record_duplicate_entry(hctx, op.key.name, plain_entry.key, &empty, header.resharding_in_logrecord()); + if (ret < 0) + return ret; + return 0; } @@ -2442,6 +2489,10 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, return ret; } } + if (header.resharding_in_logrecord()) { + bufferlist empty; + return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &empty); + } break; case CEPH_RGW_UPDATE: CLS_LOG_BITX(bitx_inst, 10, @@ -2475,6 +2526,9 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, return ret; } } + if (header.resharding_in_logrecord()) { + return reshard_log_index_operation(hctx, cur_change_key, cur_change.key, &cur_state_bl); + } break; } // switch(op) } // if (cur_disk.pending_map.empty()) -- 2.39.5