RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados *_store,
const rgw_raw_obj& _obj,
const string& _marker,
- std::set<std::string> *_entries, int _max_entries) : RGWSimpleCoroutine(_store->ctx()),
+ std::set<std::string> *_entries,
+ int _max_entries, bool *_pmore) : RGWSimpleCoroutine(_store->ctx()),
store(_store),
marker(_marker),
entries(_entries), max_entries(_max_entries),
+ pmore(_pmore),
obj(_obj), cn(NULL)
{
set_description() << "get omap keys dest=" << obj << " marker=" << marker;
set_status() << "send request";
librados::ObjectReadOperation op;
- op.omap_get_keys2(marker, max_entries, entries, nullptr, nullptr);
+ op.omap_get_keys2(marker, max_entries, entries, pmore, nullptr);
cn = stack->create_completion_notifier();
return ref.ioctx.aio_operate(ref.oid, cn->completion(), &op, NULL);
string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
- marker, &entries_map[shard_id], max_entries), false);
+ marker, &entries_map[shard_id], max_entries, nullptr), false);
++shard_id;
return true;
std::set<std::string> entries;
std::set<std::string>::iterator iter;
+ bool more = false;
string oid;
drain_all();
return set_cr_error(-ECANCELED);
}
- yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
+ sync_marker.marker, &entries,
+ max_entries, &more));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
lease_cr->go_down();
}
}
}
- } while ((int)entries.size() == max_entries);
+ } while (more);
drain_all_but_stack(lease_stack.get());
/* process bucket shards that previously failed */
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
error_marker, &error_entries,
- max_error_entries));
+ max_error_entries, &more));
tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
tn->log(20, SSTR("handle error entry: " << error_marker));
spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
}
- if ((int)error_entries.size() != max_error_entries) {
+ if (!more) {
if (error_marker.empty() && error_entries.empty()) {
/* the retry repo is empty, we back off a bit before calling it again */
retry_backoff_secs *= 2;
set<string> error_entries;
int max_omap_entries;
+ bool more = false;
int count;
public:
count = 0;
do {
yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
- marker, &error_entries, max_omap_entries));
+ marker, &error_entries, max_omap_entries, &more));
if (retcode == -ENOENT) {
break;
marker = *error_entries.rbegin();
recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
std::make_move_iterator(error_entries.end()));
- }while((int)error_entries.size() == max_omap_entries && count < max_entries);
+ } while(more && count < max_entries);
return set_cr_done();
}
const std::string& period_marker; //< max marker stored in next period
std::set<std::string> entries;
+ bool more = false;
std::set<std::string>::iterator iter;
string oid;
break;
}
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
- marker, &entries, max_entries));
+ marker, &entries, max_entries, &more));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
tn->log(0, SSTR("ERROR: failed to list omap keys, status=" << retcode));
}
}
collect_children();
- } while ((int)entries.size() == max_entries && can_adjust_marker);
+ } while (more && can_adjust_marker);
tn->unset_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */