]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/pubsub: add conf parameter for full/incremental sync
authorYuval Lifshitz <yuvalif@yahoo.com>
Fri, 7 Jun 2019 07:47:45 +0000 (10:47 +0300)
committerYuval Lifshitz <yuvalif@yahoo.com>
Fri, 7 Jun 2019 07:47:45 +0000 (10:47 +0300)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_pubsub.cc
src/rgw/rgw_sync_module_pubsub.h
src/test/rgw/rgw_multi/tests_ps.py
src/test/rgw/rgw_multi/zone_ps.py
src/test/rgw/test_multi.py

index 9a37e629c33ce34693162cf3dce2482f727e9f9b..afb839c3fd25a7cf8fcad06bf16149a2f05297ed 100644 (file)
@@ -2053,8 +2053,14 @@ public:
         if (info.syncstopped) {
           call(new RGWRadosRemoveCR(store, obj));
         } else {
-          status.state = rgw_bucket_shard_sync_info::StateFullSync;
-          status.inc_marker.position = info.max_marker;
+          // whether or not to do full sync, incremental sync will follow anyway
+          if (sync_env->sync_module->should_full_sync()) {
+            status.state = rgw_bucket_shard_sync_info::StateFullSync;
+            status.inc_marker.position = info.max_marker;
+          } else {
+            status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
+            status.inc_marker.position = "";
+          }
           map<string, bufferlist> attrs;
           status.encode_all_attrs(attrs);
           call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
index 5c376dc937114e62693adbba8a4b203bdec90c96..aa68934c06b157264e907ca55d9ab6c81354014d 100644 (file)
@@ -51,6 +51,12 @@ public:
   }
   virtual RGWMetadataHandler *alloc_bucket_meta_handler();
   virtual RGWMetadataHandler *alloc_bucket_instance_meta_handler();
+
+  // indication whether the sync module start with full sync (default behavior)
+  // incremental sync would follow anyway
+  virtual bool should_full_sync() const {
+      return true;
+  }
 };
 
 typedef std::shared_ptr<RGWSyncModuleInstance> RGWSyncModuleInstanceRef;
index 36e25080c0fab1dc287a69dcb71e32d5ab61b505..720cce838fa51a107a5760feecf89b7ffb563064 100644 (file)
@@ -36,6 +36,8 @@ config:
    "uid": <uid>,                   # default: "pubsub"
    "data_bucket_prefix": <prefix>  # default: "pubsub-"
    "data_oid_prefix": <prefix>     #
+   "events_retention_days": <int>  # default: 7
+   "start_with_full_sync" <bool>   # default: false
 
     # non-dynamic config
     "notifications": [
@@ -51,7 +53,7 @@ config:
             "name": <subscription-name>,
             "topic": <topic>,
             "push_endpoint": <endpoint>,
-            "args:" <arg list>.            # any push endpoint specific args (include all args)
+            "push_endpoint_args:" <arg list>.            # any push endpoint specific args (include all args)
             "data_bucket": <bucket>,       # override name of bucket where subscription data will be store
             "data_oid_prefix": <prefix>    # set prefix for subscription data object ids
             "s3_id": <id>                  # in case of S3 compatible notifications, the notification ID will be set here
@@ -104,7 +106,7 @@ struct PSSubConfig {
     encode_json("name", name, f);
     encode_json("topic", topic, f);
     encode_json("push_endpoint", push_endpoint_name, f);
-    encode_json("args", push_endpoint_args, f);
+    encode_json("push_endpoint_args", push_endpoint_args, f);
     encode_json("data_bucket_name", data_bucket_name, f);
     encode_json("data_oid_prefix", data_oid_prefix, f);
     encode_json("s3_id", s3_id, f);
@@ -199,6 +201,8 @@ struct PSConfig {
   std::map<std::string, PSSubConfigRef> subs;
   std::map<std::string, PSTopicConfigRef> topics;
   std::multimap<std::string, PSNotificationConfig> notifications;
+  
+  bool start_with_full_sync{false};
 
   void dump(Formatter *f) const {
     encode_json("id", id, f);
@@ -238,6 +242,7 @@ struct PSConfig {
         f->close_section();
       }
     }
+    encode_json("start_with_full_sync", start_with_full_sync, f);
   }
 
   void init(CephContext *cct, const JSONFormattable& config) {
@@ -265,6 +270,7 @@ struct PSConfig {
         iter->second->subs.insert(sc->name);
       }
     }
+    start_with_full_sync = config["start_with_full_sync"](false);
 
     ldout(cct, 5) << "pubsub: module config (parsed representation):\n" << json_str("config", *this, true) << dendl;
   }
@@ -1577,8 +1583,13 @@ RGWRESTMgr *RGWPSSyncModuleInstance::get_rest_filter(int dialect, RGWRESTMgr *or
   return new RGWRESTMgr_PubSub_S3(orig);
 }
 
+bool RGWPSSyncModuleInstance::should_full_sync() const {
+   return data_handler->get_conf()->start_with_full_sync;
+}
+
 int RGWPSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance) {
   instance->reset(new RGWPSSyncModuleInstance(cct, config));
   return 0;
 }
 
+
index 6b4b78153e817399bbf8a09022c9885d11468943..68d397867fa8b6d784871daffe05003a01b5a67c 100644 (file)
@@ -32,6 +32,9 @@ public:
   const JSONFormattable& get_effective_conf() {
     return effective_conf;
   }
+  // start with full sync based on configuration
+  // default to incremental only
+  virtual bool should_full_sync() const override;
 };
 
 #endif
index 5a45ef2a8d2533ae163179835a5ad18aff9205d8..70ff92d4458256b9c86009db6c58714a96e84ab4 100644 (file)
@@ -271,8 +271,7 @@ def test_ps_s3_notification_records():
     for record in parsed_result['Records']:
         log.debug(record)
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
 
     # cleanup
     _, status = s3_notification_conf.del_config()
@@ -516,8 +515,7 @@ def test_ps_subscription():
     for event in parsed_result['events']:
         log.debug('Event: objname: "' + str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
@@ -614,8 +612,7 @@ def test_ps_event_type_subscription():
         log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
     parsed_result = json.loads(result)
@@ -629,8 +626,7 @@ def test_ps_event_type_subscription():
     for event in parsed_result['events']:
         log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' +
                   str(event['info']['key']['name']) + '" type: "' + str(event['event']) + '"')
-    # TODO: set exact_match to true
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
     # delete objects from the bucket
     for key in bucket.list():
         key.delete()
@@ -645,8 +641,7 @@ def test_ps_event_type_subscription():
         log.debug('Event (OBJECT_CREATE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # deletions should not change the creation events
-    # TODO: set exact_match to true
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True)
     # get the events from the deletions subscription
     result, _ = sub_delete_conf.get_events()
     parsed_result = json.loads(result)
@@ -654,8 +649,7 @@ def test_ps_event_type_subscription():
         log.debug('Event (OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # only deletions should be listed here
-    # TODO: set exact_match to true
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True, deletions=True)
     # get the events from the all events subscription
     result, _ = sub_create_conf.get_events()
     parsed_result = json.loads(result)
@@ -663,7 +657,7 @@ def test_ps_event_type_subscription():
         log.debug('Event (OBJECT_CREATE,OBJECT_DELETE): objname: "' + str(event['info']['key']['name']) +
                   '" type: "' + str(event['event']) + '"')
     # both deletions and creations should be here
-    verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=False)
+    verify_events_by_elements(parsed_result['events'], keys, exact_match=True, deletions=False)
     # verify_events_by_elements(parsed_result['events'], keys, exact_match=False, deletions=True)
     # TODO: (1) test deletions (2) test overall number of events
 
@@ -732,8 +726,7 @@ def test_ps_event_fetching():
         if next_marker == '':
             break
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_events_by_elements(all_events, keys, exact_match=False)
+    verify_events_by_elements(all_events, keys, exact_match=True)
 
     # cleanup
     sub_conf.del_config()
@@ -783,8 +776,7 @@ def test_ps_event_acking():
     for event in events:
         log.debug('Event (before ack)  id: "' + str(event['id']) + '"')
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_events_by_elements(events, keys, exact_match=False)
+    verify_events_by_elements(events, keys, exact_match=True)
     # ack half of the  events
     events_to_ack = number_of_objects/2
     for event in events:
@@ -1184,8 +1176,7 @@ def test_ps_delete_bucket():
                               topic_name)
     result, _ = sub_conf.get_events()
     parsed_result = json.loads(result)
-    # TODO: set exact_match to true
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
 
     # s3 notification is deleted with bucket
     _, status = s3_notification_conf.get_config(notification=notification_name)
@@ -1475,16 +1466,14 @@ def test_ps_s3_multiple_topics_notification():
     for record in parsed_result['Records']:
         log.debug(record)
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
     
     result, _ = sub_conf2.get_events()
     parsed_result = json.loads(result)
     for record in parsed_result['Records']:
         log.debug(record)
     keys = list(bucket.list())
-    # TODO: set exact_match to true
-    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=False)
+    verify_s3_records_by_elements(parsed_result['Records'], keys, exact_match=True)
     
     # cleanup
     s3_notification_conf.del_config()
index 43b98d9f21c3ab49ba091d6a4f089dadbc1d4d7f..b0441f179446f178faa869436ebfa90acac73285 100644 (file)
@@ -15,6 +15,11 @@ log = logging.getLogger('rgw_multi.tests')
 
 class PSZone(Zone):  # pylint: disable=too-many-ancestors
     """ PubSub zone class """
+    def __init__(self, name, full_sync, retention_days, zonegroup = None, cluster = None, data = None, zone_id = None, gateways = None):
+        self.full_sync = full_sync
+        self.retention_days = retention_days
+        super(PSZone, self).__init__(name, zonegroup, cluster, data, zone_id, gateways)
+
     def is_read_only(self):
         return True
 
@@ -24,7 +29,8 @@ class PSZone(Zone):  # pylint: disable=too-many-ancestors
     def create(self, cluster, args=None, **kwargs):
         if args is None:
             args = ''
-        args += ['--tier-type', self.tier_type()]
+        tier_config = ','.join(['start_with_full_sync=' + self.full_sync, 'event_retention_days=' + self.retention_days])
+        args += ['--tier-type', self.tier_type(), '--tier-config', tier_config] 
         return self.json_command(cluster, 'create', args)
 
     def has_buckets(self):
@@ -259,3 +265,10 @@ class PSSubscription:
         """ ack events in a subscription """
         parameters = {'ack': None, 'event-id': event_id}
         return self.send_request('POST', parameters)
+
+
+class PSZoneConfig:
+    """ pubsub zone configuration """
+    def __init__(self, cfg, section):
+        self.full_sync = cfg.get(section, 'start_with_full_sync')
+        self.retention_days = cfg.get(section, 'retention_days')
index ea58646544963cf5c41419d18d70c88f5806aa8e..f80eea867f3f01e2a7fa8a066b0196f849155f1a 100644 (file)
@@ -19,6 +19,7 @@ from rgw_multi.zone_es import ESZoneConfig as ESZoneConfig
 from rgw_multi.zone_cloud import CloudZone as CloudZone
 from rgw_multi.zone_cloud import CloudZoneConfig as CloudZoneConfig
 from rgw_multi.zone_ps import PSZone as PSZone
+from rgw_multi.zone_ps import PSZoneConfig as PSZoneConfig
 
 # make tests from rgw_multi.tests available to nose
 from rgw_multi.tests import *
@@ -206,12 +207,16 @@ def init(parse_args):
 
     es_cfg = []
     cloud_cfg = []
+    ps_cfg = []
 
     for s in cfg.sections():
         if s.startswith('elasticsearch'):
             es_cfg.append(ESZoneConfig(cfg, s))
         elif s.startswith('cloud'):
             cloud_cfg.append(CloudZoneConfig(cfg, s))
+        elif s.startswith('pubsub'):
+            ps_cfg.append(PSZoneConfig(cfg, s))
+
 
     argv = []
 
@@ -247,8 +252,12 @@ def init(parse_args):
 
     num_es_zones = len(es_cfg)
     num_cloud_zones = len(cloud_cfg)
+    num_ps_zones_from_conf = len(ps_cfg)
+
+    num_ps_zones = args.num_ps_zones if num_ps_zones_from_conf == 0 else num_ps_zones_from_conf 
+    print 'num_ps_zones = ' + str(num_ps_zones)
 
-    num_zones = args.num_zones + num_es_zones + num_cloud_zones + args.num_ps_zones
+    num_zones = args.num_zones + num_es_zones + num_cloud_zones + num_ps_zones
 
     for zg in range(0, args.num_zonegroups):
         zonegroup = multisite.ZoneGroup(zonegroup_name(zg), period)
@@ -302,7 +311,11 @@ def init(parse_args):
                                  ccfg.target_path, zonegroup, cluster)
             elif ps_zone:
                 zone_index = z - args.num_zones - num_es_zones - num_cloud_zones
-                zone = PSZone(zone_name(zg, z), zonegroup, cluster)
+                if num_ps_zones_from_conf == 0:
+                    zone = PSZone(zone_name(zg, z), "false", "7", zonegroup, cluster)
+                else:
+                    pscfg = ps_cfg[zone_index]
+                    zone = PSZone(zone_name(zg, z), pscfg.full_sync, pscfg.retention_days, zonegroup, cluster)
             else:
                 zone = RadosZone(zone_name(zg, z), zonegroup, cluster)