RGWPeriodHistory::Cursor next; //< next period in history
rgw_meta_sync_status sync_status;
- map<int, RGWMetaSyncShardControlCR *> shard_crs;
+ std::mutex mutex; //< protect access to shard_crs
+
+ using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
+ map<int, ControlCRRef> shard_crs;
public:
RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
auto& period_id = sync_status.sync_info.period;
auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
+ // prevent wakeup() from accessing shard_crs while we're spawning them
+ std::lock_guard<std::mutex> lock(mutex);
+
// sync this period on each shard
for (const auto& m : sync_status.sync_markers) {
uint32_t shard_id = m.first;
auto cr = new RGWMetaSyncShardControlCR(sync_env, pool, period_id,
mdlog, shard_id, marker,
std::move(period_marker));
- // XXX: do we need to hold a ref on cr while it's in shard_crs?
shard_crs[shard_id] = cr;
spawn(cr, false);
}
}
// wait for each shard to complete
collect(&ret);
+ drain_all();
+ {
+ // drop shard cr refs under lock
+ std::lock_guard<std::mutex> lock(mutex);
+ shard_crs.clear();
+ }
if (ret < 0) {
- drain_all();
return set_cr_error(ret);
}
- drain_all();
// advance to the next period
assert(next);
cursor = next;
}
void wakeup(int shard_id) {
- map<int, RGWMetaSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
+ std::lock_guard<std::mutex> lock(mutex);
+ auto iter = shard_crs.find(shard_id);
if (iter == shard_crs.end()) {
return;
}