RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
const rgw_raw_obj& _obj,
const string& _marker,
- std::set<std::string> *_entries,
- int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()),
- store(_store),
- marker(_marker),
- entries(_entries), max_entries(_max_entries),
- pmore(_pmore),
- obj(_obj), cn(NULL)
+ int _max_entries,
+ ResultPtr _result)
+ : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
+ marker(_marker), max_entries(_max_entries),
+ result(std::move(_result))
{
+ ceph_assert(result); // must be allocated
set_description() << "get omap keys dest=" << obj << " marker=" << marker;
}
int RGWRadosGetOmapKeysCR::send_request() {
- int r = store->get_raw_obj_ref(obj, &ref);
+ int r = store->get_raw_obj_ref(obj, &result->ref);
if (r < 0) {
lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
return r;
set_status() << "send request";
librados::ObjectReadOperation op;
- op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr);
+ op.omap_get_keys2(marker, max_entries, &result->entries, &result->more, nullptr);
- cn = stack->create_completion_notifier();
- return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
+ cn = stack->create_completion_notifier(result);
+ return result->ref.ioctx.aio_operate(result->ref.oid, cn->completion(), &op, NULL);
}
int RGWRadosGetOmapKeysCR::request_complete()
uint64_t max_entries;
int num_shards;
- int shard_id{0};;
+ int shard_id{0};
string marker;
- map<int, std::set<std::string>> &entries_map;
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
public:
RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
- map<int, std::set<std::string>>& _entries_map)
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
: RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
- max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
+ max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
{}
bool spawn_next() override;
};
return false;
string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
+ auto& shard_keys = omapkeys[shard_id];
+ shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
- marker, &entries_map[shard_id], max_entries, nullptr), false);
+ marker, max_entries, shard_keys), false);
++shard_id;
return true;
}
RGWDataSyncEnv sync_env_local = sync_env;
sync_env_local.http_manager = &http_manager;
- map<int, std::set<std::string>> entries_map;
+ std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
+ omapkeys.resize(num_shards);
uint64_t max_entries{1};
- ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
+ ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
http_manager.stop();
if (ret == 0) {
- for (const auto& entry : entries_map) {
- if (entry.second.size() != 0) {
- recovering_shards.insert(entry.first);
+ for (int i = 0; i < num_shards; i++) {
+ if (omapkeys[i]->entries.size() != 0) {
+ recovering_shards.insert(i);
}
}
}
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
+ RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
std::set<std::string> entries;
std::set<std::string>::iterator iter;
- bool more = false;
string oid;
drain_all();
return set_cr_error(-ECANCELED);
}
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
- sync_marker.marker, &entries,
- max_entries, &more));
+ sync_marker.marker, max_entries, omapkeys));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
+ entries = std::move(omapkeys->entries);
if (entries.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
}
}
}
- } while (more);
+ } while (omapkeys->more);
+ omapkeys.reset();
drain_all_but_stack(lease_stack.get());
if (error_retry_time <= ceph::coarse_real_clock::now()) {
/* process bucket shards that previously failed */
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
- error_marker, &error_entries,
- max_error_entries, &more));
+ error_marker, max_error_entries, omapkeys));
+ error_entries = std::move(omapkeys->entries);
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
tn->log(20, SSTR("handle error entry: " << error_marker));
spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
- if (!more) {
+ if (!omapkeys->more) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
error_marker.clear();
}
}
+ omapkeys.reset();
#define INCREMENTAL_MAX_ENTRIES 100
tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
string marker;
string error_oid;
+ RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
set<string> error_entries;
int max_omap_entries;
- bool more = false;
int count;
public:
//read recovering bucket shards
count = 0;
do {
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
- marker, &error_entries, max_omap_entries, &more));
+ marker, max_omap_entries, omapkeys));
if (retcode == -ENOENT) {
break;
return set_cr_error(retcode);
}
+ error_entries = std::move(omapkeys->entries);
if (error_entries.empty()) {
break;
}
marker = *error_entries.rbegin();
recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
std::make_move_iterator(error_entries.end()));
- } while(more && count < max_entries);
+ } while (omapkeys->more && count < max_entries);
return set_cr_done();
}
string max_marker;
const std::string& period_marker; //< max marker stored in next period
+ RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
std::set<std::string> entries;
- bool more = false;
std::set<std::string>::iterator iter;
string oid;
lost_lock = true;
break;
}
+ omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
- marker, &entries, max_entries, &more));
+ marker, max_entries, omapkeys));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
drain_all();
return retcode;
}
+ entries = std::move(omapkeys->entries);
tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
if (entries.size() > 0) {
tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
}
}
collect_children();
- } while (more && can_adjust_marker);
+ } while (omapkeys->more && can_adjust_marker);
tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */