cout << " data sync status\n";
cout << " required for: \n";
cout << " mdlog trim\n";
+ cout << " --gen=<gen-id> optional for: \n";
+ cout << " bilog list\n";
+ cout << " bilog trim\n";
+ cout << " bilog status\n";
cout << " --max-entries=<entries> max entries for listing operations\n";
cout << " --metadata-key=<key> key to retrieve metadata from with metadata get\n";
cout << " --remote=<remote> zone or zonegroup id of remote gateway\n";
string sub_name;
string event_id;
+ std::optional<uint64_t> gen;
std::optional<std::string> str_script_ctx;
std::optional<std::string> script_package;
int allow_compilation = false;
return EINVAL;
}
specified_shard_id = true;
+ } else if (ceph_argparse_witharg(args, i, &val, "--gen", (char*)NULL)) {
+ gen = strict_strtoll(val.c_str(), 10, &err);
+ if (!err.empty()) {
+ cerr << "ERROR: failed to parse gen id: " << err << std::endl;
+ return EINVAL;
+ }
} else if (ceph_argparse_witharg(args, i, &val, "--access", (char*)NULL)) {
access = val;
perm_mask = rgw_str_to_perm(access.c_str());
RGWRados::ent_map_t result;
result.reserve(NUM_ENTRIES);
+ const auto& current_index = bucket->get_info().layout.current_index;
int r = static_cast<rgw::sal::RadosStore*>(store)->getRados()->cls_bucket_list_ordered(
- dpp(), bucket->get_info(), RGW_NO_SHARD,
+ dpp(), bucket->get_info(), current_index, RGW_NO_SHARD,
marker, empty_prefix, empty_delimiter,
NUM_ENTRIES, true, expansion_factor,
result, &is_truncated, &cls_filtered, &marker,
if (max_entries < 0)
max_entries = 1000;
+ const auto& logs = bucket->get_info().layout.logs;
+ auto log_layout = std::reference_wrapper{logs.back()};
+ if (gen) {
+ auto i = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(*gen));
+ if (i == logs.end()) {
+ cerr << "ERROR: no log layout with gen=" << *gen << std::endl;
+ return ENOENT;
+ }
+ log_layout = *i;
+ }
+
do {
list<rgw_bi_log_entry> entries;
- ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_list(dpp(), bucket->get_info(), shard_id, marker, max_entries - count, entries, &truncated);
+ ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_list(dpp(), bucket->get_info(), log_layout, shard_id, marker, max_entries - count, entries, &truncated);
if (ret < 0) {
cerr << "ERROR: list_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
- ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_trim(dpp(), bucket->get_info(), shard_id, start_marker, end_marker);
+
+ const auto& logs = bucket->get_info().layout.logs;
+ auto log_layout = std::reference_wrapper{logs.back()};
+ if (gen) {
+ auto i = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(*gen));
+ if (i == logs.end()) {
+ cerr << "ERROR: no log layout with gen=" << *gen << std::endl;
+ return ENOENT;
+ }
+ log_layout = *i;
+ }
+
+ ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_trim(dpp(), bucket->get_info(), log_layout, shard_id, start_marker, end_marker);
if (ret < 0) {
cerr << "ERROR: trim_bi_log_entries(): " << cpp_strerror(-ret) << std::endl;
return -ret;
return -ret;
}
map<int, string> markers;
- ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->get_log_status(dpp(), bucket->get_info(), shard_id,
+ const auto& logs = bucket->get_info().layout.logs;
+ auto log_layout = std::reference_wrapper{logs.back()};
+ if (gen) {
+ auto i = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(*gen));
+ if (i == logs.end()) {
+ cerr << "ERROR: no log layout with gen=" << *gen << std::endl;
+ return ENOENT;
+ }
+ log_layout = *i;
+ }
+
+ ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->get_log_status(dpp(), bucket->get_info(), log_layout, shard_id,
&markers, null_yield);
if (ret < 0) {
cerr << "ERROR: get_bi_log_status(): " << cpp_strerror(-ret) << std::endl;
Formatter *formatter = flusher.get_formatter();
formatter->open_object_section("objects");
+
while (results.is_truncated) {
rgw::sal::Bucket::ListParams params;
params.marker = results.next_marker;
int shards_num = bucket->get_info().layout.current_index.layout.normal.num_shards? bucket->get_info().layout.current_index.layout.normal.num_shards : 1;
int shard_id = bucket->get_info().layout.current_index.layout.normal.num_shards? 0 : -1;
+ const auto& log_layout = bucket->get_info().layout.logs.back();
if (!sync) {
- r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_stop(dpp, bucket->get_info(), -1);
+ r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_stop(dpp, bucket->get_info(), log_layout, -1);
if (r < 0) {
set_err_msg(err_msg, "ERROR: failed writing stop bilog:" + cpp_strerror(-r));
return r;
}
} else {
- r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_start(dpp, bucket->get_info(), -1);
+ r = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_start(dpp, bucket->get_info(), log_layout, -1);
if (r < 0) {
set_err_msg(err_msg, "ERROR: failed writing resync bilog:" + cpp_strerror(-r));
return r;
string bucket_ver, master_ver;
string max_marker;
- ret = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
+ const auto& latest_log = bucket->get_info().layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+ ret = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
if (ret < 0) {
cerr << "error getting bucket stats bucket=" << bucket->get_name() << " ret=" << ret << std::endl;
return ret;
/* need stats for num_entries */
string bucket_ver, master_ver;
std::map<RGWObjCategory, RGWStorageStats> stats;
- ret = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, nullptr);
+ const auto& latest_log = bucket->get_info().layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+ ret = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, nullptr);
if (ret < 0)
continue;
return {gen, {BucketLogType::InIndex, {gen, index}}};
}
+inline auto matches_gen(uint64_t gen)
+{
+ return [gen] (const bucket_log_layout_generation& l) { return l.gen == gen; };
+}
+
+inline bucket_index_layout_generation log_to_index_layout(const bucket_log_layout_generation& log_layout)
+{
+ ceph_assert(log_layout.layout.type == BucketLogType::InIndex);
+ bucket_index_layout_generation index;
+ index.gen = log_layout.layout.in_index.gen;
+ index.layout.normal = log_layout.layout.in_index.layout;
+ return index;
+}
+
enum class BucketReshardState : uint8_t {
None,
InProgress,
public:
RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
- const rgw_bucket_shard& bs,
+ const rgw_bucket& bucket,
rgw_bucket_index_marker_info *_info)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- instance_key(bs.get_key()), info(_info) {}
+ instance_key(bucket.get_key()), info(_info) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
if (retcode < 0) {
return set_cr_error(retcode);
}
+
return set_cr_done();
}
return 0;
rgw_bucket_shard_sync_info& status;
RGWObjVersionTracker& objv_tracker;
- rgw_bucket_index_marker_info info;
+ rgw_bucket_index_marker_info& info;
+ const BucketIndexShardsManager& marker_mgr;
bool exclusive;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& _sync_pair,
rgw_bucket_shard_sync_info& _status,
+ rgw_bucket_index_marker_info& _info,
+ const BucketIndexShardsManager& _marker_mgr,
RGWObjVersionTracker& objv_tracker,
bool exclusive)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair)),
- status(_status), objv_tracker(objv_tracker), exclusive(exclusive)
+ status(_status), objv_tracker(objv_tracker), info(_info), marker_mgr(_marker_mgr), exclusive(exclusive)
{}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- /* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs, &info));
- if (retcode < 0 && retcode != -ENOENT) {
- ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
- return set_cr_error(retcode);
- }
yield {
auto store = sync_env->store;
rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid);
const bool stopped = status.state == rgw_bucket_shard_sync_info::StateStopped;
bool write_status = false;
+ auto max_marker = marker_mgr.get(sync_pair.dest_bs.shard_id, "");
+
if (info.syncstopped) {
if (stopped && !sync_env->sync_module->should_full_sync()) {
// preserve our current incremental marker position
} else {
// whether or not to do full sync, incremental sync will follow anyway
if (sync_env->sync_module->should_full_sync()) {
- status.inc_marker.position = info.max_marker;
+ status.inc_marker.position = max_marker;
}
write_status = true;
status.inc_marker.timestamp = ceph::real_clock::now();
const rgw_bucket_sync_pair_info& pair;
rgw_bucket_shard_sync_info status;
RGWObjVersionTracker objv;
+ rgw_bucket_index_marker_info& info;
+ const BucketIndexShardsManager& marker_mgr;
+
int tries = 10; // retry on racing writes
bool exclusive = true; // first try is exclusive
using ReadCR = RGWReadBucketPipeSyncStatusCoroutine;
using InitCR = RGWInitBucketShardSyncStatusCoroutine;
public:
- InitBucketShardStatusCR(RGWDataSyncCtx* sc, const rgw_bucket_sync_pair_info& pair)
- : RGWCoroutine(sc->cct), sc(sc), pair(pair)
+ InitBucketShardStatusCR(RGWDataSyncCtx* sc,
+ const rgw_bucket_sync_pair_info& pair,
+ rgw_bucket_index_marker_info& info,
+ const BucketIndexShardsManager& marker_mgr)
+ : RGWCoroutine(sc->cct), sc(sc), pair(pair), info(info), marker_mgr(marker_mgr)
{}
int operate(const DoutPrefixProvider *dpp) {
reenter(this) {
// try exclusive create with empty status
objv.generate_new_write_ver(cct);
- yield call(new InitCR(sc, pair, status, objv, exclusive));
+ yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
if (retcode >= 0) {
return set_cr_done();
} else if (retcode != -EEXIST) {
if (retcode < 0) {
return set_cr_error(retcode);
}
- yield call(new InitCR(sc, pair, status, objv, exclusive));
+ yield call(new InitCR(sc, pair, status, info, marker_mgr, objv, exclusive));
if (retcode >= 0) {
return set_cr_done();
} else if (retcode != -ECANCELED) {
static constexpr int max_concurrent_shards = 16;
RGWDataSyncCtx* sc;
rgw_bucket_sync_pair_info sync_pair;
+ rgw_bucket_index_marker_info& info;
+ const BucketIndexShardsManager& marker_mgr;
+
const int num_shards;
int shard = 0;
public:
InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
const rgw_bucket_sync_pair_info& sync_pair,
+ rgw_bucket_index_marker_info& info,
+ const BucketIndexShardsManager& marker_mgr,
int num_shards)
: RGWShardCollectCR(sc->cct, max_concurrent_shards),
- sc(sc), sync_pair(sync_pair), num_shards(num_shards)
+ sc(sc), sync_pair(sync_pair), info(info), marker_mgr(marker_mgr), num_shards(num_shards)
{}
bool spawn_next() override {
return false;
}
sync_pair.dest_bs.shard_id = shard++;
- spawn(new InitBucketShardStatusCR(sc, sync_pair), false);
+ spawn(new InitBucketShardStatusCR(sc, sync_pair, info, marker_mgr), false);
return true;
}
};
const int num_shards;
const bool check_compat;
+ rgw_bucket_index_marker_info info;
+ BucketIndexShardsManager marker_mgr;
+
bool all_incremental = true;
public:
InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.dest_bs.bucket, &info));
+ if (retcode < 0) {
+ lderr(cct) << "failed to read remote bilog info: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
+ retcode = marker_mgr.from_string(info.max_marker, -1);
+ if (retcode < 0) {
+ lderr(cct) << "failed to parse bilog shard markers: "
+ << cpp_strerror(retcode) << dendl;
+ return set_cr_error(retcode);
+ }
+
status.state = BucketSyncState::Init;
- if (check_compat) {
- // try to convert existing per-shard incremental status for backward compatibility
- yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
- if (retcode < 0) {
- return set_cr_error(retcode);
- }
- if (all_incremental) {
- // we can use existing status and resume incremental sync
- status.state = BucketSyncState::Incremental;
+ if (info.oldest_gen == 0) {
+ if (check_compat) {
+ // try to convert existing per-shard incremental status for backward compatibility
+ yield call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair, num_shards, &all_incremental));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ if (all_incremental) {
+ // we can use existing status and resume incremental sync
+ status.state = BucketSyncState::Incremental;
+ }
}
}
if (status.state != BucketSyncState::Incremental) {
// initialize all shard sync status. this will populate the log marker
// positions where incremental sync will resume after full sync
- yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, num_shards));
+ yield call(new InitBucketShardStatusCollectCR(sc, sync_pair, info, marker_mgr, num_shards));
if (retcode < 0) {
ldout(cct, 20) << "failed to init bucket shard status: "
<< cpp_strerror(retcode) << dendl;
}
}
+ status.incremental_gen = info.latest_gen;
+
ldout(cct, 20) << "writing bucket sync state=" << status.state << dendl;
// write bucket sync status
}
};
+struct next_bilog_result {
+ uint64_t generation = 0;
+ int num_shards = 0;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("generation", generation, obj);
+ JSONDecoder::decode_json("num_shards", num_shards, obj);
+ }
+};
+
+struct bilog_list_result {
+ list<rgw_bi_log_entry> entries;
+ bool truncated{false};
+ std::optional<next_bilog_result> next_log;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("entries", entries, obj);
+ JSONDecoder::decode_json("truncated", truncated, obj);
+ JSONDecoder::decode_json("next_log", next_log, obj);
+ }
+};
+
class RGWListBucketIndexLogCR: public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
const string instance_key;
string marker;
- list<rgw_bi_log_entry> *result;
+ bilog_list_result *result;
std::optional<PerfGuard> timer;
+ uint64_t generation;
+ std::string gen_str = std::to_string(generation);
+ uint32_t format_ver{1};
public:
- RGWListBucketIndexLogCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
- string& _marker, list<rgw_bi_log_entry> *_result)
+ RGWListBucketIndexLogCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, string& _marker,
+ uint64_t _generation, bilog_list_result *_result)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- instance_key(bs.get_key()), marker(_marker), result(_result) {}
+ instance_key(bs.get_key()), marker(_marker), result(_result), generation(_generation) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
{ "format" , "json" },
{ "marker" , marker.c_str() },
{ "type", "bucket-index" },
+ { "generation", gen_str.c_str() },
+ { "format-ver", "2"},
{ NULL, NULL } };
- call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sc->conn, sync_env->http_manager, "/admin/log", pairs, result));
+ call(new RGWReadRESTResourceCR<bilog_list_result>(sync_env->cct, sc->conn, sync_env->http_manager,
+ "/admin/log", pairs, result));
}
timer.reset();
if (retcode < 0) {
RGWBucketSyncFlowManager::pipe_rules_ref rules;
rgw_bucket_shard& bs;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
+ bilog_list_result extended_result;
list<rgw_bi_log_entry> list_result;
+ uint64_t generation;
+ int next_num_shards;
+ uint64_t next_gen;
+ bool truncated;
+
list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
rgw_bucket_shard_sync_info& sync_info;
set_status("init");
rules = sync_pipe.get_rules();
target_location_key = sync_pipe.info.dest_bs.bucket.get_key();
+ generation = 0; // TODO: remove once datalog shard is done
}
bool check_key_handled(const rgw_obj_key& key) {
}
tn->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info.inc_marker.position));
set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
- yield call(new RGWListBucketIndexLogCR(sc, bs, sync_info.inc_marker.position,
- &list_result));
+ yield call(new RGWListBucketIndexLogCR(sc, bs, sync_info.inc_marker.position, generation, &extended_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
drain_all();
return set_cr_error(retcode);
}
+ list_result = std::move(extended_result.entries);
+ truncated = extended_result.truncated;
+ if (extended_result.next_log) {
+ next_gen = extended_result.next_log->generation;
+ next_num_shards = extended_result.next_log->num_shards;
+ }
+
squash_map.clear();
entries_iter = list_result.begin();
entries_end = list_result.end();
struct rgw_bucket_sync_status {
BucketSyncState state = BucketSyncState::Init;
rgw_bucket_full_sync_status full;
+ uint64_t incremental_gen;
+ std::vector<bool> shards_done_with_gen;
void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
encode(state, bl);
encode(full, bl);
+ encode(incremental_gen, bl);
+ encode(shards_done_with_gen, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
decode(state, bl);
decode(full, bl);
+ if (struct_v > 1) {
+ decode(incremental_gen, bl);
+ decode(shards_done_with_gen, bl);
+ }
DECODE_FINISH(bl);
}
std::string master_ver;
std::string max_marker;
bool syncstopped{false};
+ uint64_t oldest_gen;
+ uint64_t latest_gen;
void decode_json(JSONObj *obj) {
JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
JSONDecoder::decode_json("master_ver", master_ver, obj);
JSONDecoder::decode_json("max_marker", max_marker, obj);
JSONDecoder::decode_json("syncstopped", syncstopped, obj);
+ JSONDecoder::decode_json("oldest_gen", oldest_gen, obj);
+ JSONDecoder::decode_json("latest_gen", latest_gen, obj);
}
};
ldpp_dout(&dp, 20) << "initiating async quota refresh for bucket=" << bucket << dendl;
- r = rbucket->read_stats_async(&dp, RGW_NO_SHARD, this);
+ const auto& latest_log = rbucket->get_info().layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+ r = rbucket->read_stats_async(&dp, index, RGW_NO_SHARD, this);
if (r < 0) {
ldpp_dout(&dp, 0) << "could not get bucket info for bucket=" << bucket.name << dendl;
string bucket_ver;
string master_ver;
+ const auto& latest_log = bucket->get_info().layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+
map<RGWObjCategory, RGWStorageStats> bucket_stats;
- r = bucket->read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, bucket_stats);
+ r = bucket->read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver,
+ &master_ver, bucket_stats, nullptr);
if (r < 0) {
ldpp_dout(dpp, 0) << "could not get bucket stats for bucket="
<< _b.name << dendl;
return ret;
}
-int RGWRados::decode_policy(const DoutPrefixProvider *dpp, bufferlist& bl, ACLOwner *owner)
+int RGWRados::decode_policy(const DoutPrefixProvider *dpp,
+ ceph::buffer::list& bl,
+ ACLOwner *owner)
{
auto i = bl.cbegin();
RGWAccessControlPolicy policy(cct);
RGWRados *store = target->get_store();
CephContext *cct = store->ctx();
int shard_id = target->get_shard_id();
+ const auto& current_index = target->get_bucket_info().layout.current_index;
int count = 0;
bool truncated = true;
ent_map.reserve(read_ahead);
int r = store->cls_bucket_list_ordered(dpp,
target->get_bucket_info(),
- shard_id,
+ current_index,
+ shard_id,
cur_marker,
cur_prefix,
params.delim,
{
RGWRados *store = target->get_store();
int shard_id = target->get_shard_id();
+ const auto& current_index = target->get_bucket_info().layout.current_index;
int count = 0;
bool truncated = true;
int r = store->cls_bucket_list_unordered(dpp,
target->get_bucket_info(),
- shard_id,
+ current_index,
+ shard_id,
cur_marker,
cur_prefix,
read_ahead,
int r = cls_bucket_list_unordered(dpp,
bucket_info,
- RGW_NO_SHARD,
+ bucket_info.layout.current_index,
+ RGW_NO_SHARD,
marker,
prefix,
NUM_ENTRIES,
const rgw_bucket& bucket = bucket_info.bucket;
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
// key - bucket index object id
// value - bucket index check OP returned result with the given bucket index object (shard)
map<int, string> oids;
- int ret = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool, &oids, nullptr);
+
+ int ret = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &oids, nullptr);
if (ret < 0) {
return ret;
}
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0) {
return r;
}
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0) {
return r;
}
return 0;
}
-int RGWRados::get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, string *bucket_ver, string *master_ver,
- map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker, bool *syncstopped)
+int RGWRados::get_bucket_stats(const DoutPrefixProvider *dpp,
+ RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ int shard_id, string *bucket_ver, string *master_ver,
+ map<RGWObjCategory, RGWStorageStats>& stats,
+ string *max_marker, bool *syncstopped)
{
vector<rgw_bucket_dir_header> headers;
map<int, string> bucket_instance_ids;
- int r = cls_bucket_head(dpp, bucket_info, shard_id, headers, &bucket_instance_ids);
+ int r = cls_bucket_head(dpp, bucket_info, idx_layout, shard_id, headers, &bucket_instance_ids);
if (r < 0) {
return r;
}
}
};
-int RGWRados::get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *ctx)
+int RGWRados::get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *ctx)
{
int num_aio = 0;
RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.layout.current_index.layout.normal.num_shards ? : 1);
ceph_assert(get_ctx);
- int r = cls_bucket_head_async(dpp, bucket_info, shard_id, get_ctx, &num_aio);
+ int r = cls_bucket_head_async(dpp, bucket_info, idx_layout, shard_id, get_ctx, &num_aio);
if (r < 0) {
ctx->put();
if (num_aio) {
return ret;
}
- int r = cls_bucket_head(dpp, bucket_info, RGW_NO_SHARD, headers);
+ int r = cls_bucket_head(dpp, bucket_info, bucket_info.layout.current_index, RGW_NO_SHARD, headers);
if (r < 0)
return r;
{
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool, &bucket_objs, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, bucket_info.layout.current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
int RGWRados::cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
- const int shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ const int shard_id,
const rgw_obj_index_key& start_after,
const std::string& prefix,
const std::string& delimiter,
// value - list result for the corresponding oid (shard), it is filled by
// the AIO callback
std::map<int, std::string> shard_oids;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id,
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, idx_layout,
&index_pool, &shard_oids,
nullptr);
if (r < 0) {
int RGWRados::cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
- int shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ int shard_id,
const rgw_obj_index_key& start_after,
const std::string& prefix,
uint32_t num_entries,
RGWSI_RADOS::Pool index_pool;
std::map<int, std::string> oids;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &oids, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, idx_layout, &index_pool, &oids, nullptr);
if (r < 0) {
return r;
}
RGWSI_RADOS::Pool index_pool;
std::map<int, std::string> index_oids;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt, &index_pool,
- &index_oids, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, std::nullopt,
+ bucket_info.layout.current_index,
+ &index_pool, &index_oids, nullptr);
if (r < 0) {
ldout_bitx(bitx, dpp, 0) << "ERROR: " << __func__ <<
" open_bucket_index returned " << r << dendl_bitx;
return 0;
} // RGWRados::check_disk_state
-int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
+int RGWRados::cls_bucket_head(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids)
{
RGWSI_RADOS::Pool index_pool;
map<int, string> oids;
map<int, struct rgw_cls_list_ret> list_results;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &oids, bucket_instance_ids);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, -1, idx_layout, &index_pool, &oids, bucket_instance_ids);
if (r < 0) {
ldpp_dout(dpp, 20) << "cls_bucket_head: open_bucket_index() returned "
<< r << dendl;
return 0;
}
-int RGWRados::cls_bucket_head_async(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio)
+int RGWRados::cls_bucket_head_async(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio)
{
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
+ int r = svc.bi_rados->open_bucket_index(dpp, bucket_info, shard_id, idx_layout, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
rctx->set_compressed(obj);
}
int decode_policy(const DoutPrefixProvider *dpp, bufferlist& bl, ACLOwner *owner);
- int get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, std::string *bucket_ver, std::string *master_ver,
+ int get_bucket_stats(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, std::string *bucket_ver, std::string *master_ver,
std::map<RGWObjCategory, RGWStorageStats>& stats, std::string *max_marker, bool* syncstopped = NULL);
- int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *cb);
+ int get_bucket_stats_async(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *cb);
int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, std::map<std::string, bufferlist> *pattrs, const DoutPrefixProvider *dpp);
/* xxx dang obj_ctx -> svc */
int cls_bucket_list_ordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
- const int shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ const int shard_id,
const rgw_obj_index_key& start_after,
const std::string& prefix,
const std::string& delimiter,
RGWBucketListNameFilter force_check_filter = {});
int cls_bucket_list_unordered(const DoutPrefixProvider *dpp,
RGWBucketInfo& bucket_info,
- int shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ int shard_id,
const rgw_obj_index_key& start_after,
const std::string& prefix,
uint32_t num_entries,
RGWBucketListNameFilter force_check_filter = {});
int cls_bucket_head(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
- int shard_id,
- std::vector<rgw_bucket_dir_header>& headers,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ int shard_id, std::vector<rgw_bucket_dir_header>& headers,
std::map<int, std::string> *bucket_instance_ids = NULL);
- int cls_bucket_head_async(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio);
-
+ int cls_bucket_head_async(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
+ int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio);
int bi_get_instance(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent);
int bi_get_olh(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh);
int bi_get(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
bucket_name = s->info.args.get("bucket"),
marker = s->info.args.get("marker"),
max_entries_str = s->info.args.get("max-entries"),
- bucket_instance = s->info.args.get("bucket-instance");
+ bucket_instance = s->info.args.get("bucket-instance"),
+ gen_str = s->info.args.get("generation"),
+ format_version_str = s->info.args.get("format-ver");
std::unique_ptr<rgw::sal::Bucket> bucket;
rgw_bucket b(rgw_bucket_key(tenant_name, bucket_name));
+
unsigned max_entries;
if (bucket_name.empty() && bucket_instance.empty()) {
return;
}
+ string err;
+ const uint64_t gen = strict_strtoll(gen_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldpp_dout(s, 5) << "Error parsing generation param " << gen_str << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+
+ if (!format_version_str.empty()) {
+ format_ver = strict_strtoll(format_version_str.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldpp_dout(s, 5) << "Failed to parse format-ver param: " << format_ver << dendl;
+ op_ret = -EINVAL;
+ return;
+ }
+ }
+
int shard_id;
string bn;
op_ret = rgw_bucket_parse_bucket_instance(bucket_instance, &bn, &bucket_instance, &shard_id);
return;
}
- bool truncated;
+ const auto& logs = bucket->get_info().layout.logs;
+ auto log = std::prev(logs.end());
+ if (gen) {
+ log = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(gen));
+ if (log == logs.end()) {
+ ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << gen << dendl;
+ op_ret = -ENOENT;
+ return;
+ }
+ }
+ if (auto next = std::next(log); next != logs.end()) {
+ next_log_layout = *next; // get the next log after the current latest
+ }
+ auto& log_layout = *log; // current log layout for log listing
+
unsigned count = 0;
- string err;
+
max_entries = (unsigned)strict_strtol(max_entries_str.c_str(), 10, &err);
if (!err.empty())
send_response();
do {
list<rgw_bi_log_entry> entries;
- int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_list(s, bucket->get_info(), shard_id,
- marker, max_entries - count,
+ int ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_list(s, bucket->get_info(), log_layout, shard_id,
+ marker, max_entries - count,
entries, &truncated);
if (ret < 0) {
ldpp_dout(this, 5) << "ERROR: list_bi_log_entries()" << dendl;
if (op_ret < 0)
return;
+ if (format_ver >= 2) {
+ s->formatter->open_object_section("result");
+ }
+
s->formatter->open_array_section("entries");
}
void RGWOp_BILog_List::send_response_end() {
s->formatter->close_section();
+
+ if (format_ver >= 2) {
+ encode_json("truncated", truncated, s->formatter);
+
+ if (next_log_layout) {
+ s->formatter->open_object_section("next_log");
+ encode_json("generation", next_log_layout->gen, s->formatter);
+ encode_json("num_shards", next_log_layout->layout.in_index.layout.num_shards, s->formatter);
+ s->formatter->close_section(); // next_log
+ }
+
+ s->formatter->close_section(); // result
+ }
+
flusher.flush();
}
-
+
void RGWOp_BILog_Info::execute(optional_yield y) {
string tenant_name = s->info.args.get("tenant"),
bucket_name = s->info.args.get("bucket"),
}
map<RGWObjCategory, RGWStorageStats> stats;
- int ret = bucket->read_stats(s, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
+ const auto& latest_log = bucket->get_info().layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+
+ int ret = bucket->read_stats(s, index, shard_id, &bucket_ver, &master_ver, stats, &max_marker, &syncstopped);
if (ret < 0 && ret != -ENOENT) {
op_ret = ret;
return;
}
+
+ oldest_gen = bucket->get_info().layout.logs.front().gen;
+ latest_gen = latest_log.gen;
}
void RGWOp_BILog_Info::send_response() {
encode_json("master_ver", master_ver, s->formatter);
encode_json("max_marker", max_marker, s->formatter);
encode_json("syncstopped", syncstopped, s->formatter);
+ encode_json("oldest_gen", oldest_gen, s->formatter);
+ encode_json("latest_gen", latest_gen, s->formatter);
s->formatter->close_section();
flusher.flush();
return;
}
- op_ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_trim(s, bucket->get_info(), shard_id, start_marker, end_marker);
+ const auto& logs = bucket->get_info().layout.logs;
+ auto log_layout = std::reference_wrapper{logs.back()};
+ auto gen = logs.back().gen; // TODO: remove this once gen is passed here
+ if (gen) {
+ auto i = std::find_if(logs.begin(), logs.end(), rgw::matches_gen(gen));
+ if (i == logs.end()) {
+ ldpp_dout(s, 5) << "ERROR: no log layout with gen=" << gen << dendl;
+ op_ret = -ENOENT;
+ return;
+ }
+ log_layout = *i;
+ }
+
+ op_ret = static_cast<rgw::sal::RadosStore*>(store)->svc()->bilog_rados->log_trim(s, bucket->get_info(), log_layout, shard_id, start_marker, end_marker);
if (op_ret < 0) {
ldpp_dout(this, 5) << "ERROR: trim_bi_log_entries() " << dendl;
}
else if (s->info.args.exists("unlock"))
return new RGWOp_MDLog_Unlock;
else if (s->info.args.exists("notify"))
- return new RGWOp_MDLog_Notify;
+ return new RGWOp_MDLog_Notify;
} else if (type.compare("data") == 0) {
if (s->info.args.exists("notify"))
- return new RGWOp_DATALog_Notify;
+ return new RGWOp_DATALog_Notify;
}
return NULL;
}
class RGWOp_BILog_List : public RGWRESTOp {
bool sent_header;
+ uint32_t format_ver{0};
+ bool truncated{false};
+ std::optional<rgw::bucket_log_layout_generation> next_log_layout;
+
public:
RGWOp_BILog_List() : sent_header(false) {}
~RGWOp_BILog_List() override {}
std::string master_ver;
std::string max_marker;
bool syncstopped;
+ uint64_t oldest_gen = 0;
+ uint64_t latest_gen = 0;
+
public:
RGWOp_BILog_Info() : bucket_ver(), master_ver(), syncstopped(false) {}
~RGWOp_BILog_Info() override {}
* If @a get_stats is true, then statistics on the bucket are also looked up. */
virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y, bool get_stats = false) = 0;
/** Read the bucket stats from the backing Store, synchronous */
- virtual int read_stats(const DoutPrefixProvider *dpp, int shard_id,
- std::string* bucket_ver, std::string* master_ver,
- std::map<RGWObjCategory, RGWStorageStats>& stats,
- std::string* max_marker = nullptr,
- bool* syncstopped = nullptr) = 0;
+ virtual int read_stats(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, std::string* bucket_ver, std::string* master_ver,
+ std::map<RGWObjCategory, RGWStorageStats>& stats,
+ std::string* max_marker = nullptr,
+ bool* syncstopped = nullptr) = 0;
/** Read the bucket stats from the backing Store, asynchronous */
- virtual int read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB* ctx) = 0;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, RGWGetBucketStats_CB* ctx) = 0;
/** Sync this bucket's stats to the owning user's stats in the backing store */
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) = 0;
/** Refresh the metadata stats (size, count, and so on) from the backing store */
}
/* stats - Not for first pass */
- int DBBucket::read_stats(const DoutPrefixProvider *dpp, int shard_id,
+ int DBBucket::read_stats(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id,
std::string *bucket_ver, std::string *master_ver,
std::map<RGWObjCategory, RGWStorageStats>& stats,
std::string *max_marker, bool *syncstopped)
return 0;
}
- int DBBucket::read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB *ctx)
+ int DBBucket::read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB *ctx)
{
return 0;
}
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op();
del_op->params.bucket_owner = bucket->get_acl_owner();
del_op->params.versioning_status = 0;
-
+
// Since the data objects are associated with meta obj till
// MultipartUpload::Complete() is done, removing the metadata obj
// should remove all the uploads so far.
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy& acl, optional_yield y) override;
virtual int load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats = false) override;
- virtual int read_stats(const DoutPrefixProvider *dpp, int shard_id,
+ virtual int read_stats(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id,
std::string *bucket_ver, std::string *master_ver,
std::map<RGWObjCategory, RGWStorageStats>& stats,
std::string *max_marker = nullptr,
bool *syncstopped = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB* ctx) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp, const bucket_index_layout_generation& idx_layout, int shard_id, RGWGetBucketStats_CB* ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int update_container_stats(const DoutPrefixProvider *dpp) override;
virtual int check_bucket_shards(const DoutPrefixProvider *dpp) override;
if (ret < 0)
return ret;
- ret = read_stats(dpp, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
+ const auto& latest_log = info.layout.logs.back();
+ const auto& index = log_to_index_layout(latest_log);
+
+ ret = read_stats(dpp, index, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
if (ret < 0)
return ret;
return ret;
}
-int RadosBucket::read_stats(const DoutPrefixProvider *dpp, int shard_id,
- std::string* bucket_ver, std::string* master_ver,
- std::map<RGWObjCategory, RGWStorageStats>& stats,
- std::string* max_marker, bool* syncstopped)
+int RadosBucket::read_stats(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, std::string* bucket_ver, std::string* master_ver,
+ std::map<RGWObjCategory, RGWStorageStats>& stats,
+ std::string* max_marker, bool* syncstopped)
{
- return store->getRados()->get_bucket_stats(dpp, info, shard_id, bucket_ver, master_ver, stats, max_marker, syncstopped);
+ return store->getRados()->get_bucket_stats(dpp, info, idx_layout, shard_id, bucket_ver, master_ver, stats, max_marker, syncstopped);
}
-int RadosBucket::read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB* ctx)
+int RadosBucket::read_stats_async(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, RGWGetBucketStats_CB* ctx)
{
- return store->getRados()->get_bucket_stats_async(dpp, get_info(), shard_id, ctx);
+ return store->getRados()->get_bucket_stats_async(dpp, get_info(), idx_layout, shard_id, ctx);
}
int RadosBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y)
}
virtual ~RadosBucket();
-
virtual std::unique_ptr<Object> get_object(const rgw_obj_key& k) override;
virtual int list(const DoutPrefixProvider* dpp, ListParams&, int, ListResults&, optional_yield y) override;
virtual int remove_bucket(const DoutPrefixProvider* dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y) override;
virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
virtual int set_acl(const DoutPrefixProvider* dpp, RGWAccessControlPolicy& acl, optional_yield y) override;
virtual int load_bucket(const DoutPrefixProvider* dpp, optional_yield y, bool get_stats = false) override;
- virtual int read_stats(const DoutPrefixProvider *dpp, int shard_id,
- std::string* bucket_ver, std::string* master_ver,
- std::map<RGWObjCategory, RGWStorageStats>& stats,
- std::string* max_marker = nullptr,
- bool* syncstopped = nullptr) override;
- virtual int read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB* ctx) override;
+ virtual int read_stats(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, std::string* bucket_ver, std::string* master_ver,
+ std::map<RGWObjCategory, RGWStorageStats>& stats,
+ std::string* max_marker = nullptr,
+ bool* syncstopped = nullptr) override;
+ virtual int read_stats_async(const DoutPrefixProvider *dpp,
+ const bucket_index_layout_generation& idx_layout,
+ int shard_id, RGWGetBucketStats_CB* ctx) override;
virtual int sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) override;
virtual int update_container_stats(const DoutPrefixProvider* dpp) override;
virtual int check_bucket_shards(const DoutPrefixProvider* dpp) override;
int RGWSI_BucketIndex_RADOS::open_bucket_index(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
std::optional<int> _shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
RGWSI_RADOS::Pool *index_pool,
map<int, string> *bucket_objs,
map<int, string> *bucket_instance_ids)
return ret;
}
+ //auto gen = bucket_info.layout.current_index.gen;
+
+ // TODO: need reshard changes to add gen_id here
get_bucket_index_objects(bucket_oid_base, bucket_info.layout.current_index.layout.normal.num_shards, bucket_objs, shard_id);
if (bucket_instance_ids) {
+ // TODO: generation need to be passed here
get_bucket_instance_ids(bucket_info, shard_id, bucket_instance_ids);
}
return 0;
int RGWSI_BucketIndex_RADOS::cls_bucket_head(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
int shard_id,
vector<rgw_bucket_dir_header> *headers,
map<int, string> *bucket_instance_ids,
{
RGWSI_RADOS::Pool index_pool;
map<int, string> oids;
- int r = open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &oids, bucket_instance_ids);
+ int r = open_bucket_index(dpp, bucket_info, shard_id, idx_layout, &index_pool, &oids, bucket_instance_ids);
if (r < 0)
return r;
vector<rgw_bucket_dir_header> headers;
result->bucket = bucket_info.bucket;
- int r = cls_bucket_head(dpp, bucket_info, RGW_NO_SHARD, &headers, nullptr, y);
+ int r = cls_bucket_head(dpp, bucket_info, bucket_info.layout.current_index, RGW_NO_SHARD, &headers, nullptr, y);
if (r < 0) {
return r;
}
int r = open_bucket_index(dpp, bucket_info,
std::nullopt,
+ bucket_info.layout.current_index,
&index_pool,
&bucket_objs,
nullptr);
if (old_sync_enabled != new_sync_enabled) {
int shards_num = info.layout.current_index.layout.normal.num_shards? info.layout.current_index.layout.normal.num_shards : 1;
int shard_id = info.layout.current_index.layout.normal.num_shards? 0 : -1;
+ const auto& log_layout = info.layout.logs.back();
int ret;
if (!new_sync_enabled) {
- ret = svc.bilog->log_stop(dpp, info, -1);
+ ret = svc.bilog->log_stop(dpp, info, log_layout, -1);
} else {
- ret = svc.bilog->log_start(dpp, info, -1);
+ ret = svc.bilog->log_start(dpp, info, log_layout, -1);
}
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing bilog (bucket=" << info.bucket << "); ret=" << ret << dendl;
std::string *bucket_obj, int *shard_id);
int cls_bucket_head(const DoutPrefixProvider *dpp,
- const RGWBucketInfo& bucket_info,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_index_layout_generation& idx_layout,
int shard_id,
std::vector<rgw_bucket_dir_header> *headers,
std::map<int, std::string> *bucket_instance_ids,
int open_bucket_index(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
std::optional<int> shard_id,
+ const rgw::bucket_index_layout_generation& idx_layout,
RGWSI_RADOS::Pool *index_pool,
std::map<int, std::string> *bucket_objs,
std::map<int, std::string> *bucket_instance_ids);
svc.bi = bi_rados_svc;
}
-int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, string& start_marker, string& end_marker)
+int RGWSI_BILog_RADOS::log_trim(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, int shard_id, string& start_marker, string& end_marker)
{
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
BucketIndexShardsManager start_marker_mgr;
BucketIndexShardsManager end_marker_mgr;
- int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0) {
return r;
}
cct->_conf->rgw_bucket_index_max_aio)();
}
-int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id)
+int RGWSI_BILog_RADOS::log_start(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, int shard_id)
{
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
return CLSRGWIssueResyncBucketBILog(index_pool.ioctx(), bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
-int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id)
+int RGWSI_BILog_RADOS::log_stop(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, int shard_id)
{
RGWSI_RADOS::Pool index_pool;
map<int, string> bucket_objs;
- int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &bucket_objs, nullptr);
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &bucket_objs, nullptr);
if (r < 0)
return r;
}
}
-int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max,
+int RGWSI_BILog_RADOS::log_list(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
+ int shard_id, string& marker, uint32_t max,
std::list<rgw_bi_log_entry>& result, bool *truncated)
{
ldpp_dout(dpp, 20) << __func__ << ": " << bucket_info.bucket << " marker " << marker << " shard_id=" << shard_id << " max " << max << dendl;
RGWSI_RADOS::Pool index_pool;
map<int, string> oids;
map<int, cls_rgw_bi_log_list_ret> bi_log_lists;
- int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, &index_pool, &oids, nullptr);
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc.bi->open_bucket_index(dpp, bucket_info, shard_id, current_index, &index_pool, &oids, nullptr);
if (r < 0)
return r;
int RGWSI_BILog_RADOS::get_log_status(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
int shard_id,
map<int, string> *markers,
optional_yield y)
{
vector<rgw_bucket_dir_header> headers;
map<int, string> bucket_instance_ids;
- int r = svc.bi->cls_bucket_head(dpp, bucket_info, shard_id, &headers, &bucket_instance_ids, y);
+ const auto& current_index = rgw::log_to_index_layout(log_layout);
+ int r = svc.bi->cls_bucket_head(dpp, bucket_info, current_index, shard_id, &headers, &bucket_instance_ids, y);
if (r < 0)
return r;
void init(RGWSI_BucketIndex_RADOS *bi_rados_svc);
- int log_start(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
- int log_stop(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, int shard_id);
+ int log_start(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, int shard_id);
+ int log_stop(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& log_layout, int shard_id);
- int log_trim(const DoutPrefixProvider *dpp,
- const RGWBucketInfo& bucket_info,
+ int log_trim(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
int shard_id,
std::string& start_marker,
std::string& end_marker);
- int log_list(const DoutPrefixProvider *dpp,
- const RGWBucketInfo& bucket_info,
+ int log_list(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
int shard_id,
std::string& marker,
uint32_t max,
int get_log_status(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& log_layout,
int shard_id,
std::map<int, std::string> *markers,
optional_yield y);
};
-