encode_json("bucket_index_max_shards", bucket_index_max_shards, f);
encode_json("read_only", read_only, f);
encode_json("tier_type", tier_type, f);
+ encode_json("sync_from_all", sync_from_all, f);
+ encode_json("sync_from", sync_from, f);
}
void RGWZone::decode_json(JSONObj *obj)
JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj);
JSONDecoder::decode_json("read_only", read_only, obj);
JSONDecoder::decode_json("tier_type", tier_type, obj);
+ JSONDecoder::decode_json("sync_from_all", sync_from_all, obj);
+ JSONDecoder::decode_json("sync_from", sync_from, obj);
}
void RGWZoneGroupPlacementTarget::dump(Formatter *f) const
zone.tier_type = *ptier_type;
}
+ if (psync_from_all) {
+ zone.sync_from_all = *psync_from_all;
+ }
+
+ for (auto add : sync_from) {
+ zone.sync_from.insert(add);
+ }
+
+ for (auto rm : sync_from_rm) {
+ zone.sync_from.erase(rm);
+ }
+
post_process_params();
return update();
ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id=" << iter->first << ": " << iter->second << dendl;
}
- notify_mgr.notify_all(store->zone_conn_map, shards);
+ notify_mgr.notify_all(store->zone_data_notify_to_map, shards);
return 0;
}
return 0;
}
+
+bool RGWRados::zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone)
+{
+ return target_zone.syncs_from(source_zone.id) &&
+ sync_modules_manager->supports_data_export(source_zone.tier_type);
+}
+
/**
* Initialize the RADOS instance and prepare to do other ops
* Returns 0 on success, -ERR# on failure.
}
}
- map<string, RGWZone>::iterator ziter;
- for (ziter = get_zonegroup().zones.begin(); ziter != get_zonegroup().zones.end(); ++ziter) {
- const string& id = ziter->first;
- RGWZone& z = ziter->second;
+ /* first build all zones index */
+ for (auto ziter : get_zonegroup().zones) {
+ const string& id = ziter.first;
+ RGWZone& z = ziter.second;
zone_id_by_name[z.name] = id;
zone_by_id[id] = z;
- if (id != zone_id()) {
- if (!z.endpoints.empty()) {
- ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
- RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
- zone_conn_map[id] = conn;
- } else {
- ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl;
+ }
+
+ if (zone_by_id.find(zone_id()) == zone_by_id.end()) {
+ ldout(cct, 0) << "WARNING: could not find zone config in zonegroup for local zone (" << zone_id() << "), will use defaults" << dendl;
+ }
+ zone_public_config = zone_by_id[zone_id()];
+ for (auto ziter : get_zonegroup().zones) {
+ const string& id = ziter.first;
+ RGWZone& z = ziter.second;
+ if (id == zone_id()) {
+ continue;
+ }
+ if (z.endpoints.empty()) {
+ ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl;
+ continue;
+ }
+ ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
+ RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
+ zone_conn_map[id] = conn;
+ if (zone_syncs_from(zone_public_config, z) ||
+ zone_syncs_from(z, zone_public_config)) {
+ if (zone_syncs_from(zone_public_config, z)) {
+ zone_data_sync_from_map[id] = conn;
+ }
+ if (zone_syncs_from(z, zone_public_config)) {
+ zone_data_notify_to_map[id] = conn;
}
} else {
- zone_public_config = z;
+ ldout(cct, 20) << "NOTICE: not syncing to/from zone " << z.name << " id " << z.id << dendl;
}
}
meta_sync_processor_thread->start();
Mutex::Locker dl(data_sync_thread_lock);
- for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
- ldout(cct, 5) << "starting data sync thread for zone " << iter->first << dendl;
- RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first);
+ for (auto iter : zone_data_sync_from_map) {
+ ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
+ RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
ret = thread->init();
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
return ret;
}
thread->start();
- data_sync_processor_threads[iter->first] = thread;
+ data_sync_processor_threads[iter.first] = thread;
}
auto interval = cct->_conf->rgw_sync_log_trim_interval;
if (interval > 0) {
binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
binfo_cache->init(this);
- bool need_tombstone_cache = !zone_conn_map.empty();
+ bool need_tombstone_cache = !zone_data_notify_to_map.empty(); /* have zones syncing from us */
if (need_tombstone_cache) {
obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
*/
uint32_t bucket_index_max_shards;
- RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0) {}
+ bool sync_from_all;
+ set<string> sync_from; /* list of zones to sync from */
+
+ RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0),
+ sync_from_all(true) {}
void encode(bufferlist& bl) const {
- ENCODE_START(5, 1, bl);
+ ENCODE_START(6, 1, bl);
::encode(name, bl);
::encode(endpoints, bl);
::encode(log_meta, bl);
::encode(id, bl);
::encode(read_only, bl);
::encode(tier_type, bl);
+ ::encode(sync_from_all, bl);
+ ::encode(sync_from, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator& bl) {
- DECODE_START(5, bl);
+ DECODE_START(6, bl);
::decode(name, bl);
if (struct_v < 4) {
id = name;
if (struct_v >= 5) {
::decode(tier_type, bl);
}
+ if (struct_v >= 6) {
+ ::decode(sync_from_all, bl);
+ ::decode(sync_from, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
static void generate_test_instances(list<RGWZone*>& o);
bool is_read_only() { return read_only; }
+
+ bool syncs_from(const string& zone_id) {
+ return (sync_from_all || sync_from.find(zone_id) != sync_from.end());
+ }
};
WRITE_CLASS_ENCODER(RGWZone)
RGWRESTConn *rest_master_conn;
map<string, RGWRESTConn *> zone_conn_map;
+ map<string, RGWRESTConn *> zone_data_sync_from_map;
+ map<string, RGWRESTConn *> zone_data_notify_to_map;
map<string, RGWRESTConn *> zonegroup_conn_map;
map<string, string> zone_id_by_name;
return zone_short_id;
}
+ bool zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone);
+
const RGWQuotaInfo& get_bucket_quota() {
return current_period.get_config().bucket_quota;
}