]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: support partial mesh for zone sync
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 26 Aug 2016 21:09:00 +0000 (14:09 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 7 Oct 2016 17:31:27 +0000 (10:31 -0700)
zone configuration now includes two new fields: sync_from_all
which is boolean, and sync_from, which is a least of zones to
sync from. By default sync_from_all is set to true. Sync will
only happen from all the zones, or from the specified zones if
sync_from all is false. We also check to see whether zone can
export data (depending on tier_type).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_json_enc.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index c9584ff78d5d5b9b1d89a5663719f185134d3574..225adbd10a25d9fc8fa75d3609e424df81891f1e 100644 (file)
@@ -919,6 +919,8 @@ void RGWZone::dump(Formatter *f) const
   encode_json("bucket_index_max_shards", bucket_index_max_shards, f);
   encode_json("read_only", read_only, f);
   encode_json("tier_type", tier_type, f);
+  encode_json("sync_from_all", sync_from_all, f);
+  encode_json("sync_from", sync_from, f);
 }
 
 void RGWZone::decode_json(JSONObj *obj)
@@ -934,6 +936,8 @@ void RGWZone::decode_json(JSONObj *obj)
   JSONDecoder::decode_json("bucket_index_max_shards", bucket_index_max_shards, obj);
   JSONDecoder::decode_json("read_only", read_only, obj);
   JSONDecoder::decode_json("tier_type", tier_type, obj);
+  JSONDecoder::decode_json("sync_from_all", sync_from_all, obj);
+  JSONDecoder::decode_json("sync_from", sync_from, obj);
 }
 
 void RGWZoneGroupPlacementTarget::dump(Formatter *f) const
index 49cf4e0d4e08418404a8b0864bb491aaa5435467..48751e0466423ca3ff677d7b04b6f23b7c50ad6e 100644 (file)
@@ -288,6 +288,18 @@ int RGWZoneGroup::add_zone(const RGWZoneParams& zone_params, bool *is_master, bo
     zone.tier_type = *ptier_type;
   }
 
+  if (psync_from_all) {
+    zone.sync_from_all = *psync_from_all;
+  }
+
+  for (auto add : sync_from) {
+    zone.sync_from.insert(add);
+  }
+
+  for (auto rm : sync_from_rm) {
+    zone.sync_from.erase(rm);
+  }
+
   post_process_params();
 
   return update();
@@ -2950,7 +2962,7 @@ int RGWDataNotifier::process()
     ldout(cct, 20) << __func__ << "(): notifying datalog change, shard_id=" << iter->first << ": " << iter->second << dendl;
   }
 
-  notify_mgr.notify_all(store->zone_conn_map, shards);
+  notify_mgr.notify_all(store->zone_data_notify_to_map, shards);
 
   return 0;
 }
@@ -3706,6 +3718,13 @@ int RGWRados::init_zg_from_local(bool *creating_defaults)
   return 0;
 }
 
+
+bool RGWRados::zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone)
+{
+  return target_zone.syncs_from(source_zone.id) &&
+         sync_modules_manager->supports_data_export(source_zone.tier_type);
+}
+
 /** 
  * Initialize the RADOS instance and prepare to do other ops
  * Returns 0 on success, -ERR# on failure.
@@ -3816,22 +3835,41 @@ int RGWRados::init_complete()
     }
   }
 
-  map<string, RGWZone>::iterator ziter;
-  for (ziter = get_zonegroup().zones.begin(); ziter != get_zonegroup().zones.end(); ++ziter) {
-    const string& id = ziter->first;
-    RGWZone& z = ziter->second;
+  /* first build all zones index */
+  for (auto ziter : get_zonegroup().zones) {
+    const string& id = ziter.first;
+    RGWZone& z = ziter.second;
     zone_id_by_name[z.name] = id;
     zone_by_id[id] = z;
-    if (id != zone_id()) {
-      if (!z.endpoints.empty()) {
-        ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
-        RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
-        zone_conn_map[id] = conn;
-      } else {
-        ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl;
+  }
+  
+  if (zone_by_id.find(zone_id()) == zone_by_id.end()) {
+    ldout(cct, 0) << "WARNING: could not find zone config in zonegroup for local zone (" << zone_id() << "), will use defaults" << dendl;
+  }
+  zone_public_config = zone_by_id[zone_id()];
+  for (auto ziter : get_zonegroup().zones) {
+    const string& id = ziter.first;
+    RGWZone& z = ziter.second;
+    if (id == zone_id()) {
+      continue;
+    }
+    if (z.endpoints.empty()) {
+      ldout(cct, 0) << "WARNING: can't generate connection for zone " << z.id << " id " << z.name << ": no endpoints defined" << dendl;
+      continue;
+    }
+    ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
+    RGWRESTConn *conn = new RGWRESTConn(cct, this, z.id, z.endpoints);
+    zone_conn_map[id] = conn;
+    if (zone_syncs_from(zone_public_config, z) ||
+        zone_syncs_from(z, zone_public_config)) {
+      if (zone_syncs_from(zone_public_config, z)) {
+        zone_data_sync_from_map[id] = conn;
+      }
+      if (zone_syncs_from(z, zone_public_config)) {
+        zone_data_notify_to_map[id] = conn;
       }
     } else {
-      zone_public_config = z;
+      ldout(cct, 20) << "NOTICE: not syncing to/from zone " << z.name << " id " << z.id << dendl;
     }
   }
 
@@ -3903,16 +3941,16 @@ int RGWRados::init_complete()
     meta_sync_processor_thread->start();
 
     Mutex::Locker dl(data_sync_thread_lock);
-    for (map<string, RGWRESTConn *>::iterator iter = zone_conn_map.begin(); iter != zone_conn_map.end(); ++iter) {
-      ldout(cct, 5) << "starting data sync thread for zone " << iter->first << dendl;
-      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter->first);
+    for (auto iter : zone_data_sync_from_map) {
+      ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
+      RGWDataSyncProcessorThread *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
         return ret;
       }
       thread->start();
-      data_sync_processor_threads[iter->first] = thread;
+      data_sync_processor_threads[iter.first] = thread;
     }
     auto interval = cct->_conf->rgw_sync_log_trim_interval;
     if (interval > 0) {
@@ -3948,7 +3986,7 @@ int RGWRados::init_complete()
   binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>;
   binfo_cache->init(this);
 
-  bool need_tombstone_cache = !zone_conn_map.empty();
+  bool need_tombstone_cache = !zone_data_notify_to_map.empty(); /* have zones syncing from us */
 
   if (need_tombstone_cache) {
     obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
index fb956f64fdccac4b9a17e5e8e16b1d616b97aca3..e1263d21b6a2be2ccbfdd074256310bc0988f8ce 100644 (file)
@@ -1019,10 +1019,14 @@ struct RGWZone {
  */
   uint32_t bucket_index_max_shards;
 
-  RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0) {}
+  bool sync_from_all;
+  set<string> sync_from; /* list of zones to sync from */
+
+  RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0),
+              sync_from_all(true) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(5, 1, bl);
+    ENCODE_START(6, 1, bl);
     ::encode(name, bl);
     ::encode(endpoints, bl);
     ::encode(log_meta, bl);
@@ -1031,11 +1035,13 @@ struct RGWZone {
     ::encode(id, bl);
     ::encode(read_only, bl);
     ::encode(tier_type, bl);
+    ::encode(sync_from_all, bl);
+    ::encode(sync_from, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::iterator& bl) {
-    DECODE_START(5, bl);
+    DECODE_START(6, bl);
     ::decode(name, bl);
     if (struct_v < 4) {
       id = name;
@@ -1055,6 +1061,10 @@ struct RGWZone {
     if (struct_v >= 5) {
       ::decode(tier_type, bl);
     }
+    if (struct_v >= 6) {
+      ::decode(sync_from_all, bl);
+      ::decode(sync_from, bl);
+    }
     DECODE_FINISH(bl);
   }
   void dump(Formatter *f) const;
@@ -1062,6 +1072,10 @@ struct RGWZone {
   static void generate_test_instances(list<RGWZone*>& o);
 
   bool is_read_only() { return read_only; }
+
+  bool syncs_from(const string& zone_id) {
+    return (sync_from_all || sync_from.find(zone_id) != sync_from.end());
+  }
 };
 WRITE_CLASS_ENCODER(RGWZone)
 
@@ -1964,6 +1978,8 @@ public:
 
   RGWRESTConn *rest_master_conn;
   map<string, RGWRESTConn *> zone_conn_map;
+  map<string, RGWRESTConn *> zone_data_sync_from_map;
+  map<string, RGWRESTConn *> zone_data_notify_to_map;
   map<string, RGWRESTConn *> zonegroup_conn_map;
 
   map<string, string> zone_id_by_name;
@@ -2022,6 +2038,8 @@ public:
     return zone_short_id;
   }
 
+  bool zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone);
+
   const RGWQuotaInfo& get_bucket_quota() {
     return current_period.get_config().bucket_quota;
   }